264 lines
10 KiB
Python
264 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""XLSX 文件解析模块,提供三种解析方法。"""
|
|
|
|
import xml.etree.ElementTree as ET
|
|
import zipfile
|
|
from typing import List, Optional, Tuple
|
|
|
|
from common import parse_with_markitdown
|
|
|
|
|
|
def parse_xlsx_with_markitdown(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 MarkItDown 库解析 XLSX 文件"""
|
|
return parse_with_markitdown(file_path)
|
|
|
|
|
|
def parse_xlsx_with_pandas(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 pandas 库解析 XLSX 文件"""
|
|
try:
|
|
import pandas as pd
|
|
from tabulate import tabulate
|
|
except ImportError as e:
|
|
missing_lib = "pandas" if "pandas" in str(e) else "tabulate"
|
|
return None, f"{missing_lib} 库未安装"
|
|
|
|
try:
|
|
sheets = pd.read_excel(file_path, sheet_name=None)
|
|
|
|
markdown_parts = []
|
|
for sheet_name, df in sheets.items():
|
|
if len(df) == 0:
|
|
markdown_parts.append(f"## {sheet_name}\n\n*工作表为空*")
|
|
continue
|
|
|
|
table_md = tabulate(
|
|
df, headers="keys", tablefmt="pipe", showindex=True, missingval=""
|
|
)
|
|
markdown_parts.append(f"## {sheet_name}\n\n{table_md}")
|
|
|
|
if not markdown_parts:
|
|
return None, "Excel 文件为空"
|
|
|
|
markdown_content = "# Excel数据转换结果\n\n" + "\n\n".join(markdown_parts)
|
|
|
|
return markdown_content, None
|
|
except Exception as e:
|
|
return None, f"pandas 解析失败: {str(e)}"
|
|
|
|
|
|
def parse_xlsx_with_xml(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""使用 XML 原生解析 XLSX 文件"""
|
|
xlsx_namespace = {
|
|
"main": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
|
}
|
|
|
|
def parse_col_index(cell_ref: str) -> int:
|
|
col_index = 0
|
|
for char in cell_ref:
|
|
if char.isalpha():
|
|
col_index = col_index * 26 + (ord(char) - ord("A") + 1)
|
|
else:
|
|
break
|
|
return col_index - 1
|
|
|
|
def parse_cell_value(cell: ET.Element, shared_strings: List[str]) -> str:
|
|
cell_type = cell.attrib.get("t")
|
|
cell_value_elem = cell.find("main:v", xlsx_namespace)
|
|
|
|
if cell_value_elem is not None and cell_value_elem.text:
|
|
cell_value = cell_value_elem.text
|
|
|
|
if cell_type == "s":
|
|
try:
|
|
idx = int(cell_value)
|
|
if 0 <= idx < len(shared_strings):
|
|
text = shared_strings[idx]
|
|
return text.replace("\n", " ").replace("\r", "")
|
|
except (ValueError, IndexError):
|
|
pass
|
|
return ""
|
|
elif cell_type == "b":
|
|
return "TRUE" if cell_value == "1" else "FALSE"
|
|
elif cell_type == "str":
|
|
return cell_value.replace("\n", " ").replace("\r", "")
|
|
elif cell_type == "inlineStr":
|
|
is_elem = cell.find("main:is", xlsx_namespace)
|
|
if is_elem is not None:
|
|
t_elem = is_elem.find("main:t", xlsx_namespace)
|
|
if t_elem is not None and t_elem.text:
|
|
return t_elem.text.replace("\n", " ").replace("\r", "")
|
|
return ""
|
|
elif cell_type == "e":
|
|
error_codes = {
|
|
"#NULL!": "空引用错误",
|
|
"#DIV/0!": "除零错误",
|
|
"#VALUE!": "值类型错误",
|
|
"#REF!": "无效引用",
|
|
"#NAME?": "名称错误",
|
|
"#NUM!": "数值错误",
|
|
"#N/A": "值不可用",
|
|
}
|
|
return error_codes.get(cell_value, f"错误: {cell_value}")
|
|
elif cell_type == "d":
|
|
return f"[日期] {cell_value}"
|
|
elif cell_type == "n":
|
|
return cell_value
|
|
elif cell_type is None:
|
|
try:
|
|
float_val = float(cell_value)
|
|
if float_val.is_integer():
|
|
return str(int(float_val))
|
|
return cell_value
|
|
except ValueError:
|
|
return cell_value
|
|
else:
|
|
return cell_value
|
|
else:
|
|
return ""
|
|
|
|
def get_non_empty_columns(data: List[List[str]]) -> set:
|
|
non_empty_cols = set()
|
|
for row in data:
|
|
for col_idx, cell in enumerate(row):
|
|
if cell and cell.strip():
|
|
non_empty_cols.add(col_idx)
|
|
return non_empty_cols
|
|
|
|
def filter_columns(row: List[str], non_empty_cols: set) -> List[str]:
|
|
return [row[i] if i < len(row) else "" for i in sorted(non_empty_cols)]
|
|
|
|
def data_to_markdown(data: List[List[str]], sheet_name: str) -> str:
|
|
if not data or not data[0]:
|
|
return f"## {sheet_name}\n\n*工作表为空*"
|
|
|
|
md_lines = []
|
|
md_lines.append(f"## {sheet_name}")
|
|
md_lines.append("")
|
|
|
|
headers = data[0]
|
|
|
|
non_empty_cols = get_non_empty_columns(data)
|
|
|
|
if not non_empty_cols:
|
|
return f"## {sheet_name}\n\n*工作表为空*"
|
|
|
|
filtered_headers = filter_columns(headers, non_empty_cols)
|
|
header_line = "| " + " | ".join(filtered_headers) + " |"
|
|
md_lines.append(header_line)
|
|
|
|
separator_line = "| " + " | ".join(["---"] * len(filtered_headers)) + " |"
|
|
md_lines.append(separator_line)
|
|
|
|
for row in data[1:]:
|
|
filtered_row = filter_columns(row, non_empty_cols)
|
|
row_line = "| " + " | ".join(filtered_row) + " |"
|
|
md_lines.append(row_line)
|
|
|
|
md_lines.append("")
|
|
|
|
return "\n".join(md_lines)
|
|
|
|
try:
|
|
with zipfile.ZipFile(file_path, "r") as zip_file:
|
|
sheet_names = []
|
|
sheet_rids = []
|
|
try:
|
|
with zip_file.open("xl/workbook.xml") as f:
|
|
root = ET.parse(f).getroot()
|
|
rel_ns = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
|
sheet_elements = root.findall(".//main:sheet", xlsx_namespace)
|
|
for sheet in sheet_elements:
|
|
sheet_name = sheet.attrib.get("name", "")
|
|
rid = sheet.attrib.get(f"{{{rel_ns}}}id", "")
|
|
if sheet_name:
|
|
sheet_names.append(sheet_name)
|
|
sheet_rids.append(rid)
|
|
except KeyError:
|
|
return None, "无法解析工作表名称"
|
|
|
|
if not sheet_names:
|
|
return None, "未找到工作表"
|
|
|
|
rid_to_target = {}
|
|
try:
|
|
rels_ns = "http://schemas.openxmlformats.org/package/2006/relationships"
|
|
with zip_file.open("xl/_rels/workbook.xml.rels") as f:
|
|
rels_root = ET.parse(f).getroot()
|
|
for rel in rels_root.findall(f"{{{rels_ns}}}Relationship"):
|
|
rid = rel.attrib.get("Id", "")
|
|
target = rel.attrib.get("Target", "")
|
|
if rid and target:
|
|
rid_to_target[rid] = target
|
|
except KeyError:
|
|
pass
|
|
|
|
shared_strings = []
|
|
try:
|
|
with zip_file.open("xl/sharedStrings.xml") as f:
|
|
root = ET.parse(f).getroot()
|
|
for si in root.findall(".//main:si", xlsx_namespace):
|
|
t_elem = si.find(".//main:t", xlsx_namespace)
|
|
if t_elem is not None and t_elem.text:
|
|
shared_strings.append(t_elem.text)
|
|
else:
|
|
shared_strings.append("")
|
|
except KeyError:
|
|
pass
|
|
|
|
markdown_content = "# Excel数据转换结果 (原生XML解析)\n\n"
|
|
|
|
for sheet_index, sheet_name in enumerate(sheet_names):
|
|
rid = sheet_rids[sheet_index] if sheet_index < len(sheet_rids) else ""
|
|
target = rid_to_target.get(rid, "")
|
|
if target:
|
|
if target.startswith("/"):
|
|
worksheet_path = target.lstrip("/")
|
|
else:
|
|
worksheet_path = f"xl/{target}"
|
|
else:
|
|
worksheet_path = f"xl/worksheets/sheet{sheet_index + 1}.xml"
|
|
|
|
try:
|
|
with zip_file.open(worksheet_path) as f:
|
|
root = ET.parse(f).getroot()
|
|
sheet_data = root.find("main:sheetData", xlsx_namespace)
|
|
|
|
rows = []
|
|
if sheet_data is not None:
|
|
row_elements = sheet_data.findall(
|
|
"main:row", xlsx_namespace
|
|
)
|
|
|
|
for row_elem in row_elements:
|
|
cells = row_elem.findall("main:c", xlsx_namespace)
|
|
|
|
col_dict = {}
|
|
for cell in cells:
|
|
cell_ref = cell.attrib.get("r", "")
|
|
if not cell_ref:
|
|
continue
|
|
|
|
col_index = parse_col_index(cell_ref)
|
|
cell_value = parse_cell_value(cell, shared_strings)
|
|
col_dict[col_index] = cell_value
|
|
|
|
if col_dict:
|
|
max_col = max(col_dict.keys())
|
|
row_data = [
|
|
col_dict.get(i, "") for i in range(max_col + 1)
|
|
]
|
|
rows.append(row_data)
|
|
|
|
table_md = data_to_markdown(rows, sheet_name)
|
|
markdown_content += table_md + "\n\n"
|
|
|
|
except KeyError:
|
|
markdown_content += f"## {sheet_name}\n\n*工作表解析失败*\n\n"
|
|
|
|
if not markdown_content.strip():
|
|
return None, "解析结果为空"
|
|
|
|
return markdown_content, None
|
|
except Exception as e:
|
|
return None, f"XML 解析失败: {str(e)}"
|