refactor: 将核心代码迁移到 scripts 目录

- 创建 scripts/ 目录作为核心代码根目录
- 移动 core/, readers/, utils/ 到 scripts/ 下
- 移动 config.py, lyxy_document_reader.py 到 scripts/
- 移动 encoding_detection.py 到 scripts/utils/
- 更新 pyproject.toml 中的入口点路径和 pytest 配置
- 更新所有内部导入语句为 scripts.* 模块
- 更新 README.md 目录结构说明
- 更新 openspec/config.yaml 添加目录结构说明
- 删除无用的 main.py

此变更使项目结构更清晰,便于区分核心代码与测试、文档等支撑文件。
This commit is contained in:
2026-03-08 17:41:03 +08:00
parent 750ef50a8d
commit 15b63800a8
50 changed files with 66 additions and 60 deletions

19
scripts/config.py Normal file
View File

@@ -0,0 +1,19 @@
"""统一配置类,集中管理所有配置项。"""
class Config:
"""统一配置类"""
# 编码检测
# 回退编码列表,当 chardet 检测失败时依次尝试
FALLBACK_ENCODINGS = ['utf-8', 'gbk', 'gb2312', 'latin-1']
# HTML 下载
# 下载超时时间(秒)
DOWNLOAD_TIMEOUT = 30
# HTTP User-Agent 标识
USER_AGENT = "lyxy-document/0.1.0"
# 日志
# 日志等级,默认只输出 ERROR 级别避免干扰 Markdown 输出
LOG_LEVEL = "ERROR"

47
scripts/core/__init__.py Normal file
View File

@@ -0,0 +1,47 @@
"""Core module for lyxy-document."""
from .exceptions import (
LyxyDocumentError,
FileDetectionError,
ReaderNotFoundError,
ParseError,
DownloadError,
)
from .markdown import (
parse_with_markitdown,
parse_with_docling,
build_markdown_table,
flush_list_stack,
safe_open_zip,
normalize_markdown_whitespace,
remove_markdown_images,
get_heading_level,
extract_titles,
extract_title_content,
search_markdown,
_unstructured_elements_to_markdown,
)
from .parser import parse_input, process_content, output_result
__all__ = [
"LyxyDocumentError",
"FileDetectionError",
"ReaderNotFoundError",
"ParseError",
"DownloadError",
"parse_with_markitdown",
"parse_with_docling",
"build_markdown_table",
"flush_list_stack",
"safe_open_zip",
"normalize_markdown_whitespace",
"remove_markdown_images",
"get_heading_level",
"extract_titles",
"extract_title_content",
"search_markdown",
"_unstructured_elements_to_markdown",
"parse_input",
"process_content",
"output_result",
]

View File

@@ -0,0 +1,26 @@
"""自定义异常体系,用于文档处理过程中的各种错误场景。"""
class LyxyDocumentError(Exception):
"""文档处理基异常,所有自定义异常的父类。"""
pass
class FileDetectionError(LyxyDocumentError):
"""文件类型检测失败时抛出。"""
pass
class ReaderNotFoundError(LyxyDocumentError):
"""未找到适配的阅读器时抛出。"""
pass
class ParseError(LyxyDocumentError):
"""解析失败时抛出。"""
pass
class DownloadError(LyxyDocumentError):
"""下载失败时抛出。"""
pass

290
scripts/core/markdown.py Normal file
View File

@@ -0,0 +1,290 @@
"""Markdown 后处理模块,包含所有格式共享的工具函数。"""
import re
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
_CONSECUTIVE_BLANK_LINES = re.compile(r"\n{3,}")
# unstructured 噪声匹配: pptx 中的 RGB 颜色值(如 "R:255 G:128 B:0"
_RGB_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
# unstructured 噪声匹配: 破折号页码(如 "— 3 —"
_PAGE_NUMBER_PATTERN = re.compile(r"^—\s*\d+\s*—$")
def parse_with_markitdown(
file_path: str,
) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析文件"""
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(file_path)
if not result.text_content.strip():
return None, "文档为空"
return result.text_content, None
except ImportError:
return None, "MarkItDown 库未安装"
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"
def parse_with_docling(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析文件"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"
def build_markdown_table(rows_data: List[List[str]]) -> str:
"""将二维列表转换为 Markdown 表格格式"""
if not rows_data or not rows_data[0]:
return ""
md_lines = []
for i, row_data in enumerate(rows_data):
row_text = [cell if cell else "" for cell in row_data]
md_lines.append("| " + " | ".join(row_text) + " |")
if i == 0:
md_lines.append("| " + " | ".join(["---"] * len(row_text)) + " |")
return "\n".join(md_lines) + "\n\n"
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
"""将列表堆栈中的非空项添加到目标列表并清空堆栈"""
for item in list_stack:
if item:
target.append(item + "\n")
list_stack.clear()
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击"""
if not name:
return None
try:
normalized = Path(name).as_posix()
# 检查是否包含父目录引用
if ".." in Path(normalized).parts:
return None
# 检查是否为绝对路径
if Path(normalized).is_absolute():
return None
return zip_file.open(name)
except (ValueError, OSError):
return None
def normalize_markdown_whitespace(content: str) -> str:
"""规范化 Markdown 空白字符,保留单行空行"""
return _CONSECUTIVE_BLANK_LINES.sub("\n\n", content)
def remove_markdown_images(markdown_text: str) -> str:
"""移除 Markdown 文本中的图片标记"""
return IMAGE_PATTERN.sub("", markdown_text)
def get_heading_level(line: str) -> int:
"""获取 Markdown 行的标题级别1-6非标题返回 0"""
stripped = line.lstrip()
if not stripped.startswith("#"):
return 0
without_hash = stripped.lstrip("#")
level = len(stripped) - len(without_hash)
if not (1 <= level <= 6):
return 0
if len(stripped) == level:
return level
if stripped[level] != " ":
return 0
return level
def extract_titles(markdown_text: str) -> List[str]:
"""提取 markdown 文本中的所有标题行1-6级"""
title_lines = []
for line in markdown_text.split("\n"):
if get_heading_level(line) > 0:
title_lines.append(line.lstrip())
return title_lines
def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]:
"""提取所有指定标题及其下级内容(每个包含上级标题)"""
lines = markdown_text.split("\n")
match_indices = []
for i, line in enumerate(lines):
level = get_heading_level(line)
if level > 0:
stripped = line.lstrip()
title_text = stripped[level:].strip()
if title_text == title_name:
match_indices.append(i)
if not match_indices:
return None
result_lines = []
for match_num, idx in enumerate(match_indices):
if match_num > 0:
result_lines.append("\n---\n")
target_level = get_heading_level(lines[idx])
parent_titles = []
current_level = target_level
for i in range(idx - 1, -1, -1):
line_level = get_heading_level(lines[i])
if line_level > 0 and line_level < current_level:
parent_titles.append(lines[i])
current_level = line_level
if current_level == 1:
break
parent_titles.reverse()
result_lines.extend(parent_titles)
result_lines.append(lines[idx])
for i in range(idx + 1, len(lines)):
line = lines[i]
line_level = get_heading_level(line)
if line_level == 0 or line_level > target_level:
result_lines.append(line)
else:
break
return "\n".join(result_lines)
def search_markdown(
content: str, pattern: str, context_lines: int = 0
) -> Optional[str]:
"""使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文"""
# 边界检查
if not content:
return None
if context_lines < 0:
raise ValueError("context_lines 必须为非负整数")
try:
regex = re.compile(pattern)
except re.error:
return None
lines = content.split("\n")
non_empty_indices = []
non_empty_to_original = {}
for i, line in enumerate(lines):
if line.strip():
non_empty_indices.append(i)
non_empty_to_original[i] = len(non_empty_indices) - 1
matched_non_empty_indices = []
for orig_idx in non_empty_indices:
if regex.search(lines[orig_idx]):
matched_non_empty_indices.append(non_empty_to_original[orig_idx])
if not matched_non_empty_indices:
return None
merged_ranges = []
current_start = matched_non_empty_indices[0]
current_end = matched_non_empty_indices[0]
for idx in matched_non_empty_indices[1:]:
if idx - current_end <= context_lines * 2:
current_end = idx
else:
merged_ranges.append((current_start, current_end))
current_start = idx
current_end = idx
merged_ranges.append((current_start, current_end))
results = []
for start, end in merged_ranges:
context_start_idx = max(0, start - context_lines)
context_end_idx = min(len(non_empty_indices) - 1, end + context_lines)
start_line_idx = non_empty_indices[context_start_idx]
end_line_idx = non_empty_indices[context_end_idx]
result_lines = [
line
for i, line in enumerate(lines)
if start_line_idx <= i <= end_line_idx
]
results.append("\n".join(result_lines))
return "\n---\n".join(results)
def _unstructured_elements_to_markdown(
elements: list, trust_titles: bool = True
) -> str:
"""将 unstructured 解析出的元素列表转换为 Markdown 文本"""
try:
import markdownify as md_lib
from unstructured.documents.elements import (
Footer,
Header,
Image,
ListItem,
PageBreak,
PageNumber,
Table,
Title,
)
except ImportError:
return "\n\n".join(
el.text for el in elements if hasattr(el, "text") and el.text and el.text.strip()
)
skip_types = (Header, Footer, PageBreak, PageNumber)
parts = []
for el in elements:
if isinstance(el, skip_types):
continue
text = el.text.strip() if hasattr(el, "text") else str(el).strip()
if not text or _RGB_PATTERN.match(text) or _PAGE_NUMBER_PATTERN.match(text):
continue
if isinstance(el, Table):
html = getattr(el.metadata, "text_as_html", None)
if html:
parts.append(md_lib.markdownify(html, strip=["img"]).strip())
else:
parts.append(str(el))
elif isinstance(el, Title) and trust_titles:
depth = getattr(el.metadata, "category_depth", None) or 1
depth = min(max(depth, 1), 4)
parts.append(f"{'#' * depth} {text}")
elif isinstance(el, ListItem):
parts.append(f"- {text}")
elif isinstance(el, Image):
path = getattr(el.metadata, "image_path", None) or ""
if path:
parts.append(f"![image]({path})")
else:
parts.append(text)
return "\n\n".join(parts)

