1
0
Files
nex/backend/internal/conversion/openai/stream_decoder_test.go
lanyuanxiaoyao 4c6b49099d feat: 配置 golangci-lint 静态分析并修复存量违规
- 新增 backend/.golangci.yml 配置 12 个 linter(forbidigo、errorlint、errcheck、staticcheck、revive、gocritic、gosec、bodyclose、noctx、nilerr、goimports、gocyclo)
- 新增 lefthook.yml 配置 pre-commit hook 自动运行 lint
- 修复存量代码违规:errors.Is/As 替换、zap.Error 替换、import 排序、errcheck 修复
- 更新 README 补充编码规范说明
- 归档 backend-code-lint 变更
2026-04-24 13:01:48 +08:00

473 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package openai
import (
"encoding/json"
"testing"
"nex/backend/internal/conversion/canonical"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func makeSSEData(payload string) []byte {
return []byte("data: " + payload + "\n\n")
}
func TestStreamDecoder_BasicText(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-1",
"object": "chat.completion.chunk",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "你好"},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
foundStart := false
foundDelta := false
for _, e := range events {
if e.Type == canonical.EventMessageStart {
foundStart = true
assert.Equal(t, "chatcmpl-1", e.Message.ID)
}
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil {
foundDelta = true
assert.Equal(t, "text_delta", e.Delta.Type)
assert.Equal(t, "你好", e.Delta.Text)
}
}
assert.True(t, foundStart)
assert.True(t, foundDelta)
}
func TestStreamDecoder_ToolCalls(t *testing.T) {
d := NewStreamDecoder()
idx := 0
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx,
"id": "call_1",
"type": "function",
"function": map[string]any{
"name": "get_weather",
"arguments": "{\"city\":\"北京\"}",
},
},
},
},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
found := false
for _, e := range events {
if e.Type == canonical.EventContentBlockStart && e.ContentBlock != nil && e.ContentBlock.Type == "tool_use" {
found = true
assert.Equal(t, "call_1", e.ContentBlock.ID)
assert.Equal(t, "get_weather", e.ContentBlock.Name)
}
}
assert.True(t, found)
}
func TestStreamDecoder_Thinking(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"reasoning_content": "思考中",
},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
found := false
for _, e := range events {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil && e.Delta.Type == "thinking_delta" {
found = true
assert.Equal(t, "思考中", e.Delta.Thinking)
}
}
assert.True(t, found)
}
func TestStreamDecoder_FinishReason(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{},
"finish_reason": "stop",
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
foundStop := false
foundMsgStop := false
for _, e := range events {
if e.Type == canonical.EventMessageDelta && e.StopReason != nil {
foundStop = true
assert.Equal(t, canonical.StopReasonEndTurn, *e.StopReason)
}
if e.Type == canonical.EventMessageStop {
foundMsgStop = true
}
}
assert.True(t, foundStop)
assert.True(t, foundMsgStop)
}
func TestStreamDecoder_DoneSignal(t *testing.T) {
d := NewStreamDecoder()
// 先发送一个文本 chunk
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "hi"},
},
},
}
data, _ := json.Marshal(chunk)
raw := append(makeSSEData(string(data)), []byte("data: [DONE]\n\n")...)
events := d.ProcessChunk(raw)
// 应该包含 block stop 事件([DONE] 触发 flushOpenBlocks
foundBlockStop := false
for _, e := range events {
if e.Type == canonical.EventContentBlockStop {
foundBlockStop = true
}
}
assert.True(t, foundBlockStop)
}
func TestStreamDecoder_RefusalReuse(t *testing.T) {
d := NewStreamDecoder()
// 连续两个 refusal delta chunk
for _, text := range []string{"拒绝", "原因"} {
chunk := map[string]any{
"id": "chatcmpl-1",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"refusal": text},
},
},
}
data, _ := json.Marshal(chunk)
raw := makeSSEData(string(data))
events := d.ProcessChunk(raw)
_ = events
}
// 检查只创建了一个 text blockrefusal 复用同一个 block
assert.Contains(t, d.openBlocks, d.refusalBlockIndex)
}
func makeChunkSSE(chunk map[string]any) []byte {
data, _ := json.Marshal(chunk)
return []byte("data: " + string(data) + "\n\n")
}
func TestStreamDecoder_UsageChunk(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-usage",
"object": "chat.completion.chunk",
"model": "gpt-4",
"choices": []any{},
"usage": map[string]any{
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150,
},
}
raw := makeChunkSSE(chunk)
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
found := false
for _, e := range events {
if e.Type == canonical.EventMessageDelta {
found = true
require.NotNil(t, e.Usage)
assert.Equal(t, 100, e.Usage.InputTokens)
assert.Equal(t, 50, e.Usage.OutputTokens)
}
}
assert.True(t, found)
}
func TestStreamDecoder_MultipleToolCalls(t *testing.T) {
d := NewStreamDecoder()
idx0 := 0
chunk1 := map[string]any{
"id": "chatcmpl-mt",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx0,
"id": "call_a",
"type": "function",
"function": map[string]any{
"name": "func_a",
"arguments": "{}",
},
},
},
},
},
},
}
idx1 := 1
chunk2 := map[string]any{
"id": "chatcmpl-mt",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx1,
"id": "call_b",
"type": "function",
"function": map[string]any{
"name": "func_b",
"arguments": "{}",
},
},
},
},
},
},
}
events1 := d.ProcessChunk(makeChunkSSE(chunk1))
require.NotEmpty(t, events1)
events2 := d.ProcessChunk(makeChunkSSE(chunk2))
require.NotEmpty(t, events2)
blockIndices := map[int]bool{}
for _, e := range append(events1, events2...) {
if e.Type == canonical.EventContentBlockStart && e.ContentBlock != nil && e.ContentBlock.Type == "tool_use" {
require.NotNil(t, e.Index)
blockIndices[*e.Index] = true
}
}
assert.Equal(t, 2, len(blockIndices), "两个 tool call 应分配不同的 block 索引")
}
func TestStreamDecoder_Flush(t *testing.T) {
d := NewStreamDecoder()
result := d.Flush()
assert.Nil(t, result)
}
func TestStreamDecoder_MultipleChunks_Text(t *testing.T) {
d := NewStreamDecoder()
chunk1 := map[string]any{
"id": "chatcmpl-multi",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "你好"},
},
},
}
chunk2 := map[string]any{
"id": "chatcmpl-multi",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "世界"},
},
},
}
raw := append(makeChunkSSE(chunk1), makeChunkSSE(chunk2)...)
events := d.ProcessChunk(raw)
deltas := []string{}
for _, e := range events {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil && e.Delta.Type == "text_delta" {
deltas = append(deltas, e.Delta.Text)
}
}
assert.Equal(t, []string{"你好", "世界"}, deltas)
}
func TestStreamDecoder_UTF8Truncation(t *testing.T) {
d := NewStreamDecoder()
chunk := map[string]any{
"id": "chatcmpl-utf8",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{"content": "你"},
},
},
}
data, _ := json.Marshal(chunk)
sseData := []byte("data: " + string(data) + "\n\n")
mid := len(sseData) - 5
part1 := sseData[:mid]
part2 := sseData[mid:]
events1 := d.ProcessChunk(part1)
for _, e := range events1 {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil {
assert.Equal(t, "你", e.Delta.Text)
}
}
events2 := d.ProcessChunk(part2)
_ = events2
}
func TestStreamDecoder_ToolCallSubsequentDelta(t *testing.T) {
d := NewStreamDecoder()
idx := 0
chunk1 := map[string]any{
"id": "chatcmpl-tc",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx,
"id": "call_1",
"type": "function",
"function": map[string]any{
"name": "get_weather",
"arguments": "",
},
},
},
},
},
},
}
chunk2 := map[string]any{
"id": "chatcmpl-tc",
"model": "gpt-4",
"choices": []any{
map[string]any{
"index": 0,
"delta": map[string]any{
"tool_calls": []any{
map[string]any{
"index": &idx,
"function": map[string]any{
"arguments": "{\"city\":\"Beijing\"}",
},
},
},
},
},
},
}
events1 := d.ProcessChunk(makeChunkSSE(chunk1))
require.NotEmpty(t, events1)
events2 := d.ProcessChunk(makeChunkSSE(chunk2))
require.NotEmpty(t, events2)
foundInputJSON := false
for _, e := range events2 {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil && e.Delta.Type == "input_json_delta" {
foundInputJSON = true
assert.Equal(t, "{\"city\":\"Beijing\"}", e.Delta.PartialJSON)
}
}
assert.True(t, foundInputJSON, "subsequent tool call delta should emit input_json_delta")
}
func TestStreamDecoder_InvalidJSON(t *testing.T) {
d := NewStreamDecoder()
raw := []byte("data: {invalid json}\n\n")
events := d.ProcessChunk(raw)
assert.Nil(t, events)
}
func TestStreamDecoder_NonDataLines(t *testing.T) {
d := NewStreamDecoder()
raw := []byte(": comment line\ndata: {\"id\":\"1\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\n")
events := d.ProcessChunk(raw)
require.NotEmpty(t, events)
found := false
for _, e := range events {
if e.Type == canonical.EventContentBlockDelta && e.Delta != nil {
found = true
assert.Equal(t, "hi", e.Delta.Text)
}
}
assert.True(t, found)
}