1
0

fix: 修复 statsRepo 并发竞态条件,使用 upsert 保证原子性

- 使用 GORM clause.OnConflict 替代事务包装
- Record 和 BatchUpdate 方法改用 upsert 模式
- 修复 UsageStats 的 GORM struct tag,确保 AutoMigrate 创建正确的 UNIQUE 约束
- 更新 usage-statistics spec 以反映 upsert 操作

MySQL 并发测试验证:10 并发调用 → request_count = 10
This commit is contained in:
2026-04-23 15:54:56 +08:00
parent 5b401e29cb
commit 1522c87c74
3 changed files with 63 additions and 46 deletions

View File

@@ -29,8 +29,8 @@ type Model struct {
// UsageStats 用量统计
type UsageStats struct {
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
ProviderID string `gorm:"not null;index" json:"provider_id"`
ModelName string `gorm:"not null;index" json:"model_name"`
ProviderID string `gorm:"not null;index;uniqueIndex:idx_provider_model_date" json:"provider_id"`
ModelName string `gorm:"not null;index;uniqueIndex:idx_provider_model_date" json:"model_name"`
RequestCount int `gorm:"default:0" json:"request_count"`
Date time.Time `gorm:"type:date;not null;uniqueIndex:idx_provider_model_date" json:"date"`
}

View File

@@ -1,10 +1,10 @@
package repository
import (
"errors"
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"nex/backend/internal/config"
"nex/backend/internal/domain"
@@ -22,47 +22,43 @@ func (r *statsRepository) Record(providerID, modelName string) error {
today := time.Now().Format("2006-01-02")
todayTime, _ := time.Parse("2006-01-02", today)
return r.db.Transaction(func(tx *gorm.DB) error {
var stats config.UsageStats
err := tx.Where("provider_id = ? AND model_name = ? AND date = ?",
providerID, modelName, todayTime).First(&stats).Error
stats := config.UsageStats{
ProviderID: providerID,
ModelName: modelName,
RequestCount: 1,
Date: todayTime,
}
if errors.Is(err, gorm.ErrRecordNotFound) {
stats = config.UsageStats{
ProviderID: providerID,
ModelName: modelName,
RequestCount: 1,
Date: todayTime,
}
return tx.Create(&stats).Error
} else if err != nil {
return err
}
return tx.Model(&stats).Update("request_count", gorm.Expr("request_count + 1")).Error
})
return r.db.Clauses(clause.OnConflict{
Columns: []clause.Column{
{Name: "provider_id"},
{Name: "model_name"},
{Name: "date"},
},
DoUpdates: clause.Assignments(map[string]interface{}{
"request_count": gorm.Expr("request_count + 1"),
}),
}).Create(&stats).Error
}
func (r *statsRepository) BatchUpdate(providerID, modelName string, date time.Time, delta int) error {
return r.db.Transaction(func(tx *gorm.DB) error {
var stats config.UsageStats
err := tx.Where("provider_id = ? AND model_name = ? AND date = ?",
providerID, modelName, date).First(&stats).Error
stats := config.UsageStats{
ProviderID: providerID,
ModelName: modelName,
RequestCount: delta,
Date: date,
}
if errors.Is(err, gorm.ErrRecordNotFound) {
return tx.Create(&config.UsageStats{
ProviderID: providerID,
ModelName: modelName,
RequestCount: delta,
Date: date,
}).Error
} else if err != nil {
return err
}
return tx.Model(&stats).
Update("request_count", gorm.Expr("request_count + ?", delta)).Error
})
return r.db.Clauses(clause.OnConflict{
Columns: []clause.Column{
{Name: "provider_id"},
{Name: "model_name"},
{Name: "date"},
},
DoUpdates: clause.Assignments(map[string]interface{}{
"request_count": gorm.Expr("request_count + ?", delta),
}),
}).Create(&stats).Error
}
func (r *statsRepository) Query(providerID, modelName string, startDate, endDate *time.Time) ([]domain.UsageStats, error) {

View File

@@ -93,7 +93,21 @@
- **WHEN** 同时处理多个并发请求
- **THEN** 网关 SHALL 使用原子操作正确增加每个请求的计数
- **THEN** 不 SHALL 因并发写入而丢失统计
- **THEN** SHALL 使用 StatsBuffer 的内存计数器
- **THEN** SHALL 使用 upsert 操作保证原子性
#### Scenario: 并发调用 Record 方法
- **WHEN** 多个 goroutine 并发调用 StatsRepository.Record
- **THEN** SHALL 使用 INSERT ... ON DUPLICATE KEY UPDATE (MySQL) 或 INSERT ... ON CONFLICT DO UPDATE (SQLite)
- **THEN** SHALL 保证所有并发调用的计数都被正确累加
- **THEN** 不 SHALL 因 UNIQUE 约束冲突而丢失数据
#### Scenario: 并发调用 BatchUpdate 方法
- **WHEN** 多个 goroutine 并发调用 StatsRepository.BatchUpdate
- **THEN** SHALL 使用 upsert 操作保证原子性
- **THEN** SHALL 正确累加所有 delta 值
- **THEN** 不 SHALL 因并发写入而丢失统计
### Requirement: 使用 service 层处理业务逻辑
@@ -125,14 +139,14 @@ Service SHALL 通过 StatsRepository 访问数据。
- **WHEN** StatsBuffer 刷新统计
- **THEN** SHALL 调用 StatsRepository.BatchUpdate
- **THEN** SHALL 使用事务更新或创建统计记录
- **THEN** SHALL 使用 upsert 操作更新或创建统计记录
- **THEN** SHALL 支持增量更新request_count + delta
#### Scenario: 事务处理
#### Scenario: upsert 操作
- **WHEN** 记录统计
- **THEN** SHALL 在 repository 层使用数据库事务
- **THEN** SHALL 保并发安全
- **THEN** SHALL 在 repository 层使用 upsert 操作
- **THEN** SHALL 保证原子性和并发安全
### Requirement: 统计查询优化
@@ -168,11 +182,18 @@ StatsRepository SHALL 新增 BatchUpdate 方法支持批量增量更新。
#### Scenario: BatchUpdate 更新现有记录
- **WHEN** 调用 BatchUpdate 且当日记录存在
- **THEN** SHALL 使用事务更新 request_count = request_count + delta
- **THEN** SHALL 使用 upsert 操作更新 request_count = request_count + delta
- **THEN** SHALL 保证原子性,无竞态条件
- **THEN** SHALL 不创建新记录
#### Scenario: BatchUpdate 创建新记录
- **WHEN** 调用 BatchUpdate 且当日记录不存在
- **THEN** SHALL 创建新记录request_count = delta
- **THEN** SHALL 使用事务保证原子性
- **THEN** SHALL 使用 upsert 操作保证原子性
#### Scenario: BatchUpdate 并发安全
- **WHEN** 多个 BatchUpdate 调用同时执行
- **THEN** SHALL 保证所有 delta 都被正确累加
- **THEN** SHALL 不因并发冲突而丢失数据