75
scripts/core/parser.py Normal file
View File

@@ -0,0 +1,75 @@
"""统一解析调度器,负责根据输入类型选择合适的 reader 进行解析。"""
import argparse
import sys
from typing import List, Optional, Tuple
from scripts.core.exceptions import FileDetectionError, ReaderNotFoundError
from scripts.core.markdown import (
normalize_markdown_whitespace,
remove_markdown_images,
)
from scripts.readers import BaseReader
def parse_input(
input_path: str,
readers: List[BaseReader],
) -> Tuple[Optional[str], List[str]]:
"""
统一解析入口函数,遍历 readers 选择合适的 reader 进行解析。
返回: (content, failures)
- content: 成功时返回 Markdown 内容,失败时返回 None
- failures: 各解析器的失败原因列表
"""
if not input_path:
raise FileDetectionError("输入路径不能为空")
for reader in readers:
if reader.supports(input_path):
return reader.parse(input_path)
raise ReaderNotFoundError(f"未找到支持的 reader: {input_path}")
def process_content(content: str) -> str:
"""处理解析后的 Markdown 内容"""
content = remove_markdown_images(content)
content = normalize_markdown_whitespace(content)
return content
def output_result(
content: str,
args: argparse.Namespace,
) -> None:
"""根据命令行参数输出结果"""
if args.count:
print(len(content.replace("\n", "")))
elif args.lines:
print(len(content.split("\n")))
elif args.titles:
from core.markdown import extract_titles
titles = extract_titles(content)
for title in titles:
print(title)
elif args.title_content:
from core.markdown import extract_title_content
title_content = extract_title_content(content, args.title_content)
if title_content is None:
print(f"错误: 未找到标题 '{args.title_content}'")
sys.exit(1)
print(title_content, end="")
elif args.search:
from core.markdown import search_markdown
search_result = search_markdown(content, args.search, args.context)
if search_result is None:
print(f"错误: 正则表达式无效或未找到匹配: '{args.search}'")
sys.exit(1)
print(search_result, end="")
else:
print(content, end="")

View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""文档解析器命令行交互模块,提供命令行接口。支持 DOCX、PPTX、XLSX、PDF、HTML 和 URL。"""
import argparse
import logging
import os
import sys
import warnings
# 抑制第三方库的进度条和日志,仅保留解析结果输出
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
os.environ["TQDM_DISABLE"] = "1"
warnings.filterwarnings("ignore")
# 配置日志系统,只输出 ERROR 级别
logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(message)s')
# 设置第三方库日志等级
logging.getLogger('docling').setLevel(logging.ERROR)
logging.getLogger('unstructured').setLevel(logging.ERROR)
from scripts.core import (
FileDetectionError,
ReaderNotFoundError,
output_result,
parse_input,
process_content,
)
from scripts.readers import READERS
def main() -> None:
parser = argparse.ArgumentParser(
description="将 DOCX、PPTX、XLSX、PDF、HTML 文件或 URL 解析为 Markdown"
)
parser.add_argument("input_path", help="DOCX、PPTX、XLSX、PDF、HTML 文件或 URL")
parser.add_argument(
"-n",
"--context",
type=int,
default=2,
help="与 -s 配合使用,指定每个检索结果包含的前后行数(不包含空行)",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-c", "--count", action="store_true", help="返回解析后的 markdown 文档的总字数"
)
group.add_argument(
"-l", "--lines", action="store_true", help="返回解析后的 markdown 文档的总行数"
)
group.add_argument(
"-t",
"--titles",
action="store_true",
help="返回解析后的 markdown 文档的标题行1-6级",
)
group.add_argument(
"-tc",
"--title-content",
help="指定标题名称,输出该标题及其下级内容(不包含#号)",
)
group.add_argument(
"-s",
"--search",
help="使用正则表达式搜索文档,返回所有匹配结果(用---分隔)",
)
args = parser.parse_args()
# 实例化所有 readers
readers = [ReaderCls() for ReaderCls in READERS]
try:
content, failures = parse_input(args.input_path, readers)
except FileDetectionError as e:
print(f"错误: {e}")
sys.exit(1)
except ReaderNotFoundError as e:
print(f"错误: {e}")
sys.exit(1)
if content is None:
print("所有解析方法均失败:")
for failure in failures:
print(failure)
sys.exit(1)
# 处理内容
content = process_content(content)
# 输出结果
output_result(content, args)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,26 @@
"""Readers module for lyxy-document."""
from .base import BaseReader
from .docx import DocxReader
from .xlsx import XlsxReader
from .pptx import PptxReader
from .pdf import PdfReader
from .html import HtmlReader
READERS = [
DocxReader,
XlsxReader,
PptxReader,
PdfReader,
HtmlReader,
]
__all__ = [
"BaseReader",
"DocxReader",
"XlsxReader",
"PptxReader",
"PdfReader",
"HtmlReader",
"READERS",
]

48
scripts/readers/base.py Normal file
View File

@@ -0,0 +1,48 @@
"""Reader 基类,定义所有文档阅读器的公共接口。"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional, Tuple
class BaseReader(ABC):
"""文档阅读器基类。"""
@property
@abstractmethod
def supported_extensions(self) -> List[str]:
"""返回支持的文件扩展名列表(如 ['.docx', '.doc'])。"""
pass
@abstractmethod
def supports(self, file_path: str) -> bool:
"""
判断是否支持给定的输入(轻量检查)。
仅做初步判断如扩展名、URL 模式),不进行完整验证。
完整验证(文件存在、格式有效性)在 parse() 中进行。
不访问文件系统,不打开文件。
Args:
file_path: 文件路径或 URL
Returns:
True 如果可能支持False 否则
"""
pass
@abstractmethod
def parse(self, file_path: str) -> Tuple[Optional[str], List[str]]:
"""
解析文件并返回 Markdown 内容。
需要检查文件存在和格式有效性,然后执行实际解析逻辑。
Args:
file_path: 文件路径或 URL
Returns: (content, failures)
- content: 成功时返回 Markdown 内容,失败时返回 None
- failures: 各解析器的失败原因列表
"""
pass

View File

@@ -0,0 +1,57 @@
"""DOCX 文件阅读器,支持多种解析方法。"""
import os
from typing import List, Optional, Tuple
from scripts.readers.base import BaseReader
from scripts.utils import is_valid_docx
from . import docling
from . import unstructured
from . import markitdown
from . import pypandoc
from . import python_docx
from . import native_xml
PARSERS = [
("docling", docling.parse),
("unstructured", unstructured.parse),
("pypandoc-binary", pypandoc.parse),
("MarkItDown", markitdown.parse),
("python-docx", python_docx.parse),
("XML 原生解析", native_xml.parse),
]
class DocxReader(BaseReader):
"""DOCX 文件阅读器"""
@property
def supported_extensions(self) -> List[str]:
return [".docx"]
def supports(self, file_path: str) -> bool:
return file_path.endswith('.docx')
def parse(self, file_path: str) -> Tuple[Optional[str], List[str]]:
failures = []
# 检查文件是否存在
if not os.path.exists(file_path):
return None, ["文件不存在"]
# 验证文件格式
if not is_valid_docx(file_path):
return None, ["不是有效的 DOCX 文件"]
content = None
for parser_name, parser_func in PARSERS:
content, error = parser_func(file_path)
if content is not None:
return content, failures
else:
failures.append(f"- {parser_name}: {error}")
return None, failures

