Files
lyxy-document/readers/xlsx/native_xml.py
lanyuanxiaoyao 833018d451 feat: 统一文档解析器项目 - 迁移 lyxy-reader-office 和 lyxy-reader-html
## 功能特性

- 建立统一的项目结构,包含 core/、readers/、utils/、tests/ 模块
- 迁移 lyxy-reader-office 的所有解析器(docx、xlsx、pptx、pdf)
- 迁移 lyxy-reader-html 的所有解析器(html、url 下载)
- 统一 CLI 入口为 lyxy_document_reader.py
- 统一 Markdown 后处理逻辑
- 按文件类型组织 readers,每个解析器独立文件
- 依赖分组按文件类型细分(docx、xlsx、pptx、pdf、html、http)
- PDF OCR 解析器优先,无参数控制
- 使用 logging 模块替代简单 print
- 设计完整的单元测试结构
- 重写项目文档

## 新增目录/文件

- core/ - 核心模块(异常体系、Markdown 工具、解析调度器)
- readers/ - 格式阅读器(base.py + docx/xlsx/pptx/pdf/html)
- utils/ - 工具函数(文件类型检测)
- tests/ - 测试(conftest.py + test_core/ + test_readers/ + test_utils/)
- lyxy_document_reader.py - 统一 CLI 入口

## 依赖分组

- docx - DOCX 文档解析支持
- xlsx - XLSX 文档解析支持
- pptx - PPTX 文档解析支持
- pdf - PDF 文档解析支持(含 OCR)
- html - HTML/URL 解析支持
- http - HTTP/URL 下载支持
- office - Office 格式组合(docx/xlsx/pptx/pdf)
- web - Web 格式组合(html/http)
- full - 完整功能
- dev - 开发依赖
2026-03-08 13:46:37 +08:00

226 lines
8.6 KiB
Python

