1
0
Files
nex/backend/internal/conversion/anthropic/stream_decoder_test.go
lanyuanxiaoyao bc1ee612d9 refactor: 实现 ConversionEngine 协议转换引擎,替代旧 protocol 包
- 新增 ConversionEngine 核心引擎,支持 OpenAI 和 Anthropic 协议转换
- 添加 stream decoder/encoder 实现
- 更新 provider client 支持新引擎
- 补充单元测试和集成测试
- 更新 specs 文档
2026-04-20 13:02:28 +08:00

490 lines
13 KiB
Go

package anthropic
import (
"encoding/json"
"fmt"
"testing"
"nex/backend/internal/conversion/canonical"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func makeAnthropicEvent(eventType string, data any) []byte {
dataBytes, _ := json.Marshal(data)
return []byte(fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, string(dataBytes)))
}
func TestStreamDecoder_MessageStart(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "message_start",
"message": map[string]any{
"id": "msg_1",
"model": "claude-3",
"usage": map[string]any{"input_tokens": 10, "output_tokens": 0},
},
}
raw := makeAnthropicEvent("message_start", payload)
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
assert.Equal(t, canonical.EventMessageStart, events[0].Type)
assert.Equal(t, "msg_1", events[0].Message.ID)
assert.Equal(t, "claude-3", events[0].Message.Model)
}
func TestStreamDecoder_ContentBlockDelta(t *testing.T) {
d := NewStreamDecoder()
tests := []struct {
name string
deltaType string
deltaData map[string]any
checkField string
checkValue string
}{
{
name: "text_delta",
deltaType: "text_delta",
deltaData: map[string]any{"type": "text_delta", "text": "你好"},
checkField: "text",
checkValue: "你好",
},
{
name: "input_json_delta",
deltaType: "input_json_delta",
deltaData: map[string]any{"type": "input_json_delta", "partial_json": "{\"key\":"},
checkField: "partial_json",
checkValue: "{\"key\":",
},
{
name: "thinking_delta",
deltaType: "thinking_delta",
deltaData: map[string]any{"type": "thinking_delta", "thinking": "思考中"},
checkField: "thinking",
checkValue: "思考中",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payload := map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": tt.deltaData,
}
raw := makeAnthropicEvent("content_block_delta", payload)
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
assert.Equal(t, canonical.EventContentBlockDelta, events[0].Type)
assert.NotNil(t, events[0].Delta)
switch tt.checkField {
case "text":
assert.Equal(t, tt.checkValue, events[0].Delta.Text)
case "partial_json":
assert.Equal(t, tt.checkValue, events[0].Delta.PartialJSON)
case "thinking":
assert.Equal(t, tt.checkValue, events[0].Delta.Thinking)
}
})
}
}
func TestStreamDecoder_RedactedThinking(t *testing.T) {
d := NewStreamDecoder()
// redacted_thinking block start 应被抑制
payload := map[string]any{
"type": "content_block_start",
"index": 1,
"content_block": map[string]any{
"type": "redacted_thinking",
"data": "redacted-data",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
assert.True(t, d.redactedBlocks[1])
}
func TestStreamDecoder_RedactedBlockStopSuppressed(t *testing.T) {
d := NewStreamDecoder()
d.redactedBlocks[2] = true
// content_block_stop 对 redacted block 返回 nil
payload := map[string]any{
"type": "content_block_stop",
"index": 2,
}
raw := makeAnthropicEvent("content_block_stop", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
// 应清理 redactedBlocks
_, exists := d.redactedBlocks[2]
assert.False(t, exists)
}
func TestStreamDecoder_ContentBlockStart(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_start",
"index": 0,
"content_block": map[string]any{
"type": "text",
"text": "",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventContentBlockStart, events[0].Type)
require.NotNil(t, events[0].ContentBlock)
assert.Equal(t, "text", events[0].ContentBlock.Type)
require.NotNil(t, events[0].Index)
assert.Equal(t, 0, *events[0].Index)
}
func TestStreamDecoder_ContentBlockStart_ToolUse(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_start",
"index": 1,
"content_block": map[string]any{
"type": "tool_use",
"id": "toolu_1",
"name": "search",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventContentBlockStart, events[0].Type)
require.NotNil(t, events[0].ContentBlock)
assert.Equal(t, "tool_use", events[0].ContentBlock.Type)
assert.Equal(t, "toolu_1", events[0].ContentBlock.ID)
assert.Equal(t, "search", events[0].ContentBlock.Name)
}
func TestStreamDecoder_ContentBlockStop(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_stop",
"index": 0,
}
raw := makeAnthropicEvent("content_block_stop", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventContentBlockStop, events[0].Type)
require.NotNil(t, events[0].Index)
assert.Equal(t, 0, *events[0].Index)
}
func TestStreamDecoder_MessageDelta(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": "end_turn",
},
"usage": map[string]any{
"output_tokens": 42,
},
}
raw := makeAnthropicEvent("message_delta", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventMessageDelta, events[0].Type)
require.NotNil(t, events[0].StopReason)
assert.Equal(t, canonical.StopReasonEndTurn, *events[0].StopReason)
require.NotNil(t, events[0].Usage)
assert.Equal(t, 42, events[0].Usage.OutputTokens)
}
func TestStreamDecoder_MessageStop(t *testing.T) {
d := NewStreamDecoder()
raw := makeAnthropicEvent("message_stop", map[string]any{"type": "message_stop"})
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventMessageStop, events[0].Type)
}
func TestStreamDecoder_Ping(t *testing.T) {
d := NewStreamDecoder()
raw := makeAnthropicEvent("ping", map[string]any{"type": "ping"})
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventPing, events[0].Type)
}
func TestStreamDecoder_Error(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "error",
"error": map[string]any{
"type": "overloaded_error",
"message": "服务过载",
},
}
raw := makeAnthropicEvent("error", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventError, events[0].Type)
require.NotNil(t, events[0].Error)
assert.Equal(t, "overloaded_error", events[0].Error.Type)
assert.Equal(t, "服务过载", events[0].Error.Message)
}
func TestStreamDecoder_RedactedDeltaSuppressed(t *testing.T) {
d := NewStreamDecoder()
d.redactedBlocks[1] = true
payload := map[string]any{
"type": "content_block_delta",
"index": 1,
"delta": map[string]any{
"type": "text_delta",
"text": "被抑制的内容",
},
}
raw := makeAnthropicEvent("content_block_delta", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
}
func TestStreamDecoder_ServerToolUse_Suppressed(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_start",
"index": 2,
"content_block": map[string]any{
"type": "server_tool_use",
"id": "server_tool_1",
"name": "web_search",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
assert.True(t, d.redactedBlocks[2])
}
func TestStreamDecoder_WebSearchToolResult_Suppressed(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_start",
"index": 3,
"content_block": map[string]any{
"type": "web_search_tool_result",
"tool_use_id": "search_1",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
assert.True(t, d.redactedBlocks[3])
}
func TestStreamDecoder_CodeExecutionToolResult_Suppressed(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_start",
"index": 4,
"content_block": map[string]any{
"type": "code_execution_tool_result",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
assert.True(t, d.redactedBlocks[4])
}
func TestStreamDecoder_CitationsDelta_Discarded(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "citations_delta",
"citation": map[string]any{"title": "ref1"},
},
}
raw := makeAnthropicEvent("content_block_delta", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
}
func TestStreamDecoder_SignatureDelta_Discarded(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "signature_delta",
"signature": "sig_123",
},
}
raw := makeAnthropicEvent("content_block_delta", payload)
events := d.ProcessChunk(raw)
assert.Empty(t, events)
}
func TestStreamDecoder_UnknownEventType(t *testing.T) {
d := NewStreamDecoder()
raw := makeAnthropicEvent("unknown_event", map[string]any{"type": "unknown_event"})
events := d.ProcessChunk(raw)
assert.Empty(t, events)
}
func TestStreamDecoder_InvalidJSON(t *testing.T) {
d := NewStreamDecoder()
raw := []byte("event: message_start\ndata: {invalid}\n\n")
events := d.ProcessChunk(raw)
assert.Empty(t, events)
}
func TestStreamDecoder_MultipleEventsInSingleChunk(t *testing.T) {
d := NewStreamDecoder()
startPayload := map[string]any{
"type": "message_start",
"message": map[string]any{
"id": "msg_multi",
"model": "claude-3",
},
}
deltaPayload := map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "text_delta",
"text": "Hello",
},
}
stopPayload := map[string]any{"type": "message_stop"}
var raw []byte
raw = append(raw, makeAnthropicEvent("message_start", startPayload)...)
raw = append(raw, makeAnthropicEvent("content_block_delta", deltaPayload)...)
raw = append(raw, makeAnthropicEvent("message_stop", stopPayload)...)
events := d.ProcessChunk(raw)
require.Len(t, events, 3)
assert.Equal(t, canonical.EventMessageStart, events[0].Type)
assert.Equal(t, canonical.EventContentBlockDelta, events[1].Type)
assert.Equal(t, canonical.EventMessageStop, events[2].Type)
}
func TestStreamDecoder_ErrorInvalidJSON(t *testing.T) {
d := NewStreamDecoder()
raw := []byte("event: error\ndata: {invalid}\n\n")
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventError, events[0].Type)
assert.Contains(t, events[0].Error.Message, "解析错误事件失败")
}
func TestStreamDecoder_MessageStartWithUsage(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "message_start",
"message": map[string]any{
"id": "msg_usage",
"model": "claude-3",
"usage": map[string]any{"input_tokens": 25, "output_tokens": 0},
},
}
raw := makeAnthropicEvent("message_start", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventMessageStart, events[0].Type)
require.NotNil(t, events[0].Message.Usage)
assert.Equal(t, 25, events[0].Message.Usage.InputTokens)
}
func TestStreamDecoder_ThinkingBlockStart(t *testing.T) {
d := NewStreamDecoder()
payload := map[string]any{
"type": "content_block_start",
"index": 0,
"content_block": map[string]any{
"type": "thinking",
"thinking": "",
},
}
raw := makeAnthropicEvent("content_block_start", payload)
events := d.ProcessChunk(raw)
require.Len(t, events, 1)
assert.Equal(t, canonical.EventContentBlockStart, events[0].Type)
require.NotNil(t, events[0].ContentBlock)
assert.Equal(t, "thinking", events[0].ContentBlock.Type)
}
func TestStreamDecoder_MessageDelta_UsageNotAccumulated(t *testing.T) {
d := NewStreamDecoder()
startPayload := map[string]any{
"type": "message_start",
"message": map[string]any{
"id": "msg_usage_test",
"model": "claude-3",
"usage": map[string]any{"input_tokens": 10, "output_tokens": 0},
},
}
deltaPayload1 := map[string]any{
"type": "message_delta",
"delta": map[string]any{"stop_reason": "end_turn"},
"usage": map[string]any{"output_tokens": 25},
}
d.ProcessChunk(makeAnthropicEvent("message_start", startPayload))
events := d.ProcessChunk(makeAnthropicEvent("message_delta", deltaPayload1))
require.Len(t, events, 1)
assert.Equal(t, 25, events[0].Usage.OutputTokens)
deltaPayload2 := map[string]any{
"type": "message_delta",
"delta": map[string]any{"stop_reason": "end_turn"},
"usage": map[string]any{"output_tokens": 30},
}
events = d.ProcessChunk(makeAnthropicEvent("message_delta", deltaPayload2))
require.Len(t, events, 1)
assert.Equal(t, 30, events[0].Usage.OutputTokens, "output_tokens should be replaced, not accumulated")
assert.Equal(t, 30, d.accumulatedUsage.OutputTokens, "accumulated usage should match last value")
}