1
0
Files
nex/backend/internal/conversion/stream.go
lanyuanxiaoyao 395887667d feat: 实现统一模型 ID 机制
实现统一模型 ID 格式 (provider_id/model_name),支持跨协议模型标识和 Smart Passthrough。

核心变更:
- 新增 pkg/modelid 包:解析、格式化、校验统一模型 ID
- 数据库迁移:models 表使用 UUID 主键 + UNIQUE(provider_id, model_name) 约束
- Repository 层:FindByProviderAndModelName、ListEnabled 方法
- Service 层:联合唯一校验、provider ID 字符集校验
- Conversion 层:ExtractModelName、RewriteRequestModelName/RewriteResponseModelName 方法
- Handler 层:统一模型 ID 路由、Smart Passthrough、Models API 本地聚合
- 新增 error-responses、unified-model-id 规范

测试覆盖:
- 单元测试:modelid、conversion、handler、service、repository
- 集成测试:统一模型 ID 路由、Smart Passthrough 保真性、跨协议转换
- 迁移测试:UUID 主键、UNIQUE 约束、级联删除

OpenSpec:
- 归档 unified-model-id 变更到 archive/2026-04-21-unified-model-id
- 同步 11 个 delta specs 到 main specs
- 新增 error-responses、unified-model-id 规范文件
2026-04-21 18:14:10 +08:00

156 lines
4.6 KiB
Go

package conversion
import "nex/backend/internal/conversion/canonical"
// StreamDecoder 流式解码器接口
type StreamDecoder interface {
ProcessChunk(rawChunk []byte) []canonical.CanonicalStreamEvent
Flush() []canonical.CanonicalStreamEvent
}
// StreamEncoder 流式编码器接口
type StreamEncoder interface {
EncodeEvent(event canonical.CanonicalStreamEvent) [][]byte
Flush() [][]byte
}
// StreamConverter 流式转换器接口
type StreamConverter interface {
ProcessChunk(rawChunk []byte) [][]byte
Flush() [][]byte
}
// PassthroughStreamConverter 同协议透传流式转换器
type PassthroughStreamConverter struct{}
// NewPassthroughStreamConverter 创建透传流式转换器
func NewPassthroughStreamConverter() *PassthroughStreamConverter {
return &PassthroughStreamConverter{}
}
// ProcessChunk 直接传递原始字节
func (c *PassthroughStreamConverter) ProcessChunk(rawChunk []byte) [][]byte {
return [][]byte{rawChunk}
}
// Flush 无缓冲数据
func (c *PassthroughStreamConverter) Flush() [][]byte {
return nil
}
// SmartPassthroughStreamConverter 同协议 Smart Passthrough 流式转换器
// 逐 chunk 改写 model 字段
type SmartPassthroughStreamConverter struct {
adapter ProtocolAdapter
modelOverride string
interfaceType InterfaceType
}
// NewSmartPassthroughStreamConverter 创建 Smart Passthrough 流式转换器
func NewSmartPassthroughStreamConverter(adapter ProtocolAdapter, modelOverride string, interfaceType InterfaceType) *SmartPassthroughStreamConverter {
return &SmartPassthroughStreamConverter{
adapter: adapter,
modelOverride: modelOverride,
interfaceType: interfaceType,
}
}
// ProcessChunk 改写 chunk 中的 model 字段
func (c *SmartPassthroughStreamConverter) ProcessChunk(rawChunk []byte) [][]byte {
if len(rawChunk) == 0 {
return nil
}
rewrittenChunk, err := c.adapter.RewriteResponseModelName(rawChunk, c.modelOverride, c.interfaceType)
if err != nil {
// 改写失败,返回原始 chunk
return [][]byte{rawChunk}
}
return [][]byte{rewrittenChunk}
}
// Flush 无缓冲数据
func (c *SmartPassthroughStreamConverter) Flush() [][]byte {
return nil
}
// CanonicalStreamConverter 跨协议规范流式转换器
type CanonicalStreamConverter struct {
decoder StreamDecoder
encoder StreamEncoder
chain *MiddlewareChain
ctx ConversionContext
clientProtocol string
providerProtocol string
modelOverride string
}
// NewCanonicalStreamConverter 创建规范流式转换器
func NewCanonicalStreamConverter(decoder StreamDecoder, encoder StreamEncoder) *CanonicalStreamConverter {
return &CanonicalStreamConverter{
decoder: decoder,
encoder: encoder,
}
}
// NewCanonicalStreamConverterWithMiddleware 创建带中间件的规范流式转换器
func NewCanonicalStreamConverterWithMiddleware(decoder StreamDecoder, encoder StreamEncoder, chain *MiddlewareChain, ctx ConversionContext, clientProtocol, providerProtocol, modelOverride string) *CanonicalStreamConverter {
return &CanonicalStreamConverter{
decoder: decoder,
encoder: encoder,
chain: chain,
ctx: ctx,
clientProtocol: clientProtocol,
providerProtocol: providerProtocol,
modelOverride: modelOverride,
}
}
// ProcessChunk 解码 → 中间件 → modelOverride → 编码管道
func (c *CanonicalStreamConverter) ProcessChunk(rawChunk []byte) [][]byte {
events := c.decoder.ProcessChunk(rawChunk)
var result [][]byte
for i := range events {
if c.chain != nil {
processed, err := c.chain.ApplyStreamEvent(&events[i], c.clientProtocol, c.providerProtocol, &c.ctx)
if err != nil {
continue
}
events[i] = *processed
}
c.applyModelOverride(&events[i])
chunks := c.encoder.EncodeEvent(events[i])
result = append(result, chunks...)
}
return result
}
// Flush 刷新解码器和编码器缓冲区
func (c *CanonicalStreamConverter) Flush() [][]byte {
events := c.decoder.Flush()
var result [][]byte
for i := range events {
if c.chain != nil {
processed, err := c.chain.ApplyStreamEvent(&events[i], c.clientProtocol, c.providerProtocol, &c.ctx)
if err != nil {
continue
}
events[i] = *processed
}
c.applyModelOverride(&events[i])
chunks := c.encoder.EncodeEvent(events[i])
result = append(result, chunks...)
}
encoderChunks := c.encoder.Flush()
result = append(result, encoderChunks...)
return result
}
// applyModelOverride 在跨协议场景下覆写流式事件中的 Model 字段
func (c *CanonicalStreamConverter) applyModelOverride(event *canonical.CanonicalStreamEvent) {
if c.modelOverride != "" && event.Message != nil {
event.Message.Model = c.modelOverride
}
}