Files
lyxy-document/core/markdown.py
lanyuanxiaoyao 833018d451 feat: 统一文档解析器项目 - 迁移 lyxy-reader-office 和 lyxy-reader-html
## 功能特性

- 建立统一的项目结构,包含 core/、readers/、utils/、tests/ 模块
- 迁移 lyxy-reader-office 的所有解析器(docx、xlsx、pptx、pdf)
- 迁移 lyxy-reader-html 的所有解析器(html、url 下载)
- 统一 CLI 入口为 lyxy_document_reader.py
- 统一 Markdown 后处理逻辑
- 按文件类型组织 readers,每个解析器独立文件
- 依赖分组按文件类型细分(docx、xlsx、pptx、pdf、html、http)
- PDF OCR 解析器优先,无参数控制
- 使用 logging 模块替代简单 print
- 设计完整的单元测试结构
- 重写项目文档

## 新增目录/文件

- core/ - 核心模块(异常体系、Markdown 工具、解析调度器)
- readers/ - 格式阅读器(base.py + docx/xlsx/pptx/pdf/html)
- utils/ - 工具函数(文件类型检测)
- tests/ - 测试(conftest.py + test_core/ + test_readers/ + test_utils/)
- lyxy_document_reader.py - 统一 CLI 入口

## 依赖分组

- docx - DOCX 文档解析支持
- xlsx - XLSX 文档解析支持
- pptx - PPTX 文档解析支持
- pdf - PDF 文档解析支持(含 OCR)
- html - HTML/URL 解析支持
- http - HTTP/URL 下载支持
- office - Office 格式组合(docx/xlsx/pptx/pdf)
- web - Web 格式组合(html/http)
- full - 完整功能
- dev - 开发依赖
2026-03-08 13:46:37 +08:00

278 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Markdown 后处理模块,包含所有格式共享的工具函数。"""
import re
import zipfile
from typing import List, Optional, Tuple
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
# unstructured 噪声匹配: pptx 中的 RGB 颜色值(如 "R:255 G:128 B:0"
_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
# unstructured 噪声匹配: 破折号页码(如 "— 3 —"
_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def parse_with_markitdown(
file_path: str,
) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析文件"""
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(file_path)
if not result.text_content.strip():
return None, "文档为空"
return result.text_content, None
except ImportError:
return None, "MarkItDown 库未安装"
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"
def parse_with_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析文件"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"
def build_markdown_table(rows_data: List[List[str]]) -> str:
"""将二维列表转换为 Markdown 表格格式"""
if not rows_data or not rows_data[0]:
return ""
md_lines = []
for i, row_data in enumerate(rows_data):
row_text = [cell if cell else "" for cell in row_data]
md_lines.append("| " + " | ".join(row_text) + " |")
if i == 0:
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
return "\n".join(md_lines) + "\n\n"
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
"""将列表堆栈中的非空项添加到目标列表并清空堆栈"""
for item in list_stack:
if item:
target.append(item + "\n")
list_stack.clear()
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击"""
if not name:
return None
if name.startswith("/") or name.startswith(".."):
return None
if "/../" in name or name.endswith("/.."):
return None
if "\\" in name:
return None
return zip_file.open(name)
def normalize_markdown_whitespace(content: str) -> str:
"""规范化 Markdown 空白字符,保留单行空行"""
return _CONSECUTIVE_BLANK_LINES.sub("\n\n", content)
def remove_markdown_images(markdown_text: str) -> str:
"""移除 Markdown 文本中的图片标记"""
return IMAGE_PATTERN.sub("", markdown_text)
def get_heading_level(line: str) -> int:
"""获取 Markdown 行的标题级别1-6非标题返回 0"""
stripped = line.lstrip()
if not stripped.startswith("#"):
return 0
without_hash = stripped.lstrip("#")
level = len(stripped) - len(without_hash)
if not (1 <= level <= 6):
return 0
if len(stripped) == level:
return level
if stripped[level] != " ":
return 0
return level
def extract_titles(markdown_text: str) -> List[str]:
"""提取 markdown 文本中的所有标题行1-6级"""
title_lines = []
for line in markdown_text.split("\n"):
if get_heading_level(line) > 0:
title_lines.append(line.lstrip())
return title_lines
def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]:
"""提取所有指定标题及其下级内容(每个包含上级标题)"""
lines = markdown_text.split("\n")
match_indices = []
for i, line in enumerate(lines):
level = get_heading_level(line)
if level > 0:
stripped = line.lstrip()
title_text = stripped[level:].strip()
if title_text == title_name:
match_indices.append(i)
if not match_indices:
return None
result_lines = []
for match_num, idx in enumerate(match_indices):
if match_num > 0:
result_lines.append("\n---\n")
target_level = get_heading_level(lines[idx])
parent_titles = []
current_level = target_level
for i in range(idx - 1, -1, -1):
line_level = get_heading_level(lines[i])
if line_level > 0 and line_level < current_level:
parent_titles.append(lines[i])
current_level = line_level
if current_level == 1:
break
parent_titles.reverse()
result_lines.extend(parent_titles)
result_lines.append(lines[idx])
for i in range(idx + 1, len(lines)):
line = lines[i]
line_level = get_heading_level(line)
if line_level == 0 or line_level > target_level:
result_lines.append(line)
else:
break
return "\n".join(result_lines)
def search_markdown(
content: str, pattern: str, context_lines: int = 0
) -> Optional[str]:
"""使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文"""
try:
regex = re.compile(pattern)
except re.error:
return None
lines = content.split("\n")
non_empty_indices = []
non_empty_to_original = {}
for i, line in enumerate(lines):
if line.strip():
non_empty_indices.append(i)
non_empty_to_original[i] = len(non_empty_indices) - 1
matched_non_empty_indices = []
for orig_idx in non_empty_indices:
if regex.search(lines[orig_idx]):
matched_non_empty_indices.append(non_empty_to_original[orig_idx])
if not matched_non_empty_indices:
return None
merged_ranges = []
current_start = matched_non_empty_indices[0]
current_end = matched_non_empty_indices[0]
for idx in matched_non_empty_indices[1:]:
if idx - current_end <= context_lines * 2:
current_end = idx
else:
merged_ranges.append((current_start, current_end))
current_start = idx
current_end = idx
merged_ranges.append((current_start, current_end))
results = []
for start, end in merged_ranges:
context_start_idx = max(0, start - context_lines)
context_end_idx = min(len(non_empty_indices) - 1, end + context_lines)
start_line_idx = non_empty_indices[context_start_idx]
end_line_idx = non_empty_indices[context_end_idx]
result_lines = [
line
for i, line in enumerate(lines)
if start_line_idx <= i <= end_line_idx
]
results.append("\n".join(result_lines))
return "\n---\n".join(results)
def _unstructured_elements_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _RGB_PATTERN.match(text) or _PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)