引入 Canonical Model 和 ProtocolAdapter 架构,支持 OpenAI/Anthropic 协议间 无缝转换,统一 ProxyHandler 替代分散的 OpenAI/Anthropic Handler,简化 ProviderClient 为协议无关的 HTTP 发送器,Provider 新增 protocol 字段。
189 lines
5.0 KiB
Go
189 lines
5.0 KiB
Go
package anthropic
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"nex/backend/internal/conversion/canonical"
|
|
)
|
|
|
|
// StreamEncoder Anthropic 流式编码器
|
|
type StreamEncoder struct{}
|
|
|
|
// NewStreamEncoder 创建 Anthropic 流式编码器
|
|
func NewStreamEncoder() *StreamEncoder {
|
|
return &StreamEncoder{}
|
|
}
|
|
|
|
// EncodeEvent 编码 Canonical 事件为 Anthropic 命名 SSE 事件
|
|
func (e *StreamEncoder) EncodeEvent(event canonical.CanonicalStreamEvent) [][]byte {
|
|
switch event.Type {
|
|
case canonical.EventMessageStart:
|
|
return e.encodeMessageStart(event)
|
|
case canonical.EventContentBlockStart:
|
|
return e.encodeContentBlockStart(event)
|
|
case canonical.EventContentBlockDelta:
|
|
return e.encodeContentBlockDelta(event)
|
|
case canonical.EventContentBlockStop:
|
|
return e.encodeContentBlockStop(event)
|
|
case canonical.EventMessageDelta:
|
|
return e.encodeMessageDelta(event)
|
|
case canonical.EventMessageStop:
|
|
return e.encodeMessageStop(event)
|
|
case canonical.EventPing:
|
|
return e.encodePing()
|
|
case canonical.EventError:
|
|
return e.encodeError(event)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Flush 刷新缓冲区(无缓冲)
|
|
func (e *StreamEncoder) Flush() [][]byte {
|
|
return nil
|
|
}
|
|
|
|
// encodeMessageStart 编码消息开始事件
|
|
func (e *StreamEncoder) encodeMessageStart(event canonical.CanonicalStreamEvent) [][]byte {
|
|
payload := map[string]any{
|
|
"type": "message_start",
|
|
}
|
|
if event.Message != nil {
|
|
msg := map[string]any{
|
|
"id": event.Message.ID,
|
|
"model": event.Message.Model,
|
|
"role": "assistant",
|
|
}
|
|
if event.Message.Usage != nil {
|
|
usage := map[string]any{
|
|
"input_tokens": event.Message.Usage.InputTokens,
|
|
"output_tokens": event.Message.Usage.OutputTokens,
|
|
}
|
|
msg["usage"] = usage
|
|
}
|
|
payload["message"] = msg
|
|
}
|
|
return e.marshalEvent("message_start", payload)
|
|
}
|
|
|
|
// encodeContentBlockStart 编码内容块开始事件
|
|
func (e *StreamEncoder) encodeContentBlockStart(event canonical.CanonicalStreamEvent) [][]byte {
|
|
if event.ContentBlock == nil || event.Index == nil {
|
|
return nil
|
|
}
|
|
|
|
cb := map[string]any{
|
|
"type": event.ContentBlock.Type,
|
|
}
|
|
switch event.ContentBlock.Type {
|
|
case "text":
|
|
cb["text"] = ""
|
|
case "tool_use":
|
|
cb["id"] = event.ContentBlock.ID
|
|
cb["name"] = event.ContentBlock.Name
|
|
cb["input"] = map[string]any{}
|
|
case "thinking":
|
|
cb["thinking"] = ""
|
|
}
|
|
|
|
payload := map[string]any{
|
|
"type": "content_block_start",
|
|
"index": *event.Index,
|
|
"content_block": cb,
|
|
}
|
|
return e.marshalEvent("content_block_start", payload)
|
|
}
|
|
|
|
// encodeContentBlockDelta 编码内容块增量事件
|
|
func (e *StreamEncoder) encodeContentBlockDelta(event canonical.CanonicalStreamEvent) [][]byte {
|
|
if event.Delta == nil || event.Index == nil {
|
|
return nil
|
|
}
|
|
|
|
delta := map[string]any{
|
|
"type": event.Delta.Type,
|
|
}
|
|
switch canonical.DeltaType(event.Delta.Type) {
|
|
case canonical.DeltaTypeText:
|
|
delta["text"] = event.Delta.Text
|
|
case canonical.DeltaTypeInputJSON:
|
|
delta["partial_json"] = event.Delta.PartialJSON
|
|
case canonical.DeltaTypeThinking:
|
|
delta["thinking"] = event.Delta.Thinking
|
|
}
|
|
|
|
payload := map[string]any{
|
|
"type": "content_block_delta",
|
|
"index": *event.Index,
|
|
"delta": delta,
|
|
}
|
|
return e.marshalEvent("content_block_delta", payload)
|
|
}
|
|
|
|
// encodeContentBlockStop 编码内容块结束事件
|
|
func (e *StreamEncoder) encodeContentBlockStop(event canonical.CanonicalStreamEvent) [][]byte {
|
|
if event.Index == nil {
|
|
return nil
|
|
}
|
|
payload := map[string]any{
|
|
"type": "content_block_stop",
|
|
"index": *event.Index,
|
|
}
|
|
return e.marshalEvent("content_block_stop", payload)
|
|
}
|
|
|
|
// encodeMessageDelta 编码消息增量事件
|
|
func (e *StreamEncoder) encodeMessageDelta(event canonical.CanonicalStreamEvent) [][]byte {
|
|
delta := map[string]any{}
|
|
if event.StopReason != nil {
|
|
delta["stop_reason"] = mapCanonicalStopReason(*event.StopReason)
|
|
}
|
|
|
|
payload := map[string]any{
|
|
"type": "message_delta",
|
|
"delta": delta,
|
|
}
|
|
if event.Usage != nil {
|
|
payload["usage"] = map[string]any{
|
|
"output_tokens": event.Usage.OutputTokens,
|
|
}
|
|
}
|
|
return e.marshalEvent("message_delta", payload)
|
|
}
|
|
|
|
// encodeMessageStop 编码消息结束事件
|
|
func (e *StreamEncoder) encodeMessageStop(event canonical.CanonicalStreamEvent) [][]byte {
|
|
payload := map[string]any{"type": "message_stop"}
|
|
return e.marshalEvent("message_stop", payload)
|
|
}
|
|
|
|
// encodePing 编码心跳事件
|
|
func (e *StreamEncoder) encodePing() [][]byte {
|
|
payload := map[string]any{"type": "ping"}
|
|
return e.marshalEvent("ping", payload)
|
|
}
|
|
|
|
// encodeError 编码错误事件
|
|
func (e *StreamEncoder) encodeError(event canonical.CanonicalStreamEvent) [][]byte {
|
|
if event.Error == nil {
|
|
return nil
|
|
}
|
|
payload := map[string]any{
|
|
"type": "error",
|
|
"error": map[string]any{
|
|
"type": event.Error.Type,
|
|
"message": event.Error.Message,
|
|
},
|
|
}
|
|
return e.marshalEvent("error", payload)
|
|
}
|
|
|
|
// marshalEvent 序列化为 Anthropic 命名 SSE 事件
|
|
func (e *StreamEncoder) marshalEvent(eventType string, payload map[string]any) [][]byte {
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return [][]byte{[]byte(fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data))}
|
|
}
|