52 KiB
OpenAI 协议适配清单
依据 conversion_design.md 附录 D 模板编撰,覆盖 OpenAI API 的全部对接细节。
目录
1. 协议基本信息
| 项目 | 说明 |
|---|---|
| 协议名称 | "openai" |
| 协议版本 | 无固定版本头,API 持续演进 |
| Base URL | https://api.openai.com |
| 认证方式 | Authorization: Bearer <api_key> |
2. 接口识别
2.1 URL 路径模式
| URL 路径 | InterfaceType |
|---|---|
/v1/chat/completions |
CHAT |
/v1/models |
MODELS |
/v1/models/{model} |
MODEL_INFO |
/v1/embeddings |
EMBEDDINGS |
/v1/rerank |
RERANK |
2.2 接口能力矩阵
OpenAI.supportsInterface(type):
CHAT: return true
MODELS: return true
MODEL_INFO: return true
EMBEDDINGS: return true
RERANK: return true
AUDIO: return true
IMAGES: return true
default: return false
2.3 URL 映射表
OpenAI.buildUrl(nativePath, interfaceType):
switch interfaceType:
case CHAT: return "/v1/chat/completions"
case MODELS: return "/v1/models"
case MODEL_INFO: return "/v1/models/{modelId}"
case EMBEDDINGS: return "/v1/embeddings"
case RERANK: return "/v1/rerank"
default: return nativePath
3. 请求头构建
3.1 buildHeaders
OpenAI.buildHeaders(provider):
result = {}
result["Authorization"] = "Bearer " + provider.api_key
if provider.adapter_config["organization"]:
result["OpenAI-Organization"] = provider.adapter_config["organization"]
result["Content-Type"] = "application/json"
return result
3.2 adapter_config 契约
| Key | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
organization |
String | 否 | — | OpenAI 组织标识,映射为 OpenAI-Organization Header |
4. 核心层 — Chat 请求编解码
4.1 Decoder(OpenAI → Canonical)
系统消息
OpenAI 支持两种系统指令角色:system 和 developer(o1 及更新模型推荐使用 developer)。两者均提取为 canonical.system。
decodeSystemPrompt(messages):
systemMsgs = messages.filter(m => m.role == "system" || m.role == "developer")
remaining = messages.filter(m => m.role != "system" && m.role != "developer")
if systemMsgs.length == 0: return {system: None, messages: remaining}
return {system: systemMsgs.map(m => extractText(m.content)).join("\n\n"), messages: remaining}
从 messages 数组中提取 role="system" 和 role="developer" 的消息,合并为 canonical.system(String),剩余消息作为 canonical.messages。
消息角色映射
| OpenAI role | Canonical role | 说明 |
|---|---|---|
system |
提取为 canonical.system |
不进入 messages 数组 |
developer |
提取为 canonical.system |
不进入 messages 数组;o1+ 推荐使用 |
user |
user |
直接映射 |
assistant |
assistant |
需处理 tool_calls 结构差异 |
tool |
tool |
tool_call_id → tool_use_id |
function |
tool |
已废弃,转为 tool 角色(见下方废弃字段处理) |
内容块解码
decodeUserContent(content):
if content is String: return [{type: "text", text: content}]
return content.map(part => {
switch part.type:
"text" → {type: "text", text: part.text}
"image_url" → {type: "image", source: {url: part.image_url.url, detail: part.image_url.detail}}
"input_audio" → {type: "audio", source: {data: part.input_audio.data, format: part.input_audio.format}}
"file" → {type: "file", source: {file_data: part.file.file_data, file_id: part.file.file_id, filename: part.file.filename}}
})
decodeMessage(msg):
switch msg.role:
case "user":
return {role: "user", content: decodeUserContent(msg.content)}
case "assistant":
blocks = []
if msg.content:
if msg.content is String:
blocks.append({type: "text", text: msg.content})
else:
blocks.append(...msg.content.filter(p => p.type == "text").map(p => ({type: "text", text: p.text})))
for refusal in msg.content.filter(p => p.type == "refusal"):
blocks.append({type: "text", text: refusal.refusal})
if msg.refusal: blocks.append({type: "text", text: msg.refusal})
if msg.tool_calls:
for tc in msg.tool_calls:
switch tc.type:
"function" → blocks.append({type: "tool_use", id: tc.id, name: tc.function.name,
input: JSON.parse(tc.function.arguments)})
"custom" → blocks.append({type: "tool_use", id: tc.id, name: tc.custom.name,
input: tc.custom.input})
if msg.function_call: // 已废弃,兼容处理
blocks.append({type: "tool_use", id: generateId(), name: msg.function_call.name,
input: JSON.parse(msg.function_call.arguments)})
return {role: "assistant", content: blocks}
case "tool":
return {role: "tool", content: [{
type: "tool_result", tool_use_id: msg.tool_call_id,
content: msg.content is String ? msg.content : extractText(msg.content),
is_error: false}]}
case "function": // 已废弃,兼容处理
return {role: "tool", content: [{
type: "tool_result", tool_use_id: msg.name,
content: msg.content, is_error: false}]}
关键差异:
- OpenAI 将
tool_calls放在 message 顶层,Canonical 放在content数组中作为ToolUseBlock - OpenAI 用
tool_call_id标识工具结果,Canonical 用tool_use_id refusal编码为 text blockdeveloper角色与system角色同语义,均提取为 canonical.system- 自定义工具(
type: "custom")的input为字符串,Function 工具的arguments为 JSON 字符串
工具定义
OpenAI 有两类工具:
Function 工具(type: "function"):
| OpenAI | Canonical | 说明 |
|---|---|---|
tools[].type: "function" |
— | OpenAI 多一层 function 包装 |
tools[].function.name |
tools[].name |
直接映射 |
tools[].function.description |
tools[].description |
直接映射 |
tools[].function.parameters |
tools[].input_schema |
字段名不同 |
tools[].function.strict |
— | 协议特有,忽略 |
Custom 工具(type: "custom"):无 input_schema,使用自定义格式(text/grammar)。不映射为 CanonicalTool,跨协议时丢弃。
decodeTools(tools):
result = []
for tool in (tools ?? []):
if tool.type == "function":
result.append(CanonicalTool {
name: tool.function.name,
description: tool.function.description,
input_schema: tool.function.parameters
})
// type == "custom": 跨协议时丢弃
return result.length > 0 ? result : None
工具选择
OpenAI tool_choice 有多种形态:
| OpenAI tool_choice | Canonical ToolChoice | 说明 |
|---|---|---|
"auto" |
{type: "auto"} |
直接映射 |
"none" |
{type: "none"} |
直接映射 |
"required" |
{type: "any"} |
语义等价 |
{type: "function", function: {name}} |
{type: "tool", name} |
命名工具 |
{type: "custom", custom: {name}} |
{type: "tool", name} |
自定义工具 |
{type: "allowed_tools", allowed_tools: {mode, tools}} |
— | 协议特有,降级为 mode 映射(auto→auto, required→any) |
decodeToolChoice(tool_choice):
if tool_choice is String:
switch tool_choice:
"auto" → {type: "auto"}
"none" → {type: "none"}
"required" → {type: "any"}
elif tool_choice.type == "function":
return {type: "tool", name: tool_choice.function.name}
elif tool_choice.type == "custom":
return {type: "tool", name: tool_choice.custom.name}
elif tool_choice.type == "allowed_tools":
mode = tool_choice.allowed_tools.mode // "auto" or "required"
return mode == "required" ? {type: "any"} : {type: "auto"}
参数映射
| OpenAI | Canonical | 说明 |
|---|---|---|
max_completion_tokens |
parameters.max_tokens |
优先使用;o-series 模型专用 |
max_tokens |
parameters.max_tokens |
已废弃,作为回退 |
temperature |
parameters.temperature |
直接映射 |
top_p |
parameters.top_p |
直接映射 |
frequency_penalty |
parameters.frequency_penalty |
直接映射 |
presence_penalty |
parameters.presence_penalty |
直接映射 |
stop (String or Array) |
parameters.stop_sequences (Array) |
Decoder 规范化为 Array |
stream |
stream |
直接映射 |
decodeParameters(raw):
return RequestParameters {
max_tokens: raw.max_completion_tokens ?? raw.max_tokens,
temperature: raw.temperature,
top_p: raw.top_p,
frequency_penalty: raw.frequency_penalty,
presence_penalty: raw.presence_penalty,
stop_sequences: normalizeStop(raw.stop)
}
normalizeStop(stop):
if stop is String: return [stop]
if stop is Array: return stop
return None
公共字段
| OpenAI | Canonical | 提取规则 |
|---|---|---|
user |
user_id |
顶层字段,直接提取 |
response_format |
output_format |
按类型解码 |
parallel_tool_calls |
parallel_tool_use |
布尔值直接映射 |
reasoning_effort |
thinking |
映射为 thinking 配置 |
decodeOutputFormat(format):
if format is None: return None
switch format.type:
"json_object" → {type: "json_object"}
"json_schema" → {type: "json_schema", json_schema: format.json_schema}
"text" → null // 默认格式,无需设置
decodeThinking(reasoning_effort):
if reasoning_effort is None: return None
if reasoning_effort == "none": return ThinkingConfig {type: "disabled"}
effort = reasoning_effort == "minimal" ? "low" : reasoning_effort
return ThinkingConfig {type: "enabled", effort: effort}
reasoning_effort 映射说明:
"none"→thinking.type = "disabled"(不执行推理)"minimal"→thinking.effort = "low"(Canonical 无 minimal 级别,降级为 low)"low"/"medium"/"high"/"xhigh"→ 直接映射
废弃字段兼容
| 废弃字段 | 替代字段 | Decoder 处理 |
|---|---|---|
functions |
tools |
转换为 tools 格式(type: "function" 包装) |
function_call |
tool_choice |
转换为 tool_choice 格式 |
decodeDeprecatedFields(raw):
// functions → tools(仅当 tools 未设置时)
if raw.tools is None && raw.functions:
raw.tools = raw.functions.map(f => ({
type: "function",
function: {name: f.name, description: f.description, parameters: f.parameters}}))
// function_call → tool_choice(仅当 tool_choice 未设置时)
if raw.tool_choice is None && raw.function_call:
if raw.function_call == "none": raw.tool_choice = "none"
elif raw.function_call == "auto": raw.tool_choice = "auto"
else: raw.tool_choice = {type: "function", function: {name: raw.function_call.name}}
协议特有字段
| 字段 | 处理方式 |
|---|---|
seed |
忽略(无跨协议等价语义) |
logprobs |
忽略 |
top_logprobs |
忽略 |
logit_bias |
忽略 |
n |
忽略(仅支持单选择) |
service_tier |
忽略 |
store |
忽略 |
metadata |
忽略 |
modalities |
忽略(多模态扩展时启用) |
audio |
忽略(多模态扩展时启用) |
prediction |
忽略 |
stream_options |
忽略 |
safety_identifier |
忽略 |
prompt_cache_key |
忽略 |
prompt_cache_retention |
忽略 |
verbosity |
忽略 |
web_search_options |
忽略 |
tools[].function.strict |
忽略 |
tools[].custom (custom 工具) |
跨协议时丢弃 |
协议约束
messages中tool角色必须紧接在对应的assistant(含 tool_calls)之后tool消息的tool_call_id必须与 assistant 消息中的tool_calls[].id匹配stream_options.include_usage可选,OpenAI 特有stop参数在 o3/o4-mini 等最新推理模型上不可用
4.2 Encoder(Canonical → OpenAI)
模型名称
使用 provider.model_name 覆盖 canonical.model。
系统消息注入
将 canonical.system 编码为 messages[0].role="system" 的消息,置于 messages 数组头部。
encodeSystemPrompt(canonical):
messages = []
if canonical.system is String:
messages.append({role: "system", content: canonical.system})
elif canonical.system is Array:
text = canonical.system.map(s => s.text).join("\n\n")
messages.append({role: "system", content: text})
return messages + encodeMessages(canonical.messages)
消息编码
encodeUserContent(blocks):
if blocks.length == 1 && blocks[0].type == "text":
return blocks[0].text
return blocks.map(b => {
switch b.type:
"text" → {type: "text", text: b.text}
"image" → {type: "image_url", image_url: {url: b.source.url, detail: b.source.detail}}
"audio" → {type: "input_audio", input_audio: {data: b.source.data, format: b.source.format}}
"file" → {type: "file", file: {file_data: b.source.file_data, file_id: b.source.file_id, filename: b.source.filename}}
})
encodeMessage(msg):
switch msg.role:
case "user":
return {role: "user", content: encodeUserContent(msg.content)}
case "assistant":
message = {}
textParts = msg.content.filter(b => b.type == "text")
toolUses = msg.content.filter(b => b.type == "tool_use")
if textParts.length > 0:
message.content = textParts.map(b => b.text).join("")
elif toolUses.length > 0:
message.content = null
else:
message.content = ""
if toolUses.length > 0:
message.tool_calls = toolUses.map(tu => ({
id: tu.id, type: "function",
function: {name: tu.name, arguments: JSON.stringify(tu.input)}}))
return {role: "assistant", ...message}
case "tool":
results = msg.content.filter(b => b.type == "tool_result")
if results.length > 0:
return {role: "tool", tool_call_id: results[0].tool_use_id,
content: results[0].content}
角色约束处理
OpenAI 要求 assistant 和 user 角色严格交替。当 Canonical 消息序列中存在连续同角色消息时,需合并为单条消息。
工具编码
encodeTools(canonical):
if canonical.tools:
result.tools = canonical.tools.map(t => ({
type: "function",
function: {name: t.name, description: t.description, parameters: t.input_schema}}))
if canonical.tool_choice:
result.tool_choice = encodeToolChoice(canonical.tool_choice)
encodeToolChoice(choice):
switch choice.type:
"auto" → "auto"
"none" → "none"
"any" → "required"
"tool" → {type: "function", function: {name: choice.name}}
公共字段编码
encodeOutputFormat(format):
if format is None: return None
switch format.type:
"json_object" → {type: "json_object"}
"json_schema" → {type: "json_schema", json_schema: format.json_schema}
encodeRequest(canonical, provider):
result = {
model: provider.model_name,
messages: encodeSystemPrompt(canonical) + canonical.messages.flatMap(encodeMessage),
stream: canonical.stream
}
// 参数
if canonical.parameters.max_tokens:
result.max_completion_tokens = canonical.parameters.max_tokens
if canonical.parameters.temperature is not None:
result.temperature = canonical.parameters.temperature
if canonical.parameters.top_p is not None:
result.top_p = canonical.parameters.top_p
if canonical.parameters.frequency_penalty is not None:
result.frequency_penalty = canonical.parameters.frequency_penalty
if canonical.parameters.presence_penalty is not None:
result.presence_penalty = canonical.parameters.presence_penalty
if canonical.parameters.stop_sequences:
result.stop = canonical.parameters.stop_sequences
// 工具
if canonical.tools:
result.tools = canonical.tools.map(t => ({
type: "function",
function: {name: t.name, description: t.description, parameters: t.input_schema}}))
if canonical.tool_choice:
result.tool_choice = encodeToolChoice(canonical.tool_choice)
// 公共字段
if canonical.user_id:
result.user = canonical.user_id
if canonical.output_format:
result.response_format = encodeOutputFormat(canonical.output_format)
if canonical.parallel_tool_use != null:
result.parallel_tool_calls = canonical.parallel_tool_use
if canonical.thinking:
if canonical.thinking.type == "disabled":
result.reasoning_effort = "none"
elif canonical.thinking.effort:
result.reasoning_effort = canonical.thinking.effort
else:
result.reasoning_effort = "medium"
return result
编码说明:
- 使用
max_completion_tokens(非废弃的max_tokens)输出 token 上限 frequency_penalty和presence_penalty仅在非 null 时输出thinking映射为reasoning_effort:disabled → "none",有 effort 值则直接映射,否则默认 "medium"
降级处理
对照架构文档 §8.4 三级降级策略,确认每个不支持字段的处理:
| Canonical 字段 | OpenAI 不支持时 | 降级策略 |
|---|---|---|
thinking.budget_tokens |
OpenAI 使用 reasoning_effort 而非 token 级控制 |
替代方案:估算映射为 effort 近似值 |
stop_reason: "content_filter" |
finish_reason: "content_filter" |
自动映射(OpenAI 支持此值) |
stop_reason: "stop_sequence" |
OpenAI 无独立值 | 自动映射为 "stop" |
parameters.top_k |
OpenAI 不支持 top_k |
丢弃 |
5. 核心层 — Chat 响应编解码
逐字段对照 §4.7 CanonicalResponse 确认映射关系。
5.1 响应结构
OpenAI 响应顶层结构:
{
id: String,
object: "chat.completion",
created: Number,
model: String,
choices: [{
index: 0,
message: {
role: "assistant",
content: String | null,
refusal: String | null,
tool_calls: [{
id: String,
type: "function",
function: { name: String, arguments: String }
}] | null,
annotations: [{
type: "url_citation",
url_citation: { start_index, end_index, title, url }
}] | null,
audio: { id, data, expires_at, transcript } | null
},
finish_reason: String,
logprobs: { content, refusal } | null
}],
usage: {
prompt_tokens: Number,
completion_tokens: Number,
total_tokens: Number,
prompt_tokens_details: { cached_tokens, audio_tokens },
completion_tokens_details: { reasoning_tokens, audio_tokens, accepted_prediction_tokens, rejected_prediction_tokens }
},
service_tier: String | null,
system_fingerprint: String | null
}
兼容性说明:部分 OpenAI 兼容提供商(如 DeepSeek)在 response 中返回非标准的 reasoning_content 字段。Decoder 会检测并处理此字段,将其解码为 ThinkingBlock。
5.2 Decoder(OpenAI → Canonical)
decodeResponse(openaiResp):
choice = openaiResp.choices[0]
blocks = []
if choice.message.content: blocks.append({type: "text", text: choice.message.content})
if choice.message.refusal: blocks.append({type: "text", text: choice.message.refusal})
// reasoning_content: 非标准字段,来自兼容提供商
if choice.message.reasoning_content:
blocks.append({type: "thinking", thinking: choice.message.reasoning_content})
if choice.message.tool_calls:
for tc in choice.message.tool_calls:
switch tc.type:
"function" → blocks.append({type: "tool_use", id: tc.id, name: tc.function.name,
input: JSON.parse(tc.function.arguments)})
"custom" → blocks.append({type: "tool_use", id: tc.id, name: tc.custom.name,
input: tc.custom.input})
return CanonicalResponse {
id: openaiResp.id,
model: openaiResp.model,
content: blocks,
stop_reason: mapFinishReason(choice.finish_reason),
usage: decodeUsage(openaiResp.usage)
}
内容块解码:
content→ TextBlockrefusal→ TextBlockreasoning_content→ ThinkingBlock(非标准字段,来自兼容提供商)tool_calls[].type: "function"→ ToolUseBlock(从 message 顶层提取到 content 数组)tool_calls[].type: "custom"→ ToolUseBlock(input 为字符串)
停止原因映射:
| OpenAI finish_reason | Canonical stop_reason | 说明 |
|---|---|---|
"stop" |
"end_turn" |
自然结束或匹配 stop sequence |
"length" |
"max_tokens" |
达到 token 上限 |
"tool_calls" |
"tool_use" |
模型调用工具 |
"content_filter" |
"content_filter" |
内容过滤 |
"function_call" |
"tool_use" |
已废弃,等同于 tool_calls |
| 其他 | "end_turn" |
兜底 |
Token 用量映射:
| OpenAI usage | Canonical Usage |
|---|---|
prompt_tokens |
input_tokens |
completion_tokens |
output_tokens |
prompt_tokens_details.cached_tokens |
cache_read_tokens |
| — | cache_creation_tokens (null) |
completion_tokens_details.reasoning_tokens |
reasoning_tokens |
decodeUsage(usage):
if usage is None: return CanonicalUsage {input_tokens: 0, output_tokens: 0}
return CanonicalUsage {
input_tokens: usage.prompt_tokens,
output_tokens: usage.completion_tokens,
cache_read_tokens: usage.prompt_tokens_details?.cached_tokens,
cache_creation_tokens: null,
reasoning_tokens: usage.completion_tokens_details?.reasoning_tokens
}
协议特有内容:
| 字段 | 处理方式 |
|---|---|
refusal |
解码为 text block |
reasoning_content |
解码为 ThinkingBlock(非标准,来自兼容提供商) |
annotations |
忽略(协议特有,不晋升为公共字段) |
audio |
忽略(多模态扩展时启用) |
logprobs |
忽略 |
service_tier |
忽略 |
system_fingerprint |
忽略 |
created |
忽略 |
5.3 Encoder(Canonical → OpenAI)
encodeResponse(canonical):
textParts = canonical.content.filter(b => b.type == "text")
thinkingParts = canonical.content.filter(b => b.type == "thinking")
toolUses = canonical.content.filter(b => b.type == "tool_use")
message = {role: "assistant"}
if textParts.length > 0:
message.content = textParts.map(b => b.text).join("")
elif toolUses.length > 0:
message.content = null
else:
message.content = ""
// reasoning_content: 非标准字段,输出给兼容提供商
if thinkingParts.length > 0:
message.reasoning_content = thinkingParts.map(b => b.thinking).join("")
if toolUses.length > 0:
message.tool_calls = toolUses.map(tu => ({
id: tu.id, type: "function",
function: {name: tu.name, arguments: JSON.stringify(tu.input)}}))
return {
id: canonical.id,
object: "chat.completion",
model: canonical.model,
choices: [{
index: 0,
message: message,
finish_reason: mapCanonicalToFinishReason(canonical.stop_reason)
}],
usage: encodeUsage(canonical.usage)
}
encodeUsage(usage):
return {
prompt_tokens: usage.input_tokens,
completion_tokens: usage.output_tokens,
total_tokens: usage.input_tokens + usage.output_tokens,
prompt_tokens_details: usage.cache_read_tokens ? {cached_tokens: usage.cache_read_tokens} : null,
completion_tokens_details: usage.reasoning_tokens ? {reasoning_tokens: usage.reasoning_tokens} : null
}
内容块编码:
- TextBlock →
message.content - ToolUseBlock →
message.tool_calls(从 content 数组提取到 message 顶层) - ThinkingBlock →
reasoning_content(非标准字段,兼容提供商使用)
停止原因映射:
| Canonical stop_reason | OpenAI finish_reason |
|---|---|
"end_turn" |
"stop" |
"max_tokens" |
"length" |
"tool_use" |
"tool_calls" |
"content_filter" |
"content_filter" |
"stop_sequence" |
"stop" |
"refusal" |
"stop" |
| 其他 | "stop" |
降级处理:
| Canonical 字段 | OpenAI 不支持时 | 降级策略 |
|---|---|---|
stop_reason: "stop_sequence" |
OpenAI 无独立值 | 映射为 "stop"(自动映射) |
stop_reason: "refusal" |
OpenAI 无独立值 | 映射为 "stop"(自动映射) |
cache_creation_tokens |
OpenAI 无此字段 | 丢弃 |
6. 核心层 — 流式编解码
6.1 SSE 格式
OpenAI 使用无命名的 SSE delta chunk:
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","model":"gpt-4",
"choices":[{"index":0,"delta":{...},"finish_reason":null}]}
data: [DONE]
delta 结构:
delta: {
role?: "assistant" | "user" | "system" | "developer" | "tool",
content?: String,
tool_calls?: [{index: Number, id?: String, function?: {name?: String, arguments?: String}, type?: "function"}],
refusal?: String,
function_call?: {name?: String, arguments?: String} // 已废弃
}
兼容性说明:部分兼容提供商在 delta 中返回非标准的 reasoning_content 字段。StreamDecoder 会检测并处理。
6.2 StreamDecoder(OpenAI SSE → Canonical 事件)
| OpenAI chunk | Canonical 事件 | 说明 |
|---|---|---|
| 首个 chunk (id/model) | MessageStartEvent | 从顶层字段提取 id、model |
delta.content 首次出现 |
ContentBlockStart(text) + ContentBlockDelta(text_delta) | 新 text block 开始 |
delta.content 后续 |
ContentBlockDelta(text_delta) | 追加文本 |
delta.tool_calls[i] 首次出现 |
ContentBlockStart(tool_use) | 新 tool block,提取 id、name |
delta.tool_calls[i].function.arguments |
ContentBlockDelta(input_json_delta) | 增量 JSON 参数 |
delta.reasoning_content 首次 |
ContentBlockStart(thinking) + ContentBlockDelta(thinking_delta) | 新 thinking block(非标准,来自兼容提供商) |
delta.reasoning_content 后续 |
ContentBlockDelta(thinking_delta) | 追加思考内容 |
delta.refusal 首次 |
ContentBlockStart(text) + ContentBlockDelta(text_delta) | 新 text block |
finish_reason 非空 |
ContentBlockStop × N + MessageDeltaEvent + MessageStopEvent | 关闭所有 open blocks |
| usage chunk(choices=[]) | MessageDeltaEvent(usage) | stream_options.include_usage 触发的用量 chunk |
[DONE] |
flush() | 触发 decoder flush |
Decoder 伪代码:
StreamDecoder.processChunk(rawChunk):
events = []
// 解析 SSE data
if rawChunk == "[DONE]":
// 关闭所有 open blocks
for idx in openBlocks:
events.append(ContentBlockStopEvent {index: idx})
if messageStarted:
events.append(MessageStopEvent {})
return events
data = JSON.parse(rawChunk)
// 首个 chunk: MessageStart
if !messageStarted:
events.append(MessageStartEvent {message: {id: data.id, model: data.model, usage: null}})
messageStarted = true
for choice in data.choices:
delta = choice.delta
// role 出现时不产生事件(仅用于首个 chunk 标记)
// text content
if delta.content != null:
if !openBlocks.has(textBlockIndex):
events.append(ContentBlockStartEvent {index: textBlockIndex, content_block: {type: "text", text: ""}})
openBlocks.add(textBlockIndex)
currentBlockType[textBlockIndex] = "text"
events.append(ContentBlockDeltaEvent {index: textBlockIndex, delta: {type: "text_delta", text: delta.content}})
// reasoning_content (非标准,来自兼容提供商)
if delta.reasoning_content != null:
if !openBlocks.has(thinkingBlockIndex):
events.append(ContentBlockStartEvent {index: thinkingBlockIndex, content_block: {type: "thinking", thinking: ""}})
openBlocks.add(thinkingBlockIndex)
currentBlockType[thinkingBlockIndex] = "thinking"
events.append(ContentBlockDeltaEvent {index: thinkingBlockIndex, delta: {type: "thinking_delta", thinking: delta.reasoning_content}})
// refusal
if delta.refusal != null:
if !openBlocks.has(refusalBlockIndex):
events.append(ContentBlockStartEvent {index: refusalBlockIndex, content_block: {type: "text", text: ""}})
openBlocks.add(refusalBlockIndex)
events.append(ContentBlockDeltaEvent {index: refusalBlockIndex, delta: {type: "text_delta", text: delta.refusal}})
// tool calls
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if tc.id != null:
// 新 tool call block
toolCallIdMap[idx] = tc.id
toolCallNameMap[idx] = tc.function?.name
toolCallArguments[idx] = ""
blockIndex = allocateBlockIndex(idx)
events.append(ContentBlockStartEvent {
index: blockIndex,
content_block: {type: "tool_use", id: tc.id, name: tc.function?.name, input: {}}})
openBlocks.add(blockIndex)
currentBlockType[blockIndex] = "tool_use"
currentBlockId[blockIndex] = idx
if tc.function?.arguments:
toolCallArguments[currentBlockId[toolUseBlockIndex]] += tc.function.arguments
events.append(ContentBlockDeltaEvent {
index: toolUseBlockIndex,
delta: {type: "input_json_delta", partial_json: tc.function.arguments}})
// finish_reason
if choice.finish_reason != null:
for idx in openBlocks:
events.append(ContentBlockStopEvent {index: idx})
openBlocks.clear()
events.append(MessageDeltaEvent {delta: {stop_reason: mapFinishReason(choice.finish_reason)}, usage: null})
events.append(MessageStopEvent {})
// usage chunk (choices 为空)
if data.choices.length == 0 && data.usage:
accumulatedUsage = decodeUsage(data.usage)
events.append(MessageDeltaEvent {delta: {stop_reason: null}, usage: accumulatedUsage})
return events
6.3 StreamDecoder 状态机
StreamDecoderState {
messageStarted: Boolean
openBlocks: Set<Integer>
currentBlockType: Map<Integer, String>
currentBlockId: Map<Integer, String>
toolCallIdMap: Map<Integer, String> // OpenAI tool_calls 数组索引 → id
toolCallNameMap: Map<Integer, String> // OpenAI tool_calls 数组索引 → name
toolCallArguments: Map<Integer, StringBuffer> // OpenAI tool_calls 数组索引 → 累积参数
textBlockStarted: Boolean // 追踪 text block 生命周期
thinkingBlockStarted: Boolean // 追踪 thinking block 生命周期(非标准)
utf8Remainder: Option<ByteArray> // UTF-8 跨 chunk 安全
accumulatedUsage: Option<CanonicalUsage>
}
关键处理:
- 工具调用索引映射:OpenAI
tool_calls[i]的i不一定是连续的,需要用 Map 维护索引到 id/name 的映射 - 参数累积:
tool_calls[i].function.arguments是增量 JSON 片段,需要累积直到 block 结束 - UTF-8 安全:跨 chunk 截断的 UTF-8 字节需要用
utf8Remainder缓冲 reasoning_content:非标准字段,来自兼容提供商(如 DeepSeek),处理方式与content类似- usage chunk:当
stream_options.include_usage启用时,最后一个 chunk 的choices为空数组,仅含usage
6.4 StreamEncoder(Canonical → OpenAI SSE)
| Canonical 事件 | OpenAI chunk | 说明 |
|---|---|---|
| MessageStartEvent | {id, model, object: "chat.completion.chunk", choices:[{delta:{role:"assistant"}, index:0}]} |
首个 chunk |
| ContentBlockStart(text) | 缓冲,不输出 | 等待首次 delta 时合并输出 |
| ContentBlockDelta(text_delta) | {choices:[{delta:{content:"..."}}]} |
首次输出时合并 block_start 信息 |
| ContentBlockStart(tool_use) | 缓冲,不输出 | 等待首次 delta 时合并输出 |
| ContentBlockDelta(input_json_delta) | {choices:[{delta:{tool_calls:[{index, id?, function:{name?, arguments}}]}}]} |
首次含 id 和 name,后续仅含 arguments |
| ContentBlockStart(thinking) | 缓冲,不输出 | 等待首次 delta |
| ContentBlockDelta(thinking_delta) | {choices:[{delta:{reasoning_content:"..."}}]} |
非标准字段(兼容提供商使用) |
| ContentBlockStop | 不输出 | 静默 |
| MessageDeltaEvent | {choices:[{delta:{}, finish_reason:"..."}]} |
包含 stop_reason 映射 |
| MessageDeltaEvent(usage only) | {choices:[], usage: {...}} |
用量信息 chunk |
| MessageStopEvent | data: [DONE] |
流结束 |
| PingEvent | 丢弃 | 不输出 |
| ErrorEvent | 丢弃 | 不输出(OpenAI 无流式错误事件) |
Encoder 伪代码:
StreamEncoderState {
bufferedStart: Option<ContentBlockStartEvent> // 缓冲的 block start 事件
toolCallIndexMap: Map<String, Integer> // tool_use_id → OpenAI tool_calls 数组索引
nextToolCallIndex: Integer // 下一个可用索引
}
StreamEncoder.encodeEvent(event):
switch event.type:
case "message_start":
return [{id: event.message.id, model: event.message.model,
object: "chat.completion.chunk", created: now(),
choices: [{index: 0, delta: {role: "assistant"}, finish_reason: null}]}]
case "content_block_start":
bufferedStart = event // 缓冲,不立即输出
if event.content_block.type == "tool_use":
idx = nextToolCallIndex++
toolCallIndexMap[event.content_block.id] = idx
return []
case "content_block_delta":
chunks = []
switch event.delta.type:
"text_delta":
delta = {content: event.delta.text}
if bufferedStart:
// 首次 delta,合并 start 信息(OpenAI 不需要额外的 start 信息)
bufferedStart = null
chunks.append({choices: [{index: 0, delta: delta, finish_reason: null}]})
"input_json_delta":
tcIdx = toolCallIndexMap[currentBlockId[event.index]]
delta = {}
if bufferedStart:
// 首次 delta,含 id 和 name
start = bufferedStart.content_block
delta.tool_calls = [{index: tcIdx, id: start.id,
function: {name: start.name, arguments: event.delta.partial_json},
type: "function"}]
bufferedStart = null
else:
delta.tool_calls = [{index: tcIdx,
function: {arguments: event.delta.partial_json}}]
chunks.append({choices: [{index: 0, delta: delta, finish_reason: null}]})
"thinking_delta":
delta = {reasoning_content: event.delta.thinking}
if bufferedStart:
bufferedStart = null
chunks.append({choices: [{index: 0, delta: delta, finish_reason: null}]})
return chunks
case "content_block_stop":
return []
case "message_delta":
chunks = []
if event.delta.stop_reason:
finishReason = mapCanonicalToFinishReason(event.delta.stop_reason)
chunks.append({choices: [{index: 0, delta: {}, finish_reason: finishReason}]})
if event.usage:
chunks.append({choices: [], usage: encodeUsage(event.usage)})
return chunks
case "message_stop":
return ["[DONE]"]
缓冲策略:
ContentBlockStart不立即输出,等待首次ContentBlockDelta合并- 合并首次 delta 时,将 start 信息(如 tool id/name)一起编码
7. 扩展层接口
7.1 /models & /models/
列表接口 GET /v1/models:
| 项目 | 说明 |
|---|---|
| 接口是否存在 | 是 |
| 请求格式 | GET 请求,无 body |
响应 Decoder(OpenAI → Canonical):
decodeModelsResponse(openaiResp):
return CanonicalModelList {
models: openaiResp.data.map(m => CanonicalModel {
id: m.id, name: m.id, created: m.created, owned_by: m.owned_by })}
响应 Encoder(Canonical → OpenAI):
encodeModelsResponse(canonical):
return {object: "list",
data: canonical.models.map(m => ({id: m.id, object: "model",
created: m.created ?? 0, owned_by: m.owned_by ?? "unknown"}))}
详情接口 GET /v1/models/{model}:
| 项目 | 说明 |
|---|---|
| 接口是否存在 | 是 |
| 请求格式 | GET 请求,无 body |
响应 Decoder(OpenAI → Canonical):
decodeModelInfoResponse(openaiResp):
return CanonicalModelInfo {
id: openaiResp.id, name: openaiResp.id,
created: openaiResp.created, owned_by: openaiResp.owned_by }
响应 Encoder(Canonical → OpenAI):
encodeModelInfoResponse(canonical):
return {id: canonical.id, object: "model",
created: canonical.created ?? 0, owned_by: canonical.owned_by ?? "unknown"}
字段映射(列表和详情共用):
| OpenAI | Canonical | 说明 |
|---|---|---|
data[].id |
models[].id |
直接映射 |
data[].object: "model" |
— | 固定值 |
data[].created |
models[].created |
Unix 时间戳 |
data[].owned_by |
models[].owned_by |
直接映射 |
7.2 /embeddings
| 项目 | 说明 |
|---|---|
| 接口是否存在 | 是 |
| URL 路径 | POST /v1/embeddings |
请求 Decoder(OpenAI → Canonical):
decodeEmbeddingRequest(raw):
return CanonicalEmbeddingRequest {
model: raw.model,
input: raw.input,
encoding_format: raw.encoding_format,
dimensions: raw.dimensions
}
请求 Encoder(Canonical → OpenAI):
encodeEmbeddingRequest(canonical, provider):
result = {model: provider.model_name, input: canonical.input}
if canonical.encoding_format: result.encoding_format = canonical.encoding_format
if canonical.dimensions: result.dimensions = canonical.dimensions
return result
响应 Decoder(OpenAI → Canonical):
decodeEmbeddingResponse(openaiResp):
return CanonicalEmbeddingResponse {
data: openaiResp.data, model: openaiResp.model, usage: openaiResp.usage }
响应 Encoder(Canonical → OpenAI):
encodeEmbeddingResponse(canonical):
return {object: "list", data: canonical.data, model: canonical.model, usage: canonical.usage}
7.3 /rerank
| 项目 | 说明 |
|---|---|
| 接口是否存在 | 是 |
| URL 路径 | POST /v1/rerank |
请求/响应编解码:按 CanonicalRerankRequest / CanonicalRerankResponse 格式映射。
8. 错误编码
8.1 错误响应格式
{
"error": {
"message": "Error message",
"type": "invalid_request_error",
"param": null,
"code": null
}
}
8.2 encodeError
OpenAI.encodeError(error):
return {
error: {
message: error.message,
type: mapErrorCode(error.code),
param: error.details?.param ?? null,
code: error.code
}
}
mapErrorCode(code):
switch code:
INVALID_INPUT → "invalid_request_error"
MISSING_REQUIRED_FIELD → "invalid_request_error"
INCOMPATIBLE_FEATURE → "invalid_request_error"
TOOL_CALL_PARSE_ERROR → "invalid_request_error"
JSON_PARSE_ERROR → "invalid_request_error"
RATE_LIMIT → "rate_limit_error"
AUTHENTICATION → "authentication_error"
default → "server_error"
8.3 常用 HTTP 状态码
| HTTP Status | 说明 |
|---|---|
| 400 | 请求格式错误 |
| 401 | 认证失败 |
| 403 | 无权限 |
| 404 | 接口不存在 |
| 429 | 速率限制 |
| 500 | 服务器内部错误 |
| 503 | 服务不可用 |
9. 自检清单
| 章节 | 检查项 |
|---|---|
| §2 | [x] 所有 InterfaceType 的 supportsInterface 返回值已确定 |
| §2 | [x] 所有 InterfaceType 的 buildUrl 映射已确定 |
| §3 | [x]buildHeaders(provider) 已实现,adapter_config 契约已文档化 |
| §4 | [x] Chat 请求的 Decoder 和 Encoder 已实现(逐字段对照 §4.1/§4.2) |
| §4 | [x] 系统消息提取包含 system 和 developer 两种角色 |
| §4 | [x] 角色映射和消息顺序约束已处理(assistant/user 交替合并) |
| §4 | [x] 工具调用(tool_calls / tool_use / tool_result)的编解码已处理(含 custom 类型) |
| §4 | [x]frequency_penalty 和 presence_penalty 已映射到 Canonical(非忽略) |
| §4 | [x]max_completion_tokens 和 max_tokens 的优先级已处理 |
| §4 | [x]reasoning_effort 到 thinking 的映射已处理(含 "none" 和 "minimal") |
| §4 | [x] 废弃字段(functions / function_call)的兼容处理已实现 |
| §4 | [x] 协议特有字段已识别并确定处理方式(logprobs/n/seed/modalities/web_search_options 等忽略) |
| §5 | [x] Chat 响应的 Decoder 和 Encoder 已实现(逐字段对照 §4.7) |
| §5 | [x] stop_reason / finish_reason 映射表已确认 |
| §5 | [x] usage 字段映射已确认(prompt_tokens ↔ input_tokens) |
| §5 | [x]reasoning_content(非标准)的编解码已处理 |
| §5 | [x]annotations 等协议特有响应字段已识别并确定处理方式 |
| §6 | [x] 流式 StreamDecoder 和 StreamEncoder 已实现(对照 §4.8) |
| §6 | [x] 流式 reasoning_content(非标准)的处理已覆盖 |
| §6 | [x] usage chunk(choices 为空)的处理已覆盖 |
| §7 | [x] 扩展层接口的编解码已实现(/models、/models/{model}、/embeddings、/rerank) |
| §8 | [x]encodeError 已实现 |