package app import ( "context" "encoding/json" "fmt" "io" "net/http" "strings" "testing" "time" "ccLoad/internal/model" "ccLoad/internal/testutil" "ccLoad/internal/util" "github.com/gin-gonic/gin" ) // TestHandleChannelTest 测试渠道测试功能 func TestHandleChannelTest(t *testing.T) { tests := []struct { name string channelID string requestBody map[string]any setupData bool expectedStatus int expectSuccess bool }{ { name: "无效的渠道ID", channelID: "invalid", requestBody: map[string]any{ "model": "test-model", "channel_type": "anthropic", }, setupData: false, expectedStatus: http.StatusBadRequest, expectSuccess: false, }, { name: "渠道不存在", channelID: "999", requestBody: map[string]any{ "model": "test-model", "channel_type": "anthropic", }, setupData: false, expectedStatus: http.StatusNotFound, expectSuccess: false, }, { name: "无效的请求体", channelID: "1", requestBody: map[string]any{ "invalid_field": "value", }, setupData: false, expectedStatus: http.StatusBadRequest, expectSuccess: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // 创建测试服务器 srv := newInMemoryServer(t) ctx := context.Background() // 设置测试数据(如果需要) if tt.setupData { cfg := &model.Config{ ID: 1, Name: "test-channel", URL: "http://test.example.com", Priority: 1, ModelEntries: []model.ModelEntry{{Model: "test-model", RedirectModel: ""}}, Enabled: true, } _, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+tt.channelID+"/test", tt.requestBody)) c.Params = gin.Params{{Key: "id", Value: tt.channelID}} // 调用处理函数 srv.HandleChannelTest(c) // 验证响应状态码 if w.Code != tt.expectedStatus { t.Errorf("期望状态码 %d, 实际 %d, 响应: %s", tt.expectedStatus, w.Code, w.Body.String()) } resp := mustParseAPIResponse[json.RawMessage](t, w.Body.Bytes()) if resp.Success != tt.expectSuccess { t.Errorf("期望 success=%v, 实际=%v, error=%q", tt.expectSuccess, resp.Success, resp.Error) } }) } } func TestTestChannelAPI_MultiURLFallbackAndSelectorFeedback(t *testing.T) { failCalls := 0 okCalls := 0 failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { failCalls++ w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusServiceUnavailable) _, _ = w.Write([]byte(`{"error":{"type":"server_error","message":"upstream fail"}}`)) })) defer failUpstream.Close() okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { okCalls++ time.Sleep(15 * time.Millisecond) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`)) })) defer okUpstream.Close() srv := newInMemoryServer(t) cfg := &model.Config{ ID: 9527, Name: "multi-url-test", URL: failUpstream.URL + "\n" + okUpstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } // 强制第一跳命中失败URL,验证是否会回退到第二个URL。 srv.urlSelector.CooldownURL(cfg.ID, okUpstream.URL) req := &testutil.TestChannelRequest{ Model: "gpt-4o-mini", ChannelType: "openai", Content: "hello", } result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req) success, _ := result["success"].(bool) if !success { t.Fatalf("expected fallback success, got result=%+v", result) } if failCalls < 1 || okCalls < 1 { t.Fatalf("expected both URLs attempted, failCalls=%d okCalls=%d", failCalls, okCalls) } if !srv.urlSelector.IsCooledDown(cfg.ID, failUpstream.URL) { t.Fatalf("expected failed URL to be cooled down, url=%s", failUpstream.URL) } if lat, ok := srv.urlSelector.latencies[urlKey{channelID: cfg.ID, url: okUpstream.URL}]; !ok || lat == nil || lat.value <= 0 { t.Fatalf("expected success URL latency recorded, got=%v", lat) } } func TestExecuteChannelTestWithCooldown_RespectsRPMLimitWithoutCooldown(t *testing.T) { hits := 0 upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { hits++ w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":1,"completion_tokens":1}}`)) })) defer upstream.Close() srv := newInMemoryServer(t) cfg := &model.Config{ ID: 9528, Name: "rpm-limited-test", URL: upstream.URL, Priority: 1, RPMLimit: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } req := &testutil.TestChannelRequest{ Model: "gpt-4o-mini", ChannelType: "openai", Content: "hello", } first := srv.executeChannelTestWithCooldown(context.Background(), cfg, 0, "sk-test", req, true) if success, _ := first["success"].(bool); !success { t.Fatalf("first test should succeed, got result=%+v", first) } second := srv.executeChannelTestWithCooldown(context.Background(), cfg, 0, "sk-test", req, true) if success, _ := second["success"].(bool); success { t.Fatalf("second test should be RPM limited, got result=%+v", second) } if limited, _ := second["rpm_limited"].(bool); !limited { t.Fatalf("expected rpm_limited marker, got result=%+v", second) } if action, _ := second["cooldown_action"].(string); action != "rpm_limited_no_cooldown" { t.Fatalf("cooldown_action=%q, want rpm_limited_no_cooldown, result=%+v", action, second) } if retryAfterMs, _ := getResultInt(second["retry_after_ms"]); retryAfterMs <= 0 { t.Fatalf("retry_after_ms=%d, want positive value, result=%+v", retryAfterMs, second) } if hits != 1 { t.Fatalf("upstream hits=%d, want 1", hits) } } func TestTestChannelAPI_MultiURLFallbackOnPlainText502(t *testing.T) { failCalls := 0 okCalls := 0 failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { failCalls++ w.Header().Set("Content-Type", "text/plain; charset=UTF-8") w.WriteHeader(http.StatusBadGateway) _, _ = w.Write([]byte("error code: 502")) })) defer failUpstream.Close() okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { okCalls++ w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`)) })) defer okUpstream.Close() srv := newInMemoryServer(t) cfg := &model.Config{ ID: 9528, Name: "multi-url-plain-502-test", URL: failUpstream.URL + "\n" + okUpstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } // 强制第一跳命中 502 的坏 URL,验证 text/plain 错误体也会继续回退。 srv.urlSelector.CooldownURL(cfg.ID, okUpstream.URL) req := &testutil.TestChannelRequest{ Model: "gpt-4o-mini", ChannelType: "openai", Content: "hello", } result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req) success, _ := result["success"].(bool) if !success { t.Fatalf("expected fallback success on plain 502, got result=%+v", result) } if failCalls < 1 || okCalls < 1 { t.Fatalf("expected both URLs attempted, failCalls=%d okCalls=%d", failCalls, okCalls) } if !srv.urlSelector.IsCooledDown(cfg.ID, failUpstream.URL) { t.Fatalf("expected failed URL to be cooled down, url=%s", failUpstream.URL) } if got, ok := result["response_text"].(string); !ok || got != "ok" { t.Fatalf("expected second URL success response_text=ok, got=%+v", result) } } func TestTestChannelAPI_NonStreamUsesConfiguredTimeout(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { select { case <-r.Context().Done(): return case <-time.After(160 * time.Millisecond): } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"late"}}],"usage":{"prompt_tokens":1,"completion_tokens":1}}`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.nonStreamTimeout = 25 * time.Millisecond cfg := &model.Config{ ID: 9530, Name: "non-stream-timeout-test", URL: upstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } req := &testutil.TestChannelRequest{ Model: "gpt-4o-mini", ChannelType: "openai", Content: "hello", Stream: false, } start := time.Now() result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req) elapsed := time.Since(start) if success, _ := result["success"].(bool); success { t.Fatalf("expected timeout failure, got result=%+v", result) } if elapsed >= 120*time.Millisecond { t.Fatalf("expected configured timeout before delayed upstream response, elapsed=%v result=%+v", elapsed, result) } if statusCode, _ := getResultInt(result["status_code"]); statusCode != http.StatusGatewayTimeout { t.Fatalf("status_code=%d, want %d, result=%+v", statusCode, http.StatusGatewayTimeout, result) } } func TestTestChannelAPI_StreamFirstValidContentTimeoutIgnoresHeartbeats(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) flusher, _ := w.(http.Flusher) ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() lateContent := time.NewTimer(160 * time.Millisecond) defer lateContent.Stop() for { select { case <-r.Context().Done(): return case <-ticker.C: _, _ = w.Write([]byte(": keep-alive\n\n")) flusher.Flush() case <-lateContent.C: _, _ = w.Write([]byte("data: {\"choices\":[{\"delta\":{\"content\":\"late\"}}]}\n\n")) _, _ = w.Write([]byte("data: [DONE]\n\n")) flusher.Flush() return } } })) defer upstream.Close() srv := newInMemoryServer(t) srv.firstByteTimeout = 30 * time.Millisecond cfg := &model.Config{ ID: 9531, Name: "stream-first-content-timeout-test", URL: upstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } req := &testutil.TestChannelRequest{ Model: "gpt-4o-mini", ChannelType: "openai", Content: "hello", Stream: true, } start := time.Now() result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req) elapsed := time.Since(start) if success, _ := result["success"].(bool); success { t.Fatalf("expected first valid stream content timeout, got result=%+v", result) } if elapsed >= 120*time.Millisecond { t.Fatalf("expected timeout before late content, elapsed=%v result=%+v", elapsed, result) } if statusCode, _ := getResultInt(result["status_code"]); statusCode != util.StatusFirstByteTimeout { t.Fatalf("status_code=%d, want %d, result=%+v", statusCode, util.StatusFirstByteTimeout, result) } if _, ok := result["first_byte_duration_ms"]; ok { t.Fatalf("heartbeat must not set first_byte_duration_ms, result=%+v", result) } } func TestHandleChannelTest_RejectsBaseURL(t *testing.T) { failCalls := 0 failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { failCalls++ w.WriteHeader(http.StatusServiceUnavailable) })) defer failUpstream.Close() okCalls := 0 okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { okCalls++ w.WriteHeader(http.StatusOK) })) defer okUpstream.Close() srv := newInMemoryServer(t) ctx := context.Background() cfg := &model.Config{ Name: "channel-test-reject-base-url", URL: failUpstream.URL + "\n" + okUpstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("添加 API key 失败: %v", err) } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+fmt.Sprintf("%d", created.ID)+"/test", map[string]any{ "model": "gpt-4o-mini", "channel_type": "openai", "base_url": okUpstream.URL, })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelTest(c) if w.Code != http.StatusBadRequest { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusBadRequest, w.Body.String()) } resp := mustParseAPIResponse[json.RawMessage](t, w.Body.Bytes()) if resp.Success { t.Fatalf("expected success=false, resp=%+v", resp) } if !strings.Contains(resp.Error, "/test-url") { t.Fatalf("expected error to guide /test-url, got %q", resp.Error) } if failCalls != 0 || okCalls != 0 { t.Fatalf("expected no upstream request, failCalls=%d okCalls=%d", failCalls, okCalls) } } func TestHandleChannelURLTest_UsesForcedURL(t *testing.T) { failCalls := 0 failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { failCalls++ w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusServiceUnavailable) _, _ = w.Write([]byte(`{"error":{"type":"server_error","message":"should not hit this url"}}`)) })) defer failUpstream.Close() okCalls := 0 okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { okCalls++ w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`)) })) defer okUpstream.Close() srv := newInMemoryServer(t) ctx := context.Background() cfg := &model.Config{ Name: "single-url-test", URL: failUpstream.URL + "\n" + okUpstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("添加 API key 失败: %v", err) } // selector 和多 URL 顺序都不该影响显式单 URL 测试。 srv.urlSelector.CooldownURL(created.ID, okUpstream.URL) c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+fmt.Sprintf("%d", created.ID)+"/test-url", map[string]any{ "model": "gpt-4o-mini", "channel_type": "openai", "base_url": okUpstream.URL, })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelURLTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("expected success=true, data=%+v", resp.Data) } if failCalls != 0 { t.Fatalf("expected forced base_url to skip fail url, failCalls=%d", failCalls) } if okCalls != 1 { t.Fatalf("expected forced base_url called once, okCalls=%d", okCalls) } } // TestHandleChannelTest_NoAPIKey 渠道存在但无 API key func TestHandleChannelTest_NoAPIKey(t *testing.T) { srv := newInMemoryServer(t) ctx := context.Background() // 创建渠道但不添加 API key cfg := &model.Config{ Name: "no-key-channel", URL: "http://test.example.com", Priority: 1, ModelEntries: []model.ModelEntry{{Model: "test-model"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "test-model", "channel_type": "anthropic", } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) // 状态码 200,但 data 中 success=false if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } // RespondJSON 包装 success=true (外层), data 内部有 success: false resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) if !resp.Success { t.Fatalf("外层 APIResponse.Success 应为 true, error=%q", resp.Error) } dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatal("data.success 应为 false(渠道无 API key)") } dataError, _ := resp.Data["error"].(string) if dataError == "" { t.Fatal("data.error 不应为空") } } // TestHandleChannelTest_UnsupportedModel 渠道存在、有 Key,但模型不支持 func TestHandleChannelTest_UnsupportedModel(t *testing.T) { srv := newInMemoryServer(t) ctx := context.Background() cfg := &model.Config{ Name: "limited-model-channel", URL: "http://test.example.com", Priority: 1, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } // 添加 API key err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "test-key-001"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "gpt-4-not-supported", "channel_type": "anthropic", } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatal("data.success 应为 false(模型不支持)") } } func TestHandleChannelTest_DefaultsProtocolTransformToChannelType(t *testing.T) { var gotPath string upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gotPath = r.URL.Path w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "default-protocol-transform-openai", URL: upstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4.1"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{ "model": "gpt-4.1", })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("expected data.success=true, data=%+v", resp.Data) } if gotPath != "/v1/chat/completions" { t.Fatalf("path=%q, want %q", gotPath, "/v1/chat/completions") } } func TestHandleChannelTest_RejectsUnknownProtocolTransform(t *testing.T) { failCalls := 0 upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { failCalls++ w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}]}`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "unsupported-protocol-transform-openai", URL: upstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-4.1"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{ "model": "gpt-4.1", "protocol_transform": "unknown", })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatalf("expected data.success=false, data=%+v", resp.Data) } dataError, _ := resp.Data["error"].(string) if !strings.Contains(dataError, "协议") { t.Fatalf("expected protocol error, got %q", dataError) } if failCalls != 0 { t.Fatalf("expected no upstream request, failCalls=%d", failCalls) } } func TestHandleChannelTest_UsesProtocolTransformForTranslatedRequest(t *testing.T) { var gotPath string var gotBody string upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { t.Fatalf("ReadAll failed: %v", err) } gotPath = r.URL.Path gotBody = string(body) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{ "id": "msg_test", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "translated ok"}], "model": "claude-3-5-sonnet", "usage": {"input_tokens": 10, "output_tokens": 5} }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "anthropic-with-runtime-openai-transform", URL: upstream.URL, Priority: 1, ChannelType: "anthropic", ProtocolTransformMode: model.ProtocolTransformModeLocal, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{ "model": "claude-3-5-sonnet", "protocol_transform": "openai", "content": "hello", })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("expected data.success=true, data=%+v", resp.Data) } if gotPath != "/v1/messages" { t.Fatalf("path=%q, want %q", gotPath, "/v1/messages") } if !strings.Contains(gotBody, `"messages"`) { t.Fatalf("expected anthropic request body, body=%s", gotBody) } apiResp, ok := resp.Data["api_response"].(map[string]any) if !ok { t.Fatalf("expected translated api_response map, data=%+v", resp.Data) } if _, ok := apiResp["choices"]; !ok { t.Fatalf("expected openai-compatible api_response, got=%+v", apiResp) } } func TestHandleChannelTest_UsesCodexProtocolTransformWithBasePathPrefix(t *testing.T) { var gotPath string var gotBody string upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { t.Fatalf("ReadAll failed: %v", err) } gotPath = r.URL.Path gotBody = string(body) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{ "id": "msg_test", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "translated codex ok"}], "model": "claude-3-5-sonnet", "usage": {"input_tokens": 10, "output_tokens": 5} }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "anthropic-with-prefixed-base-path", URL: upstream.URL + "/anthropic", Priority: 1, ChannelType: "anthropic", ProtocolTransformMode: model.ProtocolTransformModeLocal, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{ "model": "claude-3-5-sonnet", "protocol_transform": "codex", "content": "hello", })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("expected data.success=true, data=%+v", resp.Data) } if gotPath != "/anthropic/v1/messages" { t.Fatalf("path=%q, want %q", gotPath, "/anthropic/v1/messages") } if !strings.Contains(gotBody, `"messages"`) { t.Fatalf("expected anthropic request body, body=%s", gotBody) } apiResp, ok := resp.Data["api_response"].(map[string]any) if !ok { t.Fatalf("expected translated api_response map, data=%+v", resp.Data) } if _, ok := apiResp["object"]; !ok { t.Fatalf("expected codex-compatible api_response, got=%+v", apiResp) } } // TestHandleChannelTest_UpstreamModeBypassesLocalTransform 验证 mode=upstream 时 // 即使客户端选择的协议与渠道原生协议不同,也直接以客户端协议构造上游请求,不触发本地翻译。 func TestHandleChannelTest_UpstreamModeBypassesLocalTransform(t *testing.T) { var gotPath string var gotBody string upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { t.Fatalf("ReadAll failed: %v", err) } gotPath = r.URL.Path gotBody = string(body) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{ "id": "chatcmpl_test", "object": "chat.completion", "model": "claude-3-5-sonnet", "choices": [{"index": 0, "message": {"role": "assistant", "content": "hello"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "anthropic-upstream-passthrough", URL: upstream.URL, Priority: 1, ChannelType: "anthropic", ProtocolTransformMode: model.ProtocolTransformModeUpstream, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{ "model": "claude-3-5-sonnet", "protocol_transform": "openai", "content": "hello", })) c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("expected data.success=true, data=%+v", resp.Data) } if gotPath != "/v1/chat/completions" { t.Fatalf("path=%q, want %q (mode=upstream 应直通 client 协议)", gotPath, "/v1/chat/completions") } if !strings.Contains(gotBody, `"messages"`) { t.Fatalf("expected openai chat request body, body=%s", gotBody) } if strings.Contains(gotBody, `"max_tokens"`) { t.Fatalf("body should be openai shape (no anthropic max_tokens), body=%s", gotBody) } apiResp, ok := resp.Data["api_response"].(map[string]any) if !ok { t.Fatalf("expected raw api_response map, data=%+v", resp.Data) } if _, ok := apiResp["choices"]; !ok { t.Fatalf("expected openai-style api_response (choices), got=%+v", apiResp) } } // TestHandleChannelTest_SuccessfulAPI 使用 mock server 模拟成功的 API 调用 func TestHandleChannelTest_SuccessfulAPI(t *testing.T) { // 创建 mock 上游服务器,返回成功的 Anthropic 响应 mockResp := `{ "id": "msg_test", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "Hello"}], "model": "claude-3-5-sonnet", "usage": {"input_tokens": 10, "output_tokens": 5} }` upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(mockResp)) })) defer upstream.Close() srv := newInMemoryServer(t) // 替换 HTTP client 以使用 mock server srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-success-channel", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) if !resp.Success { t.Fatalf("外层 APIResponse.Success 应为 true, error=%q", resp.Error) } dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("data.success 应为 true(API 调用成功), data=%+v", resp.Data) } } func TestHandleChannelTest_OpenAIRequestIncludesSessionID(t *testing.T) { var gotSessionID string var gotBody []byte upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gotSessionID = r.Header.Get("Session_id") if got := r.Header.Get("Session-Id"); got != "" { t.Fatalf("Session-Id header should be omitted, got %q", got) } gotBody, _ = io.ReadAll(r.Body) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{ "id": "chatcmpl_test", "object": "chat.completion", "model": "gpt-test", "choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "openai-test-session-id", URL: upstream.URL, Priority: 1, ChannelType: "openai", ModelEntries: []model.ModelEntry{{Model: "gpt-test"}}, Enabled: true, }) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{ "model": "gpt-test", "channel_type": "openai", })) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } if !uuidPattern.MatchString(gotSessionID) { t.Fatalf("Session_id header missing or invalid: %q", gotSessionID) } var upstreamBody map[string]any if err := json.Unmarshal(gotBody, &upstreamBody); err != nil { t.Fatalf("unmarshal upstream body failed: %v; body=%s", err, gotBody) } if got, _ := upstreamBody["user"].(string); got != gotSessionID { t.Fatalf("body user = %q, want session id %q; body=%s", got, gotSessionID, gotBody) } if got, _ := upstreamBody["prompt_cache_key"].(string); got != gotSessionID { t.Fatalf("body prompt_cache_key = %q, want session id %q; body=%s", got, gotSessionID, gotBody) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("data.success 应为 true, data=%+v", resp.Data) } } // TestHandleChannelTest_FailedAPI 使用 mock server 模拟失败的 API 调用 func TestHandleChannelTest_FailedAPI(t *testing.T) { // 创建 mock 上游服务器,返回 401 错误 upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":{"type":"authentication_error","message":"invalid api key"}}`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-fail-channel", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-invalid-key"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatal("data.success 应为 false(API 调用失败 401)") } // 验证冷却决策被记录 if action, ok := resp.Data["cooldown_action"].(string); ok { if action == "" { t.Fatal("失败时应有冷却决策记录") } t.Logf("冷却决策: %s", action) } } func TestHandleChannelTest_HonorsRequestedKeyIndexEvenIfCooled(t *testing.T) { mockResp := `{ "id": "msg_test", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "Hello"}], "model": "claude-3-5-sonnet", "usage": {"input_tokens": 10, "output_tokens": 5} }` var gotAuth string upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gotAuth = r.Header.Get("Authorization") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(mockResp)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "test-honor-cooled-key-channel", URL: upstream.URL, Priority: 1, ChannelType: "anthropic", ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-cooled"}, {ChannelID: created.ID, KeyIndex: 1, APIKey: "sk-fresh"}, }); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } if err := srv.store.SetKeyCooldown(ctx, created.ID, 0, time.Now().Add(10*time.Minute)); err != nil { t.Fatalf("SetKeyCooldown failed: %v", err) } channelID := fmt.Sprintf("%d", created.ID) c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", "key_index": 0, })) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) if dataSuccess, _ := resp.Data["success"].(bool); !dataSuccess { t.Fatalf("data.success=false, data=%+v", resp.Data) } if gotAuth != "Bearer sk-cooled" { t.Fatalf("Authorization=%q, want Bearer sk-cooled (requested key must be honored even if cooled)", gotAuth) } if gotIndex, _ := resp.Data["tested_key_index"].(float64); gotIndex != 0 { t.Fatalf("tested_key_index=%v, want 0", resp.Data["tested_key_index"]) } } // TestHandleChannelTest_RejectsUnknownKeyIndex 验证:请求一个不存在的 key_index 时直接报错, // 不再静默回退到其他可用 Key(既往会调用 SelectAvailableKey)。配合 HonorsRequestedKeyIndexEvenIfCooled // 共同保证"显式 key_index 即真"语义。 func TestHandleChannelTest_RejectsUnknownKeyIndex(t *testing.T) { srv := newInMemoryServer(t) ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "test-reject-unknown-key-channel", URL: "http://test.example.com", Priority: 1, ChannelType: "anthropic", ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-only"}, }); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } channelID := fmt.Sprintf("%d", created.ID) c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", "key_index": 99, // 不存在 })) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatalf("data.success=true, want false; data=%+v", resp.Data) } dataError, _ := resp.Data["error"].(string) if !strings.Contains(dataError, "Key #99") { t.Fatalf("data.error=%q, want mention of Key #99", dataError) } } func TestHandleChannelTest_UsesRequestAPIKeyWithoutTouchingSavedCooldown(t *testing.T) { mockResp := `{ "id": "msg_test", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "Hello"}], "model": "claude-3-5-sonnet", "usage": {"input_tokens": 10, "output_tokens": 5} }` var gotAuth string upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gotAuth = r.Header.Get("Authorization") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(mockResp)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "test-request-key-channel", URL: upstream.URL, Priority: 1, ChannelType: "anthropic", ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-saved-key"}, }); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } coolUntil := time.Now().Add(10 * time.Minute) if err := srv.store.SetKeyCooldown(ctx, created.ID, 0, coolUntil); err != nil { t.Fatalf("SetKeyCooldown failed: %v", err) } channelID := fmt.Sprintf("%d", created.ID) c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", "key_index": 1, "api_key": "sk-unsaved-key", })) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) if dataSuccess, _ := resp.Data["success"].(bool); !dataSuccess { t.Fatalf("data.success=false, data=%+v", resp.Data) } if gotAuth != "Bearer sk-unsaved-key" { t.Fatalf("Authorization=%q, want request api key", gotAuth) } keys, err := srv.store.GetAPIKeys(ctx, created.ID) if err != nil { t.Fatalf("GetAPIKeys failed: %v", err) } if len(keys) != 1 { t.Fatalf("keys len=%d, want 1", len(keys)) } if keys[0].CooldownUntil == 0 { t.Fatalf("saved key cooldown was reset for an unsaved request key") } } func TestHandleChannelTest_WritesManualTestLog(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":{"type":"authentication_error","message":"invalid api key"}}`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() now := time.Now().Add(-time.Minute) created, err := srv.store.CreateConfig(ctx, &model.Config{ Name: "manual-test-log-channel", URL: upstream.URL, Priority: 1, ChannelType: "anthropic", ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, }) if err != nil { t.Fatalf("CreateConfig failed: %v", err) } if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-invalid-key"}}); err != nil { t.Fatalf("CreateAPIKeysBatch failed: %v", err) } channelID := fmt.Sprintf("%d", created.ID) c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", })) c.Request.RemoteAddr = "198.51.100.10:12345" c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String()) } logs, err := srv.store.ListLogs(ctx, now, 10, 0, &model.LogFilter{LogSource: model.LogSourceManualTest}) if err != nil { t.Fatalf("ListLogs failed: %v", err) } if len(logs) != 1 { t.Fatalf("expected 1 manual test log, got %d", len(logs)) } entry := logs[0] if entry.LogSource != model.LogSourceManualTest { t.Fatalf("LogSource=%q, want %q", entry.LogSource, model.LogSourceManualTest) } if entry.StatusCode != http.StatusUnauthorized { t.Fatalf("StatusCode=%d, want %d", entry.StatusCode, http.StatusUnauthorized) } if entry.ClientIP != "198.51.100.10" { t.Fatalf("ClientIP=%q, want %q", entry.ClientIP, "198.51.100.10") } if entry.AuthTokenID != 0 { t.Fatalf("AuthTokenID=%d, want 0", entry.AuthTokenID) } if entry.BaseURL != upstream.URL { t.Fatalf("BaseURL=%q, want %q", entry.BaseURL, upstream.URL) } } func TestHandleChannelTest_SSESoftErrorTriggersCooldown(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = fmt.Fprint(w, "event: \n") _, _ = fmt.Fprint(w, "data: {\"error\":{\"code\":\"1113\",\"message\":\"Insufficient balance or no resource package. Please recharge.\"},\"request_id\":\"req_1113\"}\n\n") })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-sse-soft-error", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}}, Enabled: true, } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-soft-error"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "claude-3-5-sonnet", "channel_type": "anthropic", "stream": true, } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) if !resp.Success { t.Fatalf("外层 APIResponse.Success 应为 true, error=%q", resp.Error) } dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatalf("data.success 应为 false, data=%+v", resp.Data) } if got, _ := resp.Data["error"].(string); got != "Insufficient balance or no resource package. Please recharge." { t.Fatalf("错误信息不对,got=%q data=%+v", got, resp.Data) } if got, _ := resp.Data["cooldown_action"].(string); got != "channel_cooldown_applied" { t.Fatalf("1113 软错误在单 Key 渠道应升级为渠道冷却,got=%q data=%+v", got, resp.Data) } } func TestHandleChannelTest_EventStreamHeaderWithJSONBodyFallback(t *testing.T) { // 模拟“Content-Type=event-stream,但实际返回完整JSON”场景 upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{ "id":"resp_test", "status":"completed", "output":[ { "type":"message", "content":[{"type":"output_text","text":"fallback text"}] } ], "usage":{"input_tokens":12,"output_tokens":8} }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-codex-json-fallback", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "gpt-5.2"}}, Enabled: true, ChannelType: "codex", } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "gpt-5.2", "channel_type": "codex", "stream": false, } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if !dataSuccess { t.Fatalf("data.success 应为 true, data=%+v", resp.Data) } responseText, _ := resp.Data["response_text"].(string) if responseText == "" { t.Fatalf("应解析出 response_text, data=%+v", resp.Data) } if responseText != "fallback text" { t.Fatalf("response_text 解析错误: %q", responseText) } message, _ := resp.Data["message"].(string) if message != "API测试成功" { t.Fatalf("应按非流式成功文案返回,实际: %q", message) } } func TestHandleChannelTest_CodexJSONFailedResponseShouldBeFailure(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{ "id":"resp_failed", "object":"response", "status":"failed", "error":{ "code":"server_error", "message":"upstream failed" }, "output":[] }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-codex-json-failed", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "gpt-5.4"}}, Enabled: true, ChannelType: "codex", } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "gpt-5.4", "channel_type": "codex", "stream": false, } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatalf("data.success 应为 false, data=%+v", resp.Data) } errorMsg, _ := resp.Data["error"].(string) if errorMsg != "upstream failed" { t.Fatalf("应返回上游错误信息,实际: %q, data=%+v", errorMsg, resp.Data) } if message, _ := resp.Data["message"].(string); message != "" { t.Fatalf("失败响应不应返回成功文案,实际: %q", message) } } func TestHandleChannelTest_StringAPIErrorShouldExposeUpstreamMessage(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(http.StatusTooManyRequests) _, _ = w.Write([]byte(`{ "error":"由于负载过高,为了尽量保证用户体验,本站已开启限流,当前用户本周无法使用,请下周重试", "type":"error" }`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-string-api-error", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "gpt-5.4"}}, Enabled: true, ChannelType: "openai", } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "gpt-5.4", "channel_type": "openai", "stream": false, } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatalf("data.success 应为 false, data=%+v", resp.Data) } errorMsg, _ := resp.Data["error"].(string) expected := "由于负载过高,为了尽量保证用户体验,本站已开启限流,当前用户本周无法使用,请下周重试" if errorMsg != expected { t.Fatalf("应返回上游字符串错误信息,实际: %q, data=%+v", errorMsg, resp.Data) } } func TestHandleChannelTest_HTMLBlockPageShouldBeFailure(t *testing.T) { upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(` 您的IP已被封锁

