#!/usr/bin/env python3 """文档解析器的公共模块,包含所有格式共享的工具函数和验证函数。""" import os import re import zipfile from typing import List, Optional, Tuple IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)") MEDIA_LINK_PATTERN = re.compile( r'^\[.*?\]\(.*\.(png|jpg|jpeg|gif|mp4|avi|mov|pdf)\s*["\']?.*?["\']?\)$' ) RGB_COLOR_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$") def build_markdown_table(rows_data: List[List[str]]) -> str: """将二维列表转换为 Markdown 表格格式""" if not rows_data or not rows_data[0]: return "" md_lines = [] for i, row_data in enumerate(rows_data): row_text = [cell if cell else "" for cell in row_data] md_lines.append("| " + " | ".join(row_text) + " |") if i == 0: md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |") return "\n".join(md_lines) + "\n\n" def flush_list_stack(list_stack: List[str], target: List[str]) -> None: """将列表堆栈中的非空项添加到目标列表并清空堆栈""" for item in list_stack: if item: target.append(item + "\n") list_stack.clear() def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]: """安全地从 ZipFile 中打开文件,防止路径遍历攻击""" if not name: return None if name.startswith("/") or name.startswith(".."): return None if "/../" in name or name.endswith("/.."): return None if "\\" in name: return None return zip_file.open(name) def normalize_markdown_whitespace(content: str) -> str: """规范化 Markdown 空白字符,保留单行空行""" lines = content.split("\n") result = [] empty_count = 0 for line in lines: stripped = line.strip() if not stripped: empty_count += 1 if empty_count == 1: result.append(line) else: empty_count = 0 result.append(line) return "\n".join(result) def is_valid_docx(file_path: str) -> bool: """验证文件是否为有效的 DOCX 格式""" try: with zipfile.ZipFile(file_path, "r") as zip_file: names = set(zip_file.namelist()) required_files = ["[Content_Types].xml", "_rels/.rels", "word/document.xml"] return all(r in names for r in required_files) except (zipfile.BadZipFile, zipfile.LargeZipFile): return False def is_valid_pptx(file_path: str) -> bool: """验证文件是否为有效的 PPTX 格式""" try: with zipfile.ZipFile(file_path, "r") as zip_file: names = set(zip_file.namelist()) required_files = [ "[Content_Types].xml", "_rels/.rels", "ppt/presentation.xml", ] return all(r in names for r in required_files) except (zipfile.BadZipFile, zipfile.LargeZipFile): return False def is_valid_xlsx(file_path: str) -> bool: """验证文件是否为有效的 XLSX 格式""" try: with zipfile.ZipFile(file_path, "r") as zip_file: names = set(zip_file.namelist()) required_files = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"] return all(r in names for r in required_files) except (zipfile.BadZipFile, zipfile.LargeZipFile): return False def is_valid_pdf(file_path: str) -> bool: """验证文件是否为有效的 PDF 格式""" try: with open(file_path, "rb") as f: header = f.read(4) return header == b"%PDF" except (IOError, OSError): return False def remove_markdown_images(markdown_text: str) -> str: """移除 Markdown 文本中的图片标记""" return IMAGE_PATTERN.sub("", markdown_text) def filter_markdown_content(content: str) -> str: """过滤 markdown 内容,保留文本、表格、列表和基本格式""" lines = content.split("\n") filtered_lines = [] for line in lines: stripped = line.strip() if not stripped: continue if stripped.startswith(""): continue if stripped.startswith("![") or stripped.startswith("![]"): continue if "" in stripped: continue if MEDIA_LINK_PATTERN.match(stripped): continue if RGB_COLOR_PATTERN.match(stripped): continue line = re.sub(r']*style="[^"]*"[^>]*>(.*?)', r"\1", line) line = re.sub(r"]*>(.*?)", r"\1", line) line = re.sub(r"\s+", " ", line).strip() if line: filtered_lines.append(line) return "\n".join(filtered_lines) def get_heading_level(line: str) -> int: """获取 Markdown 行的标题级别(1-6),非标题返回 0""" stripped = line.lstrip() if not stripped.startswith("#"): return 0 level = 0 for char in stripped: if char == "#": level += 1 else: break if not (1 <= level <= 6): return 0 if len(stripped) == level: return level if stripped[level] != " ": return 0 return level def extract_titles(markdown_text: str) -> List[str]: """提取 markdown 文本中的所有标题行(1-6级)""" title_lines = [] for line in markdown_text.split("\n"): if get_heading_level(line) > 0: title_lines.append(line.lstrip()) return title_lines def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]: """提取所有指定标题及其下级内容(每个包含上级标题)""" lines = markdown_text.split("\n") match_indices = [] for i, line in enumerate(lines): level = get_heading_level(line) if level > 0: stripped = line.lstrip() title_text = stripped[level:].strip() if title_text == title_name: match_indices.append(i) if not match_indices: return None result_lines = [] for match_num, idx in enumerate(match_indices): if match_num > 0: result_lines.append("\n---\n") target_level = get_heading_level(lines[idx]) parent_titles = [] current_level = target_level for i in range(idx - 1, -1, -1): line_level = get_heading_level(lines[i]) if line_level > 0 and line_level < current_level: parent_titles.append(lines[i]) current_level = line_level if current_level == 1: break parent_titles.reverse() result_lines.extend(parent_titles) result_lines.append(lines[idx]) for i in range(idx + 1, len(lines)): line = lines[i] line_level = get_heading_level(line) if line_level == 0 or line_level > target_level: result_lines.append(line) else: break return "\n".join(result_lines) def search_markdown( content: str, pattern: str, context_lines: int = 0 ) -> Optional[str]: """使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文""" try: regex = re.compile(pattern) except re.error: return None lines = content.split("\n") non_empty_indices = [] non_empty_to_original = {} for i, line in enumerate(lines): if line.strip(): non_empty_indices.append(i) non_empty_to_original[i] = len(non_empty_indices) - 1 matched_non_empty_indices = [] for orig_idx in non_empty_indices: if regex.search(lines[orig_idx]): matched_non_empty_indices.append(non_empty_to_original[orig_idx]) if not matched_non_empty_indices: return None merged_ranges = [] current_start = matched_non_empty_indices[0] current_end = matched_non_empty_indices[0] for idx in matched_non_empty_indices[1:]: if idx - current_end <= context_lines * 2: current_end = idx else: merged_ranges.append((current_start, current_end)) current_start = idx current_end = idx merged_ranges.append((current_start, current_end)) results = [] for start, end in merged_ranges: context_start_idx = max(0, start - context_lines) context_end_idx = min(len(non_empty_indices) - 1, end + context_lines) start_line_idx = non_empty_indices[context_start_idx] end_line_idx = non_empty_indices[context_end_idx] selected_indices = set( non_empty_indices[context_start_idx : context_end_idx + 1] ) result_lines = [ line for i, line in enumerate(lines) if start_line_idx <= i <= end_line_idx ] results.append("\n".join(result_lines)) return "\n---\n".join(results) def detect_file_type(file_path: str) -> Optional[str]: """检测文件类型,返回 'docx'、'pptx'、'xlsx' 或 'pdf'""" _, ext = os.path.splitext(file_path) ext = ext.lower() if ext == ".docx": if is_valid_docx(file_path): return "docx" elif ext == ".pptx": if is_valid_pptx(file_path): return "pptx" elif ext == ".xlsx": if is_valid_xlsx(file_path): return "xlsx" elif ext == ".pdf": if is_valid_pdf(file_path): return "pdf" return None