1
0
Files
nex/backend/internal/conversion/anthropic/stream_decoder.go
lanyuanxiaoyao 1dac347d3b refactor: 实现 ConversionEngine 协议转换引擎,替代旧 protocol 包
引入 Canonical Model 和 ProtocolAdapter 架构,支持 OpenAI/Anthropic 协议间
无缝转换,统一 ProxyHandler 替代分散的 OpenAI/Anthropic Handler,简化
ProviderClient 为协议无关的 HTTP 发送器,Provider 新增 protocol 字段。
2026-04-20 00:36:27 +08:00

284 lines
7.3 KiB
Go

package anthropic
import (
"encoding/json"
"fmt"
"strings"
"unicode/utf8"
"nex/backend/internal/conversion/canonical"
)
// StreamDecoder Anthropic 流式解码器
type StreamDecoder struct {
messageStarted bool
redactedBlocks map[int]bool
utf8Remainder []byte
accumulatedUsage *canonical.CanonicalUsage
}
// NewStreamDecoder 创建 Anthropic 流式解码器
func NewStreamDecoder() *StreamDecoder {
return &StreamDecoder{
redactedBlocks: make(map[int]bool),
}
}
// ProcessChunk 处理原始 SSE chunk
func (d *StreamDecoder) ProcessChunk(rawChunk []byte) []canonical.CanonicalStreamEvent {
data := rawChunk
if len(d.utf8Remainder) > 0 {
data = append(d.utf8Remainder, rawChunk...)
d.utf8Remainder = nil
}
if !utf8.Valid(data) {
validEnd := len(data)
for !utf8.Valid(data[:validEnd]) {
validEnd--
}
d.utf8Remainder = append(d.utf8Remainder, data[validEnd:]...)
data = data[:validEnd]
}
var events []canonical.CanonicalStreamEvent
text := string(data)
// 解析命名 SSE 事件
var eventType string
var eventData string
for _, line := range strings.Split(text, "\n") {
line = strings.TrimRight(line, "\r")
if strings.HasPrefix(line, "event: ") {
eventType = strings.TrimPrefix(line, "event: ")
} else if strings.HasPrefix(line, "data: ") {
eventData = strings.TrimPrefix(line, "data: ")
if eventType != "" && eventData != "" {
chunkEvents := d.processEvent(eventType, []byte(eventData))
events = append(events, chunkEvents...)
}
eventType = ""
eventData = ""
} else if line == "" {
// SSE 事件分隔符
}
}
return events
}
// Flush 刷新解码器状态
func (d *StreamDecoder) Flush() []canonical.CanonicalStreamEvent {
return nil
}
// processEvent 处理单个命名 SSE 事件
func (d *StreamDecoder) processEvent(eventType string, data []byte) []canonical.CanonicalStreamEvent {
switch eventType {
case "message_start":
return d.processMessageStart(data)
case "content_block_start":
return d.processContentBlockStart(data)
case "content_block_delta":
return d.processContentBlockDelta(data)
case "content_block_stop":
return d.processContentBlockStop(data)
case "message_delta":
return d.processMessageDelta(data)
case "message_stop":
return d.processMessageStop(data)
case "ping":
return []canonical.CanonicalStreamEvent{canonical.NewPingEvent()}
case "error":
return d.processError(data)
}
return nil
}
// processMessageStart 处理消息开始事件
func (d *StreamDecoder) processMessageStart(data []byte) []canonical.CanonicalStreamEvent {
var raw map[string]json.RawMessage
if err := json.Unmarshal(data, &raw); err != nil {
return nil
}
var msg struct {
ID string `json:"id"`
Model string `json:"model"`
Usage *struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
} `json:"usage"`
}
if msgRaw, ok := raw["message"]; ok {
if err := json.Unmarshal(msgRaw, &msg); err != nil {
return nil
}
}
event := canonical.NewMessageStartEvent(msg.ID, msg.Model)
if msg.Usage != nil {
usage := &canonical.CanonicalUsage{
InputTokens: msg.Usage.InputTokens,
OutputTokens: msg.Usage.OutputTokens,
}
event = canonical.NewMessageStartEventWithUsage(msg.ID, msg.Model, usage)
d.accumulatedUsage = usage
}
d.messageStarted = true
return []canonical.CanonicalStreamEvent{event}
}
// processContentBlockStart 处理内容块开始事件
func (d *StreamDecoder) processContentBlockStart(data []byte) []canonical.CanonicalStreamEvent {
var raw struct {
Index int `json:"index"`
ContentBlock struct {
Type string `json:"type"`
Text string `json:"text"`
ID string `json:"id"`
Name string `json:"name"`
Thinking string `json:"thinking"`
Data string `json:"data"`
} `json:"content_block"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return nil
}
// 检查需要丢弃的块类型
switch raw.ContentBlock.Type {
case "redacted_thinking", "server_tool_use", "web_search_tool_result",
"code_execution_tool_result":
d.redactedBlocks[raw.Index] = true
return nil
}
if d.redactedBlocks[raw.Index] {
return nil
}
block := canonical.StreamContentBlock{
Type: raw.ContentBlock.Type,
Text: raw.ContentBlock.Text,
ID: raw.ContentBlock.ID,
Name: raw.ContentBlock.Name,
Thinking: raw.ContentBlock.Thinking,
}
return []canonical.CanonicalStreamEvent{
canonical.NewContentBlockStartEvent(raw.Index, block),
}
}
// processContentBlockDelta 处理内容块增量事件
func (d *StreamDecoder) processContentBlockDelta(data []byte) []canonical.CanonicalStreamEvent {
var raw struct {
Index int `json:"index"`
Delta struct {
Type string `json:"type"`
Text string `json:"text"`
PartialJSON string `json:"partial_json"`
Thinking string `json:"thinking"`
} `json:"delta"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return nil
}
// 检查是否在丢弃的块中
if d.redactedBlocks[raw.Index] {
return nil
}
// 丢弃协议特有 delta 类型
switch raw.Delta.Type {
case "citations_delta", "signature_delta":
return nil
}
delta := canonical.StreamDelta{
Type: raw.Delta.Type,
Text: raw.Delta.Text,
PartialJSON: raw.Delta.PartialJSON,
Thinking: raw.Delta.Thinking,
}
return []canonical.CanonicalStreamEvent{
canonical.NewContentBlockDeltaEvent(raw.Index, delta),
}
}
// processContentBlockStop 处理内容块结束事件
func (d *StreamDecoder) processContentBlockStop(data []byte) []canonical.CanonicalStreamEvent {
var raw struct {
Index int `json:"index"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return nil
}
if _, redacted := d.redactedBlocks[raw.Index]; redacted {
delete(d.redactedBlocks, raw.Index)
return nil
}
return []canonical.CanonicalStreamEvent{
canonical.NewContentBlockStopEvent(raw.Index),
}
}
// processMessageDelta 处理消息增量事件
func (d *StreamDecoder) processMessageDelta(data []byte) []canonical.CanonicalStreamEvent {
var raw struct {
Delta struct {
StopReason string `json:"stop_reason"`
} `json:"delta"`
Usage struct {
OutputTokens int `json:"output_tokens"`
} `json:"usage"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return nil
}
sr := mapStopReason(raw.Delta.StopReason)
usage := &canonical.CanonicalUsage{
OutputTokens: raw.Usage.OutputTokens,
}
if d.accumulatedUsage != nil {
d.accumulatedUsage.OutputTokens += raw.Usage.OutputTokens
}
return []canonical.CanonicalStreamEvent{
canonical.NewMessageDeltaEventWithUsage(sr, usage),
}
}
// processMessageStop 处理消息结束事件
func (d *StreamDecoder) processMessageStop(data []byte) []canonical.CanonicalStreamEvent {
return []canonical.CanonicalStreamEvent{canonical.NewMessageStopEvent()}
}
// processError 处理错误事件
func (d *StreamDecoder) processError(data []byte) []canonical.CanonicalStreamEvent {
var raw struct {
Error struct {
Type string `json:"type"`
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return []canonical.CanonicalStreamEvent{
canonical.NewErrorEvent("stream_error", fmt.Sprintf("解析错误事件失败: %s", string(data))),
}
}
return []canonical.CanonicalStreamEvent{
canonical.NewErrorEvent(raw.Error.Type, raw.Error.Message),
}
}