"""使用 XML 原生解析 XLSX 文件"""
import xml.etree.ElementTree as ET
import zipfile
from typing import List, Optional, Tuple
from core import build_markdown_table, safe_open_zip
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 XML 原生解析 XLSX 文件"""
xlsx_namespace = {
"main": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
}
def parse_col_index(cell_ref: str) -> int:
col_index = 0
for char in cell_ref:
if char.isalpha():
col_index = col_index * 26 + (ord(char) - ord("A") + 1)
else:
break
return col_index - 1
def parse_cell_value(cell: ET.Element, shared_strings: List[str]) -> str:
cell_type = cell.attrib.get("t")
if cell_type == "inlineStr":
is_elem = cell.find("main:is", xlsx_namespace)
if is_elem is not None:
t_elem = is_elem.find("main:t", xlsx_namespace)
if t_elem is not None and t_elem.text:
return t_elem.text.replace("\n", " ").replace("\r", "")
return ""
cell_value_elem = cell.find("main:v", xlsx_namespace)
if cell_value_elem is None or not cell_value_elem.text:
return ""
cell_value = cell_value_elem.text
if cell_type == "s":
try:
idx = int(cell_value)
if 0 <= idx < len(shared_strings):
text = shared_strings[idx]
return text.replace("\n", " ").replace("\r", "")
except (ValueError, IndexError):
pass
return ""
elif cell_type == "b":
return "TRUE" if cell_value == "1" else "FALSE"
elif cell_type == "str":
return cell_value.replace("\n", " ").replace("\r", "")
elif cell_type == "e":
_ERROR_CODES = {
"#NULL!": "空引用错误",
"#DIV/0!": "除零错误",
"#VALUE!": "值类型错误",
"#REF!": "无效引用",
"#NAME?": "名称错误",
"#NUM!": "数值错误",
"#N/A": "值不可用",
}
return _ERROR_CODES.get(cell_value, f"错误: {cell_value}")
elif cell_type == "d":
return f"[日期] {cell_value}"
elif cell_type == "n":
return cell_value
elif cell_type is None:
try:
float_val = float(cell_value)
if float_val.is_integer():
return str(int(float_val))
return cell_value
except ValueError:
return cell_value
else:
return cell_value
def get_non_empty_columns(data: List[List[str]]) -> set:
non_empty_cols = set()
for row in data:
for col_idx, cell in enumerate(row):
if cell and cell.strip():
non_empty_cols.add(col_idx)
return non_empty_cols
def filter_columns(row: List[str], non_empty_cols: set) -> List[str]:
return [row[i] if i < len(row) else "" for i in sorted(non_empty_cols)]
def data_to_markdown(data: List[List[str]], sheet_name: str) -> str:
if not data or not data[0]:
return f"## {sheet_name}\n\n*工作表为空*"
md_lines = []
md_lines.append(f"## {sheet_name}")
md_lines.append("")
headers = data[0]
non_empty_cols = get_non_empty_columns(data)
if not non_empty_cols:
return f"## {sheet_name}\n\n*工作表为空*"
filtered_headers = filter_columns(headers, non_empty_cols)
header_line = "| " + " | ".join(filtered_headers) + " |"
md_lines.append(header_line)
separator_line = "| " + " | ".join(["---"] * len(filtered_headers)) + " |"
md_lines.append(separator_line)
for row in data[1:]:
filtered_row = filter_columns(row, non_empty_cols)
row_line = "| " + " | ".join(filtered_row) + " |"
md_lines.append(row_line)
md_lines.append("")
return "\n".join(md_lines)
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
sheet_names = []
sheet_rids = []
try:
with zip_file.open("xl/workbook.xml") as f:
root = ET.parse(f).getroot()
rel_ns = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
sheet_elements = root.findall(".//main:sheet", xlsx_namespace)
for sheet in sheet_elements:
sheet_name = sheet.attrib.get("name", "")
rid = sheet.attrib.get(f"{{{rel_ns}}}id", "")
if sheet_name:
sheet_names.append(sheet_name)
sheet_rids.append(rid)
except KeyError:
return None, "无法解析工作表名称"
if not sheet_names:
return None, "未找到工作表"
rid_to_target = {}
try:
rels_ns = "http://schemas.openxmlformats.org/package/2006/relationships"
with zip_file.open("xl/_rels/workbook.xml.rels") as f:
rels_root = ET.parse(f).getroot()
for rel in rels_root.findall(f"{{{rels_ns}}}Relationship"):
rid = rel.attrib.get("Id", "")
target = rel.attrib.get("Target", "")
if rid and target:
rid_to_target[rid] = target
except KeyError:
pass
shared_strings = []
try:
with zip_file.open("xl/sharedStrings.xml") as f:
root = ET.parse(f).getroot()
for si in root.findall(".//main:si", xlsx_namespace):
t_elem = si.find(".//main:t", xlsx_namespace)
if t_elem is not None and t_elem.text:
shared_strings.append(t_elem.text)
else:
shared_strings.append("")
except KeyError:
pass
markdown_content = "# Excel数据转换结果 (原生XML解析)\n\n"
for sheet_index, sheet_name in enumerate(sheet_names):
rid = sheet_rids[sheet_index] if sheet_index < len(sheet_rids) else ""
target = rid_to_target.get(rid, "")
if target:
if target.startswith("/"):
worksheet_path = target.lstrip("/")
else:
worksheet_path = f"xl/{target}"
else:
worksheet_path = f"xl/worksheets/sheet{sheet_index + 1}.xml"
try:
with zip_file.open(worksheet_path) as f:
root = ET.parse(f).getroot()
sheet_data = root.find("main:sheetData", xlsx_namespace)
rows = []
if sheet_data is not None:
row_elements = sheet_data.findall(
"main:row", xlsx_namespace
)
for row_elem in row_elements:
cells = row_elem.findall("main:c", xlsx_namespace)
col_dict = {}
for cell in cells:
cell_ref = cell.attrib.get("r", "")
if not cell_ref:
continue
col_index = parse_col_index(cell_ref)
cell_value = parse_cell_value(cell, shared_strings)
col_dict[col_index] = cell_value
if col_dict:
max_col = max(col_dict.keys())
row_data = [
col_dict.get(i, "") for i in range(max_col + 1)
]
rows.append(row_data)
table_md = data_to_markdown(rows, sheet_name)
markdown_content += table_md + "\n\n"
except KeyError:
markdown_content += f"## {sheet_name}\n\n*工作表解析失败*\n\n"
if not markdown_content.strip():
return None, "解析结果为空"
return markdown_content, None
except Exception as e:
return None, f"XML 解析失败: {str(e)}"