- 新增 scripts/readers/_utils.py 作为 Reader 内部共享工具模块 - 将 parse_with_markitdown 等函数从 core/markdown.py 迁移到 _utils.py - 函数重命名:parse_with_xxx → parse_via_xxx,_unstructured_elements_to_markdown → convert_unstructured_to_markdown - 更新 17 个 Reader 实现文件的 import 路径 - 从 core/__init__.py 移除已迁移函数的导出 - 新增测试文件 tests/test_readers/test_utils.py - 新增 spec 文档 openspec/specs/reader-internal-utils/spec.md 这次重构明确了模块边界:core/ 提供公共 API,readers/_utils.py 提供 Reader 内部工具
155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
"""Markdown 后处理模块,包含 Markdown 格式化的工具函数。"""
|
||
|
||
import re
|
||
from typing import List, Optional
|
||
|
||
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
|
||
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
|
||
|
||
|
||
def normalize_markdown_whitespace(content: str) -> str:
|
||
"""规范化 Markdown 空白字符,保留单行空行"""
|
||
return _CONSECUTIVE_BLANK_LINES.sub("\n\n", content)
|
||
|
||
|
||
def remove_markdown_images(markdown_text: str) -> str:
|
||
"""移除 Markdown 文本中的图片标记"""
|
||
return IMAGE_PATTERN.sub("", markdown_text)
|
||
|
||
|
||
def get_heading_level(line: str) -> int:
|
||
"""获取 Markdown 行的标题级别(1-6),非标题返回 0"""
|
||
stripped = line.lstrip()
|
||
if not stripped.startswith("#"):
|
||
return 0
|
||
without_hash = stripped.lstrip("#")
|
||
level = len(stripped) - len(without_hash)
|
||
if not (1 <= level <= 6):
|
||
return 0
|
||
if len(stripped) == level:
|
||
return level
|
||
if stripped[level] != " ":
|
||
return 0
|
||
return level
|
||
|
||
|
||
def extract_titles(markdown_text: str) -> List[str]:
|
||
"""提取 markdown 文本中的所有标题行(1-6级)"""
|
||
title_lines = []
|
||
for line in markdown_text.split("\n"):
|
||
if get_heading_level(line) > 0:
|
||
title_lines.append(line.lstrip())
|
||
return title_lines
|
||
|
||
|
||
def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]:
|
||
"""提取所有指定标题及其下级内容(每个包含上级标题)"""
|
||
lines = markdown_text.split("\n")
|
||
match_indices = []
|
||
|
||
for i, line in enumerate(lines):
|
||
level = get_heading_level(line)
|
||
if level > 0:
|
||
stripped = line.lstrip()
|
||
title_text = stripped[level:].strip()
|
||
if title_text == title_name:
|
||
match_indices.append(i)
|
||
|
||
if not match_indices:
|
||
return None
|
||
|
||
result_lines = []
|
||
for match_num, idx in enumerate(match_indices):
|
||
if match_num > 0:
|
||
result_lines.append("\n---\n")
|
||
|
||
target_level = get_heading_level(lines[idx])
|
||
|
||
parent_titles = []
|
||
current_level = target_level
|
||
for i in range(idx - 1, -1, -1):
|
||
line_level = get_heading_level(lines[i])
|
||
if line_level > 0 and line_level < current_level:
|
||
parent_titles.append(lines[i])
|
||
current_level = line_level
|
||
if current_level == 1:
|
||
break
|
||
|
||
parent_titles.reverse()
|
||
result_lines.extend(parent_titles)
|
||
|
||
result_lines.append(lines[idx])
|
||
for i in range(idx + 1, len(lines)):
|
||
line = lines[i]
|
||
line_level = get_heading_level(line)
|
||
if line_level == 0 or line_level > target_level:
|
||
result_lines.append(line)
|
||
else:
|
||
break
|
||
|
||
return "\n".join(result_lines)
|
||
|
||
|
||
def search_markdown(
|
||
content: str, pattern: str, context_lines: int = 0
|
||
) -> Optional[str]:
|
||
"""使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文"""
|
||
# 边界检查
|
||
if not content:
|
||
return None
|
||
|
||
if context_lines < 0:
|
||
raise ValueError("context_lines 必须为非负整数")
|
||
|
||
try:
|
||
regex = re.compile(pattern)
|
||
except re.error:
|
||
return None
|
||
|
||
lines = content.split("\n")
|
||
|
||
non_empty_indices = []
|
||
non_empty_to_original = {}
|
||
for i, line in enumerate(lines):
|
||
if line.strip():
|
||
non_empty_indices.append(i)
|
||
non_empty_to_original[i] = len(non_empty_indices) - 1
|
||
|
||
matched_non_empty_indices = []
|
||
for orig_idx in non_empty_indices:
|
||
if regex.search(lines[orig_idx]):
|
||
matched_non_empty_indices.append(non_empty_to_original[orig_idx])
|
||
|
||
if not matched_non_empty_indices:
|
||
return None
|
||
|
||
merged_ranges = []
|
||
current_start = matched_non_empty_indices[0]
|
||
current_end = matched_non_empty_indices[0]
|
||
|
||
for idx in matched_non_empty_indices[1:]:
|
||
if idx - current_end <= context_lines * 2:
|
||
current_end = idx
|
||
else:
|
||
merged_ranges.append((current_start, current_end))
|
||
current_start = idx
|
||
current_end = idx
|
||
merged_ranges.append((current_start, current_end))
|
||
|
||
results = []
|
||
for start, end in merged_ranges:
|
||
context_start_idx = max(0, start - context_lines)
|
||
context_end_idx = min(len(non_empty_indices) - 1, end + context_lines)
|
||
|
||
start_line_idx = non_empty_indices[context_start_idx]
|
||
end_line_idx = non_empty_indices[context_end_idx]
|
||
|
||
result_lines = [
|
||
line
|
||
for i, line in enumerate(lines)
|
||
if start_line_idx <= i <= end_line_idx
|
||
]
|
||
results.append("\n".join(result_lines))
|
||
|
||
return "\n---\n".join(results)
|