1
0

增加unstructured处理策略

This commit is contained in:
2026-02-17 20:12:26 +08:00
parent 856700fbe0
commit c693e23888
7 changed files with 603 additions and 730 deletions

View File

@@ -8,6 +8,11 @@ from typing import List, Optional, Tuple
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
# unstructured 噪声匹配: pptx 中的 RGB 颜色值(如 "R:255 G:128 B:0"
_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
# unstructured 噪声匹配: 破折号页码(如 "— 3 —"
_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def parse_with_markitdown(
file_path: str,
@@ -80,60 +85,41 @@ def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipE
return zip_file.open(name)
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
def normalize_markdown_whitespace(content: str) -> str:
"""规范化 Markdown 空白字符,保留单行空行"""
lines = content.split("\n")
result = []
empty_count = 0
return _CONSECUTIVE_BLANK_LINES.sub("\n\n", content)
for line in lines:
stripped = line.strip()
if not stripped:
empty_count += 1
if empty_count == 1:
result.append(line)
else:
empty_count = 0
result.append(line)
return "\n".join(result)
def _is_valid_ooxml(file_path: str, required_files: List[str]) -> bool:
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
names = set(zip_file.namelist())
return all(r in names for r in required_files)
except (zipfile.BadZipFile, zipfile.LargeZipFile):
return False
_DOCX_REQUIRED = ["[Content_Types].xml", "_rels/.rels", "word/document.xml"]
_PPTX_REQUIRED = ["[Content_Types].xml", "_rels/.rels", "ppt/presentation.xml"]
_XLSX_REQUIRED = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"]
def is_valid_docx(file_path: str) -> bool:
"""验证文件是否为有效的 DOCX 格式"""
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
names = set(zip_file.namelist())
required_files = ["[Content_Types].xml", "_rels/.rels", "word/document.xml"]
return all(r in names for r in required_files)
except (zipfile.BadZipFile, zipfile.LargeZipFile):
return False
return _is_valid_ooxml(file_path, _DOCX_REQUIRED)
def is_valid_pptx(file_path: str) -> bool:
"""验证文件是否为有效的 PPTX 格式"""
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
names = set(zip_file.namelist())
required_files = [
"[Content_Types].xml",
"_rels/.rels",
"ppt/presentation.xml",
]
return all(r in names for r in required_files)
except (zipfile.BadZipFile, zipfile.LargeZipFile):
return False
return _is_valid_ooxml(file_path, _PPTX_REQUIRED)
def is_valid_xlsx(file_path: str) -> bool:
"""验证文件是否为有效的 XLSX 格式"""
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
names = set(zip_file.namelist())
required_files = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"]
return all(r in names for r in required_files)
except (zipfile.BadZipFile, zipfile.LargeZipFile):
return False
return _is_valid_ooxml(file_path, _XLSX_REQUIRED)
def is_valid_pdf(file_path: str) -> bool:
@@ -156,12 +142,8 @@ def get_heading_level(line: str) -> int:
stripped = line.lstrip()
if not stripped.startswith("#"):
return 0
level = 0
for char in stripped:
if char == "#":
level += 1
else:
break
without_hash = stripped.lstrip("#")
level = len(stripped) - len(without_hash)
if not (1 <= level <= 6):
return 0
if len(stripped) == level:
@@ -275,9 +257,6 @@ def search_markdown(
start_line_idx = non_empty_indices[context_start_idx]
end_line_idx = non_empty_indices[context_end_idx]
selected_indices = set(
non_empty_indices[context_start_idx : context_end_idx + 1]
)
result_lines = [
line
for i, line in enumerate(lines)
@@ -288,22 +267,71 @@ def search_markdown(
return "\n---\n".join(results)
_FILE_TYPE_VALIDATORS = {
".docx": is_valid_docx,
".pptx": is_valid_pptx,
".xlsx": is_valid_xlsx,
".pdf": is_valid_pdf,
}
def detect_file_type(file_path: str) -> Optional[str]:
"""检测文件类型,返回 'docx''pptx''xlsx''pdf'"""
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext == ".docx":
if is_valid_docx(file_path):
return "docx"
elif ext == ".pptx":
if is_valid_pptx(file_path):
return "pptx"
elif ext == ".xlsx":
if is_valid_xlsx(file_path):
return "xlsx"
elif ext == ".pdf":
if is_valid_pdf(file_path):
return "pdf"
ext = os.path.splitext(file_path)[1].lower()
validator = _FILE_TYPE_VALIDATORS.get(ext)
if validator and validator(file_path):
return ext.lstrip(".")
return None
def _unstructured_elements_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _RGB_PATTERN.match(text) or _PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)