增加xlsx文件的解析
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
"""整合的文档解析器,支持 DOCX 和 PPTX 文件:
|
||||
"""整合的文档解析器,支持 DOCX、PPTX 和 XLSX 文件:
|
||||
按优先级尝试多种解析方法:
|
||||
1. MarkItDown (微软官方库)
|
||||
2. python-docx 或 python-pptx (成熟的 Python 库)
|
||||
2. python-docx、python-pptx 或 pandas (成熟的 Python 库)
|
||||
3. XML 原生解析 (备选方案)
|
||||
|
||||
代码风格要求:
|
||||
@@ -116,6 +116,18 @@ def is_valid_pptx(file_path: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def is_valid_xlsx(file_path: str) -> bool:
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
||||
required_files = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"]
|
||||
for required in required_files:
|
||||
if required not in zip_file.namelist():
|
||||
return False
|
||||
return True
|
||||
except (zipfile.BadZipFile, zipfile.LargeZipFile):
|
||||
return False
|
||||
|
||||
|
||||
def remove_markdown_images(markdown_text: str) -> str:
|
||||
return IMAGE_PATTERN.sub("", markdown_text)
|
||||
|
||||
@@ -867,8 +879,241 @@ def parse_pptx_with_xml(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
return None, f"XML 解析失败: {str(e)}"
|
||||
|
||||
|
||||
def parse_xlsx_with_markitdown(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
try:
|
||||
from markitdown import MarkItDown
|
||||
|
||||
md = MarkItDown()
|
||||
result = md.convert(file_path)
|
||||
if not result.text_content.strip():
|
||||
return None, "文档为空"
|
||||
return result.text_content, None
|
||||
except ImportError:
|
||||
return None, "MarkItDown 库未安装"
|
||||
except Exception as e:
|
||||
return None, f"MarkItDown 解析失败: {str(e)}"
|
||||
|
||||
|
||||
def parse_xlsx_with_pandas(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
try:
|
||||
import pandas as pd
|
||||
from tabulate import tabulate
|
||||
except ImportError as e:
|
||||
missing_lib = "pandas" if "pandas" in str(e) else "tabulate"
|
||||
return None, f"{missing_lib} 库未安装"
|
||||
|
||||
try:
|
||||
df = pd.read_excel(file_path)
|
||||
|
||||
if len(df) == 0:
|
||||
return None, "Excel 文件为空"
|
||||
|
||||
markdown_content = tabulate(
|
||||
df, headers="keys", tablefmt="pipe", showindex=True, missingval=""
|
||||
)
|
||||
|
||||
markdown_with_header = (
|
||||
f"# Excel数据转换结果\n\n来源: {file_path}\n\n{markdown_content}"
|
||||
)
|
||||
|
||||
return markdown_with_header, None
|
||||
except Exception as e:
|
||||
return None, f"pandas 解析失败: {str(e)}"
|
||||
|
||||
|
||||
def parse_xlsx_with_xml(file_path: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
xlsx_namespace = {
|
||||
"main": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
||||
}
|
||||
|
||||
def parse_col_index(cell_ref: str) -> int:
|
||||
col_index = 0
|
||||
for char in cell_ref:
|
||||
if char.isalpha():
|
||||
col_index = col_index * 26 + (ord(char) - ord("A") + 1)
|
||||
else:
|
||||
break
|
||||
return col_index - 1
|
||||
|
||||
def parse_cell_value(cell: ET.Element, shared_strings: List[str]) -> str:
|
||||
cell_type = cell.attrib.get("t")
|
||||
cell_value_elem = cell.find("main:v", xlsx_namespace)
|
||||
|
||||
if cell_value_elem is not None and cell_value_elem.text:
|
||||
cell_value = cell_value_elem.text
|
||||
|
||||
if cell_type == "s":
|
||||
try:
|
||||
idx = int(cell_value)
|
||||
if 0 <= idx < len(shared_strings):
|
||||
text = shared_strings[idx]
|
||||
return text.replace("\n", " ").replace("\r", "")
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
return ""
|
||||
elif cell_type == "b":
|
||||
return "TRUE" if cell_value == "1" else "FALSE"
|
||||
elif cell_type == "str":
|
||||
return cell_value.replace("\n", " ").replace("\r", "")
|
||||
elif cell_type == "inlineStr":
|
||||
is_elem = cell.find("main:is", xlsx_namespace)
|
||||
if is_elem is not None:
|
||||
t_elem = is_elem.find("main:t", xlsx_namespace)
|
||||
if t_elem is not None and t_elem.text:
|
||||
return t_elem.text.replace("\n", " ").replace("\r", "")
|
||||
return ""
|
||||
elif cell_type == "e":
|
||||
error_codes = {
|
||||
"#NULL!": "空引用错误",
|
||||
"#DIV/0!": "除零错误",
|
||||
"#VALUE!": "值类型错误",
|
||||
"#REF!": "无效引用",
|
||||
"#NAME?": "名称错误",
|
||||
"#NUM!": "数值错误",
|
||||
"#N/A": "值不可用",
|
||||
}
|
||||
return error_codes.get(cell_value, f"错误: {cell_value}")
|
||||
elif cell_type == "d":
|
||||
return f"[日期] {cell_value}"
|
||||
elif cell_type == "n":
|
||||
return cell_value
|
||||
elif cell_type is None:
|
||||
try:
|
||||
float_val = float(cell_value)
|
||||
if float_val.is_integer():
|
||||
return str(int(float_val))
|
||||
return cell_value
|
||||
except ValueError:
|
||||
return cell_value
|
||||
else:
|
||||
return cell_value
|
||||
else:
|
||||
return ""
|
||||
|
||||
def get_non_empty_columns(data: List[List[str]]) -> set:
|
||||
non_empty_cols = set()
|
||||
for row in data:
|
||||
for col_idx, cell in enumerate(row):
|
||||
if cell and cell.strip():
|
||||
non_empty_cols.add(col_idx)
|
||||
return non_empty_cols
|
||||
|
||||
def filter_columns(row: List[str], non_empty_cols: set) -> List[str]:
|
||||
return [row[i] if i < len(row) else "" for i in sorted(non_empty_cols)]
|
||||
|
||||
def data_to_markdown(data: List[List[str]], sheet_name: str) -> str:
|
||||
if not data or not data[0]:
|
||||
return f"## {sheet_name}\n\n*工作表为空*"
|
||||
|
||||
md_lines = []
|
||||
md_lines.append(f"## {sheet_name}")
|
||||
md_lines.append("")
|
||||
|
||||
headers = data[0]
|
||||
|
||||
non_empty_cols = get_non_empty_columns(data)
|
||||
|
||||
if not non_empty_cols:
|
||||
return f"## {sheet_name}\n\n*工作表为空*"
|
||||
|
||||
filtered_headers = filter_columns(headers, non_empty_cols)
|
||||
header_line = "| " + " | ".join(filtered_headers) + " |"
|
||||
md_lines.append(header_line)
|
||||
|
||||
separator_line = "|" + "|".join(["---"] * len(filtered_headers)) + "|"
|
||||
md_lines.append(separator_line)
|
||||
|
||||
for row in data[1:]:
|
||||
filtered_row = filter_columns(row, non_empty_cols)
|
||||
row_line = "| " + " | ".join(filtered_row) + " |"
|
||||
md_lines.append(row_line)
|
||||
|
||||
md_lines.append("")
|
||||
|
||||
return "\n".join(md_lines)
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
||||
sheet_names = []
|
||||
try:
|
||||
with zip_file.open("xl/workbook.xml") as f:
|
||||
root = ET.parse(f).getroot()
|
||||
sheet_elements = root.findall(".//main:sheet", xlsx_namespace)
|
||||
for sheet in sheet_elements:
|
||||
sheet_name = sheet.attrib.get("name", "")
|
||||
if sheet_name:
|
||||
sheet_names.append(sheet_name)
|
||||
except KeyError:
|
||||
return None, "无法解析工作表名称"
|
||||
|
||||
if not sheet_names:
|
||||
return None, "未找到工作表"
|
||||
|
||||
shared_strings = []
|
||||
try:
|
||||
with zip_file.open("xl/sharedStrings.xml") as f:
|
||||
root = ET.parse(f).getroot()
|
||||
for si in root.findall(".//main:si", xlsx_namespace):
|
||||
t_elem = si.find(".//main:t", xlsx_namespace)
|
||||
if t_elem is not None and t_elem.text:
|
||||
shared_strings.append(t_elem.text)
|
||||
else:
|
||||
shared_strings.append("")
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
markdown_content = "# Excel数据转换结果 (原生XML解析)\n\n"
|
||||
markdown_content += f"来源: {file_path}\n\n"
|
||||
|
||||
for sheet_index, sheet_name in enumerate(sheet_names, start=1):
|
||||
try:
|
||||
worksheet_path = f"xl/worksheets/sheet{sheet_index}.xml"
|
||||
with zip_file.open(worksheet_path) as f:
|
||||
root = ET.parse(f).getroot()
|
||||
sheet_data = root.find("main:sheetData", xlsx_namespace)
|
||||
|
||||
rows = []
|
||||
if sheet_data is not None:
|
||||
row_elements = sheet_data.findall(
|
||||
"main:row", xlsx_namespace
|
||||
)
|
||||
|
||||
for row_elem in row_elements:
|
||||
cells = row_elem.findall("main:c", xlsx_namespace)
|
||||
|
||||
col_dict = {}
|
||||
for cell in cells:
|
||||
cell_ref = cell.attrib.get("r", "")
|
||||
if not cell_ref:
|
||||
continue
|
||||
|
||||
col_index = parse_col_index(cell_ref)
|
||||
cell_value = parse_cell_value(cell, shared_strings)
|
||||
col_dict[col_index] = cell_value
|
||||
|
||||
if col_dict:
|
||||
max_col = max(col_dict.keys())
|
||||
row_data = [
|
||||
col_dict.get(i, "") for i in range(max_col + 1)
|
||||
]
|
||||
rows.append(row_data)
|
||||
|
||||
table_md = data_to_markdown(rows, sheet_name)
|
||||
markdown_content += table_md + "\n\n"
|
||||
|
||||
except KeyError:
|
||||
markdown_content += f"## {sheet_name}\n\n*工作表解析失败*\n\n"
|
||||
|
||||
if not markdown_content.strip():
|
||||
return None, "解析结果为空"
|
||||
|
||||
return markdown_content, None
|
||||
except Exception as e:
|
||||
return None, f"XML 解析失败: {str(e)}"
|
||||
|
||||
|
||||
def detect_file_type(file_path: str) -> Optional[str]:
|
||||
"""检测文件类型,返回 'docx' 或 'pptx'"""
|
||||
"""检测文件类型,返回 'docx'、'pptx' 或 'xlsx'"""
|
||||
_, ext = os.path.splitext(file_path)
|
||||
ext = ext.lower()
|
||||
|
||||
@@ -878,14 +1123,19 @@ def detect_file_type(file_path: str) -> Optional[str]:
|
||||
elif ext == ".pptx":
|
||||
if is_valid_pptx(file_path):
|
||||
return "pptx"
|
||||
elif ext == ".xlsx":
|
||||
if is_valid_xlsx(file_path):
|
||||
return "xlsx"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="将 DOCX 或 PPTX 文件解析为 Markdown")
|
||||
parser = argparse.ArgumentParser(
|
||||
description="将 DOCX、PPTX 或 XLSX 文件解析为 Markdown"
|
||||
)
|
||||
|
||||
parser.add_argument("file_path", help="DOCX 或 PPTX 文件的绝对路径")
|
||||
parser.add_argument("file_path", help="DOCX、PPTX 或 XLSX 文件的绝对路径")
|
||||
|
||||
parser.add_argument(
|
||||
"-n",
|
||||
@@ -927,7 +1177,7 @@ def main() -> None:
|
||||
|
||||
file_type = detect_file_type(args.file_path)
|
||||
if file_type is None:
|
||||
print(f"错误: 文件不是有效的 DOCX 或 PPTX 格式: {args.file_path}")
|
||||
print(f"错误: 文件不是有效的 DOCX、PPTX 或 XLSX 格式: {args.file_path}")
|
||||
sys.exit(1)
|
||||
|
||||
if file_type == "docx":
|
||||
@@ -936,12 +1186,18 @@ def main() -> None:
|
||||
("python-docx", parse_docx_with_python_docx),
|
||||
("XML 原生解析", parse_docx_with_xml),
|
||||
]
|
||||
else:
|
||||
elif file_type == "pptx":
|
||||
parsers = [
|
||||
("MarkItDown", parse_pptx_with_markitdown),
|
||||
("python-pptx", parse_pptx_with_python_pptx),
|
||||
("XML 原生解析", parse_pptx_with_xml),
|
||||
]
|
||||
else:
|
||||
parsers = [
|
||||
("MarkItDown", parse_xlsx_with_markitdown),
|
||||
("pandas", parse_xlsx_with_pandas),
|
||||
("XML 原生解析", parse_xlsx_with_xml),
|
||||
]
|
||||
|
||||
failures = []
|
||||
content = None
|
||||
|
||||
Reference in New Issue
Block a user