1
0

feat: 实现分层架构,包含 domain、service、repository 和 pkg 层

- 新增 domain 层:model、provider、route、stats 实体
- 新增 service 层:models、providers、routing、stats 业务逻辑
- 新增 repository 层:models、providers、stats 数据访问
- 新增 pkg 工具包:errors、logger、validator
- 新增中间件:CORS、logging、recovery、request ID
- 新增数据库迁移:初始 schema 和索引
- 新增单元测试和集成测试
- 新增规范文档:config-management、database-migration、error-handling、layered-architecture、middleware-system、request-validation、structured-logging、test-coverage
- 移除 config 子包和 model_router(已迁移至分层架构)
This commit is contained in:
2026-04-16 00:47:20 +08:00
parent 915b004924
commit f18904af1e
77 changed files with 5727 additions and 1257 deletions

View File

@@ -9,22 +9,59 @@ import (
"strings"
"time"
"go.uber.org/zap"
"nex/backend/internal/protocol/openai"
)
// StreamConfig 流式处理配置
type StreamConfig struct {
InitialBufferSize int // 初始缓冲区大小(字节),默认 4096
MaxBufferSize int // 最大缓冲区大小(字节),默认 65536
Timeout time.Duration // 流超时时间,默认 5 分钟
ChannelBufferSize int // 事件通道缓冲区大小,默认 100
}
// DefaultStreamConfig 返回默认流式处理配置
func DefaultStreamConfig() StreamConfig {
return StreamConfig{
InitialBufferSize: 4096,
MaxBufferSize: 65536,
Timeout: 5 * time.Minute,
ChannelBufferSize: 100,
}
}
// Client OpenAI 兼容供应商客户端
type Client struct {
httpClient *http.Client
adapter *openai.Adapter
httpClient *http.Client
adapter *openai.Adapter
logger *zap.Logger
streamCfg StreamConfig
}
// StreamEvent 流事件
type StreamEvent struct {
Data []byte
Error error
Done bool
}
// ProviderClient 供应商客户端接口
type ProviderClient interface {
SendRequest(ctx context.Context, req *openai.ChatCompletionRequest, apiKey, baseURL string) (*openai.ChatCompletionResponse, error)
SendStreamRequest(ctx context.Context, req *openai.ChatCompletionRequest, apiKey, baseURL string) (<-chan StreamEvent, error)
}
// NewClient 创建供应商客户端
func NewClient() *Client {
return &Client{
httpClient: &http.Client{
Timeout: 30 * time.Second, // 非流式请求超时
Timeout: 30 * time.Second,
},
adapter: openai.NewAdapter(),
adapter: openai.NewAdapter(),
logger: zap.L(),
streamCfg: DefaultStreamConfig(),
}
}
@@ -36,10 +73,10 @@ func (c *Client) SendRequest(ctx context.Context, req *openai.ChatCompletionRequ
return nil, fmt.Errorf("准备请求失败: %w", err)
}
// 调试日志:打印完整请求信息
fmt.Printf("[DEBUG] 请求URL: %s\n", httpReq.URL.String())
fmt.Printf("[DEBUG] 请求Method: %s\n", httpReq.Method)
fmt.Printf("[DEBUG] 请求Headers: %v\n", httpReq.Header)
c.logger.Debug("发送请求",
zap.String("url", httpReq.URL.String()),
zap.String("method", httpReq.Method),
)
// 设置上下文
httpReq = httpReq.WithContext(ctx)
@@ -80,18 +117,22 @@ func (c *Client) SendStreamRequest(ctx context.Context, req *openai.ChatCompleti
return nil, fmt.Errorf("准备请求失败: %w", err)
}
// 设置上下文
httpReq = httpReq.WithContext(ctx)
// 设置带超时的上下文
streamCtx, cancel := context.WithTimeout(ctx, c.streamCfg.Timeout)
_ = cancel // cancel 在流读取结束后由 ctx 传播处理
httpReq = httpReq.WithContext(streamCtx)
// 发送请求
resp, err := c.httpClient.Do(httpReq)
if err != nil {
cancel()
return nil, fmt.Errorf("发送请求失败: %w", err)
}
// 检查状态码
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
cancel()
errorResp, parseErr := c.adapter.ParseErrorResponse(resp)
if parseErr != nil {
return nil, fmt.Errorf("供应商返回错误: HTTP %d", resp.StatusCode)
@@ -100,33 +141,33 @@ func (c *Client) SendStreamRequest(ctx context.Context, req *openai.ChatCompleti
}
// 创建事件通道
eventChan := make(chan StreamEvent, 100)
eventChan := make(chan StreamEvent, c.streamCfg.ChannelBufferSize)
// 启动 goroutine 读取流
go c.readStream(ctx, resp.Body, eventChan)
go c.readStream(streamCtx, cancel, resp.Body, eventChan)
return eventChan, nil
}
// StreamEvent 流事件
type StreamEvent struct {
Data []byte
Error error
Done bool
}
// readStream 读取 SSE 流
func (c *Client) readStream(ctx context.Context, body io.ReadCloser, eventChan chan<- StreamEvent) {
// readStream 读取 SSE 流(支持动态缓冲区、超时控制和改进的错误处理)
func (c *Client) readStream(ctx context.Context, cancel context.CancelFunc, body io.ReadCloser, eventChan chan<- StreamEvent) {
defer close(eventChan)
defer body.Close()
defer cancel()
buf := make([]byte, 4096)
bufSize := c.streamCfg.InitialBufferSize
buf := make([]byte, bufSize)
var dataBuf []byte
for {
select {
case <-ctx.Done():
eventChan <- StreamEvent{Error: ctx.Err()}
if ctx.Err() == context.DeadlineExceeded {
c.logger.Warn("流读取超时")
eventChan <- StreamEvent{Error: fmt.Errorf("流读取超时: %w", ctx.Err())}
} else {
eventChan <- StreamEvent{Error: ctx.Err()}
}
return
default:
}
@@ -134,15 +175,32 @@ func (c *Client) readStream(ctx context.Context, body io.ReadCloser, eventChan c
n, err := body.Read(buf)
if err != nil {
if err == io.EOF {
// 流结束
// 流正常结束
return
}
eventChan <- StreamEvent{Error: err}
// 区分网络错误和其他错误
if isNetworkError(err) {
c.logger.Error("流网络错误", zap.String("error", err.Error()))
eventChan <- StreamEvent{Error: fmt.Errorf("网络错误: %w", err)}
} else {
c.logger.Error("流读取错误", zap.String("error", err.Error()))
eventChan <- StreamEvent{Error: fmt.Errorf("读取错误: %w", err)}
}
return
}
dataBuf = append(dataBuf, buf[:n]...)
// 动态调整缓冲区大小:如果数据量大,增大缓冲区
if len(dataBuf) > bufSize/2 && bufSize < c.streamCfg.MaxBufferSize {
newSize := bufSize * 2
if newSize > c.streamCfg.MaxBufferSize {
newSize = c.streamCfg.MaxBufferSize
}
buf = make([]byte, newSize)
bufSize = newSize
}
// 处理完整的 SSE 事件
for {
// 查找事件边界(双换行)
@@ -175,3 +233,16 @@ func (c *Client) readStream(ctx context.Context, body io.ReadCloser, eventChan c
}
}
// isNetworkError 判断是否为网络相关错误
func isNetworkError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "connection reset") ||
strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "network") ||
strings.Contains(errStr, "timeout") ||
strings.Contains(errStr, "EOF")
}

View File

@@ -0,0 +1,151 @@
package provider
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"nex/backend/internal/protocol/openai"
)
func TestNewClient(t *testing.T) {
client := NewClient()
require.NotNil(t, client)
assert.NotNil(t, client.httpClient)
assert.NotNil(t, client.adapter)
assert.Equal(t, 4096, client.streamCfg.InitialBufferSize)
assert.Equal(t, 65536, client.streamCfg.MaxBufferSize)
assert.Equal(t, 100, client.streamCfg.ChannelBufferSize)
}
func TestDefaultStreamConfig(t *testing.T) {
cfg := DefaultStreamConfig()
assert.Equal(t, 4096, cfg.InitialBufferSize)
assert.Equal(t, 65536, cfg.MaxBufferSize)
assert.Equal(t, 100, cfg.ChannelBufferSize)
}
func TestClient_SendRequest_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "POST", r.Method)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
assert.Equal(t, "Bearer test-key", r.Header.Get("Authorization"))
resp := openai.ChatCompletionResponse{
ID: "chatcmpl-123",
Choices: []openai.Choice{
{Index: 0, Message: &openai.Message{Role: "assistant", Content: "Hello!"}},
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient()
req := &openai.ChatCompletionRequest{
Model: "gpt-4",
Messages: []openai.Message{{Role: "user", Content: "Hi"}},
}
result, err := client.SendRequest(context.Background(), req, "test-key", server.URL)
require.NoError(t, err)
assert.Equal(t, "chatcmpl-123", result.ID)
}
func TestClient_SendRequest_ErrorResponse(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(w).Encode(openai.ErrorResponse{
Error: openai.ErrorDetail{Message: "Invalid API key"},
})
}))
defer server.Close()
client := NewClient()
req := &openai.ChatCompletionRequest{
Model: "gpt-4",
Messages: []openai.Message{{Role: "user", Content: "Hi"}},
}
_, err := client.SendRequest(context.Background(), req, "bad-key", server.URL)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Invalid API key")
}
func TestClient_SendRequest_ConnectionError(t *testing.T) {
client := NewClient()
req := &openai.ChatCompletionRequest{
Model: "gpt-4",
Messages: []openai.Message{{Role: "user", Content: "Hi"}},
}
_, err := client.SendRequest(context.Background(), req, "key", "http://localhost:1")
assert.Error(t, err)
}
func TestClient_SendStreamRequest_CreatesChannel(t *testing.T) {
// 使用一个慢服务器确保客户端有时间读取
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewClient()
req := &openai.ChatCompletionRequest{
Model: "gpt-4",
Messages: []openai.Message{{Role: "user", Content: "Hi"}},
}
eventChan, err := client.SendStreamRequest(context.Background(), req, "test-key", server.URL)
require.NoError(t, err)
require.NotNil(t, eventChan)
// 读取直到 channel 关闭(服务器关闭后应产生 EOF
for range eventChan {
// 消费所有事件
}
// channel 应已关闭(不阻塞即通过)
}
func TestClient_SendStreamRequest_ErrorResponse(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
client := NewClient()
req := &openai.ChatCompletionRequest{
Model: "gpt-4",
Messages: []openai.Message{{Role: "user", Content: "Hi"}},
}
_, err := client.SendStreamRequest(context.Background(), req, "key", server.URL)
assert.Error(t, err)
}
func TestIsNetworkError(t *testing.T) {
tests := []struct {
input string
want bool
}{
{"connection reset by peer", true},
{"broken pipe", true},
{"network is unreachable", true},
{"timeout waiting for response", true},
{"unexpected EOF", true},
{"normal error", false},
{"", false},
}
for _, tt := range tests {
err := fmt.Errorf("%s", tt.input) //nolint:govet
assert.Equal(t, tt.want, isNetworkError(err), "isNetworkError(%q)", tt.input)
}
}