1
0
Files
nex/backend/internal/conversion/openai/stream_decoder_test.go
lanyuanxiaoyao bc1ee612d9 refactor: 实现 ConversionEngine 协议转换引擎,替代旧 protocol 包
- 新增 ConversionEngine 核心引擎,支持 OpenAI 和 Anthropic 协议转换
- 添加 stream decoder/encoder 实现
- 更新 provider client 支持新引擎
- 补充单元测试和集成测试
- 更新 specs 文档
2026-04-20 13:02:28 +08:00

473 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package openai
import (
"encoding/json"
"testing"
"nex/backend/internal/conversion/canonical"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func makeSSEData(payload string) []byte {
return []byte("data: " + payload + "\n\n")
}
func TestStreamDecoder_BasicText(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-1",
"object": "chat.completion.chunk",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "你好"},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
foundStart := false
foundDelta := false
for _, e := range events {
if e.Type == canonical.EventMessageStart {
foundStart = true
assert.Equal(t, "chatcmpl-1", e.Message.ID)
}
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil {
foundDelta = true
assert.Equal(t, "text_delta", e.Delta.Type)
assert.Equal(t, "你好", e.Delta.Text)
}
}
assert.True(t, foundStart)
assert.True(t, foundDelta)
}
func TestStreamDecoder_ToolCalls(t *testing.T) {
d := NewStreamDecoder()
idx := 0
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx,
"id": "call_1",
"type": "function",
"function": map[string]any{
"name": "get_weather",
"arguments": "{\"city\":\"北京\"}",
},
},
},
},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
found := false
for _, e := range events {
if e.Type == canonical.EventContentBlockStart && e.ContentBlock != nil && e.ContentBlock.Type == "tool_use" {
found = true
assert.Equal(t, "call_1", e.ContentBlock.ID)
assert.Equal(t, "get_weather", e.ContentBlock.Name)
}
}
assert.True(t, found)
}
func TestStreamDecoder_Thinking(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"reasoning_content": "思考中",
},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
found := false
for _, e := range events {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil && e.Delta.Type == "thinking_delta" {
found = true
assert.Equal(t, "思考中", e.Delta.Thinking)
}
}
assert.True(t, found)
}
func TestStreamDecoder_FinishReason(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{},
"finish_reason": "stop",
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
foundStop := false
foundMsgStop := false
for _, e := range events {
if e.Type == canonical.EventMessageDelta && e.StopReason != nil {
foundStop = true
assert.Equal(t, canonical.StopReasonEndTurn, *e.StopReason)
}
if e.Type == canonical.EventMessageStop {
foundMsgStop = true
}
}
assert.True(t, foundStop)
assert.True(t, foundMsgStop)
}
func TestStreamDecoder_DoneSignal(t *testing.T) {
d := NewStreamDecoder()
// 先发送一个文本 chunk
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "hi"},
},
},
}
data, _ := json.Marshal(chunk)
raw := append(makeSSEData(string(data)), []byte("data: [DONE]\n\n")...)
events := d.ProcessChunk(raw)
// 应该包含 block stop 事件([DONE] 触发 flushOpenBlocks
foundBlockStop := false
for _, e := range events {
if e.Type == canonical.EventContentBlockStop {
foundBlockStop = true
}
}
assert.True(t, foundBlockStop)
}
func TestStreamDecoder_RefusalReuse(t *testing.T) {
d := NewStreamDecoder()
// 连续两个 refusal delta chunk
for _, text := range []string{"拒绝", "原因"} {
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"refusal": text},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
_ = events
}
// 检查只创建了一个 text blockrefusal 复用同一个 block
assert.Contains(t, d.openBlocks, d.refusalBlockIndex)
}
func makeChunkSSE(chunk map[string]any) []byte {
data, _ := json.Marshal(chunk)
return []byte("data: " + string(data) + "\n\n")
}
func TestStreamDecoder_UsageChunk(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-usage",
"object": "chat.completion.chunk",
"model": "gpt-4",
"choices": []any{},
"usage": map[string]any{
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150,
},
}
raw := makeChunkSSE(chunk)
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
found := false
for _, e := range events {
if e.Type == canonical.EventMessageDelta {
found = true
require.NotNil(t, e.Usage)
assert.Equal(t, 100, e.Usage.InputTokens)
assert.Equal(t, 50, e.Usage.OutputTokens)
}
}
assert.True(t, found)
}
func TestStreamDecoder_MultipleToolCalls(t *testing.T) {
d := NewStreamDecoder()
idx0 := 0
chunk1 := map[string]any{
"id": "chatcmpl-mt",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx0,
"id": "call_a",
"type": "function",
"function": map[string]any{
"name": "func_a",
"arguments": "{}",
},
},
},
},
},
},
}
idx1 := 1
chunk2 := map[string]any{
"id": "chatcmpl-mt",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx1,
"id": "call_b",
"type": "function",
"function": map[string]any{
"name": "func_b",
"arguments": "{}",
},
},
},
},
},
},
}
events1 := d.ProcessChunk(makeChunkSSE(chunk1))
require.NotEmpty(t, events1)
events2 := d.ProcessChunk(makeChunkSSE(chunk2))
require.NotEmpty(t, events2)
blockIndices := map[int]bool{}
for _, e := range append(events1, events2...) {
if e.Type == canonical.EventContentBlockStart && e.ContentBlock != nil && e.ContentBlock.Type == "tool_use" {
require.NotNil(t, e.Index)
blockIndices[*e.Index] = true
}
}
assert.Equal(t, 2, len(blockIndices), "两个 tool call 应分配不同的 block 索引")
}
func TestStreamDecoder_Flush(t *testing.T) {
d := NewStreamDecoder()
result := d.Flush()
assert.Nil(t, result)
}
func TestStreamDecoder_MultipleChunks_Text(t *testing.T) {
d := NewStreamDecoder()
chunk1 := map[string]any{
"id": "chatcmpl-multi",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "你好"},
},
},
}
chunk2 := map[string]any{
"id": "chatcmpl-multi",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "世界"},
},
},
}
raw := append(makeChunkSSE(chunk1), makeChunkSSE(chunk2)...)
events := d.ProcessChunk(raw)
deltas := []string{}
for _, e := range events {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil && e.Delta.Type == "text_delta" {
deltas = append(deltas, e.Delta.Text)
}
}
assert.Equal(t, []string{"你好", "世界"}, deltas)
}
func TestStreamDecoder_UTF8Truncation(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-utf8",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "你"},
},
},
}
data, _ := json.Marshal(chunk)
sseData := []byte("data: " + string(data) + "\n\n")
mid := len(sseData) - 5
part1 := sseData[:mid]
part2 := sseData[mid:]
events1 := d.ProcessChunk(part1)
for _, e := range events1 {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil {
assert.Equal(t, "你", e.Delta.Text)
}
}
events2 := d.ProcessChunk(part2)
_ = events2
}
func TestStreamDecoder_ToolCallSubsequentDelta(t *testing.T) {
d := NewStreamDecoder()
idx := 0
chunk1 := map[string]any{
"id": "chatcmpl-tc",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx,
"id": "call_1",
"type": "function",
"function": map[string]any{
"name": "get_weather",
"arguments": "",
},
},
},
},
},
},
}
chunk2 := map[string]any{
"id": "chatcmpl-tc",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx,
"function": map[string]any{
"arguments": "{\"city\":\"Beijing\"}",
},
},
},
},
},
},
}
events1 := d.ProcessChunk(makeChunkSSE(chunk1))
require.NotEmpty(t, events1)
events2 := d.ProcessChunk(makeChunkSSE(chunk2))
require.NotEmpty(t, events2)
foundInputJSON := false
for _, e := range events2 {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil && e.Delta.Type == "input_json_delta" {
foundInputJSON = true
assert.Equal(t, "{\"city\":\"Beijing\"}", e.Delta.PartialJSON)
}
}
assert.True(t, foundInputJSON, "subsequent tool call delta should emit input_json_delta")
}
func TestStreamDecoder_InvalidJSON(t *testing.T) {
d := NewStreamDecoder()
raw := []byte("data: {invalid json}\n\n")
events := d.ProcessChunk(raw)
assert.Nil(t, events)
}
func TestStreamDecoder_NonDataLines(t *testing.T) {
d := NewStreamDecoder()
raw := []byte(": comment line\ndata: {\"id\":\"1\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\n")
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
found := false
for _, e := range events {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil {
found = true
assert.Equal(t, "hi", e.Delta.Text)
}
}
assert.True(t, found)
}