View File

@@ -0,0 +1,10 @@
"""使用 docling 库解析 DOCX 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_docling
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 DOCX 文件"""
return parse_with_docling(file_path)

View File

@@ -0,0 +1,10 @@
"""使用 MarkItDown 库解析 DOCX 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 DOCX 文件"""
return parse_with_markitdown(file_path)

View File

@@ -0,0 +1,135 @@
"""使用 XML 原生解析 DOCX 文件"""
import xml.etree.ElementTree as ET
import zipfile
from typing import Any, Dict, List, Optional, Tuple
from scripts.core import build_markdown_table, safe_open_zip
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 XML 原生解析 DOCX 文件"""
word_namespace = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
namespaces = {"w": word_namespace}
_STYLE_NAME_TO_HEADING = {
"title": 1, "heading 1": 1, "heading 2": 2, "heading 3": 3,
"heading 4": 4, "heading 5": 5, "heading 6": 6,
}
def get_heading_level(style_id: Optional[str], style_to_level: dict) -> int:
return style_to_level.get(style_id, 0)
def get_list_style(style_id: Optional[str], style_to_list: dict) -> Optional[str]:
return style_to_list.get(style_id, None)
def extract_text_with_formatting(para: Any, namespaces: dict) -> str:
texts = []
for run in para.findall(".//w:r", namespaces=namespaces):
text_elem = run.find(".//w:t", namespaces=namespaces)
if text_elem is not None and text_elem.text:
text = text_elem.text
bold = run.find(".//w:b", namespaces=namespaces) is not None
italic = run.find(".//w:i", namespaces=namespaces) is not None
if bold:
text = f"**{text}**"
if italic:
text = f"*{text}*"
texts.append(text)
return "".join(texts).strip()
def convert_table_to_markdown(table_elem: Any, namespaces: dict) -> str:
rows = table_elem.findall(".//w:tr", namespaces=namespaces)
if not rows:
return ""
rows_data = []
for row in rows:
cells = row.findall(".//w:tc", namespaces=namespaces)
cell_texts = []
for cell in cells:
cell_text = extract_text_with_formatting(cell, namespaces)
cell_text = cell_text.replace("\n", " ").strip()
cell_texts.append(cell_text if cell_text else "")
if cell_texts:
rows_data.append(cell_texts)
return build_markdown_table(rows_data)
try:
style_to_level = {}
style_to_list = {}
markdown_lines = []
with zipfile.ZipFile(file_path) as zip_file:
try:
styles_file = safe_open_zip(zip_file, "word/styles.xml")
if styles_file:
styles_root = ET.parse(styles_file).getroot()
for style in styles_root.findall(
".//w:style", namespaces=namespaces
):
style_id = style.get(f"{{{word_namespace}}}styleId")
style_name_elem = style.find("w:name", namespaces=namespaces)
if style_id and style_name_elem is not None:
style_name = style_name_elem.get(f"{{{word_namespace}}}val")
if style_name:
style_name_lower = style_name.lower()
if style_name_lower in _STYLE_NAME_TO_HEADING:
style_to_level[style_id] = _STYLE_NAME_TO_HEADING[style_name_lower]
elif (
style_name_lower.startswith("list bullet")
or style_name_lower == "bullet"
):
style_to_list[style_id] = "bullet"
elif (
style_name_lower.startswith("list number")
or style_name_lower == "number"
):
style_to_list[style_id] = "number"
except Exception:
pass
document_file = safe_open_zip(zip_file, "word/document.xml")
if not document_file:
return None, "document.xml 不存在或无法访问"
root = ET.parse(document_file).getroot()
body = root.find(".//w:body", namespaces=namespaces)
if body is None:
return None, "document.xml 中未找到 w:body 元素"
for child in body.findall("./*", namespaces=namespaces):
if child.tag.endswith("}p"):
style_elem = child.find(".//w:pStyle", namespaces=namespaces)
style_id = (
style_elem.get(f"{{{word_namespace}}}val")
if style_elem is not None
else None
)
heading_level = get_heading_level(style_id, style_to_level)
list_style = get_list_style(style_id, style_to_list)
para_text = extract_text_with_formatting(child, namespaces)
if para_text:
if heading_level > 0:
markdown_lines.append(f"{'#' * heading_level} {para_text}")
elif list_style == "bullet":
markdown_lines.append(f"- {para_text}")
elif list_style == "number":
markdown_lines.append(f"1. {para_text}")
else:
markdown_lines.append(para_text)
markdown_lines.append("")
elif child.tag.endswith("}tbl"):
table_md = convert_table_to_markdown(child, namespaces)
if table_md:
markdown_lines.append(table_md)
markdown_lines.append("")
content = "\n".join(markdown_lines)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"XML 解析失败: {str(e)}"

View File

@@ -0,0 +1,29 @@
"""使用 pypandoc-binary 库解析 DOCX 文件"""
from typing import Optional, Tuple
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 pypandoc-binary 库解析 DOCX 文件"""
try:
import pypandoc
except ImportError:
return None, "pypandoc-binary 库未安装"
try:
content = pypandoc.convert_file(
source_file=file_path,
to="md",
format="docx",
outputfile=None,
extra_args=["--wrap=none"],
)
except OSError as exc:
return None, f"pypandoc-binary 缺少 Pandoc 可执行文件: {exc}"
except RuntimeError as exc:
return None, f"pypandoc-binary 解析失败: {exc}"
content = content.strip()
if not content:
return None, "文档为空"
return content, None

View File

@@ -0,0 +1,118 @@
"""使用 python-docx 库解析 DOCX 文件"""
from typing import Any, List, Optional, Tuple
from scripts.core import build_markdown_table
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 python-docx 库解析 DOCX 文件"""
try:
from docx import Document
except ImportError:
return None, "python-docx 库未安装"
try:
doc = Document(file_path)
_HEADING_LEVELS = {
"Title": 1, "Heading 1": 1, "Heading 2": 2, "Heading 3": 3,
"Heading 4": 4, "Heading 5": 5, "Heading 6": 6,
}
def get_heading_level(para: Any) -> int:
if para.style and para.style.name:
return _HEADING_LEVELS.get(para.style.name, 0)
return 0
_LIST_STYLES = {
"Bullet": "bullet", "Number": "number",
}
def get_list_style(para: Any) -> Optional[str]:
if not para.style or not para.style.name:
return None
style_name = para.style.name
if style_name in _LIST_STYLES:
return _LIST_STYLES[style_name]
if style_name.startswith("List Bullet"):
return "bullet"
if style_name.startswith("List Number"):
return "number"
return None
def convert_runs_to_markdown(runs: List[Any]) -> str:
result = []
for run in runs:
text = run.text
if not text:
continue
if run.bold:
text = f"**{text}**"
if run.italic:
text = f"*{text}*"
if run.underline:
text = f"<u>{text}</u>"
result.append(text)
return "".join(result)
def convert_table_to_markdown(table: Any) -> str:
rows_data = []
for row in table.rows:
row_data = []
for cell in row.cells:
cell_text = cell.text.strip().replace("\n", " ")
row_data.append(cell_text)
rows_data.append(row_data)
return build_markdown_table(rows_data)
markdown_lines = []
prev_was_list = False
from docx.table import Table as DocxTable
from docx.text.paragraph import Paragraph
for element in doc.element.body:
if element.tag.endswith('}p'):
para = Paragraph(element, doc)
text = convert_runs_to_markdown(para.runs)
if not text.strip():
continue
heading_level = get_heading_level(para)
if heading_level > 0:
markdown_lines.append(f"{'#' * heading_level} {text}")
prev_was_list = False
else:
list_style = get_list_style(para)
if list_style == "bullet":
if not prev_was_list and markdown_lines:
markdown_lines.append("")
markdown_lines.append(f"- {text}")
prev_was_list = True
elif list_style == "number":
if not prev_was_list and markdown_lines:
markdown_lines.append("")
markdown_lines.append(f"1. {text}")
prev_was_list = True
else:
if prev_was_list and markdown_lines:
markdown_lines.append("")
markdown_lines.append(text)
markdown_lines.append("")
prev_was_list = False
elif element.tag.endswith('}tbl'):
table = DocxTable(element, doc)
table_md = convert_table_to_markdown(table)
if table_md:
markdown_lines.append(table_md)
markdown_lines.append("")
prev_was_list = False
content = "\n".join(markdown_lines)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"python-docx 解析失败: {str(e)}"

View File

@@ -0,0 +1,22 @@
"""使用 unstructured 库解析 DOCX 文件"""
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 unstructured 库解析 DOCX 文件"""
try:
from unstructured.partition.docx import partition_docx
except ImportError:
return None, "unstructured 库未安装"
try:
elements = partition_docx(filename=file_path, infer_table_structure=True)
content = _unstructured_elements_to_markdown(elements)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"unstructured 解析失败: {str(e)}"

