完成多文档读取的脚本
This commit is contained in:
303
temp/scripts/common.py
Normal file
303
temp/scripts/common.py
Normal file
@@ -0,0 +1,303 @@
|
||||
#!/usr/bin/env python3
|
||||
"""文档解析器的公共模块,包含所有格式共享的工具函数和验证函数。"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
IMAGE_PATTERN = re.compile(r"!\[[^\]]*\]\([^)]+\)")
|
||||
MEDIA_LINK_PATTERN = re.compile(
|
||||
r'^\[.*?\]\(.*\.(png|jpg|jpeg|gif|mp4|avi|mov|pdf)\s*["\']?.*?["\']?\)$'
|
||||
)
|
||||
RGB_COLOR_PATTERN = re.compile(r"^R:\d+\s+G:\d+\s+B:\d+$")
|
||||
|
||||
|
||||
def build_markdown_table(rows_data: List[List[str]]) -> str:
|
||||
"""将二维列表转换为 Markdown 表格格式"""
|
||||
if not rows_data or not rows_data[0]:
|
||||
return ""
|
||||
|
||||
md_lines = []
|
||||
for i, row_data in enumerate(rows_data):
|
||||
row_text = [cell if cell else "" for cell in row_data]
|
||||
md_lines.append("| " + " | ".join(row_text) + " |")
|
||||
if i == 0:
|
||||
md_lines.append("|" + " | ".join(["---"] * len(row_text)) + " |")
|
||||
return "\n".join(md_lines) + "\n\n"
|
||||
|
||||
|
||||
def flush_list_stack(list_stack: List[str], target: List[str]) -> None:
|
||||
"""将列表堆栈中的非空项添加到目标列表并清空堆栈"""
|
||||
for item in list_stack:
|
||||
if item:
|
||||
target.append(item + "\n")
|
||||
list_stack.clear()
|
||||
|
||||
|
||||
def safe_open_zip(zip_file: zipfile.ZipFile, name: str) -> Optional[zipfile.ZipExtFile]:
|
||||
"""安全地从 ZipFile 中打开文件,防止路径遍历攻击"""
|
||||
if not name:
|
||||
return None
|
||||
if name.startswith("/") or name.startswith("\\"):
|
||||
return None
|
||||
if name.startswith(".."):
|
||||
return None
|
||||
if "/../" in name or name.endswith("/.."):
|
||||
return None
|
||||
if "\\" in name:
|
||||
return None
|
||||
if "/" not in name:
|
||||
return None
|
||||
return zip_file.open(name)
|
||||
|
||||
|
||||
def normalize_markdown_whitespace(content: str) -> str:
|
||||
"""规范化 Markdown 空白字符,保留单行空行"""
|
||||
lines = content.split("\n")
|
||||
result = []
|
||||
empty_count = 0
|
||||
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
empty_count += 1
|
||||
if empty_count == 1:
|
||||
result.append(line)
|
||||
else:
|
||||
empty_count = 0
|
||||
result.append(line)
|
||||
|
||||
return "\n".join(result)
|
||||
|
||||
|
||||
def is_valid_docx(file_path: str) -> bool:
|
||||
"""验证文件是否为有效的 DOCX 格式"""
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
||||
required_files = ["[Content_Types].xml", "_rels/.rels", "word/document.xml"]
|
||||
for required in required_files:
|
||||
if required not in zip_file.namelist():
|
||||
return False
|
||||
return True
|
||||
except (zipfile.BadZipFile, zipfile.LargeZipFile):
|
||||
return False
|
||||
|
||||
|
||||
def is_valid_pptx(file_path: str) -> bool:
|
||||
"""验证文件是否为有效的 PPTX 格式"""
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
||||
required_files = [
|
||||
"[Content_Types].xml",
|
||||
"_rels/.rels",
|
||||
"ppt/presentation.xml",
|
||||
]
|
||||
for required in required_files:
|
||||
if required not in zip_file.namelist():
|
||||
return False
|
||||
return True
|
||||
except (zipfile.BadZipFile, zipfile.LargeZipFile):
|
||||
return False
|
||||
|
||||
|
||||
def is_valid_xlsx(file_path: str) -> bool:
|
||||
"""验证文件是否为有效的 XLSX 格式"""
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
||||
required_files = ["[Content_Types].xml", "_rels/.rels", "xl/workbook.xml"]
|
||||
for required in required_files:
|
||||
if required not in zip_file.namelist():
|
||||
return False
|
||||
return True
|
||||
except (zipfile.BadZipFile, zipfile.LargeZipFile):
|
||||
return False
|
||||
|
||||
|
||||
def remove_markdown_images(markdown_text: str) -> str:
|
||||
"""移除 Markdown 文本中的图片标记"""
|
||||
return IMAGE_PATTERN.sub("", markdown_text)
|
||||
|
||||
|
||||
def filter_markdown_content(content: str) -> str:
|
||||
"""过滤 markdown 内容,保留文本、表格、列表和基本格式"""
|
||||
lines = content.split("\n")
|
||||
filtered_lines = []
|
||||
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
|
||||
if not stripped:
|
||||
continue
|
||||
|
||||
if stripped.startswith("<!--") and stripped.endswith("-->"):
|
||||
continue
|
||||
|
||||
if stripped.startswith("![") or stripped.startswith("![]"):
|
||||
continue
|
||||
|
||||
if "<img" in stripped or "</img>" in stripped:
|
||||
continue
|
||||
|
||||
if MEDIA_LINK_PATTERN.match(stripped):
|
||||
continue
|
||||
|
||||
if RGB_COLOR_PATTERN.match(stripped):
|
||||
continue
|
||||
|
||||
line = re.sub(r'<span[^>]*style="[^"]*"[^>]*>(.*?)</span>', r"\1", line)
|
||||
line = re.sub(r"<span[^>]*>(.*?)</span>", r"\1", line)
|
||||
|
||||
line = re.sub(r"\s+", " ", line).strip()
|
||||
|
||||
if line:
|
||||
filtered_lines.append(line)
|
||||
|
||||
return "\n".join(filtered_lines)
|
||||
|
||||
|
||||
def get_heading_level(line: str) -> int:
|
||||
"""获取 Markdown 行的标题级别(1-6),非标题返回 0"""
|
||||
stripped = line.lstrip()
|
||||
if not stripped.startswith("#"):
|
||||
return 0
|
||||
level = 0
|
||||
for char in stripped:
|
||||
if char == "#":
|
||||
level += 1
|
||||
else:
|
||||
break
|
||||
return level if 1 <= level <= 6 else 0
|
||||
|
||||
|
||||
def extract_titles(markdown_text: str) -> List[str]:
|
||||
"""提取 markdown 文本中的所有标题行(1-6级)"""
|
||||
title_lines = []
|
||||
for line in markdown_text.split("\n"):
|
||||
if get_heading_level(line) > 0:
|
||||
title_lines.append(line.lstrip())
|
||||
return title_lines
|
||||
|
||||
|
||||
def extract_title_content(markdown_text: str, title_name: str) -> Optional[str]:
|
||||
"""提取所有指定标题及其下级内容(每个包含上级标题)"""
|
||||
lines = markdown_text.split("\n")
|
||||
match_indices = []
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
level = get_heading_level(line)
|
||||
if level > 0:
|
||||
stripped = line.lstrip()
|
||||
title_text = stripped[level:].strip()
|
||||
if title_text == title_name:
|
||||
match_indices.append(i)
|
||||
|
||||
if not match_indices:
|
||||
return None
|
||||
|
||||
result_lines = []
|
||||
for idx in match_indices:
|
||||
target_level = get_heading_level(lines[idx])
|
||||
|
||||
parent_titles = []
|
||||
current_level = target_level
|
||||
for i in range(idx - 1, -1, -1):
|
||||
line_level = get_heading_level(lines[i])
|
||||
if line_level > 0 and line_level < current_level:
|
||||
parent_titles.append(lines[i])
|
||||
current_level = line_level
|
||||
if current_level == 1:
|
||||
break
|
||||
|
||||
parent_titles.reverse()
|
||||
result_lines.extend(parent_titles)
|
||||
|
||||
result_lines.append(lines[idx])
|
||||
for i in range(idx + 1, len(lines)):
|
||||
line = lines[i]
|
||||
line_level = get_heading_level(line)
|
||||
if line_level == 0 or line_level > target_level:
|
||||
result_lines.append(line)
|
||||
else:
|
||||
break
|
||||
|
||||
return "\n".join(result_lines)
|
||||
|
||||
|
||||
def search_markdown(
|
||||
content: str, pattern: str, context_lines: int = 0
|
||||
) -> Optional[str]:
|
||||
"""使用正则表达式搜索 markdown 文档,返回匹配结果及其上下文"""
|
||||
try:
|
||||
regex = re.compile(pattern)
|
||||
except re.error:
|
||||
return None
|
||||
|
||||
lines = content.split("\n")
|
||||
|
||||
non_empty_indices = []
|
||||
non_empty_to_original = {}
|
||||
for i, line in enumerate(lines):
|
||||
if line.strip():
|
||||
non_empty_indices.append(i)
|
||||
non_empty_to_original[i] = len(non_empty_indices) - 1
|
||||
|
||||
matched_non_empty_indices = []
|
||||
for orig_idx in non_empty_indices:
|
||||
if regex.search(lines[orig_idx]):
|
||||
matched_non_empty_indices.append(non_empty_to_original[orig_idx])
|
||||
|
||||
if not matched_non_empty_indices:
|
||||
return None
|
||||
|
||||
merged_ranges = []
|
||||
current_start = matched_non_empty_indices[0]
|
||||
current_end = matched_non_empty_indices[0]
|
||||
|
||||
for idx in matched_non_empty_indices[1:]:
|
||||
if idx - current_end <= context_lines * 2:
|
||||
current_end = idx
|
||||
else:
|
||||
merged_ranges.append((current_start, current_end))
|
||||
current_start = idx
|
||||
current_end = idx
|
||||
merged_ranges.append((current_start, current_end))
|
||||
|
||||
results = []
|
||||
for start, end in merged_ranges:
|
||||
context_start_idx = max(0, start - context_lines)
|
||||
context_end_idx = min(len(non_empty_indices) - 1, end + context_lines)
|
||||
|
||||
start_line_idx = non_empty_indices[context_start_idx]
|
||||
end_line_idx = non_empty_indices[context_end_idx]
|
||||
|
||||
selected_indices = set(
|
||||
non_empty_indices[context_start_idx : context_end_idx + 1]
|
||||
)
|
||||
result_lines = [
|
||||
line
|
||||
for i, line in enumerate(lines)
|
||||
if start_line_idx <= i <= end_line_idx
|
||||
and (line.strip() or i in selected_indices)
|
||||
]
|
||||
results.append("\n".join(result_lines))
|
||||
|
||||
return "\n---\n".join(results)
|
||||
|
||||
|
||||
def detect_file_type(file_path: str) -> Optional[str]:
|
||||
"""检测文件类型,返回 'docx'、'pptx' 或 'xlsx'"""
|
||||
_, ext = os.path.splitext(file_path)
|
||||
ext = ext.lower()
|
||||
|
||||
if ext == ".docx":
|
||||
if is_valid_docx(file_path):
|
||||
return "docx"
|
||||
elif ext == ".pptx":
|
||||
if is_valid_pptx(file_path):
|
||||
return "pptx"
|
||||
elif ext == ".xlsx":
|
||||
if is_valid_xlsx(file_path):
|
||||
return "xlsx"
|
||||
|
||||
return None
|
||||
Reference in New Issue
Block a user