refactor: 重构 Reader 内部工具函数到独立模块

- 新增 scripts/readers/_utils.py 作为 Reader 内部共享工具模块
- 将 parse_with_markitdown 等函数从 core/markdown.py 迁移到 _utils.py
- 函数重命名:parse_with_xxx → parse_via_xxx,_unstructured_elements_to_markdown → convert_unstructured_to_markdown
- 更新 17 个 Reader 实现文件的 import 路径
- 从 core/__init__.py 移除已迁移函数的导出
- 新增测试文件 tests/test_readers/test_utils.py
- 新增 spec 文档 openspec/specs/reader-internal-utils/spec.md

这次重构明确了模块边界:core/ 提供公共 API,readers/_utils.py 提供 Reader 内部工具
This commit is contained in:
2026-03-09 00:56:05 +08:00
parent b80c635f07
commit 1aea561277
22 changed files with 536 additions and 179 deletions

View File

@@ -0,0 +1,95 @@
## Purpose
提供 Reader 内部共享工具模块包含解析器包装函数、格式化工具、ZIP 安全处理和 unstructured 库集成。此模块仅供 readers 包内部使用,不作为公共 API。
## Requirements
### Requirement: 解析器包装函数
系统 SHALL 提供统一的解析器包装函数,封装第三方库的调用细节。
#### Scenario: 使用 MarkItDown 解析
- **WHEN** 调用 `parse_via_markitdown(file_path)`
- **THEN** 系统使用 MarkItDown 库解析文件
- **AND** 成功时返回 `(markdown_content, None)`
- **AND** 失败时返回 `(None, error_message)`
#### Scenario: 使用 docling 解析
- **WHEN** 调用 `parse_via_docling(file_path)`
- **THEN** 系统使用 docling 库解析文件
- **AND** 成功时返回 `(markdown_content, None)`
- **AND** 失败时返回 `(None, error_message)`
#### Scenario: 库未安装时返回友好错误
- **WHEN** 调用解析器包装函数但对应库未安装
- **THEN** 系统返回 `(None, "<库名> 库未安装")`
### Requirement: Markdown 表格格式化
系统 SHALL 提供将二维列表格式化为 Markdown 表格的工具函数。
#### Scenario: 格式化标准表格
- **WHEN** 调用 `build_markdown_table(rows_data)` 且 rows_data 包含表头和数据行
- **THEN** 系统生成标准 Markdown 表格格式
- **AND** 第一行前生成分隔行(`| --- | --- |`
#### Scenario: 空数据返回空字符串
- **WHEN** 调用 `build_markdown_table([])``build_markdown_table([[]])`
- **THEN** 系统返回空字符串
### Requirement: 列表堆栈处理
系统 SHALL 提供列表堆栈处理工具函数,用于处理嵌套列表的格式化输出。
#### Scenario: 刷新列表堆栈
- **WHEN** 调用 `flush_list_stack(list_stack, target)`
- **THEN** 系统将 list_stack 中所有非空项添加到 target 列表
- **AND** 每个项末尾添加换行符
- **AND** 清空 list_stack
#### Scenario: 跳过空项
- **WHEN** list_stack 中包含空字符串
- **THEN** 系统跳过空项,不添加到 target
### Requirement: ZIP 文件安全打开
系统 SHALL 提供安全的 ZIP 文件打开函数,防止路径遍历攻击。
#### Scenario: 打开合法文件
- **WHEN** 调用 `safe_open_zip(zip_file, "valid/file.txt")`
- **THEN** 系统返回对应的 ZipExtFile 对象
#### Scenario: 拒绝路径遍历攻击
- **WHEN** 路径包含 ".." 在 Path.parts 中
- **THEN** 系统返回 None
#### Scenario: 拒绝绝对路径
- **WHEN** 路径为绝对路径
- **THEN** 系统返回 None
#### Scenario: 处理路径异常
- **WHEN** Path() 抛出 ValueError 或 OSError
- **THEN** 系统捕获异常并返回 None
### Requirement: unstructured 元素转换
系统 SHALL 提供将 unstructured 库解析的元素转换为 Markdown 的工具函数。
#### Scenario: 转换标准元素
- **WHEN** 调用 `convert_unstructured_to_markdown(elements, trust_titles=True)`
- **THEN** 系统跳过 Header、Footer、PageBreak、PageNumber 元素
- **AND** 跳过 RGB 颜色值和页码噪声
- **AND** Table 元素转换为 Markdown 表格
- **AND** Title 元素转换为 # 标题(根据 category_depth 确定级别)
- **AND** ListItem 元素转换为 - 列表项
- **AND** Image 元素转换为 ![image](path) 格式
#### Scenario: 库未安装时回退
- **WHEN** markdownify 或 unstructured 库未安装
- **THEN** 系统提取所有元素的 text 属性并用双换行连接
### Requirement: 噪声模式匹配
系统 SHALL 定义 unstructured 库的噪声匹配模式。
#### Scenario: 匹配 RGB 颜色值
- **WHEN** 文本匹配 `_UNSTRUCTURED_RGB_PATTERN`(如 "R:255 G:128 B:0"
- **THEN** 系统将其识别为噪声并过滤
#### Scenario: 匹配页码
- **WHEN** 文本匹配 `_UNSTRUCTURED_PAGE_NUMBER_PATTERN`(如 "— 3 —"
- **THEN** 系统将其识别为噪声并过滤

View File

@@ -8,18 +8,12 @@ from .exceptions import (
DownloadError,
)
from .markdown import (
parse_with_markitdown,
parse_with_docling,
build_markdown_table,
flush_list_stack,
safe_open_zip,
normalize_markdown_whitespace,
remove_markdown_images,
get_heading_level,
extract_titles,
extract_title_content,
search_markdown,
_unstructured_elements_to_markdown,
)
from .parser import parse_input, process_content, output_result
@@ -29,18 +23,12 @@ __all__ = [
"ReaderNotFoundError",
"ParseError",
"DownloadError",
"parse_with_markitdown",
"parse_with_docling",
"build_markdown_table",
"flush_list_stack",
"safe_open_zip",
"normalize_markdown_whitespace",
"remove_markdown_images",
"get_heading_level",
"extract_titles",
"extract_title_content",
"search_markdown",
"_unstructured_elements_to_markdown",
"parse_input",
"process_content",
"output_result",

View File

@@ -1,94 +1,11 @@
"""Markdown 后处理模块,包含所有格式共享的工具函数。"""
"""Markdown 后处理模块,包含 Markdown 格式化的工具函数。"""
import re
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple
from typing import List, Optional
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
# unstructured 噪声匹配: pptx 中的 RGB 颜色值(如 "R:255 G:128 B:0"
_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
# unstructured 噪声匹配: 破折号页码(如 "— 3 —"
_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def parse_with_markitdown(
file_path: str,
) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析文件"""
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(file_path)
if not result.text_content.strip():
return None, "文档为空"
return result.text_content, None
except ImportError:
return None, "MarkItDown 库未安装"
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"
def parse_with_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析文件"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"
def build_markdown_table(rows_data: List[List[str]]) -> str:
"""将二维列表转换为 Markdown 表格格式"""
if not rows_data or not rows_data[0]:
return ""
md_lines = []
for i, row_data in enumerate(rows_data):
row_text = [cell if cell else "" for cell in row_data]
md_lines.append("| " + " | ".join(row_text) + " |")
if i == 0:
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
return "\n".join(md_lines) + "\n\n"
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
"""将列表堆栈中的非空项添加到目标列表并清空堆栈"""
for item in list_stack:
if item:
target.append(item + "\n")
list_stack.clear()
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击"""
if not name:
return None
try:
normalized = Path(name).as_posix()
# 检查是否包含父目录引用
if ".." in Path(normalized).parts:
return None
# 检查是否为绝对路径
if Path(normalized).is_absolute():
return None
return zip_file.open(name)
except (ValueError, OSError):
return None
def normalize_markdown_whitespace(content: str) -> str:
"""规范化 Markdown 空白字符,保留单行空行"""
@@ -235,56 +152,3 @@ def search_markdown(
results.append("\n".join(result_lines))
return "\n---\n".join(results)
def _unstructured_elements_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _RGB_PATTERN.match(text) or _PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)

207
scripts/readers/_utils.py Normal file
View File

@@ -0,0 +1,207 @@
"""Reader 内部共享工具模块。
此模块包含各 reader 实现共享的内部工具函数,仅供 readers 包内部使用。
"""
import re
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple
# ============================================================================
# 通用解析器包装函数
# ============================================================================
def parse_via_markitdown(
file_path: str,
) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析文件。
Args:
file_path: 文件路径
Returns:
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
"""
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(file_path)
if not result.text_content.strip():
return None, "文档为空"
return result.text_content, None
except ImportError:
return None, "MarkItDown 库未安装"
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"
def parse_via_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析文件。
Args:
file_path: 文件路径
Returns:
(markdown_content, error_message): 成功时 (content, None),失败时 (None, error)
"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"
# ============================================================================
# 格式化工具
# ============================================================================
def build_markdown_table(rows_data: List[List[str]]) -> str:
"""将二维列表格式化为 Markdown 表格。
Args:
rows_data: 二维列表,第一行为表头
Returns:
Markdown 格式的表格字符串
"""
if not rows_data or not rows_data[0]:
return ""
md_lines = []
for i, row_data in enumerate(rows_data):
row_text = [cell if cell else "" for cell in row_data]
md_lines.append("| " + " | ".join(row_text) + " |")
if i == 0:
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
return "\n".join(md_lines) + "\n\n"
# ============================================================================
# 列表处理工具
# ============================================================================
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
"""将列表堆栈中的非空项添加到目标列表并清空堆栈。
用于处理嵌套列表的格式化输出。
Args:
list_stack: 列表堆栈
target: 目标列表
"""
for item in list_stack:
if item:
target.append(item + "\n")
list_stack.clear()
# ============================================================================
# ZIP 文件安全处理
# ============================================================================
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击。
Args:
zip_file: ZipFile 对象
name: 文件名
Returns:
ZipExtFile 对象,如果路径不安全则返回 None
"""
if not name:
return None
try:
normalized = Path(name).as_posix()
# 检查是否包含父目录引用
if ".." in Path(normalized).parts:
return None
# 检查是否为绝对路径
if Path(normalized).is_absolute():
return None
return zip_file.open(name)
except (ValueError, OSError, KeyError):
return None
# ============================================================================
# unstructured 库相关
# ============================================================================
# unstructured 噪声匹配模式
_UNSTRUCTURED_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
_UNSTRUCTURED_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def convert_unstructured_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本。
Args:
elements: unstructured 解析的元素列表
trust_titles: 是否信任 unstructured 的标题检测
Returns:
Markdown 格式的文本
"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _UNSTRUCTURED_RGB_PATTERN.match(text) or _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_docling
from scripts.readers._utils import parse_via_docling
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 DOCX 文件"""
return parse_with_docling(file_path)
return parse_via_docling(file_path)

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
from scripts.readers._utils import parse_via_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 DOCX 文件"""
return parse_with_markitdown(file_path)
return parse_via_markitdown(file_path)

View File

@@ -4,7 +4,7 @@ import xml.etree.ElementTree as ET
import zipfile
from typing import Any, Dict, List, Optional, Tuple
from scripts.core import build_markdown_table, safe_open_zip
from scripts.readers._utils import build_markdown_table, safe_open_zip
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -2,7 +2,7 @@
from typing import Any, List, Optional, Tuple
from scripts.core import build_markdown_table
from scripts.readers._utils import build_markdown_table
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -2,7 +2,7 @@
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
from scripts.readers._utils import convert_unstructured_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
@@ -14,7 +14,7 @@ def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
try:
elements = partition_docx(filename=file_path, infer_table_structure=True)
content = _unstructured_elements_to_markdown(elements)
content = convert_unstructured_to_markdown(elements)
if not content.strip():
return None, "文档为空"
return content, None

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
from scripts.readers._utils import parse_via_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 PDF 文件"""
return parse_with_markitdown(file_path)
return parse_via_markitdown(file_path)

View File

@@ -2,7 +2,7 @@
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
from scripts.readers._utils import convert_unstructured_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
@@ -20,7 +20,7 @@ def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
languages=["chi_sim"],
)
# fast 策略不做版面分析Title 类型标注不可靠
content = _unstructured_elements_to_markdown(elements, trust_titles=False)
content = convert_unstructured_to_markdown(elements, trust_titles=False)
if not content.strip():
return None, "文档为空"
return content, None

View File

@@ -2,7 +2,7 @@
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
from scripts.readers._utils import convert_unstructured_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
@@ -26,7 +26,7 @@ def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
ocr_agent=OCR_AGENT_PADDLE,
table_ocr_agent=OCR_AGENT_PADDLE,
)
content = _unstructured_elements_to_markdown(elements, trust_titles=True)
content = convert_unstructured_to_markdown(elements, trust_titles=True)
if not content.strip():
return None, "文档为空"
return content, None

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_docling
from scripts.readers._utils import parse_via_docling
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 PPTX 文件"""
return parse_with_docling(file_path)
return parse_via_docling(file_path)

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
from scripts.readers._utils import parse_via_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 PPTX 文件"""
return parse_with_markitdown(file_path)
return parse_via_markitdown(file_path)

View File

@@ -5,7 +5,7 @@ import xml.etree.ElementTree as ET
import zipfile
from typing import Any, List, Optional, Tuple
from scripts.core import build_markdown_table, flush_list_stack
from scripts.readers._utils import build_markdown_table, flush_list_stack
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -2,7 +2,7 @@
from typing import Any, List, Optional, Tuple
from scripts.core import build_markdown_table, flush_list_stack
from scripts.readers._utils import build_markdown_table, flush_list_stack
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -2,7 +2,7 @@
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
from scripts.readers._utils import convert_unstructured_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
@@ -16,7 +16,7 @@ def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
elements = partition_pptx(
filename=file_path, infer_table_structure=True, include_metadata=True
)
content = _unstructured_elements_to_markdown(elements)
content = convert_unstructured_to_markdown(elements)
if not content.strip():
return None, "文档为空"
return content, None

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_docling
from scripts.readers._utils import parse_via_docling
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 XLSX 文件"""
return parse_with_docling(file_path)
return parse_via_docling(file_path)

View File

@@ -2,9 +2,9 @@
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
from scripts.readers._utils import parse_via_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 XLSX 文件"""
return parse_with_markitdown(file_path)
return parse_via_markitdown(file_path)

View File

@@ -4,7 +4,7 @@ import xml.etree.ElementTree as ET
import zipfile
from typing import List, Optional, Tuple
from scripts.core import build_markdown_table, safe_open_zip
from scripts.readers._utils import build_markdown_table, safe_open_zip
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -2,7 +2,7 @@
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
from scripts.readers._utils import convert_unstructured_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
@@ -14,7 +14,7 @@ def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
try:
elements = partition_xlsx(filename=file_path, infer_table_structure=True)
content = _unstructured_elements_to_markdown(elements)
content = convert_unstructured_to_markdown(elements)
if not content.strip():
return None, "文档为空"
return content, None

View File

@@ -0,0 +1,203 @@
"""测试 Reader 内部工具函数。"""
import zipfile
import pytest
from scripts.readers._utils import (
parse_via_markitdown,
parse_via_docling,
build_markdown_table,
flush_list_stack,
safe_open_zip,
convert_unstructured_to_markdown,
_UNSTRUCTURED_RGB_PATTERN,
_UNSTRUCTURED_PAGE_NUMBER_PATTERN,
)
class TestBuildMarkdownTable:
"""测试 build_markdown_table 函数。"""
def test_standard_table(self):
"""测试标准表格格式化。"""
rows_data = [["姓名", "年龄"], ["张三", "25"], ["李四", "30"]]
result = build_markdown_table(rows_data)
assert "| 姓名 | 年龄 |" in result
assert "| --- | --- |" in result
assert "| 张三 | 25 |" in result
assert "| 李四 | 30 |" in result
def test_empty_table(self):
"""测试空表格。"""
assert build_markdown_table([]) == ""
assert build_markdown_table([[]]) == ""
def test_table_with_empty_cells(self):
"""测试包含空单元格的表格。"""
rows_data = [["A", "B"], ["", "C"], ["D", ""]]
result = build_markdown_table(rows_data)
assert "| A | B |" in result
assert "| | C |" in result
assert "| D | |" in result
class TestFlushListStack:
"""测试 flush_list_stack 函数。"""
def test_flush_non_empty_items(self):
"""测试刷新非空堆栈。"""
list_stack = ["item1\n", "", "item2\n"]
target = []
flush_list_stack(list_stack, target)
assert target == ["item1\n\n", "item2\n\n"]
assert list_stack == []
def test_flush_all_empty(self):
"""测试刷新空堆栈。"""
list_stack = ["", "", ""]
target = []
flush_list_stack(list_stack, target)
assert target == []
assert list_stack == []
class TestSafeOpenZip:
"""测试 safe_open_zip 函数。"""
def test_open_valid_file(self, tmp_path):
"""测试打开合法文件。"""
# 创建测试 ZIP 文件
zip_path = tmp_path / "test.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("valid.txt", "content")
with zipfile.ZipFile(zip_path, "r") as zf:
result = safe_open_zip(zf, "valid.txt")
assert result is not None
assert result.read() == b"content"
def test_reject_path_traversal(self, tmp_path):
"""测试拒绝路径遍历攻击。"""
zip_path = tmp_path / "test.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("safe.txt", "content")
with zipfile.ZipFile(zip_path, "r") as zf:
assert safe_open_zip(zf, "../etc/passwd") is None
assert safe_open_zip(zf, "sub/../../etc/passwd") is None
def test_reject_absolute_path(self, tmp_path):
"""测试拒绝绝对路径。"""
zip_path = tmp_path / "test.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("safe.txt", "content")
with zipfile.ZipFile(zip_path, "r") as zf:
assert safe_open_zip(zf, "/absolute/path.txt") is None
assert safe_open_zip(zf, "C:\\Windows\\System32\\config") is None
def test_empty_name(self):
"""测试空文件名。"""
import io
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
zf.writestr("test.txt", "content")
with zipfile.ZipFile(io.BytesIO(zip_buffer.getvalue()), "r") as zf:
assert safe_open_zip(zf, "") is None
class TestUnstructuredPatterns:
"""测试 unstructured 噪声匹配模式。"""
def test_rgb_pattern(self):
"""测试 RGB 颜色值模式。"""
assert _UNSTRUCTURED_RGB_PATTERN.match("R:255 G:128 B:0")
assert _UNSTRUCTURED_RGB_PATTERN.match("R:0 G:0 B:0")
assert _UNSTRUCTURED_RGB_PATTERN.match("R:255 G:255 B:255")
def test_rgb_pattern_invalid(self):
"""测试无效 RGB 值。"""
assert not _UNSTRUCTURED_RGB_PATTERN.match("255 128 0")
assert not _UNSTRUCTURED_RGB_PATTERN.match("RGB:255 G:128 B:0")
def test_page_number_pattern(self):
"""测试页码模式。"""
assert _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match("— 3 —")
assert _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match("— 123 —")
assert _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match("— 1 —")
def test_page_number_pattern_invalid(self):
"""测试无效页码。"""
assert not _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match("Page 3")
assert not _UNSTRUCTURED_PAGE_NUMBER_PATTERN.match("--- 3 ---")
class TestConvertUnstructuredToMarkdown:
"""测试 convert_unstructured_to_markdown 函数。"""
def test_skip_rgb_pattern(self):
"""测试跳过 RGB 噪声。"""
try:
from unstructured.documents.elements import Text
except ImportError:
pytest.skip("unstructured 库未安装")
elements = [Text("R:255 G:128 B:0"), Text("正常文本")]
result = convert_unstructured_to_markdown(elements)
assert "R:255 G:128 B:0" not in result
assert "正常文本" in result
def test_skip_page_number_pattern(self):
"""测试跳过页码噪声。"""
try:
from unstructured.documents.elements import Text
except ImportError:
pytest.skip("unstructured 库未安装")
elements = [Text("— 3 —"), Text("正常文本")]
result = convert_unstructured_to_markdown(elements)
assert "— 3 —" not in result
assert "正常文本" in result
def test_convert_without_markdownify(self):
"""测试未安装 markdownify 时的回退行为。"""
# 创建简单的 mock 对象
class MockElement:
def __init__(self, text):
self.text = text
elements = [MockElement("文本1"), MockElement("文本2")]
result = convert_unstructured_to_markdown(elements)
# 应该回退到简单连接文本
assert "文本1" in result
assert "文本2" in result
class TestParseViaMarkitdown:
"""测试 parse_via_markitdown 函数。"""
def test_parse_nonexistent_file(self):
"""测试解析不存在的文件。"""
content, error = parse_via_markitdown("/nonexistent/file.txt")
assert content is None
assert error is not None
class TestParseViaDocling:
"""测试 parse_via_docling 函数。"""
def test_parse_nonexistent_file(self):
"""测试解析不存在的文件。"""
content, error = parse_via_docling("/nonexistent/file.txt")
assert content is None
assert error is not None