1
0
Files
nex/backend/internal/provider/client.go

353 lines
7.9 KiB
Go

package provider
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"syscall"
"time"
"go.uber.org/zap"
"nex/backend/internal/conversion"
pkgErrors "nex/backend/pkg/errors"
pkglogger "nex/backend/pkg/logger"
)
// StreamConfig 流式处理配置
type StreamConfig struct {
InitialBufferSize int
MaxBufferSize int
Timeout time.Duration
ChannelBufferSize int
}
// DefaultStreamConfig 返回默认流式处理配置
func DefaultStreamConfig() StreamConfig {
return StreamConfig{
InitialBufferSize: 4096,
MaxBufferSize: 65536,
Timeout: 5 * time.Minute,
ChannelBufferSize: 100,
}
}
// StreamEvent 流事件
type StreamEvent struct {
Data []byte
Error error
Done bool
}
// StreamResponse 表示上游流式 HTTP 响应。
type StreamResponse struct {
StatusCode int
Headers map[string]string
Body []byte
Events <-chan StreamEvent
}
// Client 协议无关的供应商客户端
type Client struct {
httpClient *http.Client
logger *zap.Logger
streamCfg StreamConfig
}
// ProviderClient 供应商客户端接口
//
//go:generate go run go.uber.org/mock/mockgen -source=client.go -destination=../../tests/mocks/mock_provider_client.go -package=mocks
type ProviderClient interface {
Send(ctx context.Context, spec conversion.HTTPRequestSpec) (*conversion.HTTPResponseSpec, error)
SendStream(ctx context.Context, spec conversion.HTTPRequestSpec) (*StreamResponse, error)
}
// NewClient 创建供应商客户端
func NewClient(logger *zap.Logger) *Client {
return &Client{
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
logger: pkglogger.WithModule(logger, "provider.client"),
streamCfg: DefaultStreamConfig(),
}
}
// Send 发送非流式请求
func (c *Client) Send(ctx context.Context, spec conversion.HTTPRequestSpec) (*conversion.HTTPResponseSpec, error) {
var bodyReader io.Reader
if len(spec.Body) > 0 {
bodyReader = bytes.NewReader(spec.Body)
}
httpReq, err := http.NewRequestWithContext(ctx, spec.Method, spec.URL, bodyReader)
if err != nil {
return nil, pkgErrors.ErrRequestCreate.WithCause(err)
}
for k, v := range spec.Headers {
httpReq.Header.Set(k, v)
}
c.logger.Debug("发送请求",
zap.String("url", spec.URL),
zap.String("method", spec.Method),
)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, pkgErrors.ErrRequestSend.WithCause(err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, pkgErrors.ErrResponseRead.WithCause(err)
}
respHeaders := make(map[string]string)
for k, vs := range resp.Header {
if len(vs) > 0 {
respHeaders[k] = vs[0]
}
}
return &conversion.HTTPResponseSpec{
StatusCode: resp.StatusCode,
Headers: respHeaders,
Body: respBody,
}, nil
}
// SendStream 发送流式请求
func (c *Client) SendStream(ctx context.Context, spec conversion.HTTPRequestSpec) (*StreamResponse, error) {
var bodyReader io.Reader
if len(spec.Body) > 0 {
bodyReader = bytes.NewReader(spec.Body)
}
streamCtx, cancel := context.WithTimeout(ctx, c.streamCfg.Timeout)
httpReq, err := http.NewRequestWithContext(streamCtx, spec.Method, spec.URL, bodyReader)
if err != nil {
cancel()
return nil, pkgErrors.ErrRequestCreate.WithCause(err)
}
for k, v := range spec.Headers {
httpReq.Header.Set(k, v)
}
resp, err := c.httpClient.Do(httpReq)
if err != nil {
cancel()
return nil, pkgErrors.ErrRequestSend.WithCause(err)
}
respHeaders := extractResponseHeaders(resp.Header)
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
defer resp.Body.Close()
cancel()
errBody, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return nil, pkgErrors.ErrResponseRead.WithCause(readErr)
}
return &StreamResponse{
StatusCode: resp.StatusCode,
Headers: respHeaders,
Body: errBody,
}, nil
}
eventChan := make(chan StreamEvent, c.streamCfg.ChannelBufferSize)
go c.readStream(streamCtx, cancel, resp.Body, eventChan)
return &StreamResponse{
StatusCode: resp.StatusCode,
Headers: respHeaders,
Events: eventChan,
}, nil
}
// readStream 读取 SSE 流
func (c *Client) readStream(ctx context.Context, cancel context.CancelFunc, body io.ReadCloser, eventChan chan<- StreamEvent) {
defer close(eventChan)
defer body.Close()
defer cancel()
bufSize := c.streamCfg.InitialBufferSize
buf := make([]byte, bufSize)
var dataBuf []byte
for {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
c.logger.Warn("流读取超时")
eventChan <- StreamEvent{Error: fmt.Errorf("流读取超时: %w", ctx.Err())}
} else {
eventChan <- StreamEvent{Error: ctx.Err()}
}
return
default:
}
n, err := body.Read(buf)
if n > 0 {
dataBuf = append(dataBuf, buf[:n]...)
}
if err != nil {
if err != io.EOF {
if isNetworkError(err) {
c.logger.Error("流网络错误", zap.Error(err))
eventChan <- StreamEvent{Error: fmt.Errorf("网络错误: %w", err)}
} else {
c.logger.Error("流读取错误", zap.Error(err))
eventChan <- StreamEvent{Error: fmt.Errorf("读取错误: %w", err)}
}
return
}
}
if len(dataBuf) > bufSize/2 && bufSize < c.streamCfg.MaxBufferSize {
newSize := bufSize * 2
if newSize > c.streamCfg.MaxBufferSize {
newSize = c.streamCfg.MaxBufferSize
}
buf = make([]byte, newSize)
bufSize = newSize
}
for {
idx, sepLen := findSSEFrameSeparator(dataBuf)
if idx == -1 {
break
}
frameEnd := idx + sepLen
rawEvent := append([]byte(nil), dataBuf[:frameEnd]...)
dataBuf = dataBuf[frameEnd:]
if isSSEDoneFrame(rawEvent) {
eventChan <- StreamEvent{Data: rawEvent}
eventChan <- StreamEvent{Done: true}
return
}
eventChan <- StreamEvent{Data: rawEvent}
}
if err == io.EOF {
if len(dataBuf) > 0 {
eventChan <- StreamEvent{Data: dataBuf}
}
return
}
}
}
func isSSEDoneFrame(frame []byte) bool {
payload, ok := sseFrameDataPayload(frame)
return ok && strings.TrimSpace(payload) == "[DONE]"
}
func sseFrameDataPayload(frame []byte) (string, bool) {
text := strings.TrimRight(string(frame), "\r\n")
lines := strings.Split(text, "\n")
var dataLines []string
for _, line := range lines {
line = strings.TrimRight(line, "\r")
if strings.HasPrefix(line, "data:") {
value := strings.TrimPrefix(line, "data:")
if strings.HasPrefix(value, " ") {
value = value[1:]
}
dataLines = append(dataLines, value)
}
}
if len(dataLines) == 0 {
return "", false
}
return strings.Join(dataLines, "\n"), true
}
func extractResponseHeaders(header http.Header) map[string]string {
respHeaders := make(map[string]string)
for k, vs := range header {
if len(vs) > 0 {
respHeaders[k] = vs[0]
}
}
return respHeaders
}
func findSSEFrameSeparator(data []byte) (int, int) {
lf := bytes.Index(data, []byte("\n\n"))
crlf := bytes.Index(data, []byte("\r\n\r\n"))
switch {
case lf < 0 && crlf < 0:
return -1, 0
case lf < 0:
return crlf, 4
case crlf < 0:
return lf, 2
case crlf <= lf:
return crlf, 4
default:
return lf, 2
}
}
// isNetworkError 判断是否为网络相关错误
func isNetworkError(err error) bool {
if err == nil {
return false
}
// 检查标准库定义的网络错误类型
var netErr net.Error
if errors.As(err, &netErr) {
return true
}
// 检查操作错误
var opErr *net.OpError
if errors.As(err, &opErr) {
// 检查具体的系统错误
if opErr.Err != nil {
// 连接重置
if errors.Is(opErr.Err, syscall.ECONNRESET) {
return true
}
// 断管
if errors.Is(opErr.Err, syscall.EPIPE) {
return true
}
// 超时
if errors.Is(opErr.Err, syscall.ETIMEDOUT) {
return true
}
}
return true
}
// 检查上下文错误
if errors.Is(err, context.DeadlineExceeded) {
return true
}
if errors.Is(err, context.Canceled) {
return true
}
// 检查 EOF
if errors.Is(err, io.EOF) {
return true
}
return false
}