From 4c9effac0b034a5924ab82f3bd2901110c53217f Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Sat, 14 Feb 2026 12:13:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0xlsx=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E7=9A=84=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- temp/document_parser.py | 270 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 263 insertions(+), 7 deletions(-) diff --git a/temp/document_parser.py b/temp/document_parser.py index 086cccc..b7db3ab 100644 --- a/temp/document_parser.py +++ b/temp/document_parser.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -"""整合的文档解析器,支持 DOCX 和 PPTX 文件: +"""整合的文档解析器,支持 DOCX、PPTX 和 XLSX 文件: 按优先级尝试多种解析方法: 1. MarkItDown (微软官方库) -2. python-docx 或 python-pptx (成熟的 Python 库) +2. python-docx、python-pptx 或 pandas (成熟的 Python 库) 3. XML 原生解析 (备选方案) 代码风格要求: @@ -116,6 +116,18 @@ def is_valid_pptx(file_path: str) -> bool: return False +def is_valid_xlsx(file_path: str) -> bool: + try: + with zipfile.ZipFile(file_path, "r") as zip_file: + required_files = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"] + for required in required_files: + if required not in zip_file.namelist(): + return False + return True + except (zipfile.BadZipFile, zipfile.LargeZipFile): + return False + + def remove_markdown_images(markdown_text: str) -> str: return IMAGE_PATTERN.sub("", markdown_text) @@ -867,8 +879,241 @@ def parse_pptx_with_xml(file_path: str) -> Tuple[Optional[str], Optional[str]]: return None, f"XML 解析失败: {str(e)}" +def parse_xlsx_with_markitdown(file_path: str) -> Tuple[Optional[str], Optional[str]]: + try: + from markitdown import MarkItDown + + md = MarkItDown() + result = md.convert(file_path) + if not result.text_content.strip(): + return None, "文档为空" + return result.text_content, None + except ImportError: + return None, "MarkItDown 库未安装" + except Exception as e: + return None, f"MarkItDown 解析失败: {str(e)}" + + +def parse_xlsx_with_pandas(file_path: str) -> Tuple[Optional[str], Optional[str]]: + try: + import pandas as pd + from tabulate import tabulate + except ImportError as e: + missing_lib = "pandas" if "pandas" in str(e) else "tabulate" + return None, f"{missing_lib} 库未安装" + + try: + df = pd.read_excel(file_path) + + if len(df) == 0: + return None, "Excel 文件为空" + + markdown_content = tabulate( + df, headers="keys", tablefmt="pipe", showindex=True, missingval="" + ) + + markdown_with_header = ( + f"# Excel数据转换结果\n\n来源: {file_path}\n\n{markdown_content}" + ) + + return markdown_with_header, None + except Exception as e: + return None, f"pandas 解析失败: {str(e)}" + + +def parse_xlsx_with_xml(file_path: str) -> Tuple[Optional[str], Optional[str]]: + xlsx_namespace = { + "main": "http://schemas.openxmlformats.org/spreadsheetml/2006/main" + } + + def parse_col_index(cell_ref: str) -> int: + col_index = 0 + for char in cell_ref: + if char.isalpha(): + col_index = col_index * 26 + (ord(char) - ord("A") + 1) + else: + break + return col_index - 1 + + def parse_cell_value(cell: ET.Element, shared_strings: List[str]) -> str: + cell_type = cell.attrib.get("t") + cell_value_elem = cell.find("main:v", xlsx_namespace) + + if cell_value_elem is not None and cell_value_elem.text: + cell_value = cell_value_elem.text + + if cell_type == "s": + try: + idx = int(cell_value) + if 0 <= idx < len(shared_strings): + text = shared_strings[idx] + return text.replace("\n", " ").replace("\r", "") + except (ValueError, IndexError): + pass + return "" + elif cell_type == "b": + return "TRUE" if cell_value == "1" else "FALSE" + elif cell_type == "str": + return cell_value.replace("\n", " ").replace("\r", "") + elif cell_type == "inlineStr": + is_elem = cell.find("main:is", xlsx_namespace) + if is_elem is not None: + t_elem = is_elem.find("main:t", xlsx_namespace) + if t_elem is not None and t_elem.text: + return t_elem.text.replace("\n", " ").replace("\r", "") + return "" + elif cell_type == "e": + error_codes = { + "#NULL!": "空引用错误", + "#DIV/0!": "除零错误", + "#VALUE!": "值类型错误", + "#REF!": "无效引用", + "#NAME?": "名称错误", + "#NUM!": "数值错误", + "#N/A": "值不可用", + } + return error_codes.get(cell_value, f"错误: {cell_value}") + elif cell_type == "d": + return f"[日期] {cell_value}" + elif cell_type == "n": + return cell_value + elif cell_type is None: + try: + float_val = float(cell_value) + if float_val.is_integer(): + return str(int(float_val)) + return cell_value + except ValueError: + return cell_value + else: + return cell_value + else: + return "" + + def get_non_empty_columns(data: List[List[str]]) -> set: + non_empty_cols = set() + for row in data: + for col_idx, cell in enumerate(row): + if cell and cell.strip(): + non_empty_cols.add(col_idx) + return non_empty_cols + + def filter_columns(row: List[str], non_empty_cols: set) -> List[str]: + return [row[i] if i < len(row) else "" for i in sorted(non_empty_cols)] + + def data_to_markdown(data: List[List[str]], sheet_name: str) -> str: + if not data or not data[0]: + return f"## {sheet_name}\n\n*工作表为空*" + + md_lines = [] + md_lines.append(f"## {sheet_name}") + md_lines.append("") + + headers = data[0] + + non_empty_cols = get_non_empty_columns(data) + + if not non_empty_cols: + return f"## {sheet_name}\n\n*工作表为空*" + + filtered_headers = filter_columns(headers, non_empty_cols) + header_line = "| " + " | ".join(filtered_headers) + " |" + md_lines.append(header_line) + + separator_line = "|" + "|".join(["---"] * len(filtered_headers)) + "|" + md_lines.append(separator_line) + + for row in data[1:]: + filtered_row = filter_columns(row, non_empty_cols) + row_line = "| " + " | ".join(filtered_row) + " |" + md_lines.append(row_line) + + md_lines.append("") + + return "\n".join(md_lines) + + try: + with zipfile.ZipFile(file_path, "r") as zip_file: + sheet_names = [] + try: + with zip_file.open("xl/workbook.xml") as f: + root = ET.parse(f).getroot() + sheet_elements = root.findall(".//main:sheet", xlsx_namespace) + for sheet in sheet_elements: + sheet_name = sheet.attrib.get("name", "") + if sheet_name: + sheet_names.append(sheet_name) + except KeyError: + return None, "无法解析工作表名称" + + if not sheet_names: + return None, "未找到工作表" + + shared_strings = [] + try: + with zip_file.open("xl/sharedStrings.xml") as f: + root = ET.parse(f).getroot() + for si in root.findall(".//main:si", xlsx_namespace): + t_elem = si.find(".//main:t", xlsx_namespace) + if t_elem is not None and t_elem.text: + shared_strings.append(t_elem.text) + else: + shared_strings.append("") + except KeyError: + pass + + markdown_content = "# Excel数据转换结果 (原生XML解析)\n\n" + markdown_content += f"来源: {file_path}\n\n" + + for sheet_index, sheet_name in enumerate(sheet_names, start=1): + try: + worksheet_path = f"xl/worksheets/sheet{sheet_index}.xml" + with zip_file.open(worksheet_path) as f: + root = ET.parse(f).getroot() + sheet_data = root.find("main:sheetData", xlsx_namespace) + + rows = [] + if sheet_data is not None: + row_elements = sheet_data.findall( + "main:row", xlsx_namespace + ) + + for row_elem in row_elements: + cells = row_elem.findall("main:c", xlsx_namespace) + + col_dict = {} + for cell in cells: + cell_ref = cell.attrib.get("r", "") + if not cell_ref: + continue + + col_index = parse_col_index(cell_ref) + cell_value = parse_cell_value(cell, shared_strings) + col_dict[col_index] = cell_value + + if col_dict: + max_col = max(col_dict.keys()) + row_data = [ + col_dict.get(i, "") for i in range(max_col + 1) + ] + rows.append(row_data) + + table_md = data_to_markdown(rows, sheet_name) + markdown_content += table_md + "\n\n" + + except KeyError: + markdown_content += f"## {sheet_name}\n\n*工作表解析失败*\n\n" + + if not markdown_content.strip(): + return None, "解析结果为空" + + return markdown_content, None + except Exception as e: + return None, f"XML 解析失败: {str(e)}" + + def detect_file_type(file_path: str) -> Optional[str]: - """检测文件类型,返回 'docx' 或 'pptx'""" + """检测文件类型,返回 'docx'、'pptx' 或 'xlsx'""" _, ext = os.path.splitext(file_path) ext = ext.lower() @@ -878,14 +1123,19 @@ def detect_file_type(file_path: str) -> Optional[str]: elif ext == ".pptx": if is_valid_pptx(file_path): return "pptx" + elif ext == ".xlsx": + if is_valid_xlsx(file_path): + return "xlsx" return None def main() -> None: - parser = argparse.ArgumentParser(description="将 DOCX 或 PPTX 文件解析为 Markdown") + parser = argparse.ArgumentParser( + description="将 DOCX、PPTX 或 XLSX 文件解析为 Markdown" + ) - parser.add_argument("file_path", help="DOCX 或 PPTX 文件的绝对路径") + parser.add_argument("file_path", help="DOCX、PPTX 或 XLSX 文件的绝对路径") parser.add_argument( "-n", @@ -927,7 +1177,7 @@ def main() -> None: file_type = detect_file_type(args.file_path) if file_type is None: - print(f"错误: 文件不是有效的 DOCX 或 PPTX 格式: {args.file_path}") + print(f"错误: 文件不是有效的 DOCX、PPTX 或 XLSX 格式: {args.file_path}") sys.exit(1) if file_type == "docx": @@ -936,12 +1186,18 @@ def main() -> None: ("python-docx", parse_docx_with_python_docx), ("XML 原生解析", parse_docx_with_xml), ] - else: + elif file_type == "pptx": parsers = [ ("MarkItDown", parse_pptx_with_markitdown), ("python-pptx", parse_pptx_with_python_pptx), ("XML 原生解析", parse_pptx_with_xml), ] + else: + parsers = [ + ("MarkItDown", parse_xlsx_with_markitdown), + ("pandas", parse_xlsx_with_pandas), + ("XML 原生解析", parse_xlsx_with_xml), + ] failures = [] content = None