Files
lyxy-document/scripts/core/markdown.py
lanyuanxiaoyao 1aea561277 refactor: 重构 Reader 内部工具函数到独立模块
- 新增 scripts/readers/_utils.py 作为 Reader 内部共享工具模块
- 将 parse_with_markitdown 等函数从 core/markdown.py 迁移到 _utils.py
- 函数重命名:parse_with_xxx → parse_via_xxx,_unstructured_elements_to_markdown → convert_unstructured_to_markdown
- 更新 17 个 Reader 实现文件的 import 路径
- 从 core/__init__.py 移除已迁移函数的导出
- 新增测试文件 tests/test_readers/test_utils.py
- 新增 spec 文档 openspec/specs/reader-internal-utils/spec.md

这次重构明确了模块边界:core/ 提供公共 API,readers/_utils.py 提供 Reader 内部工具
2026-03-09 00:56:05 +08:00

155 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Markdown 后处理模块,包含 Markdown 格式化的工具函数。"""
import re
from typing import List, Optional
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
def normalize_markdown_whitespace(content: str) -> str:
"""规范化 Markdown 空白字符,保留单行空行"""
return _CONSECUTIVE_BLANK_LINES.sub("\n\n", content)
def remove_markdown_images(markdown_text: str) -> str:
"""移除 Markdown 文本中的图片标记"""
return IMAGE_PATTERN.sub("", markdown_text)
def get_heading_level(line: str) -> int:
"""获取 Markdown 行的标题级别1-6非标题返回 0"""
stripped = line.lstrip()
if not stripped.startswith("#"):
return 0
without_hash = stripped.lstrip("#")
level = len(stripped) - len(without_hash)
if not (1 <= level <= 6):
return 0
if len(stripped) == level:
return level
if stripped[level] != " ":
return 0
return level
def extract_titles(markdown_text: str) -> List[str]:
"""提取 markdown 文本中的所有标题行1-6级"""
title_lines = []
for line in markdown_text.split("\n"):
if get_heading_level(line) > 0:
title_lines.append(line.lstrip())
return title_lines
def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]:
"""提取所有指定标题及其下级内容(每个包含上级标题)"""
lines = markdown_text.split("\n")
match_indices = []
for i, line in enumerate(lines):
level = get_heading_level(line)
if level > 0:
stripped = line.lstrip()
title_text = stripped[level:].strip()
if title_text == title_name:
match_indices.append(i)
if not match_indices:
return None
result_lines = []
for match_num, idx in enumerate(match_indices):
if match_num > 0:
result_lines.append("\n---\n")
target_level = get_heading_level(lines[idx])
parent_titles = []
current_level = target_level
for i in range(idx - 1, -1, -1):
line_level = get_heading_level(lines[i])
if line_level > 0 and line_level < current_level:
parent_titles.append(lines[i])
current_level = line_level
if current_level == 1:
break
parent_titles.reverse()
result_lines.extend(parent_titles)
result_lines.append(lines[idx])
for i in range(idx + 1, len(lines)):
line = lines[i]
line_level = get_heading_level(line)
if line_level == 0 or line_level > target_level:
result_lines.append(line)
else:
break
return "\n".join(result_lines)
def search_markdown(
content: str, pattern: str, context_lines: int = 0
) -> Optional[str]:
"""使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文"""
# 边界检查
if not content:
return None
if context_lines < 0:
raise ValueError("context_lines 必须为非负整数")
try:
regex = re.compile(pattern)
except re.error:
return None
lines = content.split("\n")
non_empty_indices = []
non_empty_to_original = {}
for i, line in enumerate(lines):
if line.strip():
non_empty_indices.append(i)
non_empty_to_original[i] = len(non_empty_indices) - 1
matched_non_empty_indices = []
for orig_idx in non_empty_indices:
if regex.search(lines[orig_idx]):
matched_non_empty_indices.append(non_empty_to_original[orig_idx])
if not matched_non_empty_indices:
return None
merged_ranges = []
current_start = matched_non_empty_indices[0]
current_end = matched_non_empty_indices[0]
for idx in matched_non_empty_indices[1:]:
if idx - current_end <= context_lines * 2:
current_end = idx
else:
merged_ranges.append((current_start, current_end))
current_start = idx
current_end = idx
merged_ranges.append((current_start, current_end))
results = []
for start, end in merged_ranges:
context_start_idx = max(0, start - context_lines)
context_end_idx = min(len(non_empty_indices) - 1, end + context_lines)
start_line_idx = non_empty_indices[context_start_idx]
end_line_idx = non_empty_indices[context_end_idx]
result_lines = [
line
for i, line in enumerate(lines)
if start_line_idx <= i <= end_line_idx
]
results.append("\n".join(result_lines))
return "\n---\n".join(results)