- 更新 openspec/config.yaml 中 git 任务相关说明 - 将 scripts.core.* 改为 core.*,scripts.readers.* 改为 readers.* - 优化 lyxy_document_reader.py 中 sys.path 设置方式 - 同步更新所有测试文件的导入路径
226 lines
8.7 KiB
Python
226 lines
8.7 KiB
Python
"""使用 XML 原生解析 XLSX 文件"""
|
|
|
|
import xml.etree.ElementTree as ET
|
|
import zipfile
|
|
from typing import List, Optional, Tuple
|
|
|
|
from readers._utils import build_markdown_table, safe_open_zip
|
|
|
|
|
|
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 XML 原生解析 XLSX 文件"""
|
|
xlsx_namespace = {
|
|
"main": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
|
}
|
|
|
|
def parse_col_index(cell_ref: str) -> int:
|
|
col_index = 0
|
|
for char in cell_ref:
|
|
if char.isalpha():
|
|
col_index = col_index * 26 + (ord(char) - ord("A") + 1)
|
|
else:
|
|
break
|
|
return col_index - 1
|
|
|
|
def parse_cell_value(cell: ET.Element, shared_strings: List[str]) -> str:
|
|
cell_type = cell.attrib.get("t")
|
|
|
|
if cell_type == "inlineStr":
|
|
is_elem = cell.find("main:is", xlsx_namespace)
|
|
if is_elem is not None:
|
|
t_elem = is_elem.find("main:t", xlsx_namespace)
|
|
if t_elem is not None and t_elem.text:
|
|
return t_elem.text.replace("\n", " ").replace("\r", "")
|
|
return ""
|
|
|
|
cell_value_elem = cell.find("main:v", xlsx_namespace)
|
|
if cell_value_elem is None or not cell_value_elem.text:
|
|
return ""
|
|
|
|
cell_value = cell_value_elem.text
|
|
|
|
if cell_type == "s":
|
|
try:
|
|
idx = int(cell_value)
|
|
if 0 <= idx < len(shared_strings):
|
|
text = shared_strings[idx]
|
|
return text.replace("\n", " ").replace("\r", "")
|
|
except (ValueError, IndexError):
|
|
pass
|
|
return ""
|
|
elif cell_type == "b":
|
|
return "TRUE" if cell_value == "1" else "FALSE"
|
|
elif cell_type == "str":
|
|
return cell_value.replace("\n", " ").replace("\r", "")
|
|
elif cell_type == "e":
|
|
_ERROR_CODES = {
|
|
"#NULL!": "空引用错误",
|
|
"#DIV/0!": "除零错误",
|
|
"#VALUE!": "值类型错误",
|
|
"#REF!": "无效引用",
|
|
"#NAME?": "名称错误",
|
|
"#NUM!": "数值错误",
|
|
"#N/A": "值不可用",
|
|
}
|
|
return _ERROR_CODES.get(cell_value, f"错误: {cell_value}")
|
|
elif cell_type == "d":
|
|
return f"[日期] {cell_value}"
|
|
elif cell_type == "n":
|
|
return cell_value
|
|
elif cell_type is None:
|
|
try:
|
|
float_val = float(cell_value)
|
|
if float_val.is_integer():
|
|
return str(int(float_val))
|
|
return cell_value
|
|
except ValueError:
|
|
return cell_value
|
|
else:
|
|
return cell_value
|
|
|
|
def get_non_empty_columns(data: List[List[str]]) -> set:
|
|
non_empty_cols = set()
|
|
for row in data:
|
|
for col_idx, cell in enumerate(row):
|
|
if cell and cell.strip():
|
|
non_empty_cols.add(col_idx)
|
|
return non_empty_cols
|
|
|
|
def filter_columns(row: List[str], non_empty_cols: set) -> List[str]:
|
|
return [row[i] if i < len(row) else "" for i in sorted(non_empty_cols)]
|
|
|
|
def data_to_markdown(data: List[List[str]], sheet_name: str) -> str:
|
|
if not data or not data[0]:
|
|
return f"## {sheet_name}\n\n*工作表为空*"
|
|
|
|
md_lines = []
|
|
md_lines.append(f"## {sheet_name}")
|
|
md_lines.append("")
|
|
|
|
headers = data[0]
|
|
|
|
non_empty_cols = get_non_empty_columns(data)
|
|
|
|
if not non_empty_cols:
|
|
return f"## {sheet_name}\n\n*工作表为空*"
|
|
|
|
filtered_headers = filter_columns(headers, non_empty_cols)
|
|
header_line = "| " + " | ".join(filtered_headers) + " |"
|
|
md_lines.append(header_line)
|
|
|
|
separator_line = "| " + " | ".join(["---"] * len(filtered_headers)) + " |"
|
|
md_lines.append(separator_line)
|
|
|
|
for row in data[1:]:
|
|
filtered_row = filter_columns(row, non_empty_cols)
|
|
row_line = "| " + " | ".join(filtered_row) + " |"
|
|
md_lines.append(row_line)
|
|
|
|
md_lines.append("")
|
|
|
|
return "\n".join(md_lines)
|
|
|
|
try:
|
|
with zipfile.ZipFile(file_path, "r") as zip_file:
|
|
sheet_names = []
|
|
sheet_rids = []
|
|
try:
|
|
with zip_file.open("xl/workbook.xml") as f:
|
|
root = ET.parse(f).getroot()
|
|
rel_ns = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
|
sheet_elements = root.findall(".//main:sheet", xlsx_namespace)
|
|
for sheet in sheet_elements:
|
|
sheet_name = sheet.attrib.get("name", "")
|
|
rid = sheet.attrib.get(f"{{{rel_ns}}}id", "")
|
|
if sheet_name:
|
|
sheet_names.append(sheet_name)
|
|
sheet_rids.append(rid)
|
|
except KeyError:
|
|
return None, "无法解析工作表名称"
|
|
|
|
if not sheet_names:
|
|
return None, "未找到工作表"
|
|
|
|
rid_to_target = {}
|
|
try:
|
|
rels_ns = "http://schemas.openxmlformats.org/package/2006/relationships"
|
|
with zip_file.open("xl/_rels/workbook.xml.rels") as f:
|
|
rels_root = ET.parse(f).getroot()
|
|
for rel in rels_root.findall(f"{{{rels_ns}}}Relationship"):
|
|
rid = rel.attrib.get("Id", "")
|
|
target = rel.attrib.get("Target", "")
|
|
if rid and target:
|
|
rid_to_target[rid] = target
|
|
except KeyError:
|
|
pass
|
|
|
|
shared_strings = []
|
|
try:
|
|
with zip_file.open("xl/sharedStrings.xml") as f:
|
|
root = ET.parse(f).getroot()
|
|
for si in root.findall(".//main:si", xlsx_namespace):
|
|
t_elem = si.find(".//main:t", xlsx_namespace)
|
|
if t_elem is not None and t_elem.text:
|
|
shared_strings.append(t_elem.text)
|
|
else:
|
|
shared_strings.append("")
|
|
except KeyError:
|
|
pass
|
|
|
|
markdown_content = "# Excel数据转换结果 (原生XML解析)\n\n"
|
|
|
|
for sheet_index, sheet_name in enumerate(sheet_names):
|
|
rid = sheet_rids[sheet_index] if sheet_index < len(sheet_rids) else ""
|
|
target = rid_to_target.get(rid, "")
|
|
if target:
|
|
if target.startswith("/"):
|
|
worksheet_path = target.lstrip("/")
|
|
else:
|
|
worksheet_path = f"xl/{target}"
|
|
else:
|
|
worksheet_path = f"xl/worksheets/sheet{sheet_index + 1}.xml"
|
|
|
|
try:
|
|
with zip_file.open(worksheet_path) as f:
|
|
root = ET.parse(f).getroot()
|
|
sheet_data = root.find("main:sheetData", xlsx_namespace)
|
|
|
|
rows = []
|
|
if sheet_data is not None:
|
|
row_elements = sheet_data.findall(
|
|
"main:row", xlsx_namespace
|
|
)
|
|
|
|
for row_elem in row_elements:
|
|
cells = row_elem.findall("main:c", xlsx_namespace)
|
|
|
|
col_dict = {}
|
|
for cell in cells:
|
|
cell_ref = cell.attrib.get("r", "")
|
|
if not cell_ref:
|
|
continue
|
|
|
|
col_index = parse_col_index(cell_ref)
|
|
cell_value = parse_cell_value(cell, shared_strings)
|
|
col_dict[col_index] = cell_value
|
|
|
|
if col_dict:
|
|
max_col = max(col_dict.keys())
|
|
row_data = [
|
|
col_dict.get(i, "") for i in range(max_col + 1)
|
|
]
|
|
rows.append(row_data)
|
|
|
|
table_md = data_to_markdown(rows, sheet_name)
|
|
markdown_content += table_md + "\n\n"
|
|
|
|
except KeyError:
|
|
markdown_content += f"## {sheet_name}\n\n*工作表解析失败*\n\n"
|
|
|
|
if not markdown_content.strip():
|
|
return None, "解析结果为空"
|
|
|
|
return markdown_content, None
|
|
except Exception as e:
|
|
return None, f"XML 解析失败: {str(e)}"
|