1
0
Files
nex/backend/internal/service/stats_buffer_test.go
lanyuanxiaoyao df253559a5 feat(cache): 实现 RoutingCache 和 StatsBuffer 优化数据库写入
- 新增 RoutingCache 组件,使用 sync.Map 缓存 Provider 和 Model
- 新增 StatsBuffer 组件,使用 sync.Map + atomic.Int64 缓冲统计数据
- 扩展 StatsRepository.BatchUpdate 支持批量增量更新
- 改造 RoutingService/StatsService/ProviderService/ModelService 集成缓存
- 更新 usage-statistics spec,新增 routing-cache 和 stats-buffer spec
- 新增单元测试覆盖缓存命中/失效/并发场景
2026-04-22 19:24:36 +08:00

252 lines
5.3 KiB
Go

package service
import (
"errors"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"nex/backend/internal/domain"
)
type mockStatsRepo struct {
records []struct {
providerID string
modelName string
date time.Time
delta int
}
fail bool
mu sync.Mutex
}
func (m *mockStatsRepo) Record(providerID, modelName string) error {
return nil
}
func (m *mockStatsRepo) BatchUpdate(providerID, modelName string, date time.Time, delta int) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.fail {
return errors.New("db error")
}
m.records = append(m.records, struct {
providerID string
modelName string
date time.Time
delta int
}{providerID, modelName, date, delta})
return nil
}
func (m *mockStatsRepo) Query(providerID, modelName string, startDate, endDate *time.Time) ([]domain.UsageStats, error) {
return nil, nil
}
func TestStatsBuffer_Increment(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger)
buffer.Increment("openai", "gpt-4")
buffer.Increment("openai", "gpt-4")
buffer.Increment("openai", "gpt-3.5")
var count int64
buffer.counters.Range(func(key, value interface{}) bool {
counter := value.(*int64)
count += atomic.LoadInt64(counter)
return true
})
assert.Equal(t, int64(3), count)
}
func TestStatsBuffer_ConcurrentIncrement(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer.Increment("openai", "gpt-4")
}()
}
wg.Wait()
var count int64
buffer.counters.Range(func(key, value interface{}) bool {
counter := value.(*int64)
count = atomic.LoadInt64(counter)
return true
})
assert.Equal(t, int64(100), count)
}
func TestStatsBuffer_LoadOrStore(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer.Increment("openai", "gpt-4")
}()
}
wg.Wait()
var counterCount int
buffer.counters.Range(func(key, value interface{}) bool {
counterCount++
return true
})
assert.Equal(t, 1, counterCount)
}
func TestStatsBuffer_FlushByInterval(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger,
WithFlushInterval(100*time.Millisecond))
buffer.Start()
defer buffer.Stop()
buffer.Increment("openai", "gpt-4")
buffer.Increment("openai", "gpt-4")
time.Sleep(200 * time.Millisecond)
statsRepo.mu.Lock()
assert.GreaterOrEqual(t, len(statsRepo.records), 1)
statsRepo.mu.Unlock()
}
func TestStatsBuffer_FlushByThreshold(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger,
WithFlushThreshold(10))
buffer.Start()
defer buffer.Stop()
for i := 0; i < 10; i++ {
buffer.Increment("openai", "gpt-4")
}
time.Sleep(50 * time.Millisecond)
statsRepo.mu.Lock()
assert.GreaterOrEqual(t, len(statsRepo.records), 1)
statsRepo.mu.Unlock()
}
func TestStatsBuffer_SwapInt64(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger)
buffer.Increment("openai", "gpt-4")
buffer.Increment("openai", "gpt-4")
var beforeCount int64
buffer.counters.Range(func(key, value interface{}) bool {
counter := value.(*int64)
beforeCount = atomic.LoadInt64(counter)
return true
})
assert.Equal(t, int64(2), beforeCount)
buffer.flush()
var afterCount int64
buffer.counters.Range(func(key, value interface{}) bool {
counter := value.(*int64)
afterCount = atomic.LoadInt64(counter)
return true
})
assert.Equal(t, int64(0), afterCount)
}
func TestStatsBuffer_FailRetry(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{fail: true}
buffer := NewStatsBuffer(statsRepo, logger)
buffer.Increment("openai", "gpt-4")
buffer.Increment("openai", "gpt-4")
buffer.flush()
var count int64
buffer.counters.Range(func(key, value interface{}) bool {
counter := value.(*int64)
count = atomic.LoadInt64(counter)
return true
})
assert.Equal(t, int64(2), count)
}
func TestStatsBuffer_Stop(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger,
WithFlushInterval(10*time.Second))
buffer.Start()
buffer.Increment("openai", "gpt-4")
buffer.Increment("openai", "gpt-4")
start := time.Now()
buffer.Stop()
elapsed := time.Since(start)
assert.Less(t, elapsed, 1*time.Second)
statsRepo.mu.Lock()
assert.GreaterOrEqual(t, len(statsRepo.records), 1)
statsRepo.mu.Unlock()
}
func TestStatsBuffer_ConcurrentIncrementAndFlush(t *testing.T) {
logger := zap.NewNop()
statsRepo := &mockStatsRepo{}
buffer := NewStatsBuffer(statsRepo, logger,
WithFlushInterval(50*time.Millisecond))
buffer.Start()
defer buffer.Stop()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer.Increment("openai", "gpt-4")
}()
}
wg.Wait()
time.Sleep(100 * time.Millisecond)
statsRepo.mu.Lock()
totalDelta := 0
for _, r := range statsRepo.records {
totalDelta += r.delta
}
statsRepo.mu.Unlock()
assert.Equal(t, 100, totalDelta)
}