"""Reader 内部共享工具模块。 此模块包含各 reader 实现共享的内部工具函数,仅供 readers 包内部使用。 """ import re import subprocess import tempfile import shutil import zipfile from pathlib import Path from typing import List, Optional, Tuple # ============================================================================ # 通用解析器包装函数 # ============================================================================ def parse_via_markitdown( file_path: str, ) -> Tuple[Optional[str], Optional[str]]: """使用 MarkItDown 库解析文件。 Args: file_path: 文件路径 Returns: (markdown_content, error_message): 成功时 (content, None),失败时 (None, error) """ try: from markitdown import MarkItDown md = MarkItDown() result = md.convert(file_path) if not result.text_content.strip(): return None, "文档为空" return result.text_content, None except ImportError: return None, "MarkItDown 库未安装" except Exception as e: return None, f"MarkItDown 解析失败: {str(e)}" def parse_via_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]: """使用 docling 库解析文件。 Args: file_path: 文件路径 Returns: (markdown_content, error_message): 成功时 (content, None),失败时 (None, error) """ try: from docling.document_converter import DocumentConverter except ImportError: return None, "docling 库未安装" try: converter = DocumentConverter() result = converter.convert(file_path) markdown_content = result.document.export_to_markdown() if not markdown_content.strip(): return None, "文档为空" return markdown_content, None except Exception as e: return None, f"docling 解析失败: {str(e)}" def convert_via_libreoffice( input_path: str, target_format: str, output_dir: Path, output_suffix: Optional[str] = None, timeout: int = 60 ) -> Tuple[Optional[Path], Optional[str]]: """使用 LibreOffice soffice 命令行转换文件格式。 Args: input_path: 输入文件路径 target_format: 目标格式(如 "md", "pptx") output_dir: 输出目录(调用者负责生命周期管理) output_suffix: 可选,输出文件后缀(不指定则使用 target_format) timeout: 超时时间(秒) Returns: (output_path, error_message): 成功时 (Path, None),失败时 (None, error) """ # 检测 soffice 是否在 PATH 中 soffice_path = shutil.which("soffice") if not soffice_path: return None, "LibreOffice 未安装" input_file = Path(input_path) suffix = output_suffix if output_suffix else target_format expected_output = output_dir / (input_file.stem + "." + suffix) # 构建命令 cmd = [ soffice_path, "--headless", "--convert-to", target_format, "--outdir", str(output_dir), str(input_file) ] # 执行命令 try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) except subprocess.TimeoutExpired: return None, f"LibreOffice 转换超时 ({timeout}秒)" # 检查返回码 if result.returncode != 0: return None, f"LibreOffice 转换失败 (code: {result.returncode})" # 检查输出文件是否存在 output_file = None if expected_output.exists(): output_file = expected_output else: # Fallback: 遍历目录找任意匹配后缀的文件 pattern = "*." + suffix files = list(output_dir.glob(pattern)) if files: output_file = files[0] if not output_file: return None, "LibreOffice 未生成输出文件" return output_file, None def parse_via_libreoffice(file_path: str) -> Tuple[Optional[str], Optional[str]]: """使用 LibreOffice soffice 命令行转换文件为 Markdown。 支持 .doc/.docx/.odt 等 LibreOffice 可处理的格式。 Args: file_path: 文件路径 Returns: (markdown_content, error_message): 成功时 (content, None),失败时 (None, error) """ with tempfile.TemporaryDirectory() as temp_dir: output_path, error = convert_via_libreoffice( input_path=file_path, target_format="md", output_dir=Path(temp_dir), timeout=60 ) if error: return None, error # 读取输出内容 content = output_path.read_text(encoding="utf-8", errors="replace") content = content.strip() if not content: return None, "LibreOffice 输出为空" return content, None # ============================================================================ # 格式化工具 # ============================================================================ def build_markdown_table(rows_data: List[List[str]]) -> str: """将二维列表格式化为 Markdown 表格。 Args: rows_data: 二维列表,第一行为表头 Returns: Markdown 格式的表格字符串 """ if not rows_data or not rows_data[0]: return "" md_lines = [] for i, row_data in enumerate(rows_data): row_text = [cell if cell else "" for cell in row_data] md_lines.append("| " + " | ".join(row_text) + " |") if i == 0: md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |") return "\n".join(md_lines) + "\n\n" # ============================================================================ # 列表处理工具 # ============================================================================ def flush_list_stack(list_stack: List[str], target: List[str]) -> None: """将列表堆栈中的非空项添加到目标列表并清空堆栈。 用于处理嵌套列表的格式化输出。 Args: list_stack: 列表堆栈 target: 目标列表 """ for item in list_stack: if item: target.append(item + "\n") list_stack.clear() # ============================================================================ # ZIP 文件安全处理 # ============================================================================ def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]: """安全地从 ZipFile 中打开文件,防止路径遍历攻击。 Args: zip_file: ZipFile 对象 name: 文件名 Returns: ZipExtFile 对象,如果路径不安全则返回 None """ if not name: return None try: normalized = Path(name).as_posix() # 检查是否包含父目录引用 if ".." in Path(normalized).parts: return None # 检查是否为绝对路径 if Path(normalized).is_absolute(): return None return zip_file.open(name) except (ValueError, OSError, KeyError): return None # ============================================================================ # unstructured 库相关 # ============================================================================ # unstructured 噪声匹配模式 _UNSTRUCTURED_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$") _UNSTRUCTURED_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$") def convert_unstructured_to_markdown( elements: list, trust_titles: bool = True ) -> str: """将 unstructured 解析出的元素列表转换为 Markdown 文本。 Args: elements: unstructured 解析的元素列表 trust_titles: 是否信任 unstructured 的标题检测 Returns: Markdown 格式的文本 """ try: import markdownify as md_lib from unstructured.documents.elements import ( Footer, Header, Image, ListItem, PageBreak, PageNumber, Table, Title, ) except ImportError: return "\n\n".join( el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip() ) skip_types = (Header, Footer, PageBreak, PageNumber) parts = [] for el in elements: if isinstance(el, skip_types): continue text = el.text.strip() if hasattr(el, "text") else str(el).strip() if not text or _UNSTRUCTURED_RGB_PATTERN.match(text) or _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match(text): continue if isinstance(el, Table): html = getattr(el.metadata, "text_as_html", None) if html: parts.append(md_lib.markdownify(html, strip=["img"]).strip()) else: parts.append(str(el)) elif isinstance(el, Title) and trust_titles: depth = getattr(el.metadata, "category_depth", None) or 1 depth = min(max(depth, 1), 4) parts.append(f"{'#' * depth} {text}") elif isinstance(el, ListItem): parts.append(f"- {text}") elif isinstance(el, Image): path = getattr(el.metadata, "image_path", None) or "" if path: parts.append(f"![image]({path})") else: parts.append(text) return "\n\n".join(parts)