309 lines
12 KiB
Python
309 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""DOCX 文件解析模块,提供多种解析方法。"""
|
|
|
|
import xml.etree.ElementTree as ET
|
|
import zipfile
|
|
from typing import Any, List, Optional, Tuple
|
|
|
|
from common import (
|
|
_unstructured_elements_to_markdown,
|
|
build_markdown_table,
|
|
parse_with_docling,
|
|
parse_with_markitdown,
|
|
safe_open_zip,
|
|
)
|
|
|
|
|
|
def parse_docx_with_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 docling 库解析 DOCX 文件"""
|
|
return parse_with_docling(file_path)
|
|
|
|
|
|
def parse_docx_with_unstructured(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 unstructured 库解析 DOCX 文件"""
|
|
try:
|
|
from unstructured.partition.docx import partition_docx
|
|
except ImportError:
|
|
return None, "unstructured 库未安装"
|
|
|
|
try:
|
|
elements = partition_docx(filename=file_path, infer_table_structure=True)
|
|
content = _unstructured_elements_to_markdown(elements)
|
|
if not content.strip():
|
|
return None, "文档为空"
|
|
return content, None
|
|
except Exception as e:
|
|
return None, f"unstructured 解析失败: {str(e)}"
|
|
|
|
|
|
def parse_docx_with_pypandoc(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 pypandoc-binary 库解析 DOCX 文件。"""
|
|
try:
|
|
import pypandoc
|
|
except ImportError:
|
|
return None, "pypandoc-binary 库未安装"
|
|
|
|
try:
|
|
content = pypandoc.convert_file(
|
|
source_file=file_path,
|
|
to="md",
|
|
format="docx",
|
|
outputfile=None,
|
|
extra_args=["--wrap=none"],
|
|
)
|
|
except OSError as exc:
|
|
return None, f"pypandoc-binary 缺少 Pandoc 可执行文件: {exc}"
|
|
except RuntimeError as exc:
|
|
return None, f"pypandoc-binary 解析失败: {exc}"
|
|
|
|
content = content.strip()
|
|
if not content:
|
|
return None, "文档为空"
|
|
return content, None
|
|
|
|
|
|
def parse_docx_with_markitdown(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 MarkItDown 库解析 DOCX 文件"""
|
|
return parse_with_markitdown(file_path)
|
|
|
|
|
|
def parse_docx_with_python_docx(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 python-docx 库解析 DOCX 文件"""
|
|
try:
|
|
from docx import Document
|
|
except ImportError:
|
|
return None, "python-docx 库未安装"
|
|
|
|
try:
|
|
doc = Document(file_path)
|
|
|
|
_HEADING_LEVELS = {
|
|
"Title": 1, "Heading 1": 1, "Heading 2": 2, "Heading 3": 3,
|
|
"Heading 4": 4, "Heading 5": 5, "Heading 6": 6,
|
|
}
|
|
|
|
def get_heading_level(para: Any) -> int:
|
|
if para.style and para.style.name:
|
|
return _HEADING_LEVELS.get(para.style.name, 0)
|
|
return 0
|
|
|
|
_LIST_STYLES = {
|
|
"Bullet": "bullet", "Number": "number",
|
|
}
|
|
|
|
def get_list_style(para: Any) -> Optional[str]:
|
|
if not para.style or not para.style.name:
|
|
return None
|
|
style_name = para.style.name
|
|
if style_name in _LIST_STYLES:
|
|
return _LIST_STYLES[style_name]
|
|
if style_name.startswith("List Bullet"):
|
|
return "bullet"
|
|
if style_name.startswith("List Number"):
|
|
return "number"
|
|
return None
|
|
|
|
def convert_runs_to_markdown(runs: List[Any]) -> str:
|
|
result = []
|
|
for run in runs:
|
|
text = run.text
|
|
if not text:
|
|
continue
|
|
if run.bold:
|
|
text = f"**{text}**"
|
|
if run.italic:
|
|
text = f"*{text}*"
|
|
if run.underline:
|
|
text = f"<u>{text}</u>"
|
|
result.append(text)
|
|
return "".join(result)
|
|
|
|
def convert_table_to_markdown(table: Any) -> str:
|
|
rows_data = []
|
|
for row in table.rows:
|
|
row_data = []
|
|
for cell in row.cells:
|
|
cell_text = cell.text.strip().replace("\n", " ")
|
|
row_data.append(cell_text)
|
|
rows_data.append(row_data)
|
|
return build_markdown_table(rows_data)
|
|
|
|
markdown_lines = []
|
|
prev_was_list = False
|
|
|
|
from docx.table import Table as DocxTable
|
|
from docx.text.paragraph import Paragraph
|
|
|
|
for element in doc.element.body:
|
|
if element.tag.endswith('}p'):
|
|
para = Paragraph(element, doc)
|
|
text = convert_runs_to_markdown(para.runs)
|
|
if not text.strip():
|
|
continue
|
|
|
|
heading_level = get_heading_level(para)
|
|
if heading_level > 0:
|
|
markdown_lines.append(f"{'#' * heading_level} {text}")
|
|
prev_was_list = False
|
|
else:
|
|
list_style = get_list_style(para)
|
|
if list_style == "bullet":
|
|
if not prev_was_list and markdown_lines:
|
|
markdown_lines.append("")
|
|
markdown_lines.append(f"- {text}")
|
|
prev_was_list = True
|
|
elif list_style == "number":
|
|
if not prev_was_list and markdown_lines:
|
|
markdown_lines.append("")
|
|
markdown_lines.append(f"1. {text}")
|
|
prev_was_list = True
|
|
else:
|
|
if prev_was_list and markdown_lines:
|
|
markdown_lines.append("")
|
|
markdown_lines.append(text)
|
|
markdown_lines.append("")
|
|
prev_was_list = False
|
|
|
|
elif element.tag.endswith('}tbl'):
|
|
table = DocxTable(element, doc)
|
|
table_md = convert_table_to_markdown(table)
|
|
if table_md:
|
|
markdown_lines.append(table_md)
|
|
markdown_lines.append("")
|
|
prev_was_list = False
|
|
|
|
content = "\n".join(markdown_lines)
|
|
if not content.strip():
|
|
return None, "文档为空"
|
|
return content, None
|
|
except Exception as e:
|
|
return None, f"python-docx 解析失败: {str(e)}"
|
|
|
|
|
|
def parse_docx_with_xml(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 XML 原生解析 DOCX 文件"""
|
|
word_namespace = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
|
namespaces = {"w": word_namespace}
|
|
|
|
_STYLE_NAME_TO_HEADING = {
|
|
"title": 1, "heading 1": 1, "heading 2": 2, "heading 3": 3,
|
|
"heading 4": 4, "heading 5": 5, "heading 6": 6,
|
|
}
|
|
|
|
def get_heading_level(style_id: Optional[str], style_to_level: dict) -> int:
|
|
return style_to_level.get(style_id, 0)
|
|
|
|
def get_list_style(style_id: Optional[str], style_to_list: dict) -> Optional[str]:
|
|
return style_to_list.get(style_id, None)
|
|
|
|
def extract_text_with_formatting(para: Any, namespaces: dict) -> str:
|
|
texts = []
|
|
for run in para.findall(".//w:r", namespaces=namespaces):
|
|
text_elem = run.find(".//w:t", namespaces=namespaces)
|
|
if text_elem is not None and text_elem.text:
|
|
text = text_elem.text
|
|
bold = run.find(".//w:b", namespaces=namespaces) is not None
|
|
italic = run.find(".//w:i", namespaces=namespaces) is not None
|
|
if bold:
|
|
text = f"**{text}**"
|
|
if italic:
|
|
text = f"*{text}*"
|
|
texts.append(text)
|
|
return "".join(texts).strip()
|
|
|
|
def convert_table_to_markdown(table_elem: Any, namespaces: dict) -> str:
|
|
rows = table_elem.findall(".//w:tr", namespaces=namespaces)
|
|
if not rows:
|
|
return ""
|
|
rows_data = []
|
|
for row in rows:
|
|
cells = row.findall(".//w:tc", namespaces=namespaces)
|
|
cell_texts = []
|
|
for cell in cells:
|
|
cell_text = extract_text_with_formatting(cell, namespaces)
|
|
cell_text = cell_text.replace("\n", " ").strip()
|
|
cell_texts.append(cell_text if cell_text else "")
|
|
if cell_texts:
|
|
rows_data.append(cell_texts)
|
|
return build_markdown_table(rows_data)
|
|
|
|
try:
|
|
style_to_level = {}
|
|
style_to_list = {}
|
|
markdown_lines = []
|
|
|
|
with zipfile.ZipFile(file_path) as zip_file:
|
|
try:
|
|
styles_file = safe_open_zip(zip_file, "word/styles.xml")
|
|
if styles_file:
|
|
styles_root = ET.parse(styles_file).getroot()
|
|
for style in styles_root.findall(
|
|
".//w:style", namespaces=namespaces
|
|
):
|
|
style_id = style.get(f"{{{word_namespace}}}styleId")
|
|
style_name_elem = style.find("w:name", namespaces=namespaces)
|
|
if style_id and style_name_elem is not None:
|
|
style_name = style_name_elem.get(f"{{{word_namespace}}}val")
|
|
if style_name:
|
|
style_name_lower = style_name.lower()
|
|
if style_name_lower in _STYLE_NAME_TO_HEADING:
|
|
style_to_level[style_id] = _STYLE_NAME_TO_HEADING[style_name_lower]
|
|
elif (
|
|
style_name_lower.startswith("list bullet")
|
|
or style_name_lower == "bullet"
|
|
):
|
|
style_to_list[style_id] = "bullet"
|
|
elif (
|
|
style_name_lower.startswith("list number")
|
|
or style_name_lower == "number"
|
|
):
|
|
style_to_list[style_id] = "number"
|
|
except Exception:
|
|
pass
|
|
|
|
document_file = safe_open_zip(zip_file, "word/document.xml")
|
|
if not document_file:
|
|
return None, "document.xml 不存在或无法访问"
|
|
|
|
root = ET.parse(document_file).getroot()
|
|
body = root.find(".//w:body", namespaces=namespaces)
|
|
if body is None:
|
|
return None, "document.xml 中未找到 w:body 元素"
|
|
|
|
for child in body.findall("./*", namespaces=namespaces):
|
|
if child.tag.endswith("}p"):
|
|
style_elem = child.find(".//w:pStyle", namespaces=namespaces)
|
|
style_id = (
|
|
style_elem.get(f"{{{word_namespace}}}val")
|
|
if style_elem is not None
|
|
else None
|
|
)
|
|
|
|
heading_level = get_heading_level(style_id, style_to_level)
|
|
list_style = get_list_style(style_id, style_to_list)
|
|
para_text = extract_text_with_formatting(child, namespaces)
|
|
|
|
if para_text:
|
|
if heading_level > 0:
|
|
markdown_lines.append(f"{'#' * heading_level} {para_text}")
|
|
elif list_style == "bullet":
|
|
markdown_lines.append(f"- {para_text}")
|
|
elif list_style == "number":
|
|
markdown_lines.append(f"1. {para_text}")
|
|
else:
|
|
markdown_lines.append(para_text)
|
|
markdown_lines.append("")
|
|
|
|
elif child.tag.endswith("}tbl"):
|
|
table_md = convert_table_to_markdown(child, namespaces)
|
|
if table_md:
|
|
markdown_lines.append(table_md)
|
|
markdown_lines.append("")
|
|
|
|
content = "\n".join(markdown_lines)
|
|
if not content.strip():
|
|
return None, "文档为空"
|
|
return content, None
|
|
except Exception as e:
|
|
return None, f"XML 解析失败: {str(e)}"
|