Files
lyxy-document/scripts/core/markdown.py
lanyuanxiaoyao 15b63800a8 refactor: 将核心代码迁移到 scripts 目录
- 创建 scripts/ 目录作为核心代码根目录
- 移动 core/, readers/, utils/ 到 scripts/ 下
- 移动 config.py, lyxy_document_reader.py 到 scripts/
- 移动 encoding_detection.py 到 scripts/utils/
- 更新 pyproject.toml 中的入口点路径和 pytest 配置
- 更新所有内部导入语句为 scripts.* 模块
- 更新 README.md 目录结构说明
- 更新 openspec/config.yaml 添加目录结构说明
- 删除无用的 main.py

此变更使项目结构更清晰,便于区分核心代码与测试、文档等支撑文件。
2026-03-08 17:41:03 +08:00

291 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Markdown 后处理模块,包含所有格式共享的工具函数。"""
import re
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
# unstructured 噪声匹配: pptx 中的 RGB 颜色值(如 "R:255 G:128 B:0"
_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
# unstructured 噪声匹配: 破折号页码(如 "— 3 —"
_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def parse_with_markitdown(
file_path: str,
) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析文件"""
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(file_path)
if not result.text_content.strip():
return None, "文档为空"
return result.text_content, None
except ImportError:
return None, "MarkItDown 库未安装"
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"
def parse_with_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析文件"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"
def build_markdown_table(rows_data: List[List[str]]) -> str:
"""将二维列表转换为 Markdown 表格格式"""
if not rows_data or not rows_data[0]:
return ""
md_lines = []
for i, row_data in enumerate(rows_data):
row_text = [cell if cell else "" for cell in row_data]
md_lines.append("| " + " | ".join(row_text) + " |")
if i == 0:
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
return "\n".join(md_lines) + "\n\n"
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
"""将列表堆栈中的非空项添加到目标列表并清空堆栈"""
for item in list_stack:
if item:
target.append(item + "\n")
list_stack.clear()
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击"""
if not name:
return None
try:
normalized = Path(name).as_posix()
# 检查是否包含父目录引用
if ".." in Path(normalized).parts:
return None
# 检查是否为绝对路径
if Path(normalized).is_absolute():
return None
return zip_file.open(name)
except (ValueError, OSError):
return None
def normalize_markdown_whitespace(content: str) -> str:
"""规范化 Markdown 空白字符,保留单行空行"""
return _CONSECUTIVE_BLANK_LINES.sub("\n\n", content)
def remove_markdown_images(markdown_text: str) -> str:
"""移除 Markdown 文本中的图片标记"""
return IMAGE_PATTERN.sub("", markdown_text)
def get_heading_level(line: str) -> int:
"""获取 Markdown 行的标题级别1-6非标题返回 0"""
stripped = line.lstrip()
if not stripped.startswith("#"):
return 0
without_hash = stripped.lstrip("#")
level = len(stripped) - len(without_hash)
if not (1 <= level <= 6):
return 0
if len(stripped) == level:
return level
if stripped[level] != " ":
return 0
return level
def extract_titles(markdown_text: str) -> List[str]:
"""提取 markdown 文本中的所有标题行1-6级"""
title_lines = []
for line in markdown_text.split("\n"):
if get_heading_level(line) > 0:
title_lines.append(line.lstrip())
return title_lines
def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]:
"""提取所有指定标题及其下级内容(每个包含上级标题)"""
lines = markdown_text.split("\n")
match_indices = []
for i, line in enumerate(lines):
level = get_heading_level(line)
if level > 0:
stripped = line.lstrip()
title_text = stripped[level:].strip()
if title_text == title_name:
match_indices.append(i)
if not match_indices:
return None
result_lines = []
for match_num, idx in enumerate(match_indices):
if match_num > 0:
result_lines.append("\n---\n")
target_level = get_heading_level(lines[idx])
parent_titles = []
current_level = target_level
for i in range(idx - 1, -1, -1):
line_level = get_heading_level(lines[i])
if line_level > 0 and line_level < current_level:
parent_titles.append(lines[i])
current_level = line_level
if current_level == 1:
break
parent_titles.reverse()
result_lines.extend(parent_titles)
result_lines.append(lines[idx])
for i in range(idx + 1, len(lines)):
line = lines[i]
line_level = get_heading_level(line)
if line_level == 0 or line_level > target_level:
result_lines.append(line)
else:
break
return "\n".join(result_lines)
def search_markdown(
content: str, pattern: str, context_lines: int = 0
) -> Optional[str]:
"""使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文"""
# 边界检查
if not content:
return None
if context_lines < 0:
raise ValueError("context_lines 必须为非负整数")
try:
regex = re.compile(pattern)
except re.error:
return None
lines = content.split("\n")
non_empty_indices = []
non_empty_to_original = {}
for i, line in enumerate(lines):
if line.strip():
non_empty_indices.append(i)
non_empty_to_original[i] = len(non_empty_indices) - 1
matched_non_empty_indices = []
for orig_idx in non_empty_indices:
if regex.search(lines[orig_idx]):
matched_non_empty_indices.append(non_empty_to_original[orig_idx])
if not matched_non_empty_indices:
return None
merged_ranges = []
current_start = matched_non_empty_indices[0]
current_end = matched_non_empty_indices[0]
for idx in matched_non_empty_indices[1:]:
if idx - current_end <= context_lines * 2:
current_end = idx
else:
merged_ranges.append((current_start, current_end))
current_start = idx
current_end = idx
merged_ranges.append((current_start, current_end))
results = []
for start, end in merged_ranges:
context_start_idx = max(0, start - context_lines)
context_end_idx = min(len(non_empty_indices) - 1, end + context_lines)
start_line_idx = non_empty_indices[context_start_idx]
end_line_idx = non_empty_indices[context_end_idx]
result_lines = [
line
for i, line in enumerate(lines)
if start_line_idx <= i <= end_line_idx
]
results.append("\n".join(result_lines))
return "\n---\n".join(results)
def _unstructured_elements_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _RGB_PATTERN.match(text) or _PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)