package anthropic import ( "encoding/json" "fmt" "nex/backend/internal/conversion/canonical" ) // StreamEncoder Anthropic 流式编码器 type StreamEncoder struct{} // NewStreamEncoder 创建 Anthropic 流式编码器 func NewStreamEncoder() *StreamEncoder { return &StreamEncoder{} } // EncodeEvent 编码 Canonical 事件为 Anthropic 命名 SSE 事件 func (e *StreamEncoder) EncodeEvent(event canonical.CanonicalStreamEvent) [][]byte { switch event.Type { case canonical.EventMessageStart: return e.encodeMessageStart(event) case canonical.EventContentBlockStart: return e.encodeContentBlockStart(event) case canonical.EventContentBlockDelta: return e.encodeContentBlockDelta(event) case canonical.EventContentBlockStop: return e.encodeContentBlockStop(event) case canonical.EventMessageDelta: return e.encodeMessageDelta(event) case canonical.EventMessageStop: return e.encodeMessageStop(event) case canonical.EventPing: return e.encodePing() case canonical.EventError: return e.encodeError(event) } return nil } // Flush 刷新缓冲区(无缓冲) func (e *StreamEncoder) Flush() [][]byte { return nil } // encodeMessageStart 编码消息开始事件 func (e *StreamEncoder) encodeMessageStart(event canonical.CanonicalStreamEvent) [][]byte { payload := map[string]any{ "type": "message_start", } if event.Message != nil { msg := map[string]any{ "id": event.Message.ID, "model": event.Message.Model, "role": "assistant", } if event.Message.Usage != nil { usage := map[string]any{ "input_tokens": event.Message.Usage.InputTokens, "output_tokens": event.Message.Usage.OutputTokens, } msg["usage"] = usage } payload["message"] = msg } return e.marshalEvent("message_start", payload) } // encodeContentBlockStart 编码内容块开始事件 func (e *StreamEncoder) encodeContentBlockStart(event canonical.CanonicalStreamEvent) [][]byte { if event.ContentBlock == nil || event.Index == nil { return nil } cb := map[string]any{ "type": event.ContentBlock.Type, } switch event.ContentBlock.Type { case "text": cb["text"] = "" case "tool_use": cb["id"] = event.ContentBlock.ID cb["name"] = event.ContentBlock.Name cb["input"] = map[string]any{} case "thinking": cb["thinking"] = "" } payload := map[string]any{ "type": "content_block_start", "index": *event.Index, "content_block": cb, } return e.marshalEvent("content_block_start", payload) } // encodeContentBlockDelta 编码内容块增量事件 func (e *StreamEncoder) encodeContentBlockDelta(event canonical.CanonicalStreamEvent) [][]byte { if event.Delta == nil || event.Index == nil { return nil } delta := map[string]any{ "type": event.Delta.Type, } switch canonical.DeltaType(event.Delta.Type) { case canonical.DeltaTypeText: delta["text"] = event.Delta.Text case canonical.DeltaTypeInputJSON: delta["partial_json"] = event.Delta.PartialJSON case canonical.DeltaTypeThinking: delta["thinking"] = event.Delta.Thinking } payload := map[string]any{ "type": "content_block_delta", "index": *event.Index, "delta": delta, } return e.marshalEvent("content_block_delta", payload) } // encodeContentBlockStop 编码内容块结束事件 func (e *StreamEncoder) encodeContentBlockStop(event canonical.CanonicalStreamEvent) [][]byte { if event.Index == nil { return nil } payload := map[string]any{ "type": "content_block_stop", "index": *event.Index, } return e.marshalEvent("content_block_stop", payload) } // encodeMessageDelta 编码消息增量事件 func (e *StreamEncoder) encodeMessageDelta(event canonical.CanonicalStreamEvent) [][]byte { delta := map[string]any{} if event.StopReason != nil { delta["stop_reason"] = mapCanonicalStopReason(*event.StopReason) } payload := map[string]any{ "type": "message_delta", "delta": delta, } if event.Usage != nil { payload["usage"] = map[string]any{ "output_tokens": event.Usage.OutputTokens, } } return e.marshalEvent("message_delta", payload) } // encodeMessageStop 编码消息结束事件 func (e *StreamEncoder) encodeMessageStop(event canonical.CanonicalStreamEvent) [][]byte { payload := map[string]any{"type": "message_stop"} return e.marshalEvent("message_stop", payload) } // encodePing 编码心跳事件 func (e *StreamEncoder) encodePing() [][]byte { payload := map[string]any{"type": "ping"} return e.marshalEvent("ping", payload) } // encodeError 编码错误事件 func (e *StreamEncoder) encodeError(event canonical.CanonicalStreamEvent) [][]byte { if event.Error == nil { return nil } payload := map[string]any{ "type": "error", "error": map[string]any{ "type": event.Error.Type, "message": event.Error.Message, }, } return e.marshalEvent("error", payload) } // marshalEvent 序列化为 Anthropic 命名 SSE 事件 func (e *StreamEncoder) marshalEvent(eventType string, payload map[string]any) [][]byte { data, err := json.Marshal(payload) if err != nil { return nil } return [][]byte{[]byte(fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data))} }