From 7f0f831226a7df46a52b4e29ee79744bbbc3bbc2 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Tue, 21 Apr 2026 11:45:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8A=BD=E5=8F=96=20scripts/core.py=20?= =?UTF-8?q?=E5=85=AC=E5=85=B1=E6=A8=A1=E5=9D=97=EF=BC=8C=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将 anthropic_detect.py 和 openai_detect.py 中的公共功能抽取到 core.py 模块,包括: - HTTP 请求(普通/流式)及重试逻辑 - SSL 上下文管理 - 测试用例/结果数据结构 (TestCase, TestResult) - 错误分类 (ErrorType) - 响应验证辅助函数 (validate_response_structure 等) - 测试执行框架 (run_test, run_test_suite) 两个检测脚本重构后更聚焦于各自 API 的测试用例定义。 --- scripts/anthropic_detect.py | 853 +++++++++++++++++++++++------------- scripts/core.py | 471 ++++++++++++++++++++ scripts/openai_detect.py | 671 ++++++++++++++++++---------- 3 files changed, 1446 insertions(+), 549 deletions(-) create mode 100644 scripts/core.py diff --git a/scripts/anthropic_detect.py b/scripts/anthropic_detect.py index 219e4ab..7ab1236 100644 --- a/scripts/anthropic_detect.py +++ b/scripts/anthropic_detect.py @@ -11,83 +11,21 @@ """ import json -import time -import ssl import argparse -import urllib.request -import urllib.error +from typing import Dict, List, Tuple, Any +from core import ( + create_ssl_context, + TestCase, + run_test_suite, + validate_response_structure, + format_validation_errors +) -TIMEOUT = 30 ANTHROPIC_VERSION = "2023-06-01" -def create_ssl_context(): - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - return ctx - - -def http_request(url, method="GET", headers=None, body=None, ssl_ctx=None): - req = urllib.request.Request(url, method=method) - if headers: - for k, v in headers.items(): - req.add_header(k, v) - if body is not None: - if isinstance(body, str): - req.data = body.encode("utf-8") - else: - req.data = json.dumps(body).encode("utf-8") - - start = time.time() - try: - resp = urllib.request.urlopen(req, timeout=TIMEOUT, context=ssl_ctx) - elapsed = time.time() - start - return resp.getcode(), resp.read().decode("utf-8"), elapsed - except urllib.error.HTTPError as e: - elapsed = time.time() - start - return e.code, e.read().decode("utf-8"), elapsed - except Exception as e: - elapsed = time.time() - start - return None, str(e), elapsed - - -def http_stream_request(url, headers=None, body=None, ssl_ctx=None): - req = urllib.request.Request(url, method="POST") - if headers: - for k, v in headers.items(): - req.add_header(k, v) - if body is not None: - req.data = json.dumps(body).encode("utf-8") - - start = time.time() - try: - resp = urllib.request.urlopen(req, timeout=TIMEOUT, context=ssl_ctx) - status = resp.getcode() - lines = [] - for raw_line in resp: - line = raw_line.decode("utf-8").rstrip("\n\r") - if line: - lines.append(line) - elapsed = time.time() - start - return status, "\n".join(lines), elapsed - except urllib.error.HTTPError as e: - elapsed = time.time() - start - return e.code, e.read().decode("utf-8"), elapsed - except Exception as e: - elapsed = time.time() - start - return None, str(e), elapsed - - -def format_json(text): - try: - parsed = json.loads(text) - return json.dumps(parsed, ensure_ascii=False, indent=2) - except (json.JSONDecodeError, TypeError): - return text - - -def build_headers(api_key): +def build_headers(api_key: str) -> Dict[str, str]: + """构建 Anthropic API 请求头""" h = { "Content-Type": "application/json", "anthropic-version": ANTHROPIC_VERSION, @@ -97,32 +35,149 @@ def build_headers(api_key): return h -def run_test(index, total, desc, url, method, headers, body, stream, ssl_ctx): - print(f"\n[{index}/{total}] {desc}") - print(f">>> {method} {url}") - if body is not None: - if isinstance(body, str): - print(body) +# ==================== Anthropic 响应验证函数 ==================== + +def validate_anthropic_models_list_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 Anthropic Models List 响应 + + 根据API文档,响应应包含: + - data: array of ModelInfo + - first_id: string (可选) + - has_more: boolean + - last_id: string (可选) + """ + errors = [] + + try: + data = json.loads(response_text) + except json.JSONDecodeError as e: + return False, [f"响应不是有效的JSON: {e}"] + + # 检查必需字段 + required_fields = ["data", "has_more"] + for field in required_fields: + if field not in data: + errors.append(f"缺少必需字段: {field}") + + # 检查 data 数组 + if "data" in data: + if not isinstance(data["data"], list): + errors.append(f"字段 'data' 类型错误: 期望 list, 实际 {type(data['data']).__name__}") else: - print(format_json(json.dumps(body, ensure_ascii=False))) + for i, model in enumerate(data["data"]): + if not isinstance(model, dict): + errors.append(f"data[{i}] 不是对象") + continue - if stream: - status, data, elapsed = http_stream_request(url, headers, body, ssl_ctx) - else: - status, data, elapsed = http_request(url, method, headers, body, ssl_ctx) + # 检查 model 对象的必需字段 + model_required = ["id", "type", "display_name", "created_at"] + for field in model_required: + if field not in model: + errors.append(f"data[{i}] 缺少必需字段: {field}") - if status is not None: - print(f"状态码: {status} | 耗时: {elapsed:.2f}s") - else: - print(f"请求失败 | 耗时: {elapsed:.2f}s") + # 检查 type 字段值 + if "type" in model and model["type"] != "model": + errors.append(f"data[{i}].type 值错误: 期望 'model', 实际 '{model['type']}'") - if stream and status and status < 300: - for line in data.split("\n"): - print(line) - else: - print(format_json(data)) + return len(errors) == 0, errors - return status + +def validate_anthropic_model_retrieve_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 Anthropic Model Retrieve 响应 + + 根据API文档,响应应包含: + - id: string + - type: "model" + - display_name: string + - created_at: string + - max_input_tokens: number + - max_tokens: number + """ + required_fields = ["id", "type", "display_name", "created_at"] + field_types = { + "id": str, + "type": str, + "display_name": str, + "created_at": str + } + enum_values = { + "type": ["model"] + } + + return validate_response_structure(response_text, required_fields, field_types, enum_values) + + +def validate_anthropic_messages_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 Anthropic Messages 响应 + + 根据API文档,响应应包含: + - id: string + - type: "message" + - role: "assistant" + - content: array + - model: string + - stop_reason: string (可选) + - usage: object + """ + errors = [] + + try: + data = json.loads(response_text) + except json.JSONDecodeError as e: + return False, [f"响应不是有效的JSON: {e}"] + + # 检查必需字段 + required_fields = ["id", "type", "role", "content", "model"] + for field in required_fields: + if field not in data: + errors.append(f"缺少必需字段: {field}") + + # 检查特定字段值 + if "type" in data and data["type"] != "message": + errors.append(f"字段 'type' 值错误: 期望 'message', 实际 '{data['type']}'") + + if "role" in data and data["role"] != "assistant": + errors.append(f"字段 'role' 值错误: 期望 'assistant', 实际 '{data['role']}'") + + # 检查 content 数组 + if "content" in data: + if not isinstance(data["content"], list): + errors.append(f"字段 'content' 类型错误: 期望 list, 实际 {type(data['content']).__name__}") + else: + for i, block in enumerate(data["content"]): + if not isinstance(block, dict): + errors.append(f"content[{i}] 不是对象") + continue + + # 检查 content block 的必需字段 + if "type" not in block: + errors.append(f"content[{i}] 缺少必需字段: type") + + # 检查 usage 对象 + if "usage" in data and data["usage"] is not None: + if not isinstance(data["usage"], dict): + errors.append(f"字段 'usage' 类型错误: 期望 object, 实际 {type(data['usage']).__name__}") + else: + usage_fields = ["input_tokens", "output_tokens"] + for field in usage_fields: + if field not in data["usage"]: + errors.append(f"usage 缺少必需字段: {field}") + + return len(errors) == 0, errors + + +def validate_anthropic_count_tokens_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 Anthropic Count Tokens 响应 + + 根据API文档,响应应包含: + - input_tokens: number + """ + required_fields = ["input_tokens"] + field_types = { + "input_tokens": (int, float) + } + + return validate_response_structure(response_text, required_fields, field_types) def main(): @@ -157,59 +212,83 @@ def main(): models_url = f"{base_url}/v1/models" count_tokens_url = f"{base_url}/v1/messages/count_tokens" - # --- 收集用例: (描述, 方法, URL, 请求头, 请求体, 是否流式) --- - cases = [] + # --- 收集测试用例 --- + cases: List[TestCase] = [] # ==== Models API ==== - cases.append(( - "获取模型列表 (GET /v1/models)", - "GET", models_url, headers, None, False + cases.append(TestCase( + desc="获取模型列表 (GET /v1/models)", + method="GET", + url=models_url, + headers=headers, + validator=validate_anthropic_models_list_response )) - cases.append(( - "获取模型列表(分页 limit=3)(GET /v1/models?limit=3)", - "GET", f"{models_url}?limit=3", headers, None, False + cases.append(TestCase( + desc="获取模型列表(分页 limit=3)(GET /v1/models?limit=3)", + method="GET", + url=f"{models_url}?limit=3", + headers=headers, + validator=validate_anthropic_models_list_response )) - cases.append(( - "获取指定模型详情 (GET /v1/models/{model})", - "GET", f"{models_url}/{model}", headers, None, False + cases.append(TestCase( + desc="获取指定模型详情 (GET /v1/models/{model})", + method="GET", + url=f"{models_url}/{model}", + headers=headers, + validator=validate_anthropic_model_retrieve_response )) - cases.append(( - "获取不存在的模型 (GET /v1/models/nonexistent-model-xxx)", - "GET", f"{models_url}/nonexistent-model-xxx", headers, None, False + cases.append(TestCase( + desc="获取不存在的模型 (GET /v1/models/nonexistent-model-xxx)", + method="GET", + url=f"{models_url}/nonexistent-model-xxx", + headers=headers )) # ==== Messages API: 正面用例 ==== - cases.append(( - "基本对话(仅 user)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="基本对话(仅 user)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "messages": [{"role": "user", "content": "Hi"}] - }, False + }, + validator=validate_anthropic_messages_response )) - cases.append(( - "system prompt + user 对话", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="system prompt + user 对话", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "system": "You are a helpful assistant.", "messages": [{"role": "user", "content": "1+1="}] - }, False + } )) - cases.append(( - "system prompt 数组格式(带缓存控制)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="system prompt 数组格式(带缓存控制)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "system": [ {"type": "text", "text": "You are a helpful assistant.", "cache_control": {"type": "ephemeral"}} ], "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "多轮对话(含 assistant 历史)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="多轮对话(含 assistant 历史)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "messages": [ @@ -217,87 +296,114 @@ def main(): {"role": "assistant", "content": "Hello!"}, {"role": "user", "content": "1+1="} ] - }, False + } )) - cases.append(( - "assistant prefill(部分回复填充)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="assistant prefill(部分回复填充)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 1, "messages": [ {"role": "user", "content": "What is latin for Ant? (A) Apoidea (B) Rhopalocera (C) Formicidae"}, {"role": "assistant", "content": "The answer is ("} ] - }, False + } )) - cases.append(( - "content 数组格式(多个 text block)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="content 数组格式(多个 text block)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "messages": [{"role": "user", "content": [ {"type": "text", "text": "Hello"}, {"type": "text", "text": "1+1=?"} ]}] - }, False + } )) - cases.append(( - "temperature + top_p", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="temperature + top_p", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "temperature": 0.5, "top_p": 0.9, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "temperature = 0(类确定性输出)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="temperature = 0(类确定性输出)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "temperature": 0, "messages": [{"role": "user", "content": "1+1="}] - }, False + } )) - cases.append(( - "top_k 参数", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="top_k 参数", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "top_k": 40, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "max_tokens 限制", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="max_tokens 限制", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 10, "messages": [{"role": "user", "content": "讲一个故事"}] - }, False + } )) - cases.append(( - "stop_sequences", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="stop_sequences", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 20, "stop_sequences": ["5"], "messages": [{"role": "user", "content": "数数: 1,2,3,"}] - }, False + } )) - cases.append(( - "metadata 参数(user_id)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="metadata 参数(user_id)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "metadata": {"user_id": "test-user-001"}, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "assistant content 数组格式(text + tool_use 块)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="assistant content 数组格式(text + tool_use 块)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 20, "messages": [ @@ -312,20 +418,27 @@ def main(): ]} ]} ] - }, False + } )) # ==== Count Tokens API ==== - cases.append(( - "计数 Token (POST /v1/messages/count_tokens)", - "POST", count_tokens_url, headers, { + cases.append(TestCase( + desc="计数 Token (POST /v1/messages/count_tokens)", + method="POST", + url=count_tokens_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hello, how are you?"}] - }, False + }, + validator=validate_anthropic_count_tokens_response )) - cases.append(( - "计数 Token(带 system + tools)", - "POST", count_tokens_url, headers, { + cases.append(TestCase( + desc="计数 Token(带 system + tools)", + method="POST", + url=count_tokens_url, + headers=headers, + body={ "model": model, "system": "You are a helpful assistant.", "messages": [{"role": "user", "content": "Hi"}], @@ -338,130 +451,175 @@ def main(): "required": ["location"] } }] - }, False + } )) - cases.append(( - "计数 Token 缺少 model(负面)", - "POST", count_tokens_url, headers, { + cases.append(TestCase( + desc="计数 Token 缺少 model(负面)", + method="POST", + url=count_tokens_url, + headers=headers, + body={ "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) # ==== Messages API: 负面用例 ==== - cases.append(( - "缺少 x-api-key header(无认证)", - "POST", messages_url, { + cases.append(TestCase( + desc="缺少 x-api-key header(无认证)", + method="POST", + url=messages_url, + headers={ "Content-Type": "application/json", "anthropic-version": ANTHROPIC_VERSION, - }, { + }, + body={ "model": model, "max_tokens": 5, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "错误的 anthropic-version header", - "POST", messages_url, { + cases.append(TestCase( + desc="错误的 anthropic-version header", + method="POST", + url=messages_url, + headers={ "Content-Type": "application/json", "anthropic-version": "0000-00-00", "x-api-key": api_key, } if api_key else { "Content-Type": "application/json", "anthropic-version": "0000-00-00", - }, { + }, + body={ "model": model, "max_tokens": 5, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "缺少 model 参数", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="缺少 model 参数", + method="POST", + url=messages_url, + headers=headers, + body={ "max_tokens": 5, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "缺少 messages 参数", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="缺少 messages 参数", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5 - }, False + } )) - cases.append(( - "缺少 max_tokens 参数", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="缺少 max_tokens 参数", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "messages 为空数组", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="messages 为空数组", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "messages": [] - }, False + } )) - cases.append(( - "无效 API key", - "POST", messages_url, headers_bad_auth, { + cases.append(TestCase( + desc="无效 API key", + method="POST", + url=messages_url, + headers=headers_bad_auth, + body={ "model": model, "max_tokens": 5, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "不存在的模型", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="不存在的模型", + method="POST", + url=messages_url, + headers=headers, + body={ "model": "nonexistent-model-xxx", "max_tokens": 5, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "畸形 JSON body", - "POST", messages_url, headers, "invalid json{", False + cases.append(TestCase( + desc="畸形 JSON body", + method="POST", + url=messages_url, + headers=headers, + body="invalid json{" )) - cases.append(( - "无效 role(非法消息角色)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="无效 role(非法消息角色)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "messages": [{"role": "system", "content": "You are helpful"}] - }, False + } )) - cases.append(( - "max_tokens 为负数", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="max_tokens 为负数", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": -1, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "max_tokens = 0", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="max_tokens = 0", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 0, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) - cases.append(( - "temperature 超出范围 (2.0)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="temperature 超出范围 (2.0)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "temperature": 2.0, "messages": [{"role": "user", "content": "Hi"}] - }, False + } )) # ==== --vision ==== if args.vision: - cases.append(( - "图片 URL 输入 (--vision)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="图片 URL 输入 (--vision)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 10, "messages": [{"role": "user", "content": [ @@ -473,39 +631,51 @@ def main(): "2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" }} ]}] - }, False + } )) # ==== --stream ==== if args.stream: - cases.append(( - "基本流式 (--stream)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="基本流式 (--stream)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "stream": True, "messages": [{"role": "user", "content": "Hi"}] - }, True + }, + stream=True )) - cases.append(( - "流式 + system prompt (--stream)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="流式 + system prompt (--stream)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 5, "stream": True, "system": "Reply in one word.", "messages": [{"role": "user", "content": "1+1="}] - }, True + }, + stream=True )) - cases.append(( - "流式 + stop_sequences (--stream)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="流式 + stop_sequences (--stream)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 20, "stream": True, "stop_sequences": ["5"], "messages": [{"role": "user", "content": "数数: 1,2,3,"}] - }, True + }, + stream=True )) # ==== --tools ==== @@ -521,49 +691,64 @@ def main(): "required": ["location"] } } - cases.append(( - "工具调用 tool_choice: auto (--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="工具调用 tool_choice: auto (--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 50, "tools": [tool_weather], "tool_choice": {"type": "auto"}, "messages": [{"role": "user", "content": "北京天气怎么样?"}] - }, False + } )) - cases.append(( - "工具调用 tool_choice: any (--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="工具调用 tool_choice: any (--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 50, "tools": [tool_weather], "tool_choice": {"type": "any"}, "messages": [{"role": "user", "content": "北京天气怎么样?"}] - }, False + } )) - cases.append(( - "指定工具调用 tool_choice: {name} (--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="指定工具调用 tool_choice: {name} (--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 50, "tools": [tool_weather], "tool_choice": {"type": "tool", "name": "get_weather"}, "messages": [{"role": "user", "content": "北京天气怎么样?"}] - }, False + } )) - cases.append(( - "tool_choice: none (--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="tool_choice: none (--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 20, "tools": [tool_weather], "tool_choice": {"type": "none"}, "messages": [{"role": "user", "content": "北京天气怎么样?"}] - }, False + } )) - cases.append(( - "多轮工具调用(tool_result 返回)(--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="多轮工具调用(tool_result 返回)(--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 20, "tools": [tool_weather], @@ -577,11 +762,14 @@ def main(): {"type": "tool_result", "tool_use_id": "toolu_001", "content": "{\"temperature\": 22, \"condition\": \"晴\"}"} ]} ] - }, False + } )) - cases.append(( - "多轮工具调用(tool_result 带 is_error)(--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="多轮工具调用(tool_result 带 is_error)(--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 20, "tools": [tool_weather], @@ -594,21 +782,27 @@ def main(): {"type": "tool_result", "tool_use_id": "toolu_002", "is_error": True, "content": "天气服务不可用"} ]} ] - }, False + } )) - cases.append(( - "tool_choice 指向不存在的工具(负面)(--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="tool_choice 指向不存在的工具(负面)(--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 50, "tools": [tool_weather], "tool_choice": {"type": "tool", "name": "nonexistent_tool_xxx"}, "messages": [{"role": "user", "content": "北京天气怎么样?"}] - }, False + } )) - cases.append(( - "多工具定义 (--tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="多工具定义 (--tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 50, "tools": [ @@ -627,28 +821,34 @@ def main(): ], "tool_choice": {"type": "auto"}, "messages": [{"role": "user", "content": "北京现在几点了?天气怎么样?"}] - }, False + } )) # ==== --thinking ==== if args.thinking: - cases.append(( - "扩展思维 enabled (--thinking)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="扩展思维 enabled (--thinking)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 200, "thinking": {"type": "enabled", "budget_tokens": 100}, "messages": [{"role": "user", "content": "1+1=?"}] - }, False + } )) - cases.append(( - "扩展思维 adaptive (--thinking)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="扩展思维 adaptive (--thinking)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 200, "thinking": {"type": "adaptive", "budget_tokens": 100}, "messages": [{"role": "user", "content": "1+1=?"}] - }, False + } )) # ==== --stream + --tools 组合 ==== @@ -664,28 +864,74 @@ def main(): "required": ["location"] } } - cases.append(( - "流式工具调用 (--stream --tools)", - "POST", messages_url, headers, { + cases.append(TestCase( + desc="流式工具调用 (--stream --tools)", + method="POST", + url=messages_url, + headers=headers, + body={ "model": model, "max_tokens": 50, "stream": True, "tools": [tool_weather_stream], "tool_choice": {"type": "auto"}, "messages": [{"role": "user", "content": "北京天气怎么样?"}] - }, True + }, + stream=True )) - # ==== 执行测试 ==== - total = len(cases) - count_2xx = 0 - count_other = 0 + # ==== 高级参数测试 ==== + # cache_control: 缓存控制 + cases.append(TestCase( + desc="cache_control 缓存控制", + method="POST", + url=messages_url, + headers=headers, + body={ + "model": model, + "max_tokens": 10, + "cache_control": {"type": "ephemeral"}, + "messages": [{"role": "user", "content": "Hello"}] + } + )) - print("=" * 60) - print("Anthropic 兼容性测试") - print(f"目标: {base_url}") - print(f"模型: {model}") - print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") + # output_config: 输出配置 + cases.append(TestCase( + desc="output_config 输出配置", + method="POST", + url=messages_url, + headers=headers, + body={ + "model": model, + "max_tokens": 10, + "output_config": {"format": "text"}, + "messages": [{"role": "user", "content": "Hi"}] + } + )) + + # service_tier: 服务层级 + cases.append(TestCase( + desc="service_tier: auto", + method="POST", + url=messages_url, + headers=headers, + body={ + "model": model, + "max_tokens": 5, + "service_tier": "auto", + "messages": [{"role": "user", "content": "Hello"}] + } + )) + + # ==== Models API 分页测试 ==== + cases.append(TestCase( + desc="Models API 分页 limit=5", + method="GET", + url=f"{models_url}?limit=5", + headers=headers + )) + + # ==== 执行测试 ==== flags = [] if args.vision: flags.append("vision") @@ -695,20 +941,15 @@ def main(): flags.append("tools") if args.thinking: flags.append("thinking") - print(f"用例: {total} 个" + (f" | 扩展: {', '.join(flags)}" if flags else "")) - print("=" * 60) - for i, (desc, method, url, hdrs, body, stream) in enumerate(cases, 1): - status = run_test(i, total, desc, url, method, hdrs, body, stream, ssl_ctx) - if status is not None and 200 <= status < 300: - count_2xx += 1 - else: - count_other += 1 - - print() - print("=" * 60) - print(f"测试完成 | 总计: {total} | HTTP 2xx: {count_2xx} | 非 2xx: {count_other}") - print("=" * 60) + run_test_suite( + cases=cases, + ssl_ctx=ssl_ctx, + title="Anthropic 兼容性测试", + base_url=base_url, + model=model, + flags=flags + ) if __name__ == "__main__": diff --git a/scripts/core.py b/scripts/core.py new file mode 100644 index 0000000..a852946 --- /dev/null +++ b/scripts/core.py @@ -0,0 +1,471 @@ +#!/usr/bin/env python3 +"""兼容性测试脚本的核心公共函数 + +提供 HTTP 请求、SSL 上下文、JSON 格式化、验证辅助等通用功能。 +""" + +import json +import time +import ssl +import urllib.request +import urllib.error +from dataclasses import dataclass +from typing import Optional, Dict, Any, Tuple, List, Union, Type +from enum import Enum + +TIMEOUT = 30 +MAX_RETRIES = 2 # 最大重试次数 + + +class ErrorType(Enum): + """错误类型分类""" + NETWORK = "network" # 网络错误 + CLIENT = "client" # 4xx 错误 + SERVER = "server" # 5xx 错误 + SUCCESS = "success" # 成功 + + +@dataclass +class TestCase: + """测试用例数据结构""" + desc: str # 测试描述 + method: str # HTTP 方法 + url: str # 请求 URL + headers: Dict[str, str] # 请求头 + body: Optional[Any] = None # 请求体 + stream: bool = False # 是否流式请求 + validator: Optional[Any] = None # 响应验证函数(可选) + + +@dataclass +class TestResult: + """测试结果数据结构""" + status: Optional[int] # HTTP 状态码 + elapsed: float # 耗时(秒) + error_type: ErrorType # 错误类型 + response: str # 响应内容 + + +def create_ssl_context() -> ssl.SSLContext: + """创建不验证证书的 SSL 上下文(用于测试环境)""" + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +def classify_error(status: Optional[int]) -> ErrorType: + """根据状态码分类错误类型""" + if status is None: + return ErrorType.NETWORK + if 200 <= status < 300: + return ErrorType.SUCCESS + if 400 <= status < 500: + return ErrorType.CLIENT + if status >= 500: + return ErrorType.SERVER + return ErrorType.NETWORK + + +def http_request( + url: str, + method: str = "GET", + headers: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + ssl_ctx: Optional[ssl.SSLContext] = None, + retries: int = MAX_RETRIES +) -> TestResult: + """执行普通 HTTP 请求(支持重试) + + Args: + url: 请求 URL + method: HTTP 方法 (GET/POST/PUT/DELETE) + headers: 请求头字典 + body: 请求体 (dict 或 str) + ssl_ctx: SSL 上下文 + retries: 重试次数 + + Returns: + TestResult 对象 + """ + req = urllib.request.Request(url, method=method) + if headers: + for k, v in headers.items(): + req.add_header(k, v) + if body is not None: + if isinstance(body, str): + req.data = body.encode("utf-8") + else: + req.data = json.dumps(body).encode("utf-8") + + start = time.time() + last_error = None + + for attempt in range(retries + 1): + try: + resp = urllib.request.urlopen(req, timeout=TIMEOUT, context=ssl_ctx) + elapsed = time.time() - start + status = resp.getcode() + return TestResult( + status=status, + elapsed=elapsed, + error_type=classify_error(status), + response=resp.read().decode("utf-8") + ) + except urllib.error.HTTPError as e: + elapsed = time.time() - start + return TestResult( + status=e.code, + elapsed=elapsed, + error_type=classify_error(e.code), + response=e.read().decode("utf-8") + ) + except Exception as e: + last_error = str(e) + if attempt < retries: + time.sleep(0.5 * (attempt + 1)) # 递增延迟 + continue + + elapsed = time.time() - start + return TestResult( + status=None, + elapsed=elapsed, + error_type=ErrorType.NETWORK, + response=last_error or "Unknown error" + ) + + +def http_stream_request( + url: str, + headers: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + ssl_ctx: Optional[ssl.SSLContext] = None, + retries: int = MAX_RETRIES +) -> TestResult: + """执行流式 HTTP 请求 (SSE,支持重试) + + Args: + url: 请求 URL + headers: 请求头字典 + body: 请求体 (dict) + ssl_ctx: SSL 上下文 + retries: 重试次数 + + Returns: + TestResult 对象 + """ + req = urllib.request.Request(url, method="POST") + if headers: + for k, v in headers.items(): + req.add_header(k, v) + if body is not None: + req.data = json.dumps(body).encode("utf-8") + + start = time.time() + last_error = None + + for attempt in range(retries + 1): + try: + resp = urllib.request.urlopen(req, timeout=TIMEOUT, context=ssl_ctx) + status = resp.getcode() + lines = [] + for raw_line in resp: + line = raw_line.decode("utf-8").rstrip("\n\r") + if line: + lines.append(line) + elapsed = time.time() - start + return TestResult( + status=status, + elapsed=elapsed, + error_type=classify_error(status), + response="\n".join(lines) + ) + except urllib.error.HTTPError as e: + elapsed = time.time() - start + return TestResult( + status=e.code, + elapsed=elapsed, + error_type=classify_error(e.code), + response=e.read().decode("utf-8") + ) + except Exception as e: + last_error = str(e) + if attempt < retries: + time.sleep(0.5 * (attempt + 1)) + continue + + elapsed = time.time() - start + return TestResult( + status=None, + elapsed=elapsed, + error_type=ErrorType.NETWORK, + response=last_error or "Unknown error" + ) + + +def format_json(text: str) -> str: + """格式化 JSON 文本(用于美化输出) + + Args: + text: JSON 字符串或任意文本 + + Returns: + 格式化后的 JSON 字符串,或原文本(如果不是有效 JSON) + """ + try: + parsed = json.loads(text) + return json.dumps(parsed, ensure_ascii=False, indent=2) + except (json.JSONDecodeError, TypeError): + return text + + +def run_test( + index: int, + total: int, + test_case: TestCase, + ssl_ctx: ssl.SSLContext +) -> TestResult: + """执行单个测试用例并打印结果 + + Args: + index: 测试序号 + total: 总测试数 + test_case: 测试用例对象 + ssl_ctx: SSL 上下文 + + Returns: + TestResult 对象 + """ + print(f"\n[{index}/{total}] {test_case.desc}") + print(f">>> {test_case.method} {test_case.url}") + if test_case.body is not None: + if isinstance(test_case.body, str): + print(test_case.body) + else: + print(format_json(json.dumps(test_case.body, ensure_ascii=False))) + + if test_case.stream: + result = http_stream_request( + test_case.url, + test_case.headers, + test_case.body, + ssl_ctx + ) + else: + result = http_request( + test_case.url, + test_case.method, + test_case.headers, + test_case.body, + ssl_ctx + ) + + if result.status is not None: + print(f"状态码: {result.status} | 耗时: {result.elapsed:.2f}s") + else: + print(f"请求失败 | 耗时: {result.elapsed:.2f}s") + + if test_case.stream and result.status and result.status < 300: + # 流式响应按 SSE 行逐行输出 + for line in result.response.split("\n"): + print(line) + else: + print(format_json(result.response)) + + # 执行响应验证 + if test_case.validator and result.status and 200 <= result.status < 300: + is_valid, errors = test_case.validator(result.response) + if is_valid: + print("✓ 响应验证通过") + else: + print("✗ 响应验证失败:") + for error in errors: + print(f" - {error}") + + return result + + +def run_test_suite( + cases: List[TestCase], + ssl_ctx: ssl.SSLContext, + title: str, + base_url: str, + model: str, + flags: Optional[List[str]] = None +) -> Tuple[int, int, int, int]: + """执行测试套件并打印总结 + + Args: + cases: 测试用例列表 + ssl_ctx: SSL 上下文 + title: 测试标题 + base_url: API 基础地址 + model: 模型名称 + flags: 扩展测试标记列表 + + Returns: + (总数, 成功数, 客户端错误数, 服务端错误数) + """ + total = len(cases) + count_success = 0 + count_client_error = 0 + count_server_error = 0 + count_network_error = 0 + + print("=" * 60) + print(title) + print(f"目标: {base_url}") + print(f"模型: {model}") + print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") + if flags: + print(f"用例: {total} 个 | 扩展: {', '.join(flags)}") + else: + print(f"用例: {total} 个") + print("=" * 60) + + for i, test_case in enumerate(cases, 1): + result = run_test(i, total, test_case, ssl_ctx) + + if result.error_type == ErrorType.SUCCESS: + count_success += 1 + elif result.error_type == ErrorType.CLIENT: + count_client_error += 1 + elif result.error_type == ErrorType.SERVER: + count_server_error += 1 + else: + count_network_error += 1 + + print() + print("=" * 60) + print(f"测试完成 | 总计: {total} | 成功: {count_success} | " + f"客户端错误: {count_client_error} | 服务端错误: {count_server_error} | " + f"网络错误: {count_network_error}") + print("=" * 60) + + return total, count_success, count_client_error, count_server_error + + +# ==================== 通用验证辅助函数 ==================== + +def check_required_fields(data: Dict[str, Any], required_fields: List[str]) -> Tuple[bool, List[str]]: + """检查必需字段是否存在 + + Args: + data: 待检查的数据字典 + required_fields: 必需字段列表 + + Returns: + (是否全部存在, 缺失字段列表) + """ + missing = [] + for field in required_fields: + if field not in data: + missing.append(field) + return len(missing) == 0, missing + + +def check_field_type(value: Any, expected_type: Union[Type, tuple]) -> bool: + """检查字段类型是否正确 + + Args: + value: 待检查的值 + expected_type: 期望的类型(可以是类型元组) + + Returns: + 类型是否匹配 + """ + if value is None: + return True # None值通常表示可选字段,允许 + return isinstance(value, expected_type) + + +def check_enum_value(value: Any, allowed_values: List[Any]) -> bool: + """检查值是否在允许的枚举值列表中 + + Args: + value: 待检查的值 + allowed_values: 允许的值列表 + + Returns: + 值是否合法 + """ + if value is None: + return True # None值通常表示可选字段,允许 + return value in allowed_values + + +def check_array_items_type(arr: List[Any], expected_item_type: Union[Type, tuple]) -> bool: + """检查数组中所有元素的类型 + + Args: + arr: 待检查的数组 + expected_item_type: 期望的元素类型 + + Returns: + 所有元素类型是否匹配 + """ + if not isinstance(arr, list): + return False + return all(check_field_type(item, expected_item_type) for item in arr) + + +def format_validation_errors(errors: List[str]) -> str: + """格式化验证错误信息 + + Args: + errors: 错误信息列表 + + Returns: + 格式化后的错误字符串 + """ + if not errors: + return "验证通过" + return "验证失败:\n - " + "\n - ".join(errors) + + +def validate_response_structure( + response_text: str, + required_fields: List[str], + field_types: Optional[Dict[str, Union[Type, tuple]]] = None, + enum_values: Optional[Dict[str, List[Any]]] = None +) -> Tuple[bool, List[str]]: + """验证响应结构(通用验证函数) + + Args: + response_text: 响应文本 + required_fields: 必需字段列表 + field_types: 字段类型映射 {字段名: 期望类型} + enum_values: 枚举值映射 {字段名: 允许值列表} + + Returns: + (是否验证通过, 错误信息列表) + """ + errors = [] + + # 尝试解析JSON + try: + data = json.loads(response_text) + except json.JSONDecodeError as e: + errors.append(f"响应不是有效的JSON: {e}") + return False, errors + + # 检查必需字段 + has_required, missing = check_required_fields(data, required_fields) + if not has_required: + errors.append(f"缺少必需字段: {', '.join(missing)}") + + # 检查字段类型 + if field_types: + for field, expected_type in field_types.items(): + if field in data and not check_field_type(data[field], expected_type): + actual_type = type(data[field]).__name__ + expected_name = expected_type.__name__ if isinstance(expected_type, type) else str(expected_type) + errors.append(f"字段 '{field}' 类型错误: 期望 {expected_name}, 实际 {actual_type}") + + # 检查枚举值 + if enum_values: + for field, allowed in enum_values.items(): + if field in data and not check_enum_value(data[field], allowed): + errors.append(f"字段 '{field}' 值非法: {data[field]}, 允许值: {allowed}") + + return len(errors) == 0, errors diff --git a/scripts/openai_detect.py b/scripts/openai_detect.py index 2575329..abf1292 100755 --- a/scripts/openai_detect.py +++ b/scripts/openai_detect.py @@ -11,115 +11,157 @@ """ import json -import time -import ssl import argparse -import urllib.request -import urllib.error - -TIMEOUT = 30 +from typing import Dict, List, Tuple, Any +from core import ( + create_ssl_context, + TestCase, + run_test_suite, + validate_response_structure, + format_validation_errors +) -def create_ssl_context(): - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - return ctx - - -def http_request(url, method="GET", headers=None, body=None, ssl_ctx=None): - req = urllib.request.Request(url, method=method) - if headers: - for k, v in headers.items(): - req.add_header(k, v) - if body is not None: - if isinstance(body, str): - req.data = body.encode("utf-8") - else: - req.data = json.dumps(body).encode("utf-8") - - start = time.time() - try: - resp = urllib.request.urlopen(req, timeout=TIMEOUT, context=ssl_ctx) - elapsed = time.time() - start - return resp.getcode(), resp.read().decode("utf-8"), elapsed - except urllib.error.HTTPError as e: - elapsed = time.time() - start - return e.code, e.read().decode("utf-8"), elapsed - except Exception as e: - elapsed = time.time() - start - return None, str(e), elapsed - - -def http_stream_request(url, headers=None, body=None, ssl_ctx=None): - req = urllib.request.Request(url, method="POST") - if headers: - for k, v in headers.items(): - req.add_header(k, v) - if body is not None: - req.data = json.dumps(body).encode("utf-8") - - start = time.time() - try: - resp = urllib.request.urlopen(req, timeout=TIMEOUT, context=ssl_ctx) - status = resp.getcode() - lines = [] - for raw_line in resp: - line = raw_line.decode("utf-8").rstrip("\n\r") - if line: - lines.append(line) - elapsed = time.time() - start - return status, "\n".join(lines), elapsed - except urllib.error.HTTPError as e: - elapsed = time.time() - start - return e.code, e.read().decode("utf-8"), elapsed - except Exception as e: - elapsed = time.time() - start - return None, str(e), elapsed - - -def format_json(text): - try: - parsed = json.loads(text) - return json.dumps(parsed, ensure_ascii=False, indent=2) - except (json.JSONDecodeError, TypeError): - return text - - -def build_headers(api_key): +def build_headers(api_key: str) -> Dict[str, str]: + """构建 OpenAI API 请求头""" h = {"Content-Type": "application/json"} if api_key: h["Authorization"] = f"Bearer {api_key}" return h -def run_test(index, total, desc, url, method, headers, body, stream, ssl_ctx): - print(f"\n[{index}/{total}] {desc}") - print(f">>> {method} {url}") - if body is not None: - if isinstance(body, str): - print(body) +# ==================== OpenAI 响应验证函数 ==================== + +def validate_openai_models_list_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 OpenAI Models List 响应 + + 根据API文档,响应应包含: + - object: "list" + - data: array of Model objects + """ + errors = [] + + try: + data = json.loads(response_text) + except json.JSONDecodeError as e: + return False, [f"响应不是有效的JSON: {e}"] + + # 检查必需字段 + if "object" not in data: + errors.append("缺少必需字段: object") + elif data["object"] != "list": + errors.append(f"字段 'object' 值错误: 期望 'list', 实际 '{data['object']}'") + + if "data" not in data: + errors.append("缺少必需字段: data") + elif not isinstance(data["data"], list): + errors.append(f"字段 'data' 类型错误: 期望 list, 实际 {type(data['data']).__name__}") + else: + # 验证每个 model 对象 + for i, model in enumerate(data["data"]): + if not isinstance(model, dict): + errors.append(f"data[{i}] 不是对象") + continue + + # 检查 model 对象的必需字段 + model_required = ["id", "object", "created", "owned_by"] + for field in model_required: + if field not in model: + errors.append(f"data[{i}] 缺少必需字段: {field}") + + # 检查 object 字段值 + if "object" in model and model["object"] != "model": + errors.append(f"data[{i}].object 值错误: 期望 'model', 实际 '{model['object']}'") + + return len(errors) == 0, errors + + +def validate_openai_model_retrieve_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 OpenAI Model Retrieve 响应 + + 根据API文档,响应应包含: + - id: string + - object: "model" + - created: number + - owned_by: string + """ + required_fields = ["id", "object", "created", "owned_by"] + field_types = { + "id": str, + "object": str, + "created": (int, float), + "owned_by": str + } + enum_values = { + "object": ["model"] + } + + return validate_response_structure(response_text, required_fields, field_types, enum_values) + + +def validate_openai_chat_completion_response(response_text: str) -> Tuple[bool, List[str]]: + """验证 OpenAI Chat Completion 响应 + + 根据API文档,响应应包含: + - id: string + - object: "chat.completion" + - created: number + - model: string + - choices: array + - usage: object (可选) + """ + errors = [] + + try: + data = json.loads(response_text) + except json.JSONDecodeError as e: + return False, [f"响应不是有效的JSON: {e}"] + + # 检查必需字段 + required_fields = ["id", "object", "created", "model", "choices"] + for field in required_fields: + if field not in data: + errors.append(f"缺少必需字段: {field}") + + # 检查 object 字段值 + if "object" in data and data["object"] != "chat.completion": + errors.append(f"字段 'object' 值错误: 期望 'chat.completion', 实际 '{data['object']}'") + + # 检查 choices 数组 + if "choices" in data: + if not isinstance(data["choices"], list): + errors.append(f"字段 'choices' 类型错误: 期望 list, 实际 {type(data['choices']).__name__}") else: - print(format_json(json.dumps(body, ensure_ascii=False))) + for i, choice in enumerate(data["choices"]): + if not isinstance(choice, dict): + errors.append(f"choices[{i}] 不是对象") + continue - if stream: - status, data, elapsed = http_stream_request(url, headers, body, ssl_ctx) - else: - status, data, elapsed = http_request(url, method, headers, body, ssl_ctx) + # 检查 choice 对象的必需字段 + choice_required = ["index", "message", "finish_reason"] + for field in choice_required: + if field not in choice: + errors.append(f"choices[{i}] 缺少必需字段: {field}") - if status is not None: - print(f"状态码: {status} | 耗时: {elapsed:.2f}s") - else: - print(f"请求失败 | 耗时: {elapsed:.2f}s") + # 检查 message 对象 + if "message" in choice and isinstance(choice["message"], dict): + msg = choice["message"] + if "role" not in msg: + errors.append(f"choices[{i}].message 缺少必需字段: role") + elif msg["role"] != "assistant": + errors.append(f"choices[{i}].message.role 值错误: 期望 'assistant', 实际 '{msg['role']}'") - if stream and status and status < 300: - # 流式响应按 SSE 行逐行输出 - for line in data.split("\n"): - print(line) - else: - print(format_json(data)) + # 检查 usage 对象(可选) + if "usage" in data and data["usage"] is not None: + if not isinstance(data["usage"], dict): + errors.append(f"字段 'usage' 类型错误: 期望 object, 实际 {type(data['usage']).__name__}") + else: + usage_required = ["prompt_tokens", "completion_tokens", "total_tokens"] + for field in usage_required: + if field not in data["usage"]: + errors.append(f"usage 缺少必需字段: {field}") - return status + return len(errors) == 0, errors def main(): @@ -154,57 +196,79 @@ def main(): chat_url = f"{base_url}/chat/completions" - # --- 收集用例: (描述, 方法, URL, 请求头, 请求体, 是否流式) --- - cases = [] + # --- 收集测试用例 --- + cases: List[TestCase] = [] # ---- Models API ---- - cases.append(( - "获取模型列表 (GET /models)", - "GET", f"{base_url}/models", headers, None, False + cases.append(TestCase( + desc="获取模型列表 (GET /models)", + method="GET", + url=f"{base_url}/models", + headers=headers, + validator=validate_openai_models_list_response )) - cases.append(( - "获取指定模型详情 (GET /models/{model})", - "GET", f"{base_url}/models/{model}", headers, None, False + cases.append(TestCase( + desc="获取指定模型详情 (GET /models/{model})", + method="GET", + url=f"{base_url}/models/{model}", + headers=headers, + validator=validate_openai_model_retrieve_response )) - cases.append(( - "获取不存在的模型 (GET /models/nonexistent-model-xxx)", - "GET", f"{base_url}/models/nonexistent-model-xxx", headers, None, False + cases.append(TestCase( + desc="获取不存在的模型 (GET /models/nonexistent-model-xxx)", + method="GET", + url=f"{base_url}/models/nonexistent-model-xxx", + headers=headers )) # ---- Chat Completions: 正面用例 ---- - cases.append(( - "基本对话(仅 user)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="基本对话(仅 user)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5 - }, False + }, + validator=validate_openai_chat_completion_response )) - cases.append(( - "system + user 对话", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="system + user 对话", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "1+1="} ], "max_tokens": 5 - }, False + }, + validator=validate_openai_chat_completion_response )) - cases.append(( - "developer + user 对话", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="developer + user 对话", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [ {"role": "developer", "content": "You are a helpful assistant."}, {"role": "user", "content": "1+1="} ], "max_tokens": 5 - }, False + } )) - cases.append(( - "多轮对话(含 assistant 历史)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="多轮对话(含 assistant 历史)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [ {"role": "user", "content": "Hi"}, @@ -212,74 +276,98 @@ def main(): {"role": "user", "content": "1+1="} ], "max_tokens": 5 - }, False + } )) - cases.append(( - "temperature + top_p", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="temperature + top_p", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "temperature": 0.5, "top_p": 0.9 - }, False + } )) - cases.append(( - "max_tokens 限制", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="max_tokens 限制", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "讲一个故事"}], "max_tokens": 10 - }, False + } )) - cases.append(( - "stop sequences", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="stop sequences", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "数数: 1,2,3,"}], "max_tokens": 20, "stop": ["5"] - }, False + } )) - cases.append(( - "n=2 多候选", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="n=2 多候选", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "n": 2 - }, False + } )) - cases.append(( - "seed 参数", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="seed 参数", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "seed": 42 - }, False + } )) - cases.append(( - "frequency_penalty + presence_penalty", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="frequency_penalty + presence_penalty", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "frequency_penalty": 0.5, "presence_penalty": 0.5 - }, False + } )) - cases.append(( - "max_completion_tokens 参数", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="max_completion_tokens 参数", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "讲一个故事"}], "max_completion_tokens": 10 - }, False + } )) - cases.append(( - "JSON mode (response_format: json_object)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="JSON mode (response_format: json_object)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [ {"role": "system", "content": "以 JSON 格式回复: {\"answer\": \"ok\"}"}, @@ -287,51 +375,69 @@ def main(): ], "max_tokens": 10, "response_format": {"type": "json_object"} - }, False + } )) # ---- Chat Completions: 负面用例 ---- - cases.append(( - "缺少 model 参数", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="缺少 model 参数", + method="POST", + url=chat_url, + headers=headers, + body={ "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5 - }, False + } )) - cases.append(( - "缺少 messages 参数", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="缺少 messages 参数", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "max_tokens": 5 - }, False + } )) - cases.append(( - "messages 为空数组", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="messages 为空数组", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [], "max_tokens": 5 - }, False + } )) - cases.append(( - "无效 API key", - "POST", chat_url, headers_bad_auth, { + cases.append(TestCase( + desc="无效 API key", + method="POST", + url=chat_url, + headers=headers_bad_auth, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5 - }, False + } )) - cases.append(( - "不存在的模型 (chat)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="不存在的模型 (chat)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": "nonexistent-model-xxx", "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5 - }, False + } )) - cases.append(( - "畸形 JSON body", - "POST", chat_url, headers, "invalid json{", False + cases.append(TestCase( + desc="畸形 JSON body", + method="POST", + url=chat_url, + headers=headers, + body="invalid json{" )) # ---- --vision ---- @@ -341,9 +447,12 @@ def main(): "Gfp-wisconsin-madison-the-nature-boardwalk.jpg/" "2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" ) - cases.append(( - "图片 URL 输入 + detail 参数 (--vision)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="图片 URL 输入 + detail 参数 (--vision)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [ {"role": "system", "content": "简短描述图片"}, @@ -355,39 +464,51 @@ def main(): ]} ], "max_tokens": 10 - }, False + } )) # ---- --stream ---- if args.stream: - cases.append(( - "基本流式 (--stream)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="基本流式 (--stream)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "stream": True - }, True + }, + stream=True )) - cases.append(( - "流式 + include_usage (--stream)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="流式 + include_usage (--stream)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "stream": True, "stream_options": {"include_usage": True} - }, True + }, + stream=True )) - cases.append(( - "流式 + stop sequences (--stream)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="流式 + stop sequences (--stream)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "数数: 1,2,3,"}], "max_tokens": 20, "stream": True, "stop": ["5"] - }, True + }, + stream=True )) # ---- --tools ---- @@ -406,29 +527,38 @@ def main(): } } } - cases.append(( - "工具调用 tool_choice: auto (--tools)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="工具调用 tool_choice: auto (--tools)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "北京天气怎么样?"}], "max_tokens": 50, "tools": [tool_weather], "tool_choice": "auto" - }, False + } )) - cases.append(( - "工具调用 tool_choice: required (--tools)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="工具调用 tool_choice: required (--tools)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "北京天气怎么样?"}], "max_tokens": 50, "tools": [tool_weather], "tool_choice": "required" - }, False + } )) - cases.append(( - "指定函数调用 tool_choice: {name} (--tools)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="指定函数调用 tool_choice: {name} (--tools)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "北京天气怎么样?"}], "max_tokens": 50, @@ -437,11 +567,14 @@ def main(): "type": "function", "function": {"name": "get_weather"} } - }, False + } )) - cases.append(( - "多轮工具调用(构造 tool 结果)(--tools)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="多轮工具调用(构造 tool 结果)(--tools)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [ {"role": "user", "content": "北京天气怎么样?"}, @@ -457,38 +590,47 @@ def main(): ], "max_tokens": 20, "tools": [tool_weather] - }, False + } )) - cases.append(( - "parallel_tool_calls: false (--tools)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="parallel_tool_calls: false (--tools)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "北京和上海的天气怎么样?"}], "max_tokens": 50, "tools": [tool_weather], "tool_choice": "auto", "parallel_tool_calls": False - }, False + } )) # ---- --logprobs ---- if args.logprobs: - cases.append(( - "logprobs + top_logprobs (--logprobs)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="logprobs + top_logprobs (--logprobs)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "logprobs": True, "top_logprobs": 2 - }, False + } )) # ---- --json-schema ---- if args.json_schema: - cases.append(( - "Structured Output json_schema (--json_schema)", - "POST", chat_url, headers, { + cases.append(TestCase( + desc="Structured Output json_schema (--json_schema)", + method="POST", + url=chat_url, + headers=headers, + body={ "model": model, "messages": [{"role": "user", "content": "1+1等于几?"}], "max_tokens": 20, @@ -508,19 +650,67 @@ def main(): } } } - }, False + } )) - # ---- 执行测试 ---- - total = len(cases) - count_2xx = 0 - count_other = 0 + # ---- 高级参数测试 ---- + # logit_bias: 修改特定token的似然 + cases.append(TestCase( + desc="logit_bias 参数测试", + method="POST", + url=chat_url, + headers=headers, + body={ + "model": model, + "messages": [{"role": "user", "content": "Hello"}], + "max_tokens": 5, + "logit_bias": {"1234": -100, "5678": 50} # token_id: bias + } + )) - print("=" * 60) - print("OpenAI 兼容性测试") - print(f"目标: {base_url}") - print(f"模型: {model}") - print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") + # reasoning_effort: 推理努力级别(需要模型支持) + cases.append(TestCase( + desc="reasoning_effort: medium", + method="POST", + url=chat_url, + headers=headers, + body={ + "model": model, + "messages": [{"role": "user", "content": "1+1=?"}], + "max_tokens": 10, + "reasoning_effort": "medium" + } + )) + + # service_tier: 服务层级 + cases.append(TestCase( + desc="service_tier: auto", + method="POST", + url=chat_url, + headers=headers, + body={ + "model": model, + "messages": [{"role": "user", "content": "Hi"}], + "max_tokens": 5, + "service_tier": "auto" + } + )) + + # verbosity: 冗长程度 + cases.append(TestCase( + desc="verbosity: low", + method="POST", + url=chat_url, + headers=headers, + body={ + "model": model, + "messages": [{"role": "user", "content": "介绍一下Python"}], + "max_tokens": 50, + "verbosity": "low" + } + )) + + # ---- 执行测试 ---- flags = [] if args.vision: flags.append("vision") @@ -532,20 +722,15 @@ def main(): flags.append("logprobs") if args.json_schema: flags.append("json-schema") - print(f"用例: {total} 个" + (f" | 扩展: {', '.join(flags)}" if flags else "")) - print("=" * 60) - for i, (desc, method, url, hdrs, body, stream) in enumerate(cases, 1): - status = run_test(i, total, desc, url, method, hdrs, body, stream, ssl_ctx) - if status is not None and 200 <= status < 300: - count_2xx += 1 - else: - count_other += 1 - - print() - print("=" * 60) - print(f"测试完成 | 总计: {total} | HTTP 2xx: {count_2xx} | 非 2xx: {count_other}") - print("=" * 60) + run_test_suite( + cases=cases, + ssl_ctx=ssl_ctx, + title="OpenAI 兼容性测试", + base_url=base_url, + model=model, + flags=flags + ) if __name__ == "__main__":