71 KiB
LLM API Protocol Conversion Layer — Architecture Design
语言无关的 HTTP 层 SDK。以 Hub-and-Spoke 架构实现 OpenAI / Anthropic 协议间的双向转换,覆盖完整 HTTP 接口体系,同协议自动透传,为新协议和多模态预留扩展点。
目录
- 设计目标与约束
- 架构总览
- 接口体系分层
- Canonical Model(规范模型)
- Protocol Adapter 接口
- Conversion Engine(转换引擎)
- 流式转换架构
- OpenAI 协议适配器
- Anthropic 协议适配器
- 字段映射参考
- 扩展点设计
- 错误处理
- 参考实现对比
1. 设计目标与约束
1.1 核心目标
| 目标 | 说明 |
|---|---|
| 完整 HTTP 接口体系转换 | 覆盖 /models、/embeddings、/files、/rerank 等全部接口的 URL 路由映射、请求头转换、请求体/响应体格式转换 |
| 输入输出解耦 | 客户端协议和上游协议独立指定,任意组合 |
| 同协议透传 | source == target 时跳过转换,零语义损失、零序列化开销 |
| 尽力转换 | 能对接的参数尽可能对接,不能对接的各自忽略,保障最大覆盖面 |
| 协议可扩展 | 添加新协议只需实现 Adapter,不修改核心引擎 |
| 流式优先 | SSE 流式转换作为核心能力,与非流式同等地位 |
| Tool Calling 核心 | 工具调用是编程场景的一等公民 |
| 语言无关 | 不绑定编程语言,用伪类型描述接口 |
1.2 约束
| 约束 | 说明 |
|---|---|
| 部署形态 | HTTP 层 SDK,处理 HTTP 请求/响应转换(URL + Headers + Body),不包含 HTTP 服务器启动/监听 |
| 当前协议 | OpenAI API、Anthropic API |
| 当前模态 | 仅文本(含 Tool Calling),接口体系覆盖 /models、/count_tokens、/embeddings、/files、/rerank |
| Provider 必传 | 每次转换调用需传入 TargetProvider,提供目标上游的地址、认证、模型名等信息 |
| 适配器注册 | 所有 ProtocolAdapter 通过代码注册,不支持动态增减 |
| 有状态特性 | 初始不实现,架构预留扩展点 |
1.3 设计决策溯源
| 决策 | 依据 |
|---|---|
| HTTP 层 SDK | 编程工具启动时调用 /models、/count_tokens 等接口,缺失会报错或功能降级 |
| Hub-and-Spoke | new-api 验证了以规范格式为枢纽的可行性,O(n²) 降为 O(n) |
| 自定义 Canonical Model | 不选用厂商格式,避免语义损失和厂商锁定 |
| 协议前缀路由 | CC-Switch 的 /claude/、/codex/、/gemini/ 前缀验证了 URL 前缀区分协议的可行性 |
| 接口分层 + 尽力转换 | 不同接口差异程度不同,分层处理最大化覆盖面 |
| 同协议透传 | CC-Switch 的 Anthropic→Anthropic 直通验证了零转换性能优势 |
2. 架构总览
2.1 分层架构
┌─────────────────────────────────────────────────────────────────────────┐
│ 上层 HTTP 框架(用户自选) │
│ Express / FastAPI / Axum / Gin / 任意框架 │
└───────────────────────────────┬─────────────────────────────────────────┘
│ HTTP 请求/响应
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Conversion Engine (SDK) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Protocol Prefix Router │ │
│ │ │ │
│ │ 入站: /{protocol}/{native_path} │ │
│ │ │ │
│ │ /openai/v1/chat/completions → source=openai, CHAT │ │
│ │ /anthropic/v1/messages → source=anthropic, CHAT │ │
│ │ /openai/v1/models → source=openai, MODELS │ │
│ │ /anthropic/v1/models → source=anthropic, MODELS │ │
│ │ /openai/v1/* → source=openai, UNKNOWN │ │
│ │ /anthropic/v1/* → source=anthropic, UNKNOWN │ │
│ │ │ │
│ │ Step 1: URL 前缀 → source protocol(唯一依据,无歧义) │ │
│ │ Step 2: 剥离前缀 → 识别 InterfaceType │ │
│ │ Step 3: target protocol 由配置决定 │ │
│ └─────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌─────────┐ ┌──────────┐ │
│ │ Passthrough │ │核心接口层│ │扩展接口层 │ │
│ │ 同协议直通 │ │ Chat │ │ Models │ │
│ │ 或未知接口 │ │ 流式 │ │ Embed │ │
│ │ 直接转发 │ │ Tool │ │ Files │ │
│ └──────────────┘ │ Thinking│ │ Rerank │ │
│ └────┬────┘ └────┬─────┘ │
│ ┌──────────▼──────────▼───────┐ │
│ │ ProtocolAdapter 层 │ │
│ │ ┌────────┐ ┌───────────┐ │ │
│ │ │OpenAI │ │Anthropic │ │ │
│ │ │Adapter │ │Adapter │ │ │
│ │ └────────┘ └───────────┘ │ │
│ └──────────────┬───────────────┘ │
│ ┌──────────────▼───────────────┐ │
│ │ Canonical Model │ │
│ └──────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 URL 路由规则
入站 URL 采用协议前缀:/{protocol}/{native_path}。前缀是协议识别的唯一依据。
入站 URL 剥离前缀后 出站(target=anthropic)
──────────────────────────────────────────────────────────────────────────────────
/openai/v1/chat/completions → /v1/chat/completions → /v1/messages
/anthropic/v1/messages → /v1/messages → /v1/chat/completions
/openai/v1/models → /v1/models → /v1/models
/anthropic/v1/models → /v1/models → /v1/models
/openai/v1/embeddings → /v1/embeddings → 不支持,透传或返回错误
/anthropic/v1/messages/count_tokens→ /v1/messages/count_tokens→ OpenAI 无此接口
协议前缀仅用于入站侧。出站到上游 API 时使用目标协议原生路径(无前缀)。
2.3 请求处理流程
每个 HTTP 请求的转换流程:
客户端入站 SDK 内部处理 上游出站
┌──────────────────┐ ┌──────────────────┐
│ URL: │ 1. 前缀提取: /anthropic/ → source=anthropic │ URL: │
│ /anthropic/ │ 2. 前缀剥离: /v1/messages │ /v1/chat/ │
│ v1/messages │ 3. 接口识别: CHAT │ completions │
│ Headers: │ 4. 同协议? ──yes──▶ 剥离前缀后直接转发 │ Headers: │
│ x-api-key │ └──no──▶ 继续转换 │ Authorization │
│ Body: │ 5. URL 映射: /v1/messages → /v1/chat/completions │ Body: │
│ {model,system, │ 6. Header 转换: x-api-key → Authorization: Bearer │ {model,messages,│
│ messages,...} │ 7. Body 转换: Anthropic → Canonical → OpenAI │ tools,...} │
└──────────────────┘ └──────────────────┘
响应方向同理(含流式)。
同协议透传:source == target 时,仅剥离前缀后原样转发到上游。 未知接口透传:无法识别的路径,URL+Header 适配后 Body 原样转发。
3. 接口体系分层
3.1 分层策略
| 层级 | 接口 | 转换方式 |
|---|---|---|
| 核心层 | Chat Completions, Messages | Canonical Model 深度转换 |
| 扩展层 | /models, /embeddings, /files, /rerank, /count_tokens | 轻量级字段映射 |
| 透传层 | /batches, /fine-tuning, /audio/, /images/, /video/*, /moderations 等未知路径 | URL+Header 适配后 Body 原样转发 |
接口纳入原则:只有 ≥2 个协议提供相同功能的接口才纳入转换层(核心层或扩展层),其余走透传。
尽力转换策略:
- 双方都支持且可映射 → 完整转换(如 /models)
- 目标协议不支持该接口 → 透传到上游或返回空响应/错误(可配置)
- 未知接口 → 始终透传
3.2 OpenAI 接口体系
OpenAI API
├── /v1/chat/completions [核心层]
├── /v1/models [扩展层]
├── /v1/models/{model} [扩展层]
├── /v1/embeddings [扩展层]
├── /v1/files [扩展层]
├── /v1/files/{id}/content [扩展层]
├── /v1/rerank [扩展层]
├── /v1/batches [透传层]
├── /v1/fine_tuning/jobs [透传层]
├── /v1/audio/* [透传层]
├── /v1/images/* [透传层]
├── /v1/moderations [透传层]
├── /v1/realtime [透传层]
└── /v1/* [透传层]
3.3 Anthropic 接口体系
Anthropic API
├── /v1/messages [核心层]
├── /v1/messages/count_tokens [扩展层]
├── /v1/models [扩展层]
├── /v1/batches [扩展层]
└── /v1/* [透传层]
4. Canonical Model(规范模型)
协议无关的统一内部表示,Hub-and-Spoke 架构的枢纽。核心层使用完整 Canonical Model 深度转换;扩展层使用轻量 Canonical Models 做字段映射;透传层不使用。
设计原则:超集设计、类型化 discriminated union、前向兼容。Canonical Model 面向当前已适配协议(OpenAI、Anthropic)的公共语义进行抽象,字段随协议扩展演进(详见附录 D:字段晋升规范)。
4.1 CanonicalRequest
CanonicalRequest {
model: String
system: Union<None, String, Array<SystemBlock>>
SystemBlock {
text: String
}
messages: Array<CanonicalMessage>
tools: Option<Array<CanonicalTool>>
tool_choice: Option<ToolChoice>
parameters: RequestParameters
thinking: Option<ThinkingConfig>
stream: Boolean
user_id: Option<String>
output_format: Option<OutputFormat>
parallel_tool_use: Option<Boolean> // true = 允许并行, false = 禁止并行
}
4.2 RequestParameters
RequestParameters {
max_tokens: Option<Integer>
temperature: Option<Float>
top_p: Option<Float>
stop_sequences: Option<Array<String>>
}
4.3 CanonicalMessage / ContentBlock
CanonicalMessage {
role: Enum<system, user, assistant, tool>
content: Array<ContentBlock>
}
ContentBlock = Union<
TextBlock, { type: "text", text: String }
ToolUseBlock, { type: "tool_use", id: String, name: String, input: Object }
ToolResultBlock, { type: "tool_result", tool_use_id: String,
content: Union<String, Array<ContentBlock>>,
is_error: Option<Boolean> }
ThinkingBlock, { type: "thinking", thinking: String }
ImageBlock, { type: "image", source: ... } // 多模态预留
AudioBlock, { type: "audio", source: ... } // 多模态预留
VideoBlock, { type: "video", source: ... } // 多模态预留
FileBlock { type: "file", source: ... } // 多模态预留
>
4.4 CanonicalTool / ToolChoice
CanonicalTool {
name: String
description: Option<String>
input_schema: Object
}
ToolChoice = Union<
{type: "auto"},
{type: "none"},
{type: "any"},
{type: "tool", name: String}
>
4.5 ThinkingConfig
ThinkingConfig {
type: Enum<enabled, disabled> // Anthropic 启用时 budget_tokens 必填
budget_tokens: Option<Integer> // Anthropic: thinking tokens 不计入 max_tokens
effort: Option<Enum<low, medium, high, xhigh>> // OpenAI
}
4.6 OutputFormat
OutputFormat = Union<
{ type: "json_object" },
{ type: "json_schema", json_schema: { name: String, schema: Object, strict: Option<Boolean> } },
{ type: "text" }
>
4.7 CanonicalResponse
CanonicalResponse {
id: String
model: String
content: Array<ContentBlock>
stop_reason: Option<StopReason> // end_turn | max_tokens | tool_use | stop_sequence | content_filter
usage: CanonicalUsage
}
CanonicalUsage {
input_tokens: Integer
output_tokens: Integer
cache_read_tokens: Option<Integer>
cache_creation_tokens: Option<Integer>
reasoning_tokens: Option<Integer>
}
4.7 CanonicalStreamEvent
采用 Anthropic 风格的类型化事件模型(比 OpenAI delta 模型语义更明确,双向转换更容易):
CanonicalStreamEvent = Union<
MessageStartEvent, { type: "message_start", message: {id, model, usage} }
ContentBlockStartEvent, { type: "content_block_start", index, content_block }
ContentBlockDeltaEvent, { type: "content_block_delta", index, delta }
ContentBlockStopEvent, { type: "content_block_stop", index }
MessageDeltaEvent, { type: "message_delta", delta: {stop_reason}, usage }
MessageStopEvent, { type: "message_stop" }
ErrorEvent, { type: "error", error: {type, message} }
PingEvent { type: "ping" }
>
content_block: Union<
{type: "text", text: ""},
{type: "tool_use", id, name, input: {}},
{type: "thinking", thinking: ""},
{type: "image"} | {type: "audio"} | {type: "video"} // 预留
>
delta: Union<
{type: "text_delta", text},
{type: "input_json_delta", partial_json},
{type: "thinking_delta", thinking},
{type: "image_delta", data} | {type: "audio_delta", data} // 预留
>
4.8 扩展层 Canonical Models
// /models
CanonicalModelList { models: Array<CanonicalModel> }
CanonicalModel { id, name, created, owned_by }
// /embeddings
CanonicalEmbeddingRequest { model, input, encoding_format?, dimensions? }
CanonicalEmbeddingResponse { data: [{index, embedding}], model, usage }
// /count_tokens
CanonicalTokenCountRequest { model, messages, system?, tools? }
CanonicalTokenCountResponse { input_tokens }
// /files
CanonicalFileObject { id, object, filename?, bytes?, created_at?, purpose?, status? }
// /rerank
CanonicalRerankRequest { model, query, documents, top_n?, return_documents? }
CanonicalRerankResponse { results: [{index, relevance_score, document?}], model }
4.9 设计说明
system 为什么独立于 messages?
Anthropic 用顶层 system 字段,OpenAI 用 messages[role="system"]。独立 system 语义更清晰,Decoder 提取、Encoder 注入。
流式事件为什么用 Anthropic 风格? 类型化事件有显式的 start/stop 生命周期,语义明确;OpenAI delta 模型需要状态机推断语义,双向转换更复杂。
user_id 为什么从 metadata 中提升?
OpenAI 用顶层 user,Anthropic 用 metadata.user_id。两者语义相同,提升为独立字段更清晰。
output_format 为什么替代 response_format?
OpenAI 的 response_format 和 Anthropic 新模型的 output_format 表达相同意图(控制输出格式),统一为 output_format 避免命名偏向。
parallel_tool_use 为什么独立为顶层字段?
OpenAI 用 parallel_tool_calls(允许),Anthropic 用 disable_parallel_tool_use(禁止),语义反转但表达相同控制。统一为正向语义。
5. Protocol Adapter 接口
5.1 TargetProvider
每次转换调用传入的目标上游信息。Adapter 通过 Provider 获取认证和配置,不需要理解其他协议的 Header 格式。
TargetProvider {
base_url: String // 上游 API 地址
api_key: String // 上游认证密钥
model_name: String // 目标模型名(调用方完成映射后传入)
adapter_config: Map<String, Any> // 协议专属配置(如 anthropic-version)
}
adapter_config 由各 Adapter 自行定义所需的 key,引擎不感知含义,原样透传。示例:
- Anthropic:
{ "anthropic_version": "2023-06-01", "anthropic_beta": ["..."] } - OpenAI:
{ "organization": "org-xxx" }或空
5.2 ProtocolAdapter
每种协议的完整适配器,融入所有接口类型的处理能力。所有 Adapter 通过代码注册,不支持动态增减。
interface ProtocolAdapter {
protocolName(): String
protocolVersion(): String
supportsPassthrough(): Boolean // 同协议透传开关,默认 true
// HTTP 级别
mapUrl(nativePath: String, interfaceType: InterfaceType): Option<String>
buildHeaders(provider: TargetProvider): Map<String, String>
supportsInterface(interfaceType: InterfaceType): Boolean
// 核心层:Chat
decodeRequest(raw): CanonicalRequest
encodeRequest(canonical, provider): RawRequest
decodeResponse(raw): CanonicalResponse
encodeResponse(canonical): RawResponse
// 核心层:流式
createStreamDecoder(): StreamDecoder
createStreamEncoder(): StreamEncoder
// 错误编码
encodeError(error: ConversionError): RawResponse
// 扩展层
decodeModelsResponse(raw): CanonicalModelList
encodeModelsResponse(canonical): RawResponse
decodeEmbeddingRequest(raw): CanonicalEmbeddingRequest
encodeEmbeddingRequest(canonical, provider): RawRequest
decodeEmbeddingResponse(raw): CanonicalEmbeddingResponse
encodeEmbeddingResponse(canonical): RawResponse
decodeTokenCountRequest(raw): CanonicalTokenCountRequest
encodeTokenCountRequest(canonical, provider): RawRequest
decodeTokenCountResponse(raw): CanonicalTokenCountResponse
encodeTokenCountResponse(canonical): RawResponse
encodeFileListResponse(canonical): RawResponse
decodeRerankRequest(raw): CanonicalRerankRequest
encodeRerankRequest(canonical, provider): RawRequest
decodeRerankResponse(raw): CanonicalRerankResponse
encodeRerankResponse(canonical): RawResponse
}
buildHeaders vs 原 mapHeaders:Adapter 只需从 provider 中提取自己协议需要的认证和配置信息,构建自己的 Header 格式。不再需要理解其他协议的 Header。
5.3 InterfaceType
InterfaceType = Enum<
CHAT, MODELS, MODEL_INFO, EMBEDDINGS, FILES, RERANK,
COUNT_TOKENS, BATCHES, FINE_TUNING,
AUDIO, IMAGES, REALTIME, UNKNOWN
>
注:MODERATIONS 移除,仅 OpenAI 单协议支持,走透传层。
5.4 StreamDecoder / StreamEncoder
interface StreamDecoder {
processChunk(rawChunk): Array<CanonicalStreamEvent>
flush(): Array<CanonicalStreamEvent>
}
interface StreamEncoder {
encodeEvent(event): Array<RawSSEChunk>
flush(): Array<RawSSEChunk>
}
5.5 AdapterRegistry
interface AdapterRegistry {
register(adapter: ProtocolAdapter): void
get(protocolName: String): ProtocolAdapter
listProtocols(): Array<String>
}
协议识别规则:URL 第一个路径段即协议标识。
/openai/v1/... → "openai"
/anthropic/v1/... → "anthropic"
/gemini/v1beta/... → "gemini" (未来)
未知前缀 → null, 上层框架自行处理
6. Conversion Engine(转换引擎)
6.1 ConversionEngine
class ConversionEngine {
registry: AdapterRegistry
middlewareChain: MiddlewareChain
registerAdapter(adapter): void
use(middleware): void
isPassthrough(source, target): Boolean {
return source == target && registry.get(source).supportsPassthrough()
}
// 非流式请求转换
convertHttpRequest(request, sourceProtocol, targetProtocol, provider): HttpRequest {
nativePath = stripProtocolPrefix(request.url)
interfaceType = detectInterfaceType(nativePath)
if isPassthrough(sourceProtocol, targetProtocol):
targetAdapter = registry.get(targetProtocol)
return {url: provider.base_url + nativePath, method: request.method,
headers: targetAdapter.buildHeaders(provider), body: request.body}
sourceAdapter = registry.get(sourceProtocol)
targetAdapter = registry.get(targetProtocol)
targetUrl = targetAdapter.mapUrl(nativePath, interfaceType) ?? nativePath
targetHeaders = targetAdapter.buildHeaders(provider)
targetBody = convertBody(interfaceType, sourceAdapter, targetAdapter, provider, request.body)
return {url: provider.base_url + targetUrl, method: request.method,
headers: targetHeaders, body: targetBody}
}
// 非流式响应转换
convertHttpResponse(response, sourceProtocol, targetProtocol, interfaceType): HttpResponse {
if isPassthrough(sourceProtocol, targetProtocol): return response
sourceAdapter = registry.get(sourceProtocol)
targetAdapter = registry.get(targetProtocol)
targetBody = convertResponseBody(interfaceType, sourceAdapter, targetAdapter, response.body)
return {status: response.status, headers: response.headers, body: targetBody}
}
// 流式转换
createStreamConverter(sourceProtocol, targetProtocol, provider): StreamConverter {
if isPassthrough(sourceProtocol, targetProtocol):
targetAdapter = registry.get(targetProtocol)
return new PassthroughStreamConverter(targetAdapter.buildHeaders(provider))
source = registry.get(sourceProtocol)
target = registry.get(targetProtocol)
return new CanonicalStreamConverter(
source.createStreamDecoder(), target.createStreamEncoder(), middlewareChain)
}
}
6.2 Body 转换分发
function convertBody(interfaceType, sourceAdapter, targetAdapter, provider, body):
switch interfaceType:
case CHAT:
canonical = sourceAdapter.decodeRequest(body)
canonical = middlewareChain.apply(canonical)
return targetAdapter.encodeRequest(canonical, provider)
case MODELS:
return body // GET 请求,无 body
case EMBEDDINGS:
if !sourceAdapter.supportsInterface(EMBEDDINGS)
|| !targetAdapter.supportsInterface(EMBEDDINGS):
return body // 尽力转换:不支持则透传
return targetAdapter.encodeEmbeddingRequest(
sourceAdapter.decodeEmbeddingRequest(body), provider)
case RERANK:
// 同 EMBEDDINGS 模式
case COUNT_TOKENS:
// 同 EMBEDDINGS 模式
default:
return body // 透传层:原样转发
function convertResponseBody(interfaceType, sourceAdapter, targetAdapter, body):
// 结构与 convertBody 对称,CHAT 走 Canonical 深度转换,扩展层走轻量映射,默认透传
6.3 StreamConverter
interface StreamConverter {
processChunk(rawChunk): Array<RawSSEChunk>
flush(): Array<RawSSEChunk>
}
class PassthroughStreamConverter implements StreamConverter {
headers: Map<String, String>
constructor(headers) { this.headers = headers }
processChunk(rawChunk): Array<RawSSEChunk> { return [rawChunk] }
flush(): Array<RawSSEChunk> { return [] }
}
class CanonicalStreamConverter implements StreamConverter {
decoder: StreamDecoder
encoder: StreamEncoder
middleware: MiddlewareChain
processChunk(rawChunk):
events = decoder.processChunk(rawChunk).map(e => middleware.applyStreamEvent(e))
return events.flatMap(e => encoder.encodeEvent(e))
flush():
return decoder.flush().flatMap(e => encoder.encodeEvent(e)) + encoder.flush()
}
6.4 Middleware
引擎内部的拦截钩子,在 decode → encode 之间对 Canonical 进行变换。
interface ConversionMiddleware {
intercept(canonical, sourceProtocol, targetProtocol, context): canonical | error
interceptStreamEvent?(event, sourceProtocol, targetProtocol, context): event | error
}
ConversionContext { conversionId, interfaceType, timestamp, metadata }
intercept返回修改后的 canonical,或返回 ConversionError 以中断转换interceptStreamEvent同理,返回错误可中断流式转换- 多个 Middleware 按注册顺序链式执行,任一中断则后续不再执行
6.5 使用示例
engine = new ConversionEngine()
engine.registerAdapter(new OpenAIAdapter())
engine.registerAdapter(new AnthropicAdapter())
// 场景1: OpenAI→Anthropic Chat
// 入站: /openai/v1/chat/completions
provider = TargetProvider {
base_url: "https://api.anthropic.com",
api_key: "sk-ant-xxx",
model_name: "claude-sonnet-4-20250514",
adapter_config: { "anthropic_version": "2023-06-01" }
}
out = engine.convertHttpRequest(inRequest, "openai", "anthropic", provider)
// 出站 /v1/messages, headers: x-api-key + anthropic-version
// 场景2: /models 跨协议
// 入站: /anthropic/v1/models
provider = TargetProvider {
base_url: "https://api.openai.com",
api_key: "sk-xxx", model_name: "", adapter_config: {}
}
out = engine.convertHttpRequest(inRequest, "anthropic", "openai", provider)
// source=anthropic, 出站 /v1/models + Authorization: Bearer
// 场景3: 同协议透传
// 入站: /openai/v1/chat/completions
provider = TargetProvider {
base_url: "https://api.openai.com",
api_key: "sk-xxx", model_name: "", adapter_config: {}
}
out = engine.convertHttpRequest(inRequest, "openai", "openai", provider)
// source=openai == target=openai → 剥离前缀, 用 provider 重建 headers 后原样转发
// 场景4: 流式转换
converter = engine.createStreamConverter("anthropic", "openai", provider)
for chunk in upstreamSSE {
for out in converter.processChunk(chunk) { sendToClient(out) }
}
converter.flush()
7. 流式转换架构
7.1 转换管道
上游 SSE 流
│
├── 同协议: PassthroughStreamConverter(用 provider 重建 Headers 后逐块转发)
│
└── 跨协议: CanonicalStreamConverter
StreamDecoder StreamEncoder
┌───────────┐ ┌───────────┐
│ SSE Parser│ │SSE Writer │
└─────┬─────┘ └─────▲─────┘
│ │
┌─────▼─────┐ CanonicalEvent[] ┌─────┴─────┐
│ Event │──────────────────────▶│ Event │
│ Translator │ ┌──────────┐ │ Translator │
│ (状态机) │ │Middleware│ │ │
└───────────┘ └──────────┘ └───────────┘
7.2 StreamDecoder 状态
StreamDecoderState {
messageStarted: Boolean
openBlocks: Set<Integer>
currentBlockType: Map<Integer, String>
currentBlockId: Map<Integer, String>
toolCallIdMap: Map<Integer, String> // OpenAI 特有
toolCallNameMap: Map<Integer, String> // OpenAI 特有
toolCallArguments: Map<Integer, StringBuffer> // OpenAI 特有
utf8Remainder: Option<ByteArray> // UTF-8 跨 chunk 安全
accumulatedUsage: Option<CanonicalUsage>
}
7.3 事件映射
Anthropic SSE ↔ Canonical:几乎 1:1 映射,事件类型完全对应。
OpenAI SSE → Canonical(需要状态机):
| OpenAI chunk | Canonical 事件 |
|---|---|
| 首个 chunk (id/model) | MessageStartEvent |
| delta.content 首次 | ContentBlockStart(text) + ContentBlockDelta(text_delta) |
| delta.content 后续 | ContentBlockDelta(text_delta) |
| delta.tool_calls[i] 首次 | ContentBlockStart(tool_use) |
| delta.tool_calls[i].arguments | ContentBlockDelta(input_json_delta) |
| delta.reasoning_content | ContentBlockStart(thinking) + ContentBlockDelta(thinking_delta) |
| finish_reason | ContentBlockStop × N + MessageDeltaEvent + MessageStopEvent |
| [DONE] | flush() |
Canonical → OpenAI SSE(Encoder 端):
| Canonical 事件 | OpenAI chunk |
|---|---|
| MessageStartEvent | 首个 chunk (id, model, delta:{}) |
| ContentBlockStart(text) | 缓冲,不输出 |
| ContentBlockDelta(text) | {choices:[{delta:{content:"..."}}]}(首次合并 block_start) |
| ContentBlockStart(tool) | 缓冲 |
| ContentBlockDelta(input_json) | {choices:[{delta:{tool_calls:[...]}}]}(首次含 id/name) |
| ContentBlockStop | 不输出 |
| MessageDeltaEvent | {choices:[{finish_reason:"..."}]} |
| MessageStopEvent | [DONE] |
| PingEvent | 丢弃 |
8. OpenAI 协议适配器
8.1 URL 路径映射
OpenAI.mapUrl(nativePath, interfaceType):
switch interfaceType:
case CHAT: return "/v1/chat/completions"
case MODELS: return "/v1/models"
case MODEL_INFO: return "/v1/models/{modelId}"
case EMBEDDINGS: return "/v1/embeddings"
case FILES: return nativePath
case RERANK: return "/v1/rerank"
default: return nativePath
8.2 请求头构建
OpenAI.buildHeaders(provider):
result = {}
result["Authorization"] = "Bearer " + provider.api_key
if provider.adapter_config["organization"]:
result["OpenAI-Organization"] = provider.adapter_config["organization"]
result["Content-Type"] = "application/json"
return result
8.3 接口能力声明
OpenAI.supportsInterface(type):
CHAT...IMAGES: return true // 全部支持
COUNT_TOKENS: return false // OpenAI 无此接口
default: return false
8.4 Chat 请求 Decoder/Encoder
Decoder: decodeRequest → CanonicalRequest
decodeSystemPrompt(messages):
systemMsgs = messages.filter(m => m.role == "system")
remaining = messages.filter(m => m.role != "system")
if systemMsgs.length == 0: return {system: None, messages: remaining}
return {system: systemMsgs.map(m => extractText(m.content)).join("\n\n"), messages: remaining}
decodeMessage(msg):
switch msg.role:
case "user":
return {role: "user", content: decodeUserContent(msg.content)}
case "assistant":
blocks = []
if msg.content: blocks.append({type: "text", text: extractText(msg.content)})
if msg.refusal: blocks.append({type: "text", text: msg.refusal})
if msg.tool_calls:
for tc in msg.tool_calls:
blocks.append({type: "tool_use", id: tc.id, name: tc.function.name,
input: JSON.parse(tc.function.arguments)})
return {role: "assistant", content: blocks}
case "tool":
return {role: "tool", content: [{
type: "tool_result", tool_use_id: msg.tool_call_id,
content: msg.content, is_error: false}]}
Encoder: encodeRequest ← CanonicalRequest
encodeRequest(canonical, provider):
result = {
model: provider.model_name,
messages: encodeSystemPrompt(canonical) + canonical.messages.flatMap(encodeMessage),
max_tokens: canonical.parameters.max_tokens,
temperature: canonical.parameters.temperature,
top_p: canonical.parameters.top_p,
stream: canonical.stream
}
if canonical.parameters.stop_sequences:
result.stop = canonical.parameters.stop_sequences
if canonical.user_id:
result.user = canonical.user_id
if canonical.output_format:
result.response_format = encodeOutputFormat(canonical.output_format)
if canonical.parallel_tool_use != null:
result.parallel_tool_calls = canonical.parallel_tool_use
if canonical.tools:
result.tools = canonical.tools.map(t => ({
type: "function", function: {name: t.name, description: t.description, parameters: t.input_schema}}))
if canonical.tool_choice:
result.tool_choice = encodeToolChoice(canonical.tool_choice)
if canonical.thinking && canonical.thinking.type == "enabled":
result.reasoning_effort = canonical.thinking.effort ?? "medium"
return result
encodeOutputFormat(format):
switch format.type:
"json_object" → {type: "json_object"}
"json_schema" → {type: "json_schema", json_schema: format.json_schema}
"text" → null
encodeToolChoice(choice):
switch choice.type:
"auto" → "auto"
"none" → "none"
"any" → "required"
"tool" → {type: "function", function: {name: choice.name}}
8.5 Chat 响应 Decoder/Encoder
// Decoder
mapStopReason(reason):
"stop" → "end_turn" | "length" → "max_tokens" | "tool_calls" → "tool_use" | "content_filter" → "content_filter" | _ → "end_turn"
decodeResponse(openaiResp):
choice = openaiResp.choices[0]
blocks = []
if choice.message.content: blocks.append({type: "text", text: choice.message.content})
if choice.message.refusal: blocks.append({type: "text", text: choice.message.refusal})
if choice.message.reasoning_content:
blocks.append({type: "thinking", thinking: choice.message.reasoning_content})
if choice.message.tool_calls:
for tc: blocks.append({type: "tool_use", id: tc.id, name: tc.function.name,
input: JSON.parse(tc.function.arguments)})
return CanonicalResponse {id, model, content: blocks, stop_reason: mapStopReason(choice.finish_reason), usage: decodeUsage(openaiResp.usage)}
// Encoder
mapCanonicalToFinishReason(reason):
"end_turn" → "stop" | "max_tokens" → "length" | "tool_use" → "tool_calls" | "content_filter" → "content_filter" | _ → "stop"
encodeResponse(canonical):
textParts = canonical.content.filter(b => b.type == "text")
toolUses = canonical.content.filter(b => b.type == "tool_use")
message = {}
if textParts.length > 0: message.content = textParts.map(b => b.text).join("")
elif toolUses.length > 0: message.content = null
else: message.content = ""
if toolUses.length > 0:
message.tool_calls = toolUses.map(tu => ({
id: tu.id, type: "function",
function: {name: tu.name, arguments: JSON.stringify(tu.input)}}))
return {id: canonical.id, object: "chat.completion", model: canonical.model,
choices: [{index: 0, message, finish_reason: mapCanonicalToFinishReason(canonical.stop_reason)}],
usage: encodeUsage(canonical.usage)}
8.6 /models 响应编解码
// Decoder: OpenAI → Canonical
decodeModelsResponse(openaiResp):
return CanonicalModelList {
models: openaiResp.data.map(m => CanonicalModel {
id: m.id, name: m.id, created: m.created, owned_by: m.owned_by })}
// Encoder: Canonical → OpenAI
encodeModelsResponse(canonical):
return {object: "list",
data: canonical.models.map(m => ({id: m.id, object: "model",
created: m.created ?? 0, owned_by: m.owned_by ?? "unknown"}))}
跨协议示例(入站 /openai/v1/models,目标 Anthropic):
入站: GET /openai/v1/models, Authorization: Bearer sk-xxx
→ source=openai, target=anthropic
→ URL: /v1/models, Headers: x-api-key: sk-xxx, anthropic-version: ...
Anthropic 上游响应: {data: [{id: "claude-sonnet-4", display_name: "Claude Sonnet 4", ...}], has_more: false}
→ Anthropic.decodeModelsResponse → CanonicalModelList
→ OpenAI.encodeModelsResponse
返回客户端: {object: "list", data: [{id: "claude-sonnet-4", object: "model", created: 0, owned_by: "anthropic"}]}
9. Anthropic 协议适配器
9.1 URL 路径映射
Anthropic.mapUrl(nativePath, interfaceType):
switch interfaceType:
case CHAT: return "/v1/messages"
case MODELS: return "/v1/models"
case MODEL_INFO: return "/v1/models/{modelId}"
case COUNT_TOKENS: return "/v1/messages/count_tokens"
case EMBEDDINGS: return null // 不支持
case RERANK: return null
case FILES: return null
default: return nativePath
9.2 请求头构建
Anthropic.buildHeaders(provider):
result = {}
result["x-api-key"] = provider.api_key
result["anthropic-version"] = provider.adapter_config["anthropic_version"] ?? "2023-06-01"
if provider.adapter_config["anthropic_beta"]:
result["anthropic-beta"] = provider.adapter_config["anthropic_beta"].join(",")
result["Content-Type"] = "application/json"
return result
9.3 接口能力声明
Anthropic.supportsInterface(type):
CHAT: return true
MODELS: return true
COUNT_TOKENS: return true
BATCHES: return true
default: return false
9.4 Chat 请求 Decoder/Encoder
Decoder
decodeSystem(system):
if system is None: return None
if system is String: return system
return system.map(s => SystemBlock {text: s.text})
decodeMessage(msg):
switch msg.role:
case "user":
blocks = decodeContentBlocks(msg.content)
toolResults = blocks.filter(b => b.type == "tool_result")
others = blocks.filter(b => b.type != "tool_result")
if toolResults.length > 0:
return [
...(others.length > 0 ? [{role: "user", content: others}] : []),
{role: "tool", content: toolResults}]
return [{role: "user", content: blocks}]
case "assistant":
return [{role: "assistant", content: decodeContentBlocks(msg.content)}]
decodeContentBlocks(content):
if content is String: return [{type: "text", text: content}]
return content.map(block => {
switch block.type:
"text" → TextBlock{text: block.text}
"tool_use" → ToolUseBlock{id: block.id, name: block.name, input: block.input}
"tool_result" → ToolResultBlock{tool_use_id: block.tool_use_id, ...}
"thinking" → ThinkingBlock{thinking: block.thinking}
"redacted_thinking" → 丢弃 }) // 仅 Anthropic 使用,不在中间层保留
// 额外字段提取
decodeExtras(raw):
user_id = raw.metadata?.user_id
output_format = null // Anthropic 旧模型无 output_format,新模型通过 output_config 提取
parallel_tool_use = raw.disable_parallel_tool_use == true ? false : null
thinking = raw.thinking ? ThinkingConfig {
type: raw.thinking.type, // "enabled" | "disabled"
budget_tokens: raw.thinking.budget_tokens,
effort: null } : null
Encoder
encodeRequest(canonical, provider):
result = {
model: provider.model_name,
messages: encodeMessages(canonical),
max_tokens: canonical.parameters.max_tokens,
temperature: canonical.parameters.temperature,
top_p: canonical.parameters.top_p,
stream: canonical.stream
}
if canonical.system:
result.system = encodeSystem(canonical.system)
if canonical.parameters.stop_sequences:
result.stop_sequences = canonical.parameters.stop_sequences
if canonical.user_id:
result.metadata = {user_id: canonical.user_id}
if canonical.output_format:
result.output_format = encodeOutputFormat(canonical.output_format)
if canonical.parallel_tool_use == false:
result.disable_parallel_tool_use = true
if canonical.tools:
result.tools = canonical.tools.map(t => ({
name: t.name, description: t.description, input_schema: t.input_schema}))
if canonical.tool_choice:
result.tool_choice = encodeToolChoice(canonical.tool_choice)
if canonical.thinking:
result.thinking = {type: canonical.thinking.type}
if canonical.thinking.budget_tokens:
result.thinking.budget_tokens = canonical.thinking.budget_tokens
return result
encodeSystem(system):
if system is String: return system
return system.map(s => ({text: s.text}))
encodeToolChoice(choice):
switch choice.type:
"auto" → {type: "auto"}
"none" → {type: "none"}
"any" → {type: "any"}
"tool" → {type: "tool", name: choice.name}
9.5 Chat 响应 Decoder/Encoder
// Decoder
decodeResponse(anthropicResp):
blocks = []
for block in anthropicResp.content:
switch block.type:
"text" → blocks.append({type: "text", text: block.text})
"tool_use" → blocks.append({type: "tool_use", id: block.id, name: block.name, input: block.input})
"thinking" → blocks.append({type: "thinking", thinking: block.thinking})
"redacted_thinking" → 丢弃 // 仅 Anthropic 使用,不在中间层保留
return CanonicalResponse {id, model, content: blocks, stop_reason: anthropicResp.stop_reason,
usage: CanonicalUsage {input_tokens, output_tokens,
cache_read_tokens: anthropicResp.usage.cache_read_input_tokens,
cache_creation_tokens: anthropicResp.usage.cache_creation_input_tokens}}
// Encoder
encodeResponse(canonical):
blocks = canonical.content.map(block => {
switch block.type:
"text" → {type: "text", text: block.text}
"tool_use" → {type: "tool_use", id: block.id, name: block.name, input: block.input}
"thinking" → {type: "thinking", thinking: block.thinking}})
return {id: canonical.id, type: "message", role: "assistant", model: canonical.model,
content: blocks,
stop_reason: canonical.stop_reason == "content_filter" ? "end_turn" : canonical.stop_reason,
stop_sequence: None,
usage: {input_tokens: canonical.usage.input_tokens, output_tokens: canonical.usage.output_tokens,
cache_read_input_tokens: canonical.usage.cache_read_tokens,
cache_creation_input_tokens: canonical.usage.cache_creation_tokens}}
// 错误编码
encodeError(error):
return {type: "error", error: {type: error.code, message: error.message}}
9.6 /models 响应编解码
// Decoder: Anthropic → Canonical
decodeModelsResponse(anthropicResp):
return CanonicalModelList {
models: anthropicResp.data.map(m => CanonicalModel {
id: m.id, name: m.display_name ?? m.id, created: m.created_at,
owned_by: "anthropic"})}
// Encoder: Canonical → Anthropic
encodeModelsResponse(canonical):
return {data: canonical.models.map(m => ({
id: m.id,
display_name: m.name ?? m.id,
created_at: m.created ?? 0, type: "model"})),
has_more: false,
first_id: canonical.models[0]?.id, last_id: canonical.models.last?.id}
9.7 /count_tokens 编解码
// Decoder
decodeTokenCountRequest(raw):
return CanonicalTokenCountRequest {
model: raw.model, messages: decodeMessages(raw.messages),
system: decodeSystem(raw.system), tools: decodeTools(raw.tools)}
// Encoder
encodeTokenCountRequest(canonical):
return {model: canonical.model, messages: encodeMessages(canonical),
system: encodeSystem(canonical), tools: encodeTools(canonical.tools)}
decodeTokenCountResponse(raw):
return CanonicalTokenCountResponse {input_tokens: raw.input_tokens}
encodeTokenCountResponse(canonical):
return {input_tokens: canonical.input_tokens}
跨协议对接策略(入站 /anthropic/v1/messages/count_tokens,目标 OpenAI):
OpenAI 不提供 /count_tokens 接口。Anthropic Claude Code 启动时会调用此接口。
| 策略 | 实现 |
|---|---|
| 透传 | URL+Header 适配后发送到 OpenAI,由上游返回 404 |
| 模拟(推荐) | 解码请求,用估算器返回近似 token 数 |
| 精确估算 | 解码 messages,调用 OpenAI Chat Completions (max_tokens=1),从 usage.prompt_tokens 获取 |
策略选择通过配置或 Middleware 决定。
10. 字段映射参考
10.1 Chat 请求字段映射
| Canonical | OpenAI | Anthropic | 说明 |
|---|---|---|---|
system |
messages[0].role="system" |
system (顶层) |
位置差异 |
user_id |
user (顶层) |
metadata.user_id |
嵌套差异 |
output_format |
response_format |
output_format (新模型) |
字段名差异;旧 Anthropic 模型需降级 |
parallel_tool_use |
parallel_tool_calls (bool) |
disable_parallel_tool_use (bool, 反转) |
语义反转 |
ToolUseBlock |
tool_calls[{id,function}] |
content[{type:"tool_use"}] |
OpenAI 在 message 顶层 |
ToolResultBlock |
{role:"tool",tool_call_id} |
content[{type:"tool_result"}] in user |
结构性差异 |
tools[].input_schema |
tools[].function.parameters |
tools[].input_schema |
字段名不同 |
tool_choice: "any" |
"required" |
{type:"any"} |
语义映射 |
max_tokens |
max_tokens / max_completion_tokens |
max_tokens |
o-series 差异 |
stop_sequences |
stop (String or Array) |
stop_sequences (Array) |
OpenAI Decoder 规范化为 Array |
thinking.type |
— | thinking.type |
Anthropic 特有 |
thinking.effort |
reasoning_effort |
— | OpenAI 参数级 |
thinking.budget_tokens |
— | thinking.budget_tokens |
Anthropic token 级 |
10.2 Chat 响应字段映射
| Canonical | OpenAI | Anthropic |
|---|---|---|
stop_reason: "end_turn" |
finish_reason: "stop" |
stop_reason: "end_turn" |
stop_reason: "max_tokens" |
finish_reason: "length" |
stop_reason: "max_tokens" |
stop_reason: "tool_use" |
finish_reason: "tool_calls" |
stop_reason: "tool_use" |
ThinkingBlock |
reasoning_content (非流式) |
content[{type:"thinking"}] |
usage.input_tokens |
usage.prompt_tokens |
usage.input_tokens |
usage.output_tokens |
usage.completion_tokens |
usage.output_tokens |
usage.cache_read_tokens |
usage.prompt_tokens_details.cached_tokens |
usage.cache_read_input_tokens |
10.3 HTTP 头映射
Headers 由各 Adapter 的 buildHeaders(provider) 方法构建,从 provider.api_key 和 provider.adapter_config 获取信息,无需理解其他协议。
| 场景 | OpenAI | Anthropic |
|---|---|---|
| 认证 | Authorization: Bearer <key> |
x-api-key: <key> |
| 版本 | — | anthropic-version(从 adapter_config 读取) |
| Beta | — | anthropic-beta(从 adapter_config 读取) |
10.4 扩展层接口映射
/models
| 维度 | OpenAI | Anthropic |
|---|---|---|
| 响应格式 | {object:"list", data:[{id, object:"model", created, owned_by}]} |
{data:[{id, display_name, created_at, type}], has_more, first_id, last_id} |
| 映射 | id↔id, created↔created_at, owned_by→"anthropic", display_name→id |
/count_tokens
| 维度 | OpenAI | Anthropic |
|---|---|---|
| 存在性 | 不存在 | POST /v1/messages/count_tokens |
| 策略 | Anthropic→OpenAI: 模拟/估算 | 原生支持 |
/embeddings
| 维度 | OpenAI | Anthropic |
|---|---|---|
| 存在性 | POST /v1/embeddings |
不存在 |
| 策略 | 原生支持 | OpenAI→Anthropic: 返回不支持错误 |
11. 扩展点设计
11.1 新协议接入
- 实现 ProtocolAdapter(URL 映射 + Header 映射 + 各接口编解码)
- 注册到 AdapterRegistry
- 完成
11.2 多模态扩展
Canonical Model 已预留 ImageBlock / AudioBlock / VideoBlock / FileBlock。实现路径:
- 在各 ProtocolAdapter 中实现多模态块的编解码
- 在 StreamDecoder/StreamEncoder 中处理多模态增量数据
11.3 有状态特性扩展
interface StatefulMiddleware extends ConversionMiddleware {
stateStore: SessionStateStore
}
适用场景:Gemini thoughtSignature 跨轮次保留。
11.4 特性降级策略
| 源特性 | 目标协议 | 降级方式 |
|---|---|---|
output_format |
Anthropic(旧模型) | 注入合成工具实现 JSON 模式 |
thinking.effort |
Anthropic | 转为 thinking.budget_tokens |
thinking.budget_tokens |
OpenAI | 转为 reasoning_effort |
count_tokens |
OpenAI | 模拟/估算 |
/embeddings |
Anthropic | 返回不支持的错误 |
11.5 自定义接口支持
interface CustomInterfaceHandler {
interfaceType(): InterfaceType
matchUrl(url): Boolean
convertRequest(source, target, raw): raw
convertResponse(source, target, raw): raw
}
engine.registerCustomHandler(handler)
12. 错误处理
12.1 错误分类
ConversionError { code: ErrorCode, message, sourceProtocol?, targetProtocol?,
interfaceType?, details?, cause? }
ErrorCode = Enum<
INVALID_INPUT, // 请求格式不符合协议规范
MISSING_REQUIRED_FIELD, // 缺少必填字段
INCOMPATIBLE_FEATURE, // 特性在目标协议不可用
FIELD_MAPPING_FAILURE, // 字段映射逻辑错误
TOOL_CALL_PARSE_ERROR, // 工具调用参数解析失败
JSON_PARSE_ERROR, // JSON 解析失败
STREAM_STATE_ERROR, // 流式状态机异常
UTF8_DECODE_ERROR, // UTF-8 解码错误
PROTOCOL_CONSTRAINT_VIOLATION, // 违反协议约束
ENCODING_FAILURE, // 编码失败
INTERFACE_NOT_SUPPORTED // 目标协议不支持此接口
>
12.2 错误处理策略
ErrorHandler { mode: "strict" | "lenient" }
strict: 任何错误抛出异常
lenient: 尽力继续
INCOMPATIBLE_FEATURE → 降级继续
INTERFACE_NOT_SUPPORTED → 透传或返回空响应
TOOL_CALL_PARSE_ERROR → 保留原始内容继续
PROTOCOL_CONSTRAINT_VIOLATION → 自动修复
不支持接口的处理(INTERFACE_NOT_SUPPORTED):
| 策略 | 适用场景 | 实现 |
|---|---|---|
| 透传 | 上游可能有自己的实现 | URL+Header 适配后 Body 原样转发 |
| 返回空响应 | 不影响核心功能 | 返回空列表 {data: []} |
| 返回错误 | 客户端明确需要此功能 | 返回 501 或协议格式错误 |
具体策略通过配置或 Middleware 决定。
12.3 错误响应格式
转换失败时,错误响应用**客户端协议(source protocol)**的格式编码。由 sourceAdapter.encodeError(error) 完成:
- OpenAI 格式:
{error: {message, type, code, param}} - Anthropic 格式:
{type: "error", error: {type, message}}
Middleware 中断转换时同理,引擎调用 sourceAdapter.encodeError 将 ConversionError 编码为客户端可理解的格式。
13. 参考实现对比
13.1 关键差异
| 维度 | CC-Switch | LiteLLM | new-api | one-api | 本设计 |
|---|---|---|---|---|---|
| 定位 | 桌面代理 | 完整网关+SDK | 完整网关 | 完整网关 | HTTP 层 SDK |
| 架构 | 直接转换 | 策略+工厂 | Hub-and-Spoke (OpenAI) | Hub-and-Spoke (OpenAI) | Hub-and-Spoke (自定义) |
| 转换方向 | 双向(有限) | 单向→厂商 | 单向→OpenAI | 单向→OpenAI | 双向(任意协议对) |
| Canonical 模型 | 无 | 无 | OpenAI 格式 | OpenAI 格式 | 自定义超集,字段随协议扩展演进 |
| 厂商覆盖 | 2 | 100+ | 50+ | 56 渠道/19 API 类型 | 协议级(当前 2, 可扩展) |
| 管理接口 | /health, /status | 全接口 | 全接口 | 全接口 | /models, /embeddings, /count_tokens, /files, /rerank + 透传层 |
| 同协议透传 | Anthropic 直通 | 无 | 开关控制 | OpenAI 兼容渠道零拷贝 | 自动检测 |
| 协议前缀 | /claude/, /codex/, /gemini/ | 无 | 无 | 无 | /openai/, /anthropic/ |
| 流式处理 | 简单转发 | 简单解析 | Scanner+SSE | Scanner+SSE | StreamDecoder/Encoder 状态机 |
| 错误处理 | 基础 | 基础 | 统一格式归一化 | 多格式归一化 | strict/lenient 模式 + ErrorCode 枚举 |
| 扩展机制 | 硬编码 | 策略模式 | 实现 Adaptor 接口 | 实现 Adaptor 接口 | ProtocolAdapter + Middleware + 自定义接口 |
13.2 设计取舍
| 取舍 | 选择 | 理由 |
|---|---|---|
| HTTP 层 SDK vs 纯 chat 库 | HTTP 层 SDK | 编程工具需要管理接口 |
| 自定义 Canonical vs OpenAI | 自定义 | 避免厂商锁定 |
| 接口分层 vs 全部深度转换 | 分层 | 核心深度转换,扩展轻量映射,透传零转换 |
| 尽力转换 vs 严格转换 | 尽力转换 | 最大化覆盖面 |
| 透传 vs 总是 Canonical | 自动透传 + 可选禁用 | 同协议和未知接口的性能优化 |
13.3 关键经验
- CC-Switch: UTF-8 跨 chunk 截断处理必要;工具调用参数可能乱序到达;同协议直通是最高频路径
- LiteLLM: BaseConfig 抽象 + 编排器不变模式扩展性好;全接口覆盖对用户体验至关重要
- new-api: 多格式输入的 Hub-and-Spoke 已生产验证;/models 的协议感知转换价值最高
- one-api: 以 OpenAI 格式为 Canonical 的 Hub-and-Spoke 架构在 19 种 API 类型、56 个渠道的生产环境验证了可行性;大量厂商(DepSeek/Groq/Mistral/Moonshot 等)采用 OpenAI 兼容格式使得同协议透传成为最高频路径;适配器模式的 ChannelType→APIType→Adaptor 三层映射提供了厂商级精细控制但增加了维护复杂度
附录 A:完整转换流程示例
客户端发送:
POST /openai/v1/chat/completions HTTP/1.1 ← /openai/ 前缀标识源协议
Authorization: Bearer sk-xxx
Content-Type: application/json
{ "model": "gpt-4",
"messages": [
{"role": "system", "content": "You are a coding assistant"},
{"role": "user", "content": "Read the file main.py"},
{"role": "assistant", "content": null, "tool_calls": [{
"id": "call_abc123", "type": "function",
"function": {"name": "read_file", "arguments": "{\"path\": \"main.py\"}"}}]},
{"role": "tool", "tool_call_id": "call_abc123", "content": "print('hello')"},
{"role": "user", "content": "What does this code do?"}],
"tools": [{"type": "function", "function": {"name": "read_file", "parameters": {...}}}],
"max_tokens": 1024, "stream": true }
│
▼
engine.convertHttpRequest(request, "openai", "anthropic", provider)
provider = TargetProvider {
base_url: "https://api.anthropic.com",
api_key: "sk-ant-xxx",
model_name: "claude-sonnet-4-20250514",
adapter_config: { "anthropic_version": "2023-06-01" }
}
Step 0: 前缀 /openai/ → source=openai, 剥离后 /v1/chat/completions
Step 1: 接口识别 → CHAT
Step 2: URL 映射 → /v1/messages
Step 3: Header 构建 → x-api-key: sk-ant-xxx, anthropic-version: 2023-06-01
Step 4: Body 转换:
OpenAI.decodeRequest → CanonicalRequest {
model: "gpt-4",
system: "You are a coding assistant",
messages: [
{user, [{text, "Read the file main.py"}]},
{assistant, [{tool_use, id:"call_abc123", name:"read_file", input:{path:"main.py"}}]},
{tool, [{tool_result, tool_use_id:"call_abc123", content:"print('hello')"}]},
{user, [{text, "What does this code do?"}]}],
parameters: {max_tokens: 1024},
stream: true }
Anthropic.encodeRequest(canonical, provider) → {
"model": "claude-sonnet-4-20250514", ← 使用 provider.model_name
"system": "You are a coding assistant",
"messages": [
{"role": "user", "content": [{"type": "text", "text": "Read the file main.py"}]},
{"role": "assistant", "content": [{"type": "tool_use", "id": "call_abc123", ...}]},
{"role": "user", "content": [{"type": "tool_result", "tool_use_id": "call_abc123", ...}]},
{"role": "user", "content": [{"type": "text", "text": "What does this code do?"}]}],
"max_tokens": 1024, "stream": true }
发往 Anthropic 上游:
POST https://api.anthropic.com/v1/messages HTTP/1.1 ← provider.base_url + 目标路径
x-api-key: sk-ant-xxx
anthropic-version: 2023-06-01
附录 B:模块依赖
┌──────────────────────────────────────────────────┐
│ ConversionEngine │
│ 门面:HTTP 转换 / 协议路由 / 透传判断 / 流式转换 │
│ ┌────────────────────────────────────────────┐ │
│ │ Protocol Prefix Router │ │
│ │ URL 前缀 → source protocol │ │
│ └────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────┤
│ TargetProvider │
│ base_url / api_key / model_name / adapter_config │
├──────────────────┬───────────────────────────────┤
│ AdapterRegistry │ MiddlewareChain │
├──────────────────┴───────────────────────────────┤
│ StreamConverter: Passthrough | Canonical │
├──────────────────────────────────────────────────┤
│ ProtocolAdapter: OpenAI | Anthropic | Future... │
│ · buildHeaders(provider) · URL 映射 │
│ · Chat/Models/Embeddings/Rerank/... 编解码 │
│ · encodeError · StreamDecoder / StreamEncoder │
├──────────────────────────────────────────────────┤
│ Canonical Model (Core + Extended) │
├──────────────────────────────────────────────────┤
│ Error Handling │
├──────────────────────────────────────────────────┤
│ Utility: UTF-8 Buffer / SSE Parser / Detector │
└──────────────────────────────────────────────────┘
附录 C:接口速查
// ─── 核心入口 ───
ConversionEngine
.registerAdapter(adapter)
.use(middleware)
.isPassthrough(source, target): Boolean
.convertHttpRequest(request, sourceProtocol, targetProtocol, provider): HttpRequest
.convertHttpResponse(response, sourceProtocol, targetProtocol, interfaceType): HttpResponse
.createStreamConverter(sourceProtocol, targetProtocol, provider): StreamConverter
// ─── 目标上游信息 ───
TargetProvider
.base_url: String
.api_key: String
.model_name: String
.adapter_config: Map<String, Any>
// ─── URL 路由 ───
// 入站: /{protocol}/{native_path}
// /openai/v1/chat/completions → source=openai
// /anthropic/v1/messages → source=anthropic
// 出站: provider.base_url + 目标协议原生路径(无前缀)
// ─── 协议适配器 ───
ProtocolAdapter
.protocolName() / .protocolVersion() / .supportsPassthrough()
.mapUrl(nativePath, type) / .buildHeaders(provider) / .supportsInterface(type)
.decodeRequest(raw) / .encodeRequest(canonical, provider)
.decodeResponse(raw) / .encodeResponse(canonical)
.createStreamDecoder() / .createStreamEncoder()
.encodeError(error): RawResponse
.decodeModelsResponse / .encodeModelsResponse
.decodeEmbeddingRequest / .encodeEmbeddingRequest(canonical, provider) / ...Response
.decodeTokenCountRequest / .encodeTokenCountRequest(canonical, provider) / ...Response
.decodeRerankRequest / .encodeRerankRequest(canonical, provider) / ...Response
// ─── 流式处理 ───
StreamConverter: .processChunk(raw) / .flush()
├─ PassthroughStreamConverter [raw] → [raw](用 provider 重建 Headers)
└─ CanonicalStreamConverter decode → middleware → encode
// ─── 接口类型 ───
InterfaceType = CHAT | MODELS | MODEL_INFO | EMBEDDINGS | FILES |
RERANK | COUNT_TOKENS | BATCHES |
FINE_TUNING | AUDIO | IMAGES | REALTIME | UNKNOWN
附录 D:字段晋升规范
原则
Canonical Model 是活的公共契约,不是固定不变的。其字段集反映的是当前已适配协议的公共语义,随协议扩展而演进。
字段分类
| 分类 | 判定条件 | 处理方式 |
|---|---|---|
| 公共字段 | ≥2 个协议表达相同含义 | 升级为 Canonical Model 的正式字段 |
| 协议特有字段 | 仅 1 个协议使用 | 不在中间层传播;同协议透传时自然保留;跨协议时丢弃 |
| 未知字段 | 新协议带来的新概念 | 先按特有字段处理;当第二个协议也出现时晋升 |
晋升流程
1. 发现:适配新协议时,识别出无法映射到现有 Canonical 字段的语义
2. 判定:该语义是否已被 ≥1 个现有协议所表达?
- 是 → 晋升为 Canonical 公共字段
- 否 → 暂不纳入,记录在适配清单中
3. 设计:确定字段名(协议中立)、类型、在 Canonical 中的位置
4. 更新:修改 Canonical Model 定义,所有现有 Adapter 的 Decoder/Encoder 同步更新
5. 文档:更新 §10 字段映射参考表
晋升示例
| 字段 | 原始位置 | 晋升原因 |
|---|---|---|
user_id |
OpenAI user / Anthropic metadata.user_id |
两协议均支持用户标识 |
output_format |
OpenAI response_format / Anthropic output_format |
两协议均支持控制输出格式 |
parallel_tool_use |
OpenAI parallel_tool_calls / Anthropic disable_parallel_tool_use |
两协议均支持并行工具调用控制 |
cache_control |
仅 Anthropic | 不晋升,仅 Anthropic 使用 |
reasoning_content |
仅 OpenAI | 不晋升(但 ThinkingBlock 已覆盖 thinking 语义) |
降级规则
当公共字段的目标协议不支持时:
- 有语义等价物 → 自动映射(如
parallel_tool_use→disable_parallel_tool_use) - 无等价物 → 丢弃,日志 warn(如
output_format→ Anthropic 旧模型丢弃) - 有替代方案 → 降级策略处理(如
output_format→ 注入合成工具)
附录 E:协议适配清单模板
适配新协议时,按以下清单逐项确认。所有项目确认后即可与引擎对接。
E.1 协议基本信息
| 项目 | 说明 |
|---|---|
| 协议名称 | 用于 URL 前缀和 Adapter 注册的唯一标识(如 "openai", "anthropic") |
| 协议版本 | 当前适配的 API 版本(如 "2023-06-01") |
| Base URL | API 服务地址 |
| 认证方式 | Header 名称和格式(如 Authorization: Bearer / x-api-key) |
E.2 接口识别
| 项目 | 说明 |
|---|---|
| URL 路径模式 | 列出所有接口的 URL 路径和对应的 InterfaceType |
| 接口能力矩阵 | 每种 InterfaceType 的支持状态(supportsInterface) |
| URL 映射表 | 每种 InterfaceType 的目标 URL 路径(mapUrl) |
E.3 请求头构建
| 项目 | 说明 |
|---|---|
| 认证头 | 如何从 provider.api_key 构建认证 Header |
| 必需 Header | 协议要求的固定 Header(如 Content-Type) |
| 可选 Header | 根据功能动态添加的 Header(如 anthropic-beta) |
| adapter_config 契约 | 定义本 Adapter 从 provider.adapter_config 读取的 key 列表和默认值 |
E.4 核心层 — Chat 请求编解码
Decoder(协议 → Canonical)
| 项目 | 说明 |
|---|---|
| 系统消息 | 如何提取为 canonical.system(顶层字段 / messages 中 / 不支持) |
| 消息角色 | 协议角色与 Canonical 角色(system/user/assistant/tool)的映射 |
| 内容块 | 每种内容类型(text/tool_use/tool_result/thinking)的解码规则 |
| 工具定义 | 如何映射为 canonical.tools(字段名差异:input_schema vs parameters) |
| 工具选择 | tool_choice 各变体的映射规则 |
| 参数映射 | max_tokens/temperature/top_p/stop_sequences 等参数的映射 |
| 新增公共字段 | user_id/output_format/parallel_tool_use/thinking 的提取规则 |
| 协议特有字段 | 仅本协议使用的字段列表,以及处理方式(忽略/记录) |
| 协议约束 | 消息顺序要求、角色交替要求、必填字段等 |
Encoder(Canonical → 协议)
| 项目 | 说明 |
|---|---|
| 模型名称 | 使用 provider.model_name 覆盖 canonical.model |
| 系统消息注入 | 如何将 canonical.system 编码为协议格式 |
| 消息编码 | 各角色的编码规则(特别注意 role 映射、content 结构差异) |
| 角色约束处理 | 是否需要 enforceAlternation?具体策略? |
| 工具编码 | tools/tool_choice 的编码规则 |
| 参数编码 | Canonical 参数到协议参数的映射 |
| 公共字段编码 | user_id/output_format/parallel_tool_use/thinking 的注入规则 |
| 降级处理 | 目标协议不支持的 Canonical 字段的降级策略 |
E.5 核心层 — Chat 响应编解码
| 项目 | 说明 |
|---|---|
| 响应结构 | 协议响应的顶层结构解析 |
| 内容块解码 | 各 content block 类型的解码规则 |
| 停止原因 | stop_reason / finish_reason 的映射表 |
| Token 用量 | usage 各字段的映射(注意命名差异:input_tokens vs prompt_tokens) |
| 推理内容 | reasoning_content / thinking block 的处理 |
| 协议特有内容 | 仅本协议返回的特有字段的处理 |
E.6 核心层 — 流式编解码
StreamDecoder(协议 SSE → Canonical 事件)
| 项目 | 说明 |
|---|---|
| SSE 格式 | 协议的 SSE 事件格式(named events vs delta chunks) |
| 事件映射表 | 每种协议 SSE 事件 → CanonicalStreamEvent 的映射规则 |
| 状态机设计 | 需要跟踪的状态(当前 block index、open blocks、tool call 映射等) |
| UTF-8 安全 | 是否需要处理跨 chunk 的 UTF-8 截断 |
| 特殊情况 | 工具调用参数乱序、无限空白检测、延迟字段等 |
StreamEncoder(Canonical 事件 → 协议 SSE)
| 项目 | 说明 |
|---|---|
| 事件映射表 | 每个 CanonicalStreamEvent → 协议 SSE chunk 的映射规则 |
| 缓冲策略 | 哪些事件需要缓冲、何时输出 |
| SSE 格式 | event type / data 字段的编码方式 |
| 结束标记 | 如何输出流结束信号(如 [DONE]) |
E.7 扩展层接口
对每种支持的扩展层接口(/models、/embeddings、/files、/rerank、/count_tokens),确认:
| 项目 | 说明 |
|---|---|
| 接口是否存在 | 该协议是否原生支持此接口 |
| URL 路径 | 接口的 URL 路径 |
| 请求格式 | 请求体的协议特有格式 → Canonical 格式的映射 |
| 响应格式 | 响应体的协议特有格式 ← Canonical 格式的映射 |
| 不支持时策略 | 透传 / 返回空响应 / 返回错误 |
E.8 错误编码
| 项目 | 说明 |
|---|---|
| 错误响应格式 | 协议的错误响应 JSON 结构 |
| encodeError | ConversionError → 协议错误格式的编码规则 |
| HTTP 状态码 | 协议常用的错误状态码映射 |
E.9 自检清单
- 所有 InterfaceType 的
supportsInterface返回值已确定 - 所有 InterfaceType 的
mapUrl映射已确定 - Chat 请求的 Decoder 和 Encoder 已实现
- Chat 响应的 Decoder 和 Encoder 已实现
- 流式 StreamDecoder 和 StreamEncoder 已实现
buildHeaders(provider)已实现encodeError已实现- 扩展层接口的编解码已实现(支持的接口)
- 角色映射和消息顺序约束已处理
- 工具调用(tool_calls / tool_use / tool_result)的编解码已处理
- stop_reason / finish_reason 映射表已确认
- usage 字段映射已确认
- 协议特有字段已识别并确定处理方式(忽略/降级)
- adapter_config 契约已文档化