View File

@@ -0,0 +1,89 @@
"""HTML/URL 文件阅读器,支持多种解析方法。"""
import os
from typing import List, Optional, Tuple
from scripts.readers.base import BaseReader
from scripts.utils import is_url
from scripts.utils import encoding_detection
from . import cleaner
from . import downloader
from . import trafilatura
from . import domscribe
from . import markitdown
from . import html2text
PARSERS = [
("trafilatura", lambda c, t: trafilatura.parse(c)),
("domscribe", lambda c, t: domscribe.parse(c)),
("MarkItDown", lambda c, t: markitdown.parse(c, t)),
("html2text", lambda c, t: html2text.parse(c)),
]
class HtmlReader(BaseReader):
"""HTML/URL 文件阅读器"""
@property
def supported_extensions(self) -> List[str]:
return [".html", ".htm"]
def supports(self, file_path: str) -> bool:
return is_url(file_path) or file_path.endswith(('.html', '.htm'))
def download_and_parse(self, url: str) -> Tuple[Optional[str], List[str]]:
"""下载 URL 并解析"""
all_failures = []
# 下载 HTML
html_content, download_failures = downloader.download_html(url)
all_failures.extend(download_failures)
if html_content is None:
return None, all_failures
# 清理 HTML
html_content = cleaner.clean_html_content(html_content)
# 解析 HTML
content, parse_failures = self._parse_html_content(html_content, None)
all_failures.extend(parse_failures)
return content, all_failures
def _parse_html_content(self, html_content: str, temp_file_path: Optional[str]) -> Tuple[Optional[str], List[str]]:
"""解析 HTML 内容"""
failures = []
content = None
for parser_name, parser_func in PARSERS:
content, error = parser_func(html_content, temp_file_path)
if content is not None:
return content, failures
else:
failures.append(f"- {parser_name}: {error}")
return None, failures
def parse(self, file_path: str) -> Tuple[Optional[str], List[str]]:
all_failures = []
# 判断输入类型
if is_url(file_path):
return self.download_and_parse(file_path)
# 读取本地 HTML 文件,使用编码检测
html_content, error = encoding_detection.read_text_file(file_path)
if error:
return None, [f"- {error}"]
# 清理 HTML
html_content = cleaner.clean_html_content(html_content)
# 解析 HTML
content, parse_failures = self._parse_html_content(html_content, file_path)
all_failures.extend(parse_failures)
return content, all_failures

View File

@@ -0,0 +1,69 @@
"""HTML 清理模块,用于清理 HTML 内容中的敏感信息。"""
import re
from bs4 import BeautifulSoup
def clean_html_content(html_content: str) -> str:
"""清理 HTML 内容,移除 script/style/link/svg 标签和 URL 属性。"""
soup = BeautifulSoup(html_content, "html.parser")
# Remove all script tags
for script in soup.find_all("script"):
script.decompose()
# Remove all style tags
for style in soup.find_all("style"):
style.decompose()
# Remove all svg tags
for svg in soup.find_all("svg"):
svg.decompose()
# Remove all link tags
for link in soup.find_all("link"):
link.decompose()
# Remove URLs from href and src attributes
for tag in soup.find_all(True):
if "href" in tag.attrs:
del tag["href"]
if "src" in tag.attrs:
del tag["src"]
if "srcset" in tag.attrs:
del tag["srcset"]
if "action" in tag.attrs:
del tag["action"]
data_attrs = [
attr
for attr in tag.attrs
if attr.startswith("data-") and "src" in attr.lower()
]
for attr in data_attrs:
del tag[attr]
# Remove all style attributes from all tags
for tag in soup.find_all(True):
if "style" in tag.attrs:
del tag["style"]
# Remove data-href attributes
for tag in soup.find_all(True):
if "data-href" in tag.attrs:
del tag["data-href"]
# Remove URLs from title attributes
for tag in soup.find_all(True):
if "title" in tag.attrs:
title = tag["title"]
cleaned_title = re.sub(r"https?://\S+", "", title, flags=re.IGNORECASE)
tag["title"] = cleaned_title
# Remove class attributes that contain URL-like patterns
for tag in soup.find_all(True):
if "class" in tag.attrs:
classes = tag["class"]
cleaned_classes = [c for c in classes if not c.startswith("url ") and not "hyperlink-href:" in c]
tag["class"] = cleaned_classes
return str(soup)

View File

