package anthropic import ( "encoding/json" "fmt" "strings" "unicode/utf8" "nex/backend/internal/conversion/canonical" ) // StreamDecoder Anthropic 流式解码器 type StreamDecoder struct { messageStarted bool redactedBlocks map[int]bool utf8Remainder []byte accumulatedUsage *canonical.CanonicalUsage } // NewStreamDecoder 创建 Anthropic 流式解码器 func NewStreamDecoder() *StreamDecoder { return &StreamDecoder{ redactedBlocks: make(map[int]bool), } } // ProcessChunk 处理原始 SSE chunk func (d *StreamDecoder) ProcessChunk(rawChunk []byte) []canonical.CanonicalStreamEvent { data := rawChunk if len(d.utf8Remainder) > 0 { data = append(d.utf8Remainder, rawChunk...) d.utf8Remainder = nil } if !utf8.Valid(data) { validEnd := len(data) for !utf8.Valid(data[:validEnd]) { validEnd-- } d.utf8Remainder = append(d.utf8Remainder, data[validEnd:]...) data = data[:validEnd] } var events []canonical.CanonicalStreamEvent text := string(data) // 解析命名 SSE 事件 var eventType string var eventData string for _, line := range strings.Split(text, "\n") { line = strings.TrimRight(line, "\r") if strings.HasPrefix(line, "event: ") { eventType = strings.TrimPrefix(line, "event: ") } else if strings.HasPrefix(line, "data: ") { eventData = strings.TrimPrefix(line, "data: ") if eventType != "" && eventData != "" { chunkEvents := d.processEvent(eventType, []byte(eventData)) events = append(events, chunkEvents...) } eventType = "" eventData = "" } else if line == "" { // SSE 事件分隔符 } } return events } // Flush 刷新解码器状态 func (d *StreamDecoder) Flush() []canonical.CanonicalStreamEvent { return nil } // processEvent 处理单个命名 SSE 事件 func (d *StreamDecoder) processEvent(eventType string, data []byte) []canonical.CanonicalStreamEvent { switch eventType { case "message_start": return d.processMessageStart(data) case "content_block_start": return d.processContentBlockStart(data) case "content_block_delta": return d.processContentBlockDelta(data) case "content_block_stop": return d.processContentBlockStop(data) case "message_delta": return d.processMessageDelta(data) case "message_stop": return d.processMessageStop(data) case "ping": return []canonical.CanonicalStreamEvent{canonical.NewPingEvent()} case "error": return d.processError(data) } return nil } // processMessageStart 处理消息开始事件 func (d *StreamDecoder) processMessageStart(data []byte) []canonical.CanonicalStreamEvent { var raw map[string]json.RawMessage if err := json.Unmarshal(data, &raw); err != nil { return nil } var msg struct { ID string `json:"id"` Model string `json:"model"` Usage *struct { InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` } `json:"usage"` } if msgRaw, ok := raw["message"]; ok { if err := json.Unmarshal(msgRaw, &msg); err != nil { return nil } } event := canonical.NewMessageStartEvent(msg.ID, msg.Model) if msg.Usage != nil { usage := &canonical.CanonicalUsage{ InputTokens: msg.Usage.InputTokens, OutputTokens: msg.Usage.OutputTokens, } event = canonical.NewMessageStartEventWithUsage(msg.ID, msg.Model, usage) d.accumulatedUsage = usage } d.messageStarted = true return []canonical.CanonicalStreamEvent{event} } // processContentBlockStart 处理内容块开始事件 func (d *StreamDecoder) processContentBlockStart(data []byte) []canonical.CanonicalStreamEvent { var raw struct { Index int `json:"index"` ContentBlock struct { Type string `json:"type"` Text string `json:"text"` ID string `json:"id"` Name string `json:"name"` Thinking string `json:"thinking"` Data string `json:"data"` } `json:"content_block"` } if err := json.Unmarshal(data, &raw); err != nil { return nil } // 检查需要丢弃的块类型 switch raw.ContentBlock.Type { case "redacted_thinking", "server_tool_use", "web_search_tool_result", "code_execution_tool_result": d.redactedBlocks[raw.Index] = true return nil } if d.redactedBlocks[raw.Index] { return nil } block := canonical.StreamContentBlock{ Type: raw.ContentBlock.Type, Text: raw.ContentBlock.Text, ID: raw.ContentBlock.ID, Name: raw.ContentBlock.Name, Thinking: raw.ContentBlock.Thinking, } return []canonical.CanonicalStreamEvent{ canonical.NewContentBlockStartEvent(raw.Index, block), } } // processContentBlockDelta 处理内容块增量事件 func (d *StreamDecoder) processContentBlockDelta(data []byte) []canonical.CanonicalStreamEvent { var raw struct { Index int `json:"index"` Delta struct { Type string `json:"type"` Text string `json:"text"` PartialJSON string `json:"partial_json"` Thinking string `json:"thinking"` } `json:"delta"` } if err := json.Unmarshal(data, &raw); err != nil { return nil } // 检查是否在丢弃的块中 if d.redactedBlocks[raw.Index] { return nil } // 丢弃协议特有 delta 类型 switch raw.Delta.Type { case "citations_delta", "signature_delta": return nil } delta := canonical.StreamDelta{ Type: raw.Delta.Type, Text: raw.Delta.Text, PartialJSON: raw.Delta.PartialJSON, Thinking: raw.Delta.Thinking, } return []canonical.CanonicalStreamEvent{ canonical.NewContentBlockDeltaEvent(raw.Index, delta), } } // processContentBlockStop 处理内容块结束事件 func (d *StreamDecoder) processContentBlockStop(data []byte) []canonical.CanonicalStreamEvent { var raw struct { Index int `json:"index"` } if err := json.Unmarshal(data, &raw); err != nil { return nil } if _, redacted := d.redactedBlocks[raw.Index]; redacted { delete(d.redactedBlocks, raw.Index) return nil } return []canonical.CanonicalStreamEvent{ canonical.NewContentBlockStopEvent(raw.Index), } } // processMessageDelta 处理消息增量事件 func (d *StreamDecoder) processMessageDelta(data []byte) []canonical.CanonicalStreamEvent { var raw struct { Delta struct { StopReason string `json:"stop_reason"` } `json:"delta"` Usage struct { OutputTokens int `json:"output_tokens"` } `json:"usage"` } if err := json.Unmarshal(data, &raw); err != nil { return nil } sr := mapStopReason(raw.Delta.StopReason) usage := &canonical.CanonicalUsage{ OutputTokens: raw.Usage.OutputTokens, } if d.accumulatedUsage != nil { d.accumulatedUsage.OutputTokens = raw.Usage.OutputTokens } return []canonical.CanonicalStreamEvent{ canonical.NewMessageDeltaEventWithUsage(sr, usage), } } // processMessageStop 处理消息结束事件 func (d *StreamDecoder) processMessageStop(data []byte) []canonical.CanonicalStreamEvent { return []canonical.CanonicalStreamEvent{canonical.NewMessageStopEvent()} } // processError 处理错误事件 func (d *StreamDecoder) processError(data []byte) []canonical.CanonicalStreamEvent { var raw struct { Error struct { Type string `json:"type"` Message string `json:"message"` } `json:"error"` } if err := json.Unmarshal(data, &raw); err != nil { return []canonical.CanonicalStreamEvent{ canonical.NewErrorEvent("stream_error", fmt.Sprintf("解析错误事件失败: %s", string(data))), } } return []canonical.CanonicalStreamEvent{ canonical.NewErrorEvent(raw.Error.Type, raw.Error.Message), } }