- 新增 PPT (旧格式) 解析器 - 重构 _utils.py,提取通用 convert_via_libreoffice 函数 - 更新依赖配置,添加 PPT 相关依赖 - 完善文档,更新 README 和 SKILL.md - 添加 PPT 文件检测函数 - 新增 PPT 解析器测试用例
311 lines
9.3 KiB
Python
311 lines
9.3 KiB
Python
"""Reader 内部共享工具模块。
|
||
|
||
此模块包含各 reader 实现共享的内部工具函数,仅供 readers 包内部使用。
|
||
"""
|
||
|
||
import re
|
||
import subprocess
|
||
import tempfile
|
||
import shutil
|
||
import zipfile
|
||
from pathlib import Path
|
||
from typing import List, Optional, Tuple
|
||
|
||
|
||
# ============================================================================
|
||
# 通用解析器包装函数
|
||
# ============================================================================
|
||
|
||
def parse_via_markitdown(
|
||
file_path: str,
|
||
) -> Tuple[Optional[str], Optional[str]]:
|
||
"""使用 MarkItDown 库解析文件。
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
|
||
Returns:
|
||
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
|
||
"""
|
||
try:
|
||
from markitdown import MarkItDown
|
||
|
||
md = MarkItDown()
|
||
result = md.convert(file_path)
|
||
if not result.text_content.strip():
|
||
return None, "文档为空"
|
||
return result.text_content, None
|
||
except ImportError:
|
||
return None, "MarkItDown 库未安装"
|
||
except Exception as e:
|
||
return None, f"MarkItDown 解析失败: {str(e)}"
|
||
|
||
|
||
def parse_via_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
||
"""使用 docling 库解析文件。
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
|
||
Returns:
|
||
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
|
||
"""
|
||
try:
|
||
from docling.document_converter import DocumentConverter
|
||
except ImportError:
|
||
return None, "docling 库未安装"
|
||
|
||
try:
|
||
converter = DocumentConverter()
|
||
result = converter.convert(file_path)
|
||
markdown_content = result.document.export_to_markdown()
|
||
if not markdown_content.strip():
|
||
return None, "文档为空"
|
||
return markdown_content, None
|
||
except Exception as e:
|
||
return None, f"docling 解析失败: {str(e)}"
|
||
|
||
|
||
def convert_via_libreoffice(
|
||
input_path: str,
|
||
target_format: str,
|
||
output_dir: Path,
|
||
output_suffix: Optional[str] = None,
|
||
timeout: int = 60
|
||
) -> Tuple[Optional[Path], Optional[str]]:
|
||
"""使用 LibreOffice soffice 命令行转换文件格式。
|
||
|
||
Args:
|
||
input_path: 输入文件路径
|
||
target_format: 目标格式(如 "md", "pptx")
|
||
output_dir: 输出目录(调用者负责生命周期管理)
|
||
output_suffix: 可选,输出文件后缀(不指定则使用 target_format)
|
||
timeout: 超时时间(秒)
|
||
|
||
Returns:
|
||
(output_path, error_message): 成功时 (Path, None),失败时 (None, error)
|
||
"""
|
||
# 检测 soffice 是否在 PATH 中
|
||
soffice_path = shutil.which("soffice")
|
||
if not soffice_path:
|
||
return None, "LibreOffice 未安装"
|
||
|
||
input_file = Path(input_path)
|
||
suffix = output_suffix if output_suffix else target_format
|
||
expected_output = output_dir / (input_file.stem + "." + suffix)
|
||
|
||
# 构建命令
|
||
cmd = [
|
||
soffice_path,
|
||
"--headless",
|
||
"--convert-to", target_format,
|
||
"--outdir", str(output_dir),
|
||
str(input_file)
|
||
]
|
||
|
||
# 执行命令
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
return None, f"LibreOffice 转换超时 ({timeout}秒)"
|
||
|
||
# 检查返回码
|
||
if result.returncode != 0:
|
||
return None, f"LibreOffice 转换失败 (code: {result.returncode})"
|
||
|
||
# 检查输出文件是否存在
|
||
output_file = None
|
||
if expected_output.exists():
|
||
output_file = expected_output
|
||
else:
|
||
# Fallback: 遍历目录找任意匹配后缀的文件
|
||
pattern = "*." + suffix
|
||
files = list(output_dir.glob(pattern))
|
||
if files:
|
||
output_file = files[0]
|
||
|
||
if not output_file:
|
||
return None, "LibreOffice 未生成输出文件"
|
||
|
||
return output_file, None
|
||
|
||
|
||
def parse_via_libreoffice(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
||
"""使用 LibreOffice soffice 命令行转换文件为 Markdown。
|
||
|
||
支持 .doc/.docx/.odt 等 LibreOffice 可处理的格式。
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
|
||
Returns:
|
||
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
|
||
"""
|
||
with tempfile.TemporaryDirectory() as temp_dir:
|
||
output_path, error = convert_via_libreoffice(
|
||
input_path=file_path,
|
||
target_format="md",
|
||
output_dir=Path(temp_dir),
|
||
timeout=60
|
||
)
|
||
if error:
|
||
return None, error
|
||
|
||
# 读取输出内容
|
||
content = output_path.read_text(encoding="utf-8", errors="replace")
|
||
content = content.strip()
|
||
|
||
if not content:
|
||
return None, "LibreOffice 输出为空"
|
||
|
||
return content, None
|
||
|
||
|
||
# ============================================================================
|
||
# 格式化工具
|
||
# ============================================================================
|
||
|
||
def build_markdown_table(rows_data: List[List[str]]) -> str:
|
||
"""将二维列表格式化为 Markdown 表格。
|
||
|
||
Args:
|
||
rows_data: 二维列表,第一行为表头
|
||
|
||
Returns:
|
||
Markdown 格式的表格字符串
|
||
"""
|
||
if not rows_data or not rows_data[0]:
|
||
return ""
|
||
|
||
md_lines = []
|
||
for i, row_data in enumerate(rows_data):
|
||
row_text = [cell if cell else "" for cell in row_data]
|
||
md_lines.append("| " + " | ".join(row_text) + " |")
|
||
if i == 0:
|
||
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
|
||
return "\n".join(md_lines) + "\n\n"
|
||
|
||
|
||
# ============================================================================
|
||
# 列表处理工具
|
||
# ============================================================================
|
||
|
||
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
|
||
"""将列表堆栈中的非空项添加到目标列表并清空堆栈。
|
||
|
||
用于处理嵌套列表的格式化输出。
|
||
|
||
Args:
|
||
list_stack: 列表堆栈
|
||
target: 目标列表
|
||
"""
|
||
for item in list_stack:
|
||
if item:
|
||
target.append(item + "\n")
|
||
list_stack.clear()
|
||
|
||
|
||
# ============================================================================
|
||
# ZIP 文件安全处理
|
||
# ============================================================================
|
||
|
||
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
|
||
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击。
|
||
|
||
Args:
|
||
zip_file: ZipFile 对象
|
||
name: 文件名
|
||
|
||
Returns:
|
||
ZipExtFile 对象,如果路径不安全则返回 None
|
||
"""
|
||
if not name:
|
||
return None
|
||
|
||
try:
|
||
normalized = Path(name).as_posix()
|
||
# 检查是否包含父目录引用
|
||
if ".." in Path(normalized).parts:
|
||
return None
|
||
# 检查是否为绝对路径
|
||
if Path(normalized).is_absolute():
|
||
return None
|
||
return zip_file.open(name)
|
||
except (ValueError, OSError, KeyError):
|
||
return None
|
||
|
||
|
||
# ============================================================================
|
||
# unstructured 库相关
|
||
# ============================================================================
|
||
|
||
# unstructured 噪声匹配模式
|
||
_UNSTRUCTURED_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
|
||
_UNSTRUCTURED_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
|
||
|
||
|
||
def convert_unstructured_to_markdown(
|
||
elements: list, trust_titles: bool = True
|
||
) -> str:
|
||
"""将 unstructured 解析出的元素列表转换为 Markdown 文本。
|
||
|
||
Args:
|
||
elements: unstructured 解析的元素列表
|
||
trust_titles: 是否信任 unstructured 的标题检测
|
||
|
||
Returns:
|
||
Markdown 格式的文本
|
||
"""
|
||
try:
|
||
import markdownify as md_lib
|
||
from unstructured.documents.elements import (
|
||
Footer,
|
||
Header,
|
||
Image,
|
||
ListItem,
|
||
PageBreak,
|
||
PageNumber,
|
||
Table,
|
||
Title,
|
||
)
|
||
except ImportError:
|
||
return "\n\n".join(
|
||
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
|
||
)
|
||
|
||
skip_types = (Header, Footer, PageBreak, PageNumber)
|
||
parts = []
|
||
|
||
for el in elements:
|
||
if isinstance(el, skip_types):
|
||
continue
|
||
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
|
||
if not text or _UNSTRUCTURED_RGB_PATTERN.match(text) or _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match(text):
|
||
continue
|
||
|
||
if isinstance(el, Table):
|
||
html = getattr(el.metadata, "text_as_html", None)
|
||
if html:
|
||
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
|
||
else:
|
||
parts.append(str(el))
|
||
elif isinstance(el, Title) and trust_titles:
|
||
depth = getattr(el.metadata, "category_depth", None) or 1
|
||
depth = min(max(depth, 1), 4)
|
||
parts.append(f"{'#' * depth} {text}")
|
||
elif isinstance(el, ListItem):
|
||
parts.append(f"- {text}")
|
||
elif isinstance(el, Image):
|
||
path = getattr(el.metadata, "image_path", None) or ""
|
||
if path:
|
||
parts.append(f"")
|
||
else:
|
||
parts.append(text)
|
||
|
||
return "\n\n".join(parts)
|