| package sqlite_test |
|
|
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
|
|
| "ccLoad/internal/model" |
| ) |
|
|
| |
| |
| |
|
|
| |
| func TestConcurrentConfigCreate(t *testing.T) { |
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
| const numGoroutines = 50 |
|
|
| var wg sync.WaitGroup |
| var successCount atomic.Int32 |
| var errorCount atomic.Int32 |
|
|
| for i := 0; i < numGoroutines; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
|
|
| cfg := &model.Config{ |
| Name: fmt.Sprintf("concurrent-channel-%d", idx), |
| URL: "https://api.example.com", |
| Enabled: true, |
| ModelEntries: []model.ModelEntry{{Model: "gpt-4", RedirectModel: ""}}, |
| } |
|
|
| _, err := store.CreateConfig(ctx, cfg) |
| if err != nil { |
| errorCount.Add(1) |
| t.Logf("创建失败 #%d: %v", idx, err) |
| } else { |
| successCount.Add(1) |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| success := successCount.Load() |
| errors := errorCount.Load() |
|
|
| t.Logf("[INFO] 并发创建测试完成: 成功=%d, 失败=%d, 总数=%d", success, errors, numGoroutines) |
|
|
| if success == 0 { |
| t.Fatal("所有并发创建都失败了") |
| } |
|
|
| |
| configs, err := store.ListConfigs(ctx) |
| if err != nil { |
| t.Fatalf("ListConfigs失败: %v", err) |
| } |
|
|
| if len(configs) != int(success) { |
| t.Errorf("数据不一致: 数据库中有%d个配置,期望%d个", len(configs), success) |
| } |
| } |
|
|
| |
| func TestConcurrentConfigReadWrite(t *testing.T) { |
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
|
|
| |
| cfg := &model.Config{ |
| Name: "test-rw-channel", |
| URL: "https://api.example.com", |
| Enabled: true, |
| ModelEntries: []model.ModelEntry{{Model: "gpt-4", RedirectModel: ""}}, |
| } |
| created, err := store.CreateConfig(ctx, cfg) |
| if err != nil { |
| t.Fatalf("创建初始配置失败: %v", err) |
| } |
|
|
| const numReaders = 20 |
| const numWriters = 10 |
|
|
| var wg sync.WaitGroup |
| var readCount atomic.Int32 |
| var writeCount atomic.Int32 |
|
|
| |
| for i := 0; i < numReaders; i++ { |
| wg.Add(1) |
| go func(_ int) { |
| defer wg.Done() |
| for j := 0; j < 10; j++ { |
| _, err := store.GetConfig(ctx, created.ID) |
| if err == nil { |
| readCount.Add(1) |
| } |
| time.Sleep(1 * time.Millisecond) |
| } |
| }(i) |
| } |
|
|
| |
| for i := 0; i < numWriters; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
| for j := 0; j < 5; j++ { |
| updates := &model.Config{ |
| Priority: idx*10 + j, |
| } |
| _, err := store.UpdateConfig(ctx, created.ID, updates) |
| if err == nil { |
| writeCount.Add(1) |
| } |
| time.Sleep(2 * time.Millisecond) |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| reads := readCount.Load() |
| writes := writeCount.Load() |
|
|
| t.Logf("[INFO] 并发读写测试完成: 读取=%d次, 写入=%d次", reads, writes) |
|
|
| if reads < 100 { |
| t.Errorf("读取次数过少: %d (期望至少100次)", reads) |
| } |
| if writes < 30 { |
| t.Errorf("写入次数过少: %d (期望至少30次)", writes) |
| } |
| } |
|
|
| |
| func TestConcurrentLogAdd(t *testing.T) { |
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
| const numGoroutines = 30 |
| const logsPerGoroutine = 10 |
|
|
| var wg sync.WaitGroup |
| var successCount atomic.Int32 |
|
|
| startTime := time.Now() |
|
|
| for i := 0; i < numGoroutines; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
|
|
| for j := 0; j < logsPerGoroutine; j++ { |
| channelID := int64(idx + 1) |
| entry := &model.LogEntry{ |
| ChannelID: channelID, |
| StatusCode: 200, |
| Model: "gpt-4", |
| Time: model.JSONTime{Time: time.Now()}, |
| } |
|
|
| err := store.AddLog(ctx, entry) |
| if err == nil { |
| successCount.Add(1) |
| } |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| elapsed := time.Since(startTime) |
| success := successCount.Load() |
| expected := int32(numGoroutines * logsPerGoroutine) |
|
|
| t.Logf("[INFO] 并发日志添加测试完成: 成功=%d/%d, 耗时=%v", success, expected, elapsed) |
|
|
| if success < expected*9/10 { |
| t.Errorf("成功率过低: %d/%d (%.1f%%)", success, expected, float64(success)/float64(expected)*100) |
| } |
|
|
| |
| logs, err := store.ListLogs(ctx, time.Time{}, 1000, 0, nil) |
| if err != nil { |
| t.Fatalf("ListLogs失败: %v", err) |
| } |
|
|
| if len(logs) < int(success)*9/10 { |
| t.Errorf("日志数量不匹配: 数据库中有%d条,期望至少%d条", len(logs), success*9/10) |
| } |
| } |
|
|
| |
| func TestConcurrentBatchLogAdd(t *testing.T) { |
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
| const numGoroutines = 20 |
| const batchSize = 50 |
|
|
| var wg sync.WaitGroup |
| var successCount atomic.Int32 |
|
|
| startTime := time.Now() |
|
|
| for i := 0; i < numGoroutines; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
|
|
| batch := make([]*model.LogEntry, batchSize) |
| channelID := int64(idx + 1) |
| for j := 0; j < batchSize; j++ { |
| batch[j] = &model.LogEntry{ |
| ChannelID: channelID, |
| StatusCode: 200, |
| Model: "gpt-4", |
| Time: model.JSONTime{Time: time.Now()}, |
| } |
| } |
|
|
| err := store.BatchAddLogs(ctx, batch) |
| if err == nil { |
| successCount.Add(int32(batchSize)) |
| } else { |
| t.Logf("批量添加失败 #%d: %v", idx, err) |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| elapsed := time.Since(startTime) |
| success := successCount.Load() |
| expected := int32(numGoroutines * batchSize) |
|
|
| t.Logf("[INFO] 并发批量日志测试完成: 成功=%d/%d, 耗时=%v", success, expected, elapsed) |
|
|
| if success < expected*8/10 { |
| t.Errorf("成功率过低: %d/%d (%.1f%%)", success, expected, float64(success)/float64(expected)*100) |
| } |
| } |
|
|
| |
| func TestConcurrentAPIKeyOperations(t *testing.T) { |
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
|
|
| |
| cfg := &model.Config{ |
| Name: "test-apikey-channel", |
| URL: "https://api.example.com", |
| Enabled: true, |
| } |
| created, err := store.CreateConfig(ctx, cfg) |
| if err != nil { |
| t.Fatalf("创建初始配置失败: %v", err) |
| } |
|
|
| const numKeys = 30 |
| var wg sync.WaitGroup |
| var createSuccess atomic.Int32 |
| var readSuccess atomic.Int32 |
|
|
| |
| for i := 0; i < numKeys; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
|
|
| key := &model.APIKey{ |
| ChannelID: created.ID, |
| KeyIndex: idx, |
| APIKey: fmt.Sprintf("sk-test-key-%d", idx), |
| KeyStrategy: model.KeyStrategySequential, |
| } |
|
|
| err := store.CreateAPIKeysBatch(ctx, []*model.APIKey{key}) |
| if err == nil { |
| createSuccess.Add(1) |
| } else { |
| t.Logf("创建Key失败 #%d: %v", idx, err) |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| |
| for i := 0; i < numKeys; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
|
|
| _, err := store.GetAPIKey(ctx, created.ID, idx) |
| if err == nil { |
| readSuccess.Add(1) |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| creates := createSuccess.Load() |
| reads := readSuccess.Load() |
|
|
| t.Logf("[INFO] 并发API Key测试完成: 创建成功=%d/%d, 读取成功=%d/%d", |
| creates, numKeys, reads, numKeys) |
|
|
| if creates < int32(numKeys)*8/10 { |
| t.Errorf("创建成功率过低: %d/%d", creates, numKeys) |
| } |
|
|
| |
| allKeys, err := store.GetAPIKeys(ctx, created.ID) |
| if err != nil { |
| t.Fatalf("GetAPIKeys失败: %v", err) |
| } |
|
|
| if len(allKeys) < int(creates)*9/10 { |
| t.Errorf("API Key数量不匹配: 数据库中有%d个,期望至少%d个", len(allKeys), creates*9/10) |
| } |
| } |
|
|
| |
| func TestConcurrentCooldownOperations(t *testing.T) { |
| if testing.Short() { |
| t.Skip("跳过并发测试(使用 -short 标志)") |
| } |
|
|
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
|
|
| |
| cfg := &model.Config{ |
| Name: "test-cooldown-channel", |
| URL: "https://api.example.com", |
| Enabled: true, |
| } |
| created, err := store.CreateConfig(ctx, cfg) |
| if err != nil { |
| t.Fatalf("创建初始配置失败: %v", err) |
| } |
|
|
| |
| cdKeys := make([]*model.APIKey, 3) |
| for i := 0; i < 3; i++ { |
| cdKeys[i] = &model.APIKey{ |
| ChannelID: created.ID, |
| KeyIndex: i, |
| APIKey: fmt.Sprintf("sk-cooldown-key-%d", i), |
| KeyStrategy: model.KeyStrategySequential, |
| } |
| } |
| _ = store.CreateAPIKeysBatch(ctx, cdKeys) |
|
|
| |
| sem := make(chan struct{}, 2) |
| var wg sync.WaitGroup |
| var channelCooldowns atomic.Int32 |
| var keyCooldowns atomic.Int32 |
|
|
| now := time.Now() |
|
|
| |
| for i := 0; i < 5; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
| sem <- struct{}{} |
| defer func() { <-sem }() |
|
|
| statusCode := 500 + (idx % 5) |
| _, err := store.BumpChannelCooldown(ctx, created.ID, now, statusCode) |
| if err == nil { |
| channelCooldowns.Add(1) |
| } |
| }(i) |
| } |
|
|
| |
| for i := 0; i < 6; i++ { |
| wg.Add(1) |
| go func(idx int) { |
| defer wg.Done() |
| sem <- struct{}{} |
| defer func() { <-sem }() |
|
|
| keyIndex := idx % 3 |
| _, err := store.BumpKeyCooldown(ctx, created.ID, keyIndex, now, 401) |
| if err == nil { |
| keyCooldowns.Add(1) |
| } |
| }(i) |
| } |
|
|
| wg.Wait() |
|
|
| channelSucc := channelCooldowns.Load() |
| keySucc := keyCooldowns.Load() |
|
|
| t.Logf("[INFO] 并发冷却测试完成: 渠道冷却成功=%d/5, Key冷却成功=%d/6", |
| channelSucc, keySucc) |
|
|
| |
| if channelSucc == 0 { |
| t.Error("渠道冷却全部失败") |
| } |
| if keySucc == 0 { |
| t.Error("Key冷却全部失败") |
| } |
| } |
|
|
| |
| func TestConcurrentMixedOperations(t *testing.T) { |
| store, cleanup := setupSQLiteTestStore(t, "concurrent-test.db") |
| defer cleanup() |
|
|
| ctx := context.Background() |
| const duration = 500 * time.Millisecond |
|
|
| var wg sync.WaitGroup |
| var operations atomic.Int32 |
| stopCh := make(chan struct{}) |
|
|
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| idx := 0 |
| for { |
| select { |
| case <-stopCh: |
| return |
| default: |
| cfg := &model.Config{ |
| Name: fmt.Sprintf("mixed-channel-%d", idx), |
| URL: "https://api.example.com", |
| Enabled: true, |
| } |
| _, _ = store.CreateConfig(ctx, cfg) |
| operations.Add(1) |
| idx++ |
| time.Sleep(5 * time.Millisecond) |
| } |
| } |
| }() |
|
|
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| for { |
| select { |
| case <-stopCh: |
| return |
| default: |
| _, _ = store.ListConfigs(ctx) |
| operations.Add(1) |
| time.Sleep(3 * time.Millisecond) |
| } |
| } |
| }() |
|
|
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| channelID := int64(1) |
| for { |
| select { |
| case <-stopCh: |
| return |
| default: |
| entry := &model.LogEntry{ |
| ChannelID: channelID, |
| StatusCode: 200, |
| Model: "gpt-4", |
| Time: model.JSONTime{Time: time.Now()}, |
| } |
| _ = store.AddLog(ctx, entry) |
| operations.Add(1) |
| time.Sleep(2 * time.Millisecond) |
| } |
| } |
| }() |
|
|
| |
| time.Sleep(duration) |
| close(stopCh) |
| wg.Wait() |
|
|
| totalOps := operations.Load() |
| t.Logf("[INFO] 混合并发测试完成: 总操作数=%d, 持续时间=%v, QPS=%.1f", |
| totalOps, duration, float64(totalOps)/duration.Seconds()) |
|
|
| if totalOps < 25 { |
| t.Errorf("操作数过少: %d (期望至少25)", totalOps) |
| } |
| } |
|
|
| |
|
|
| |
|
|