当前 IP 已被封锁

暂时无法访问本站内容。

`)) })) defer upstream.Close() srv := newInMemoryServer(t) srv.client = upstream.Client() ctx := context.Background() cfg := &model.Config{ Name: "test-html-block-page", URL: upstream.URL, Priority: 1, ModelEntries: []model.ModelEntry{{Model: "gpt-5.4"}}, Enabled: true, ChannelType: "openai", } created, err := srv.store.CreateConfig(ctx, cfg) if err != nil { t.Fatalf("创建测试渠道失败: %v", err) } err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{ {ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}, }) if err != nil { t.Fatalf("添加 API key 失败: %v", err) } channelID := fmt.Sprintf("%d", created.ID) reqBody := map[string]any{ "model": "gpt-5.4", "channel_type": "openai", "stream": false, } c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody)) c.Params = gin.Params{{Key: "id", Value: channelID}} srv.HandleChannelTest(c) if w.Code != http.StatusOK { t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String()) } resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes()) dataSuccess, _ := resp.Data["success"].(bool) if dataSuccess { t.Fatalf("HTML 封禁页必须判定为失败, data=%+v", resp.Data) } errorMsg, _ := resp.Data["error"].(string) if !strings.Contains(errorMsg, "IP") || !strings.Contains(errorMsg, "封锁") { t.Fatalf("应提炼出上游封禁信息,实际: %q, data=%+v", errorMsg, resp.Data) } rawResp, _ := resp.Data["raw_response"].(string) if !strings.Contains(rawResp, "您的IP已被封锁") { t.Fatalf("应保留原始 HTML 响应,实际: %q", rawResp) } if message, _ := resp.Data["message"].(string); message != "" { t.Fatalf("失败响应不应返回成功文案,实际: %q", message) } } func TestShouldFallbackToNextURL_StructuredSoftErrors(t *testing.T) { t.Run("key_level_soft_error_should_not_fallback_or_cooldown_url", func(t *testing.T) { result := map[string]any{ "success": false, "status_code": http.StatusOK, "api_error": map[string]any{ "error": map[string]any{ "code": "1113", "message": "Insufficient balance or no resource package. Please recharge.", }, }, "response_headers": map[string]string{ "Content-Type": "text/event-stream", }, } continueFallback, shouldCooldown := shouldFallbackToNextURL(result) if continueFallback || shouldCooldown { t.Fatalf("Key级软错误不应继续切URL或冷却URL,got fallback=%v cooldown=%v", continueFallback, shouldCooldown) } }) t.Run("channel_level_soft_error_should_fallback_and_cooldown_url", func(t *testing.T) { result := map[string]any{ "success": false, "status_code": http.StatusOK, "api_error": map[string]any{ "type": "error", "error": map[string]any{ "type": "api_error", "message": "upstream overloaded", }, }, } continueFallback, shouldCooldown := shouldFallbackToNextURL(result) if !continueFallback || !shouldCooldown { t.Fatalf("渠道级软错误应继续切URL并冷却当前URL,got fallback=%v cooldown=%v", continueFallback, shouldCooldown) } }) }