Files
lyxy-document/scripts/readers/html/downloader.py
lanyuanxiaoyao 15b63800a8 refactor: 将核心代码迁移到 scripts 目录
- 创建 scripts/ 目录作为核心代码根目录
- 移动 core/, readers/, utils/ 到 scripts/ 下
- 移动 config.py, lyxy_document_reader.py 到 scripts/
- 移动 encoding_detection.py 到 scripts/utils/
- 更新 pyproject.toml 中的入口点路径和 pytest 配置
- 更新所有内部导入语句为 scripts.* 模块
- 更新 README.md 目录结构说明
- 更新 openspec/config.yaml 添加目录结构说明
- 删除无用的 main.py

此变更使项目结构更清晰,便于区分核心代码与测试、文档等支撑文件。
2026-03-08 17:41:03 +08:00

263 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""URL 下载模块,按 pyppeteer → selenium → httpx → urllib 优先级尝试下载。"""
import os
import asyncio
import tempfile
import urllib.request
import urllib.error
from typing import Optional, Tuple
# 公共配置
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
WINDOW_SIZE = "1920,1080"
LANGUAGE_SETTING = "zh-CN,zh"
# Chrome 浏览器启动参数pyppeteer 和 selenium 共用)
CHROME_ARGS = [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-extensions",
"--disable-background-networking",
"--disable-default-apps",
"--disable-sync",
"--disable-translate",
"--hide-scrollbars",
"--metrics-recording-only",
"--mute-audio",
"--no-first-run",
"--safebrowsing-disable-auto-update",
"--blink-settings=imagesEnabled=false",
"--disable-plugins",
"--disable-ipc-flooding-protection",
"--disable-renderer-backgrounding",
"--disable-background-timer-throttling",
"--disable-hang-monitor",
"--disable-prompt-on-repost",
"--disable-client-side-phishing-detection",
"--disable-component-update",
"--disable-domain-reliability",
"--disable-features=site-per-process",
"--disable-features=IsolateOrigins",
"--disable-features=VizDisplayCompositor",
"--disable-features=WebRTC",
f"--window-size={WINDOW_SIZE}",
f"--lang={LANGUAGE_SETTING}",
f"--user-agent={USER_AGENT}",
]
# 隐藏自动化特征的脚本pyppeteer 和 selenium 共用)
HIDE_AUTOMATION_SCRIPT = """
() => {
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] });
}
"""
# pyppeteer 额外的隐藏自动化脚本(包含 notifications 处理)
HIDE_AUTOMATION_SCRIPT_PUPPETEER = """
() => {
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] });
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
}
"""
def download_with_pyppeteer(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 pyppeteer 下载 URL支持 JS 渲染)"""
try:
from pyppeteer import launch
except ImportError:
return None, "pyppeteer 库未安装"
async def _download():
pyppeteer_temp_dir = os.path.join(tempfile.gettempdir(), "pyppeteer_home")
chromium_path = os.environ.get("LYXY_CHROMIUM_BINARY")
if not chromium_path:
os.environ["PYPPETEER_HOME"] = pyppeteer_temp_dir
executable_path = chromium_path if (chromium_path and os.path.exists(chromium_path)) else None
browser = None
try:
browser = await launch(
headless=True,
executablePath=executable_path,
args=CHROME_ARGS
)
page = await browser.newPage()
await page.evaluateOnNewDocument(HIDE_AUTOMATION_SCRIPT_PUPPETEER)
await page.setJavaScriptEnabled(True)
await page.goto(url, {"waitUntil": "networkidle2", "timeout": 30000})
return await page.content()
finally:
if browser is not None:
try:
await browser.close()
except Exception:
pass
try:
content = asyncio.run(_download())
if not content or not content.strip():
return None, "下载内容为空"
return content, None
except Exception as e:
return None, f"pyppeteer 下载失败: {str(e)}"
def download_with_selenium(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 selenium 下载 URL支持 JS 渲染)"""
try:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
except ImportError:
return None, "selenium 库未安装"
driver_path = os.environ.get("LYXY_CHROMIUM_DRIVER")
binary_path = os.environ.get("LYXY_CHROMIUM_BINARY")
if not driver_path or not os.path.exists(driver_path):
return None, "LYXY_CHROMIUM_DRIVER 环境变量未设置或文件不存在"
if not binary_path or not os.path.exists(binary_path):
return None, "LYXY_CHROMIUM_BINARY 环境变量未设置或文件不存在"
chrome_options = Options()
chrome_options.binary_location = binary_path
chrome_options.add_argument("--headless=new")
for arg in CHROME_ARGS:
chrome_options.add_argument(arg)
# 隐藏自动化特征
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option("useAutomationExtension", False)
driver = None
try:
import time
service = Service(driver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
# 隐藏 webdriver 属性
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": HIDE_AUTOMATION_SCRIPT
})
driver.get(url)
# 等待页面内容稳定
WebDriverWait(driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
last_len = 0
stable_count = 0
for _ in range(30):
current_len = len(driver.page_source)
if current_len == last_len:
stable_count += 1
if stable_count >= 2:
break
else:
stable_count = 0
last_len = current_len
time.sleep(0.5)
content = driver.page_source
if not content or not content.strip():
return None, "下载内容为空"
return content, None
except Exception as e:
return None, f"selenium 下载失败: {str(e)}"
finally:
if driver is not None:
try:
driver.quit()
except Exception:
pass
def download_with_httpx(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 httpx 下载 URL轻量级 HTTP 客户端)"""
try:
import httpx
except ImportError:
return None, "httpx 库未安装"
headers = {
"User-Agent": USER_AGENT
}
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(url, headers=headers)
if response.status_code == 200:
content = response.text
if not content or not content.strip():
return None, "下载内容为空"
return content, None
return None, f"HTTP {response.status_code}"
except Exception as e:
return None, f"httpx 下载失败: {str(e)}"
def download_with_urllib(url: str) -> Tuple[Optional[str], Optional[str]]:
"""使用 urllib 下载 URL标准库兜底方案"""
headers = {
"User-Agent": USER_AGENT
}
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as response:
if response.status == 200:
content = response.read().decode("utf-8")
if not content or not content.strip():
return None, "下载内容为空"
return content, None
return None, f"HTTP {response.status}"
except Exception as e:
return None, f"urllib 下载失败: {str(e)}"
def download_html(url: str) -> Tuple[Optional[str], list]:
"""
统一的 HTML 下载入口函数,按优先级尝试各下载器。
返回: (content, failures)
- content: 成功时返回 HTML 内容,失败时返回 None
- failures: 各下载器的失败原因列表
"""
failures = []
content = None
# 按优先级尝试各下载器
downloaders = [
("pyppeteer", download_with_pyppeteer),
("selenium", download_with_selenium),
("httpx", download_with_httpx),
("urllib", download_with_urllib),
]
for name, func in downloaders:
content, error = func(url)
if content is not None:
return content, failures
else:
failures.append(f"- {name}: {error}")
return None, failures