Files
lyxy-document/scripts/readers/_utils.py
lanyuanxiaoyao a490b2642c feat: 新增 PPT 旧格式支持,重构 LibreOffice 转换工具
- 新增 PPT (旧格式) 解析器
- 重构 _utils.py,提取通用 convert_via_libreoffice 函数
- 更新依赖配置,添加 PPT 相关依赖
- 完善文档,更新 README 和 SKILL.md
- 添加 PPT 文件检测函数
- 新增 PPT 解析器测试用例
2026-03-16 22:49:04 +08:00

311 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Reader 内部共享工具模块。
此模块包含各 reader 实现共享的内部工具函数,仅供 readers 包内部使用。
"""
import re
import subprocess
import tempfile
import shutil
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple
# ============================================================================
# 通用解析器包装函数
# ============================================================================
def parse_via_markitdown(
file_path: str,
) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析文件。
Args:
file_path: 文件路径
Returns:
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
"""
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(file_path)
if not result.text_content.strip():
return None, "文档为空"
return result.text_content, None
except ImportError:
return None, "MarkItDown 库未安装"
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"
def parse_via_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析文件。
Args:
file_path: 文件路径
Returns:
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"
def convert_via_libreoffice(
input_path: str,
target_format: str,
output_dir: Path,
output_suffix: Optional[str] = None,
timeout: int = 60
) -> Tuple[Optional[Path], Optional[str]]:
"""使用 LibreOffice soffice 命令行转换文件格式。
Args:
input_path: 输入文件路径
target_format: 目标格式(如 "md", "pptx"
output_dir: 输出目录(调用者负责生命周期管理)
output_suffix: 可选,输出文件后缀(不指定则使用 target_format
timeout: 超时时间(秒)
Returns:
(output_path, error_message): 成功时 (Path, None),失败时 (None, error)
"""
# 检测 soffice 是否在 PATH 中
soffice_path = shutil.which("soffice")
if not soffice_path:
return None, "LibreOffice 未安装"
input_file = Path(input_path)
suffix = output_suffix if output_suffix else target_format
expected_output = output_dir / (input_file.stem + "." + suffix)
# 构建命令
cmd = [
soffice_path,
"--headless",
"--convert-to", target_format,
"--outdir", str(output_dir),
str(input_file)
]
# 执行命令
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
except subprocess.TimeoutExpired:
return None, f"LibreOffice 转换超时 ({timeout}秒)"
# 检查返回码
if result.returncode != 0:
return None, f"LibreOffice 转换失败 (code: {result.returncode})"
# 检查输出文件是否存在
output_file = None
if expected_output.exists():
output_file = expected_output
else:
# Fallback: 遍历目录找任意匹配后缀的文件
pattern = "*." + suffix
files = list(output_dir.glob(pattern))
if files:
output_file = files[0]
if not output_file:
return None, "LibreOffice 未生成输出文件"
return output_file, None
def parse_via_libreoffice(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 LibreOffice soffice 命令行转换文件为 Markdown。
支持 .doc/.docx/.odt 等 LibreOffice 可处理的格式。
Args:
file_path: 文件路径
Returns:
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
"""
with tempfile.TemporaryDirectory() as temp_dir:
output_path, error = convert_via_libreoffice(
input_path=file_path,
target_format="md",
output_dir=Path(temp_dir),
timeout=60
)
if error:
return None, error
# 读取输出内容
content = output_path.read_text(encoding="utf-8", errors="replace")
content = content.strip()
if not content:
return None, "LibreOffice 输出为空"
return content, None
# ============================================================================
# 格式化工具
# ============================================================================
def build_markdown_table(rows_data: List[List[str]]) -> str:
"""将二维列表格式化为 Markdown 表格。
Args:
rows_data: 二维列表,第一行为表头
Returns:
Markdown 格式的表格字符串
"""
if not rows_data or not rows_data[0]:
return ""
md_lines = []
for i, row_data in enumerate(rows_data):
row_text = [cell if cell else "" for cell in row_data]
md_lines.append("| " + " | ".join(row_text) + " |")
if i == 0:
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
return "\n".join(md_lines) + "\n\n"
# ============================================================================
# 列表处理工具
# ============================================================================
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
"""将列表堆栈中的非空项添加到目标列表并清空堆栈。
用于处理嵌套列表的格式化输出。
Args:
list_stack: 列表堆栈
target: 目标列表
"""
for item in list_stack:
if item:
target.append(item + "\n")
list_stack.clear()
# ============================================================================
# ZIP 文件安全处理
# ============================================================================
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击。
Args:
zip_file: ZipFile 对象
name: 文件名
Returns:
ZipExtFile 对象,如果路径不安全则返回 None
"""
if not name:
return None
try:
normalized = Path(name).as_posix()
# 检查是否包含父目录引用
if ".." in Path(normalized).parts:
return None
# 检查是否为绝对路径
if Path(normalized).is_absolute():
return None
return zip_file.open(name)
except (ValueError, OSError, KeyError):
return None
# ============================================================================
# unstructured 库相关
# ============================================================================
# unstructured 噪声匹配模式
_UNSTRUCTURED_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
_UNSTRUCTURED_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def convert_unstructured_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本。
Args:
elements: unstructured 解析的元素列表
trust_titles: 是否信任 unstructured 的标题检测
Returns:
Markdown 格式的文本
"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _UNSTRUCTURED_RGB_PATTERN.match(text) or _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)