@@ -0,0 +1,22 @@
"""使用 domscribe 解析 HTML"""
from typing import Optional, Tuple
def parse(html_content: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 domscribe 解析 HTML"""
try:
from domscribe import html_to_markdown
except ImportError:
return None, "domscribe 库未安装"
try:
options = {
'extract_main_content': True,
}
markdown_content = html_to_markdown(html_content, options)
if not markdown_content.strip():
return None, "解析内容为空"
return markdown_content, None
except Exception as e:
return None, f"domscribe 解析失败: {str(e)}"

View File

@@ -0,0 +1,262 @@
"""URL 下载模块,按 pyppeteer → selenium → httpx → urllib 优先级尝试下载。"""
import os
import asyncio
import tempfile
import urllib.request
import urllib.error
from typing import Optional, Tuple
# 公共配置
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
WINDOW_SIZE = "1920,1080"
LANGUAGE_SETTING = "zh-CN,zh"
# Chrome 浏览器启动参数pyppeteer 和 selenium 共用)
CHROME_ARGS = [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-extensions",
"--disable-background-networking",
"--disable-default-apps",
"--disable-sync",
"--disable-translate",
"--hide-scrollbars",
"--metrics-recording-only",
"--mute-audio",
"--no-first-run",
"--safebrowsing-disable-auto-update",
"--blink-settings=imagesEnabled=false",
"--disable-plugins",
"--disable-ipc-flooding-protection",
"--disable-renderer-backgrounding",
"--disable-background-timer-throttling",
"--disable-hang-monitor",
"--disable-prompt-on-repost",
"--disable-client-side-phishing-detection",
"--disable-component-update",
"--disable-domain-reliability",
"--disable-features=site-per-process",
"--disable-features=IsolateOrigins",
"--disable-features=VizDisplayCompositor",
"--disable-features=WebRTC",
f"--window-size={WINDOW_SIZE}",
f"--lang={LANGUAGE_SETTING}",
f"--user-agent={USER_AGENT}",
]
# 隐藏自动化特征的脚本pyppeteer 和 selenium 共用)
HIDE_AUTOMATION_SCRIPT = """
() => {
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] });
}
"""
# pyppeteer 额外的隐藏自动化脚本(包含 notifications 处理)
HIDE_AUTOMATION_SCRIPT_PUPPETEER = """
() => {
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] });
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
}
"""
def download_with_pyppeteer(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 pyppeteer 下载 URL支持 JS 渲染)"""
try:
from pyppeteer import launch
except ImportError:
return None, "pyppeteer 库未安装"
async def _download():
pyppeteer_temp_dir = os.path.join(tempfile.gettempdir(), "pyppeteer_home")
chromium_path = os.environ.get("LYXY_CHROMIUM_BINARY")
if not chromium_path:
os.environ["PYPPETEER_HOME"] = pyppeteer_temp_dir
executable_path = chromium_path if (chromium_path and os.path.exists(chromium_path)) else None
browser = None
try:
browser = await launch(
headless=True,
executablePath=executable_path,
args=CHROME_ARGS
)
page = await browser.newPage()
await page.evaluateOnNewDocument(HIDE_AUTOMATION_SCRIPT_PUPPETEER)
await page.setJavaScriptEnabled(True)
await page.goto(url, {"waitUntil": "networkidle2", "timeout": 30000})
return await page.content()
finally:
if browser is not None:
try:
await browser.close()
except Exception:
pass
try:
content = asyncio.run(_download())
if not content or not content.strip():
return None, "下载内容为空"
return content, None
except Exception as e:
return None, f"pyppeteer 下载失败: {str(e)}"
def download_with_selenium(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 selenium 下载 URL支持 JS 渲染)"""
try:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
except ImportError:
return None, "selenium 库未安装"
driver_path = os.environ.get("LYXY_CHROMIUM_DRIVER")
binary_path = os.environ.get("LYXY_CHROMIUM_BINARY")
if not driver_path or not os.path.exists(driver_path):
return None, "LYXY_CHROMIUM_DRIVER 环境变量未设置或文件不存在"
if not binary_path or not os.path.exists(binary_path):
return None, "LYXY_CHROMIUM_BINARY 环境变量未设置或文件不存在"
chrome_options = Options()
chrome_options.binary_location = binary_path
chrome_options.add_argument("--headless=new")
for arg in CHROME_ARGS:
chrome_options.add_argument(arg)
# 隐藏自动化特征
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option("useAutomationExtension", False)
driver = None
try:
import time
service = Service(driver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
# 隐藏 webdriver 属性
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": HIDE_AUTOMATION_SCRIPT
})
driver.get(url)
# 等待页面内容稳定
WebDriverWait(driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
last_len = 0
stable_count = 0
for _ in range(30):
current_len = len(driver.page_source)
if current_len == last_len:
stable_count += 1
if stable_count >= 2:
break
else:
stable_count = 0
last_len = current_len
time.sleep(0.5)
content = driver.page_source
if not content or not content.strip():
return None, "下载内容为空"
return content, None
except Exception as e:
return None, f"selenium 下载失败: {str(e)}"
finally:
if driver is not None:
try:
driver.quit()
except Exception:
pass
def download_with_httpx(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 httpx 下载 URL轻量级 HTTP 客户端)"""
try:
import httpx
except ImportError:
return None, "httpx 库未安装"
headers = {
"User-Agent": USER_AGENT
}
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(url, headers=headers)
if response.status_code == 200:
content = response.text
if not content or not content.strip():
return None, "下载内容为空"
return content, None
return None, f"HTTP {response.status_code}"
except Exception as e:
return None, f"httpx 下载失败: {str(e)}"
def download_with_urllib(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 urllib 下载 URL标准库兜底方案"""
headers = {
"User-Agent": USER_AGENT
}
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as response:
if response.status == 200:
content = response.read().decode("utf-8")
if not content or not content.strip():
return None, "下载内容为空"
return content, None
return None, f"HTTP {response.status}"
except Exception as e:
return None, f"urllib 下载失败: {str(e)}"
def download_html(url: str) -> Tuple[Optional[str], list]:
"""
统一的 HTML 下载入口函数,按优先级尝试各下载器。
返回: (content, failures)
- content: 成功时返回 HTML 内容,失败时返回 None
- failures: 各下载器的失败原因列表
"""
failures = []
content = None
# 按优先级尝试各下载器
downloaders = [
("pyppeteer", download_with_pyppeteer),
("selenium", download_with_selenium),
("httpx", download_with_httpx),
("urllib", download_with_urllib),
]
for name, func in downloaders:
content, error = func(url)
if content is not None:
return content, failures
else:
failures.append(f"- {name}: {error}")
return None, failures

View File

@@ -0,0 +1,25 @@
"""使用 html2text 解析 HTML兜底方案"""
from typing import Optional, Tuple
def parse(html_content: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 html2text 解析 HTML兜底方案"""
try:
import html2text
except ImportError:
return None, "html2text 库未安装"
try:
converter = html2text.HTML2Text()
converter.ignore_emphasis = False
converter.ignore_links = False
converter.ignore_images = True
converter.body_width = 0
converter.skip_internal_links = True
markdown_content = converter.handle(html_content)
if not markdown_content.strip():
return None, "解析内容为空"
return markdown_content, None
except Exception as e:
return None, f"html2text 解析失败: {str(e)}"

View File

@@ -0,0 +1,41 @@
"""使用 MarkItDown 解析 HTML"""
import os
import tempfile
from typing import Optional, Tuple
def parse(html_content: str, temp_file_path: Optional[str] = None) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 解析 HTML"""
try:
from markitdown import MarkItDown
except ImportError:
return None, "MarkItDown 库未安装"
try:
input_path = temp_file_path
if not input_path or not os.path.exists(input_path):
# 创建临时文件
fd, input_path = tempfile.mkstemp(suffix='.html')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
f.write(html_content)
md = MarkItDown()
result = md.convert(
input_path,
heading_style="ATX",
strip=["img", "script", "style", "noscript"],
)
markdown_content = result.text_content
if not temp_file_path:
try:
os.unlink(input_path)
except Exception:
pass
if not markdown_content.strip():
return None, "解析内容为空"
return markdown_content, None
except Exception as e:
return None, f"MarkItDown 解析失败: {str(e)}"

View File

@@ -0,0 +1,30 @@
"""使用 trafilatura 解析 HTML"""
from typing import Optional, Tuple
def parse(html_content: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 trafilatura 解析 HTML"""
try:
import trafilatura
except ImportError:
return None, "trafilatura 库未安装"
try:
markdown_content = trafilatura.extract(
html_content,
output_format="markdown",
include_formatting=True,
include_links=True,
include_images=False,
include_tables=True,
favor_recall=True,
include_comments=True,
)
if markdown_content is None:
return None, "trafilatura 返回 None"
if not markdown_content.strip():
return None, "解析内容为空"
return markdown_content, None
except Exception as e:
return None, f"trafilatura 解析失败: {str(e)}"

View File

@@ -0,0 +1,57 @@
"""PDF 文件阅读器支持多种解析方法OCR 优先)。"""
import os
from typing import List, Optional, Tuple
from scripts.readers.base import BaseReader
from scripts.utils import is_valid_pdf
from . import docling_ocr
from . import unstructured_ocr
from . import docling
from . import unstructured
from . import markitdown
from . import pypdf
PARSERS = [
("docling OCR", docling_ocr.parse),
("unstructured OCR", unstructured_ocr.parse),
("docling", docling.parse),
("unstructured", unstructured.parse),
("MarkItDown", markitdown.parse),
("pypdf", pypdf.parse),
]
class PdfReader(BaseReader):
"""PDF 文件阅读器"""
@property
def supported_extensions(self) -> List[str]:
return [".pdf"]
def supports(self, file_path: str) -> bool:
return file_path.endswith('.pdf')
def parse(self, file_path: str) -> Tuple[Optional[str], List[str]]:
failures = []
# 检查文件是否存在
if not os.path.exists(file_path):
return None, ["文件不存在"]
# 验证文件格式
if not is_valid_pdf(file_path):
return None, ["不是有效的 PDF 文件"]
content = None
for parser_name, parser_func in PARSERS:
content, error = parser_func(file_path)
if content is not None:
return content, failures
else:
failures.append(f"- {parser_name}: {error}")
return None, failures

View File

@@ -0,0 +1,29 @@
"""使用 docling 库解析 PDF 文件(不启用 OCR"""
from typing import Optional, Tuple
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 PDF 文件(不启用 OCR"""
try:
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_options=PdfPipelineOptions(do_ocr=False)
)
}
)
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling 解析失败: {str(e)}"

View File

@@ -0,0 +1,21 @@
"""使用 docling 库解析 PDF 文件(启用 OCR"""
from typing import Optional, Tuple
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 PDF 文件(启用 OCR"""
try:
from docling.document_converter import DocumentConverter
except ImportError:
return None, "docling 库未安装"
try:
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
if not markdown_content.strip():
return None, "文档为空"
return markdown_content, None
except Exception as e:
return None, f"docling OCR 解析失败: {str(e)}"

View File

@@ -0,0 +1,10 @@
"""使用 MarkItDown 库解析 PDF 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 PDF 文件"""
return parse_with_markitdown(file_path)

View File

@@ -0,0 +1,28 @@
"""使用 pypdf 库解析 PDF 文件"""
from typing import Optional, Tuple
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 pypdf 库解析 PDF 文件"""
try:
from pypdf import PdfReader
except ImportError:
return None, "pypdf 库未安装"
try:
reader = PdfReader(file_path)
md_content = []
for page in reader.pages:
text = page.extract_text(extraction_mode="plain")
if text and text.strip():
md_content.append(text.strip())
md_content.append("")
content = "\n".join(md_content).strip()
if not content:
return None, "文档为空"
return content, None
except Exception as e:
return None, f"pypdf 解析失败: {str(e)}"

View File

@@ -0,0 +1,28 @@
"""使用 unstructured 库解析 PDF 文件fast 策略)"""
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 unstructured 库解析 PDF 文件fast 策略)"""
try:
from unstructured.partition.pdf import partition_pdf
except ImportError:
return None, "unstructured 库未安装"
try:
elements = partition_pdf(
filename=file_path,
infer_table_structure=True,
strategy="fast",
languages=["chi_sim"],
)
# fast 策略不做版面分析Title 类型标注不可靠
content = _unstructured_elements_to_markdown(elements, trust_titles=False)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"unstructured 解析失败: {str(e)}"

View File

@@ -0,0 +1,34 @@
"""使用 unstructured 库解析 PDF 文件hi_res 策略 + PaddleOCR"""
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 unstructured 库解析 PDF 文件hi_res 策略 + PaddleOCR"""
try:
from unstructured.partition.pdf import partition_pdf
except ImportError:
return None, "unstructured 库未安装"
try:
from unstructured.partition.utils.constants import OCR_AGENT_PADDLE
except ImportError:
return None, "unstructured-paddleocr 库未安装"
try:
elements = partition_pdf(
filename=file_path,
infer_table_structure=True,
strategy="hi_res",
languages=["chi_sim"],
ocr_agent=OCR_AGENT_PADDLE,
table_ocr_agent=OCR_AGENT_PADDLE,
)
content = _unstructured_elements_to_markdown(elements, trust_titles=True)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"unstructured OCR 解析失败: {str(e)}"

View File

@@ -0,0 +1,55 @@
"""PPTX 文件阅读器,支持多种解析方法。"""
import os
from typing import List, Optional, Tuple
from scripts.readers.base import BaseReader
from scripts.utils import is_valid_pptx
from . import docling
from . import unstructured
from . import markitdown
from . import python_pptx
from . import native_xml
PARSERS = [
("docling", docling.parse),
("unstructured", unstructured.parse),
("MarkItDown", markitdown.parse),
("python-pptx", python_pptx.parse),
("XML 原生解析", native_xml.parse),
]
class PptxReader(BaseReader):
"""PPTX 文件阅读器"""
@property
def supported_extensions(self) -> List[str]:
return [".pptx"]
def supports(self, file_path: str) -> bool:
return file_path.endswith('.pptx')
def parse(self, file_path: str) -> Tuple[Optional[str], List[str]]:
failures = []
# 检查文件是否存在
if not os.path.exists(file_path):
return None, ["文件不存在"]
# 验证文件格式
if not is_valid_pptx(file_path):
return None, ["不是有效的 PPTX 文件"]
content = None
for parser_name, parser_func in PARSERS:
content, error = parser_func(file_path)
if content is not None:
return content, failures
else:
failures.append(f"- {parser_name}: {error}")
return None, failures

View File

@@ -0,0 +1,10 @@
"""使用 docling 库解析 PPTX 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_docling
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 PPTX 文件"""
return parse_with_docling(file_path)

View File

@@ -0,0 +1,10 @@
"""使用 MarkItDown 库解析 PPTX 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 PPTX 文件"""
return parse_with_markitdown(file_path)

View File

@@ -0,0 +1,170 @@
"""使用 XML 原生解析 PPTX 文件"""
import re
import xml.etree.ElementTree as ET
import zipfile
from typing import Any, List, Optional, Tuple
from scripts.core import build_markdown_table, flush_list_stack
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 XML 原生解析 PPTX 文件"""
pptx_namespace = {
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
"p": "http://schemas.openxmlformats.org/presentationml/2006/main",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
}
def extract_text_with_formatting(text_elem: Any, namespaces: dict) -> str:
result = []
runs = text_elem.findall(".//a:r", namespaces=namespaces)
for run in runs:
t_elem = run.find(".//a:t", namespaces=namespaces)
if t_elem is None or not t_elem.text:
continue
text = t_elem.text
rPr = run.find(".//a:rPr", namespaces=namespaces)
is_bold = False
is_italic = False
if rPr is not None:
is_bold = rPr.find(".//a:b", namespaces=namespaces) is not None
is_italic = rPr.find(".//a:i", namespaces=namespaces) is not None
if is_bold and is_italic:
text = f"***{text}***"
elif is_bold:
text = f"**{text}**"
elif is_italic:
text = f"*{text}*"
result.append(text)
return "".join(result).strip() if result else ""
def convert_table_to_md(table_elem: Any, namespaces: dict) -> str:
rows = table_elem.findall(".//a:tr", namespaces=namespaces)
if not rows:
return ""
rows_data = []
for row in rows:
cells = row.findall(".//a:tc", namespaces=namespaces)
row_data = []
for cell in cells:
cell_text = extract_text_with_formatting(cell, namespaces)
if cell_text:
cell_text = cell_text.replace("\n", " ").replace("\r", "")
row_data.append(cell_text if cell_text else "")
rows_data.append(row_data)
return build_markdown_table(rows_data)
def is_list_item(p_elem: Any, namespaces: dict) -> Tuple[bool, bool]:
if p_elem is None:
return False, False
pPr = p_elem.find(".//a:pPr", namespaces=namespaces)
if pPr is None:
return False, False
buChar = pPr.find(".//a:buChar", namespaces=namespaces)
if buChar is not None:
return True, False
buAutoNum = pPr.find(".//a:buAutoNum", namespaces=namespaces)
if buAutoNum is not None:
return True, True
return False, False
def get_indent_level(p_elem: Any, namespaces: dict) -> int:
if p_elem is None:
return 0
pPr = p_elem.find(".//a:pPr", namespaces=namespaces)
if pPr is None:
return 0
lvl = pPr.get("lvl")
return int(lvl) if lvl else 0
try:
md_content = []
with zipfile.ZipFile(file_path) as zip_file:
slide_files = [
f
for f in zip_file.namelist()
if re.match(r"ppt/slides/slide\d+\.xml$", f)
]
slide_files.sort(
key=lambda f: int(re.search(r"slide(\d+)\.xml$", f).group(1))
)
for slide_idx, slide_file in enumerate(slide_files, 1):
md_content.append("\n## Slide {}\n".format(slide_idx))
with zip_file.open(slide_file) as slide_xml:
slide_root = ET.parse(slide_xml).getroot()
tx_bodies = slide_root.findall(
".//p:sp/p:txBody", namespaces=pptx_namespace
)
tables = slide_root.findall(".//a:tbl", namespaces=pptx_namespace)
for table in tables:
table_md = convert_table_to_md(table, pptx_namespace)
if table_md:
md_content.append(table_md)
for tx_body in tx_bodies:
paragraphs = tx_body.findall(
".//a:p", namespaces=pptx_namespace
)
list_stack = []
for para in paragraphs:
is_list, is_ordered = is_list_item(para, pptx_namespace)
if is_list:
level = get_indent_level(para, pptx_namespace)
while len(list_stack) <= level:
list_stack.append("")
text = extract_text_with_formatting(
para, pptx_namespace
)
if text:
marker = "1. " if is_ordered else "- "
indent = " " * level
list_stack[level] = f"{indent}{marker}{text}"
for i in range(len(list_stack)):
if list_stack[i]:
md_content.append(list_stack[i] + "\n")
list_stack[i] = ""
else:
if list_stack:
flush_list_stack(list_stack, md_content)
text = extract_text_with_formatting(
para, pptx_namespace
)
if text:
md_content.append(f"{text}\n")
if list_stack:
flush_list_stack(list_stack, md_content)
md_content.append("---\n")
content = "\n".join(md_content)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"XML 解析失败: {str(e)}"

View File

@@ -0,0 +1,127 @@
"""使用 python-pptx 库解析 PPTX 文件"""
from typing import Any, List, Optional, Tuple
from scripts.core import build_markdown_table, flush_list_stack
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 python-pptx 库解析 PPTX 文件"""
try:
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
except ImportError:
return None, "python-pptx 库未安装"
_A_NS = {"a": "http://schemas.openxmlformats.org/drawingml/2006/main"}
def extract_formatted_text(runs: List[Any]) -> str:
"""从 PPTX 文本运行中提取带有格式的文本"""
result = []
for run in runs:
if not run.text:
continue
text = run.text
font = run.font
is_bold = getattr(font, "bold", False) or False
is_italic = getattr(font, "italic", False) or False
if is_bold and is_italic:
text = f"***{text}***"
elif is_bold:
text = f"**{text}**"
elif is_italic:
text = f"*{text}*"
result.append(text)
return "".join(result).strip()
def convert_table_to_md(table: Any) -> str:
"""将 PPTX 表格转换为 Markdown 格式"""
rows_data = []
for row in table.rows:
row_data = []
for cell in row.cells:
cell_content = []
for para in cell.text_frame.paragraphs:
text = extract_formatted_text(para.runs)
if text:
cell_content.append(text)
cell_text = " ".join(cell_content).strip()
row_data.append(cell_text if cell_text else "")
rows_data.append(row_data)
return build_markdown_table(rows_data)
try:
prs = Presentation(file_path)
md_content = []
for slide_num, slide in enumerate(prs.slides, 1):
md_content.append(f"\n## Slide {slide_num}\n")
list_stack = []
for shape in slide.shapes:
if shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
continue
if hasattr(shape, "has_table") and shape.has_table:
if list_stack:
flush_list_stack(list_stack, md_content)
table_md = convert_table_to_md(shape.table)
md_content.append(table_md)
if hasattr(shape, "text_frame"):
for para in shape.text_frame.paragraphs:
pPr = para._element.pPr
is_list = False
if pPr is not None:
is_list = (
para.level > 0
or pPr.find(".//a:buChar", namespaces=_A_NS) is not None
or pPr.find(".//a:buAutoNum", namespaces=_A_NS) is not None
)
if is_list:
level = para.level
while len(list_stack) <= level:
list_stack.append("")
text = extract_formatted_text(para.runs)
if text:
is_ordered = (
pPr is not None
and pPr.find(".//a:buAutoNum", namespaces=_A_NS) is not None
)
marker = "1. " if is_ordered else "- "
indent = " " * level
list_stack[level] = f"{indent}{marker}{text}"
for i in range(len(list_stack)):
if list_stack[i]:
md_content.append(list_stack[i] + "\n")
list_stack[i] = ""
else:
if list_stack:
flush_list_stack(list_stack, md_content)
text = extract_formatted_text(para.runs)
if text:
md_content.append(f"{text}\n")
if list_stack:
flush_list_stack(list_stack, md_content)
md_content.append("---\n")
content = "\n".join(md_content)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"python-pptx 解析失败: {str(e)}"

View File

@@ -0,0 +1,24 @@
"""使用 unstructured 库解析 PPTX 文件"""
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 unstructured 库解析 PPTX 文件"""
try:
from unstructured.partition.pptx import partition_pptx
except ImportError:
return None, "unstructured 库未安装"
try:
elements = partition_pptx(
filename=file_path, infer_table_structure=True, include_metadata=True
)
content = _unstructured_elements_to_markdown(elements)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"unstructured 解析失败: {str(e)}"

View File

@@ -0,0 +1,55 @@
"""XLSX 文件阅读器,支持多种解析方法。"""
import os
from typing import List, Optional, Tuple
from scripts.readers.base import BaseReader
from scripts.utils import is_valid_xlsx
from . import docling
from . import unstructured
from . import markitdown
from . import pandas
from . import native_xml
PARSERS = [
("docling", docling.parse),
("unstructured", unstructured.parse),
("MarkItDown", markitdown.parse),
("pandas", pandas.parse),
("XML 原生解析", native_xml.parse),
]
class XlsxReader(BaseReader):
"""XLSX 文件阅读器"""
@property
def supported_extensions(self) -> List[str]:
return [".xlsx"]
def supports(self, file_path: str) -> bool:
return file_path.endswith('.xlsx')
def parse(self, file_path: str) -> Tuple[Optional[str], List[str]]:
failures = []
# 检查文件是否存在
if not os.path.exists(file_path):
return None, ["文件不存在"]
# 验证文件格式
if not is_valid_xlsx(file_path):
return None, ["不是有效的 XLSX 文件"]
content = None
for parser_name, parser_func in PARSERS:
content, error = parser_func(file_path)
if content is not None:
return content, failures
else:
failures.append(f"- {parser_name}: {error}")
return None, failures

View File

@@ -0,0 +1,10 @@
"""使用 docling 库解析 XLSX 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_docling
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 docling 库解析 XLSX 文件"""
return parse_with_docling(file_path)

View File

@@ -0,0 +1,10 @@
"""使用 MarkItDown 库解析 XLSX 文件"""
from typing import Optional, Tuple
from scripts.core import parse_with_markitdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 MarkItDown 库解析 XLSX 文件"""
return parse_with_markitdown(file_path)

View File

@@ -0,0 +1,225 @@
"""使用 XML 原生解析 XLSX 文件"""
import xml.etree.ElementTree as ET
import zipfile
from typing import List, Optional, Tuple
from scripts.core import build_markdown_table, safe_open_zip
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 XML 原生解析 XLSX 文件"""
xlsx_namespace = {
"main": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
}
def parse_col_index(cell_ref: str) -> int:
col_index = 0
for char in cell_ref:
if char.isalpha():
col_index = col_index * 26 + (ord(char) - ord("A") + 1)
else:
break
return col_index - 1
def parse_cell_value(cell: ET.Element, shared_strings: List[str]) -> str:
cell_type = cell.attrib.get("t")
if cell_type == "inlineStr":
is_elem = cell.find("main:is", xlsx_namespace)
if is_elem is not None:
t_elem = is_elem.find("main:t", xlsx_namespace)
if t_elem is not None and t_elem.text:
return t_elem.text.replace("\n", " ").replace("\r", "")
return ""
cell_value_elem = cell.find("main:v", xlsx_namespace)
if cell_value_elem is None or not cell_value_elem.text:
return ""
cell_value = cell_value_elem.text
if cell_type == "s":
try:
idx = int(cell_value)
if 0 <= idx < len(shared_strings):
text = shared_strings[idx]
return text.replace("\n", " ").replace("\r", "")
except (ValueError, IndexError):
pass
return ""
elif cell_type == "b":
return "TRUE" if cell_value == "1" else "FALSE"
elif cell_type == "str":
return cell_value.replace("\n", " ").replace("\r", "")
elif cell_type == "e":
_ERROR_CODES = {
"#NULL!": "空引用错误",
"#DIV/0!": "除零错误",
"#VALUE!": "值类型错误",
"#REF!": "无效引用",
"#NAME?": "名称错误",
"#NUM!": "数值错误",
"#N/A": "值不可用",
}
return _ERROR_CODES.get(cell_value, f"错误: {cell_value}")
elif cell_type == "d":
return f"[日期] {cell_value}"
elif cell_type == "n":
return cell_value
elif cell_type is None:
try:
float_val = float(cell_value)
if float_val.is_integer():
return str(int(float_val))
return cell_value
except ValueError:
return cell_value
else:
return cell_value
def get_non_empty_columns(data: List[List[str]]) -> set:
non_empty_cols = set()
for row in data:
for col_idx, cell in enumerate(row):
if cell and cell.strip():
non_empty_cols.add(col_idx)
return non_empty_cols
def filter_columns(row: List[str], non_empty_cols: set) -> List[str]:
return [row[i] if i < len(row) else "" for i in sorted(non_empty_cols)]
def data_to_markdown(data: List[List[str]], sheet_name: str) -> str:
if not data or not data[0]:
return f"## {sheet_name}\n\n*工作表为空*"
md_lines = []
md_lines.append(f"## {sheet_name}")
md_lines.append("")
headers = data[0]
non_empty_cols = get_non_empty_columns(data)
if not non_empty_cols:
return f"## {sheet_name}\n\n*工作表为空*"
filtered_headers = filter_columns(headers, non_empty_cols)
header_line = "| " + " | ".join(filtered_headers) + " |"
md_lines.append(header_line)
separator_line = "| " + " | ".join(["---"] * len(filtered_headers)) + " |"
md_lines.append(separator_line)
for row in data[1:]:
filtered_row = filter_columns(row, non_empty_cols)
row_line = "| " + " | ".join(filtered_row) + " |"
md_lines.append(row_line)
md_lines.append("")
return "\n".join(md_lines)
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
sheet_names = []
sheet_rids = []
try:
with zip_file.open("xl/workbook.xml") as f:
root = ET.parse(f).getroot()
rel_ns = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
sheet_elements = root.findall(".//main:sheet", xlsx_namespace)
for sheet in sheet_elements:
sheet_name = sheet.attrib.get("name", "")
rid = sheet.attrib.get(f"{{{rel_ns}}}id", "")
if sheet_name:
sheet_names.append(sheet_name)
sheet_rids.append(rid)
except KeyError:
return None, "无法解析工作表名称"
if not sheet_names:
return None, "未找到工作表"
rid_to_target = {}
try:
rels_ns = "http://schemas.openxmlformats.org/package/2006/relationships"
with zip_file.open("xl/_rels/workbook.xml.rels") as f:
rels_root = ET.parse(f).getroot()
for rel in rels_root.findall(f"{{{rels_ns}}}Relationship"):
rid = rel.attrib.get("Id", "")
target = rel.attrib.get("Target", "")
if rid and target:
rid_to_target[rid] = target
except KeyError:
pass
shared_strings = []
try:
with zip_file.open("xl/sharedStrings.xml") as f:
root = ET.parse(f).getroot()
for si in root.findall(".//main:si", xlsx_namespace):
t_elem = si.find(".//main:t", xlsx_namespace)
if t_elem is not None and t_elem.text:
shared_strings.append(t_elem.text)
else:
shared_strings.append("")
except KeyError:
pass
markdown_content = "# Excel数据转换结果 (原生XML解析)\n\n"
for sheet_index, sheet_name in enumerate(sheet_names):
rid = sheet_rids[sheet_index] if sheet_index < len(sheet_rids) else ""
target = rid_to_target.get(rid, "")
if target:
if target.startswith("/"):
worksheet_path = target.lstrip("/")
else:
worksheet_path = f"xl/{target}"
else:
worksheet_path = f"xl/worksheets/sheet{sheet_index + 1}.xml"
try:
with zip_file.open(worksheet_path) as f:
root = ET.parse(f).getroot()
sheet_data = root.find("main:sheetData", xlsx_namespace)
rows = []
if sheet_data is not None:
row_elements = sheet_data.findall(
"main:row", xlsx_namespace
)
for row_elem in row_elements:
cells = row_elem.findall("main:c", xlsx_namespace)
col_dict = {}
for cell in cells:
cell_ref = cell.attrib.get("r", "")
if not cell_ref:
continue
col_index = parse_col_index(cell_ref)
cell_value = parse_cell_value(cell, shared_strings)
col_dict[col_index] = cell_value
if col_dict:
max_col = max(col_dict.keys())
row_data = [
col_dict.get(i, "") for i in range(max_col + 1)
]
rows.append(row_data)
table_md = data_to_markdown(rows, sheet_name)
markdown_content += table_md + "\n\n"
except KeyError:
markdown_content += f"## {sheet_name}\n\n*工作表解析失败*\n\n"
if not markdown_content.strip():
return None, "解析结果为空"
return markdown_content, None
except Exception as e:
return None, f"XML 解析失败: {str(e)}"

View File

@@ -0,0 +1,36 @@
"""使用 pandas 库解析 XLSX 文件"""
from typing import Optional, Tuple
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 pandas 库解析 XLSX 文件"""
try:
import pandas as pd
from tabulate import tabulate
except ImportError as e:
missing_lib = "pandas" if "pandas" in str(e) else "tabulate"
return None, f"{missing_lib} 库未安装"
try:
sheets = pd.read_excel(file_path, sheet_name=None)
markdown_parts = []
for sheet_name, df in sheets.items():
if len(df) == 0:
markdown_parts.append(f"## {sheet_name}\n\n*工作表为空*")
continue
table_md = tabulate(
df, headers="keys", tablefmt="pipe", showindex=True, missingval=""
)
markdown_parts.append(f"## {sheet_name}\n\n{table_md}")
if not markdown_parts:
return None, "Excel 文件为空"
markdown_content = "# Excel数据转换结果\n\n" + "\n\n".join(markdown_parts)
return markdown_content, None
except Exception as e:
return None, f"pandas 解析失败: {str(e)}"

View File

@@ -0,0 +1,22 @@
"""使用 unstructured 库解析 XLSX 文件"""
from typing import Optional, Tuple
from scripts.core import _unstructured_elements_to_markdown
def parse(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 unstructured 库解析 XLSX 文件"""
try:
from unstructured.partition.xlsx import partition_xlsx
except ImportError:
return None, "unstructured 库未安装"
try:
elements = partition_xlsx(filename=file_path, infer_table_structure=True)
content = _unstructured_elements_to_markdown(elements)
if not content.strip():
return None, "文档为空"
return content, None
except Exception as e:
return None, f"unstructured 解析失败: {str(e)}"

21
scripts/utils/__init__.py Normal file
View File

@@ -0,0 +1,21 @@
"""Utils module for lyxy-document."""
from .file_detection import (
is_valid_docx,
is_valid_pptx,
is_valid_xlsx,
is_valid_pdf,
is_html_file,
is_url,
detect_file_type,
)
__all__ = [
"is_valid_docx",
"is_valid_pptx",
"is_valid_xlsx",
"is_valid_pdf",
"is_html_file",
"is_url",
"detect_file_type",
]

View File

@@ -0,0 +1,62 @@
"""文件编码自动检测模块。"""
from typing import Optional, Tuple
from scripts.config import Config
def detect_encoding(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""
检测文件编码。
Args:
file_path: 文件路径
Returns:
(encoding, error): 成功时返回 (编码名称, None),失败时返回 (None, 错误信息)
"""
try:
import chardet
except ImportError:
return None, "chardet 库未安装"
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
result = chardet.detect(raw_data)
return result['encoding'], None
except Exception as e:
return None, f"编码检测失败: {str(e)}"
def read_text_file(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""
读取文本文件,自动检测编码。
首先使用 chardet 检测编码,如果失败则尝试配置的回退编码列表。
Args:
file_path: 文件路径
Returns:
(content, error): 成功时返回 (文件内容, None),失败时返回 (None, 错误信息)
"""
# 尝试使用 chardet 检测编码
encoding, error = detect_encoding(file_path)
if error:
# chardet 失败,使用回退编码列表
for enc in Config.FALLBACK_ENCODINGS:
try:
with open(file_path, 'r', encoding=enc) as f:
return f.read(), None
except UnicodeDecodeError:
continue
return None, "无法识别文件编码"
# 使用检测到的编码读取文件
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read(), None
except Exception as e:
return None, f"读取文件失败: {str(e)}"

View File

@@ -0,0 +1,73 @@
"""文件类型检测模块,用于验证和检测输入文件类型。"""
import os
import zipfile
from typing import List, Optional
def _is_valid_ooxml(file_path: str, required_files: List[str]) -> bool:
"""验证 OOXML 格式文件DOCX/PPTX/XLSX"""
try:
with zipfile.ZipFile(file_path, "r") as zip_file:
names = set(zip_file.namelist())
return all(r in names for r in required_files)
except (zipfile.BadZipFile, zipfile.LargeZipFile):
return False
_DOCX_REQUIRED = ["[Content_Types].xml", "_rels/.rels", "word/document.xml"]
_PPTX_REQUIRED = ["[Content_Types].xml", "_rels/.rels", "ppt/presentation.xml"]
_XLSX_REQUIRED = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"]
def is_valid_docx(file_path: str) -> bool:
"""验证文件是否为有效的 DOCX 格式"""
return _is_valid_ooxml(file_path, _DOCX_REQUIRED)
def is_valid_pptx(file_path: str) -> bool:
"""验证文件是否为有效的 PPTX 格式"""
return _is_valid_ooxml(file_path, _PPTX_REQUIRED)
def is_valid_xlsx(file_path: str) -> bool:
"""验证文件是否为有效的 XLSX 格式"""
return _is_valid_ooxml(file_path, _XLSX_REQUIRED)
def is_valid_pdf(file_path: str) -> bool:
"""验证文件是否为有效的 PDF 格式"""
try:
with open(file_path, "rb") as f:
header = f.read(4)
return header == b"%PDF"
except (IOError, OSError):
return False
def is_html_file(file_path: str) -> bool:
"""判断文件是否为 HTML 文件(仅检查扩展名)"""
ext = file_path.lower()
return ext.endswith(".html") or ext.endswith(".htm")
def is_url(input_str: str) -> bool:
"""判断输入是否为 URL"""
return input_str.startswith("http://") or input_str.startswith("https://")
_FILE_TYPE_VALIDATORS = {
".docx": is_valid_docx,
".pptx": is_valid_pptx,
".xlsx": is_valid_xlsx,
".pdf": is_valid_pdf,
}
def detect_file_type(file_path: str) -> Optional[str]:
"""检测文件类型,返回 'docx''pptx''xlsx''pdf'"""
ext = os.path.splitext(file_path)[1].lower()
validator = _FILE_TYPE_VALIDATORS.get(ext)
if validator and validator(file_path):
return ext.lstrip(".")
return None