ccpoad / internal /app /admin_testing_test.go
anyalerob's picture
Upload folder using huggingface_hub
2986042 verified
Raw
History Blame Contribute Delete
59.4 kB
package app
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"
"ccLoad/internal/model"
"ccLoad/internal/testutil"
"ccLoad/internal/util"
"github.com/gin-gonic/gin"
)
// TestHandleChannelTest 测试渠道测试功能
func TestHandleChannelTest(t *testing.T) {
tests := []struct {
name string
channelID string
requestBody map[string]any
setupData bool
expectedStatus int
expectSuccess bool
}{
{
name: "无效的渠道ID",
channelID: "invalid",
requestBody: map[string]any{
"model": "test-model",
"channel_type": "anthropic",
},
setupData: false,
expectedStatus: http.StatusBadRequest,
expectSuccess: false,
},
{
name: "渠道不存在",
channelID: "999",
requestBody: map[string]any{
"model": "test-model",
"channel_type": "anthropic",
},
setupData: false,
expectedStatus: http.StatusNotFound,
expectSuccess: false,
},
{
name: "无效的请求体",
channelID: "1",
requestBody: map[string]any{
"invalid_field": "value",
},
setupData: false,
expectedStatus: http.StatusBadRequest,
expectSuccess: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 创建测试服务器
srv := newInMemoryServer(t)
ctx := context.Background()
// 设置测试数据(如果需要)
if tt.setupData {
cfg := &model.Config{
ID: 1,
Name: "test-channel",
URL: "http://test.example.com",
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "test-model", RedirectModel: ""}},
Enabled: true,
}
_, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+tt.channelID+"/test", tt.requestBody))
c.Params = gin.Params{{Key: "id", Value: tt.channelID}}
// 调用处理函数
srv.HandleChannelTest(c)
// 验证响应状态码
if w.Code != tt.expectedStatus {
t.Errorf("期望状态码 %d, 实际 %d, 响应: %s", tt.expectedStatus, w.Code, w.Body.String())
}
resp := mustParseAPIResponse[json.RawMessage](t, w.Body.Bytes())
if resp.Success != tt.expectSuccess {
t.Errorf("期望 success=%v, 实际=%v, error=%q", tt.expectSuccess, resp.Success, resp.Error)
}
})
}
}
func TestTestChannelAPI_MultiURLFallbackAndSelectorFeedback(t *testing.T) {
failCalls := 0
okCalls := 0
failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failCalls++
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":{"type":"server_error","message":"upstream fail"}}`))
}))
defer failUpstream.Close()
okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
okCalls++
time.Sleep(15 * time.Millisecond)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`))
}))
defer okUpstream.Close()
srv := newInMemoryServer(t)
cfg := &model.Config{
ID: 9527,
Name: "multi-url-test",
URL: failUpstream.URL + "\n" + okUpstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
// 强制第一跳命中失败URL,验证是否会回退到第二个URL。
srv.urlSelector.CooldownURL(cfg.ID, okUpstream.URL)
req := &testutil.TestChannelRequest{
Model: "gpt-4o-mini",
ChannelType: "openai",
Content: "hello",
}
result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req)
success, _ := result["success"].(bool)
if !success {
t.Fatalf("expected fallback success, got result=%+v", result)
}
if failCalls < 1 || okCalls < 1 {
t.Fatalf("expected both URLs attempted, failCalls=%d okCalls=%d", failCalls, okCalls)
}
if !srv.urlSelector.IsCooledDown(cfg.ID, failUpstream.URL) {
t.Fatalf("expected failed URL to be cooled down, url=%s", failUpstream.URL)
}
if lat, ok := srv.urlSelector.latencies[urlKey{channelID: cfg.ID, url: okUpstream.URL}]; !ok || lat == nil || lat.value <= 0 {
t.Fatalf("expected success URL latency recorded, got=%v", lat)
}
}
func TestExecuteChannelTestWithCooldown_RespectsRPMLimitWithoutCooldown(t *testing.T) {
hits := 0
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hits++
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":1,"completion_tokens":1}}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
cfg := &model.Config{
ID: 9528,
Name: "rpm-limited-test",
URL: upstream.URL,
Priority: 1,
RPMLimit: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
req := &testutil.TestChannelRequest{
Model: "gpt-4o-mini",
ChannelType: "openai",
Content: "hello",
}
first := srv.executeChannelTestWithCooldown(context.Background(), cfg, 0, "sk-test", req, true)
if success, _ := first["success"].(bool); !success {
t.Fatalf("first test should succeed, got result=%+v", first)
}
second := srv.executeChannelTestWithCooldown(context.Background(), cfg, 0, "sk-test", req, true)
if success, _ := second["success"].(bool); success {
t.Fatalf("second test should be RPM limited, got result=%+v", second)
}
if limited, _ := second["rpm_limited"].(bool); !limited {
t.Fatalf("expected rpm_limited marker, got result=%+v", second)
}
if action, _ := second["cooldown_action"].(string); action != "rpm_limited_no_cooldown" {
t.Fatalf("cooldown_action=%q, want rpm_limited_no_cooldown, result=%+v", action, second)
}
if retryAfterMs, _ := getResultInt(second["retry_after_ms"]); retryAfterMs <= 0 {
t.Fatalf("retry_after_ms=%d, want positive value, result=%+v", retryAfterMs, second)
}
if hits != 1 {
t.Fatalf("upstream hits=%d, want 1", hits)
}
}
func TestTestChannelAPI_MultiURLFallbackOnPlainText502(t *testing.T) {
failCalls := 0
okCalls := 0
failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failCalls++
w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("error code: 502"))
}))
defer failUpstream.Close()
okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
okCalls++
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`))
}))
defer okUpstream.Close()
srv := newInMemoryServer(t)
cfg := &model.Config{
ID: 9528,
Name: "multi-url-plain-502-test",
URL: failUpstream.URL + "\n" + okUpstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
// 强制第一跳命中 502 的坏 URL,验证 text/plain 错误体也会继续回退。
srv.urlSelector.CooldownURL(cfg.ID, okUpstream.URL)
req := &testutil.TestChannelRequest{
Model: "gpt-4o-mini",
ChannelType: "openai",
Content: "hello",
}
result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req)
success, _ := result["success"].(bool)
if !success {
t.Fatalf("expected fallback success on plain 502, got result=%+v", result)
}
if failCalls < 1 || okCalls < 1 {
t.Fatalf("expected both URLs attempted, failCalls=%d okCalls=%d", failCalls, okCalls)
}
if !srv.urlSelector.IsCooledDown(cfg.ID, failUpstream.URL) {
t.Fatalf("expected failed URL to be cooled down, url=%s", failUpstream.URL)
}
if got, ok := result["response_text"].(string); !ok || got != "ok" {
t.Fatalf("expected second URL success response_text=ok, got=%+v", result)
}
}
func TestTestChannelAPI_NonStreamUsesConfiguredTimeout(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-r.Context().Done():
return
case <-time.After(160 * time.Millisecond):
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"late"}}],"usage":{"prompt_tokens":1,"completion_tokens":1}}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.nonStreamTimeout = 25 * time.Millisecond
cfg := &model.Config{
ID: 9530,
Name: "non-stream-timeout-test",
URL: upstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
req := &testutil.TestChannelRequest{
Model: "gpt-4o-mini",
ChannelType: "openai",
Content: "hello",
Stream: false,
}
start := time.Now()
result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req)
elapsed := time.Since(start)
if success, _ := result["success"].(bool); success {
t.Fatalf("expected timeout failure, got result=%+v", result)
}
if elapsed >= 120*time.Millisecond {
t.Fatalf("expected configured timeout before delayed upstream response, elapsed=%v result=%+v", elapsed, result)
}
if statusCode, _ := getResultInt(result["status_code"]); statusCode != http.StatusGatewayTimeout {
t.Fatalf("status_code=%d, want %d, result=%+v", statusCode, http.StatusGatewayTimeout, result)
}
}
func TestTestChannelAPI_StreamFirstValidContentTimeoutIgnoresHeartbeats(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
flusher, _ := w.(http.Flusher)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
lateContent := time.NewTimer(160 * time.Millisecond)
defer lateContent.Stop()
for {
select {
case <-r.Context().Done():
return
case <-ticker.C:
_, _ = w.Write([]byte(": keep-alive\n\n"))
flusher.Flush()
case <-lateContent.C:
_, _ = w.Write([]byte("data: {\"choices\":[{\"delta\":{\"content\":\"late\"}}]}\n\n"))
_, _ = w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
return
}
}
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.firstByteTimeout = 30 * time.Millisecond
cfg := &model.Config{
ID: 9531,
Name: "stream-first-content-timeout-test",
URL: upstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
req := &testutil.TestChannelRequest{
Model: "gpt-4o-mini",
ChannelType: "openai",
Content: "hello",
Stream: true,
}
start := time.Now()
result := srv.testChannelAPI(context.Background(), cfg, "sk-test", req)
elapsed := time.Since(start)
if success, _ := result["success"].(bool); success {
t.Fatalf("expected first valid stream content timeout, got result=%+v", result)
}
if elapsed >= 120*time.Millisecond {
t.Fatalf("expected timeout before late content, elapsed=%v result=%+v", elapsed, result)
}
if statusCode, _ := getResultInt(result["status_code"]); statusCode != util.StatusFirstByteTimeout {
t.Fatalf("status_code=%d, want %d, result=%+v", statusCode, util.StatusFirstByteTimeout, result)
}
if _, ok := result["first_byte_duration_ms"]; ok {
t.Fatalf("heartbeat must not set first_byte_duration_ms, result=%+v", result)
}
}
func TestHandleChannelTest_RejectsBaseURL(t *testing.T) {
failCalls := 0
failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failCalls++
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer failUpstream.Close()
okCalls := 0
okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
okCalls++
w.WriteHeader(http.StatusOK)
}))
defer okUpstream.Close()
srv := newInMemoryServer(t)
ctx := context.Background()
cfg := &model.Config{
Name: "channel-test-reject-base-url",
URL: failUpstream.URL + "\n" + okUpstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+fmt.Sprintf("%d", created.ID)+"/test", map[string]any{
"model": "gpt-4o-mini",
"channel_type": "openai",
"base_url": okUpstream.URL,
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelTest(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusBadRequest, w.Body.String())
}
resp := mustParseAPIResponse[json.RawMessage](t, w.Body.Bytes())
if resp.Success {
t.Fatalf("expected success=false, resp=%+v", resp)
}
if !strings.Contains(resp.Error, "/test-url") {
t.Fatalf("expected error to guide /test-url, got %q", resp.Error)
}
if failCalls != 0 || okCalls != 0 {
t.Fatalf("expected no upstream request, failCalls=%d okCalls=%d", failCalls, okCalls)
}
}
func TestHandleChannelURLTest_UsesForcedURL(t *testing.T) {
failCalls := 0
failUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failCalls++
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":{"type":"server_error","message":"should not hit this url"}}`))
}))
defer failUpstream.Close()
okCalls := 0
okUpstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
okCalls++
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`))
}))
defer okUpstream.Close()
srv := newInMemoryServer(t)
ctx := context.Background()
cfg := &model.Config{
Name: "single-url-test",
URL: failUpstream.URL + "\n" + okUpstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4o-mini"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
// selector 和多 URL 顺序都不该影响显式单 URL 测试。
srv.urlSelector.CooldownURL(created.ID, okUpstream.URL)
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+fmt.Sprintf("%d", created.ID)+"/test-url", map[string]any{
"model": "gpt-4o-mini",
"channel_type": "openai",
"base_url": okUpstream.URL,
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelURLTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("expected success=true, data=%+v", resp.Data)
}
if failCalls != 0 {
t.Fatalf("expected forced base_url to skip fail url, failCalls=%d", failCalls)
}
if okCalls != 1 {
t.Fatalf("expected forced base_url called once, okCalls=%d", okCalls)
}
}
// TestHandleChannelTest_NoAPIKey 渠道存在但无 API key
func TestHandleChannelTest_NoAPIKey(t *testing.T) {
srv := newInMemoryServer(t)
ctx := context.Background()
// 创建渠道但不添加 API key
cfg := &model.Config{
Name: "no-key-channel",
URL: "http://test.example.com",
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "test-model"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "test-model",
"channel_type": "anthropic",
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
// 状态码 200,但 data 中 success=false
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
// RespondJSON 包装 success=true (外层), data 内部有 success: false
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
if !resp.Success {
t.Fatalf("外层 APIResponse.Success 应为 true, error=%q", resp.Error)
}
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatal("data.success 应为 false(渠道无 API key)")
}
dataError, _ := resp.Data["error"].(string)
if dataError == "" {
t.Fatal("data.error 不应为空")
}
}
// TestHandleChannelTest_UnsupportedModel 渠道存在、有 Key,但模型不支持
func TestHandleChannelTest_UnsupportedModel(t *testing.T) {
srv := newInMemoryServer(t)
ctx := context.Background()
cfg := &model.Config{
Name: "limited-model-channel",
URL: "http://test.example.com",
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
// 添加 API key
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "test-key-001"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "gpt-4-not-supported",
"channel_type": "anthropic",
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatal("data.success 应为 false(模型不支持)")
}
}
func TestHandleChannelTest_DefaultsProtocolTransformToChannelType(t *testing.T) {
var gotPath string
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":5}}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "default-protocol-transform-openai",
URL: upstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4.1"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{
"model": "gpt-4.1",
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("expected data.success=true, data=%+v", resp.Data)
}
if gotPath != "/v1/chat/completions" {
t.Fatalf("path=%q, want %q", gotPath, "/v1/chat/completions")
}
}
func TestHandleChannelTest_RejectsUnknownProtocolTransform(t *testing.T) {
failCalls := 0
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failCalls++
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl-test","choices":[{"message":{"content":"ok"}}]}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "unsupported-protocol-transform-openai",
URL: upstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-4.1"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{
"model": "gpt-4.1",
"protocol_transform": "unknown",
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatalf("expected data.success=false, data=%+v", resp.Data)
}
dataError, _ := resp.Data["error"].(string)
if !strings.Contains(dataError, "协议") {
t.Fatalf("expected protocol error, got %q", dataError)
}
if failCalls != 0 {
t.Fatalf("expected no upstream request, failCalls=%d", failCalls)
}
}
func TestHandleChannelTest_UsesProtocolTransformForTranslatedRequest(t *testing.T) {
var gotPath string
var gotBody string
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}
gotPath = r.URL.Path
gotBody = string(body)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "msg_test",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "translated ok"}],
"model": "claude-3-5-sonnet",
"usage": {"input_tokens": 10, "output_tokens": 5}
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "anthropic-with-runtime-openai-transform",
URL: upstream.URL,
Priority: 1,
ChannelType: "anthropic",
ProtocolTransformMode: model.ProtocolTransformModeLocal,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{
"model": "claude-3-5-sonnet",
"protocol_transform": "openai",
"content": "hello",
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("expected data.success=true, data=%+v", resp.Data)
}
if gotPath != "/v1/messages" {
t.Fatalf("path=%q, want %q", gotPath, "/v1/messages")
}
if !strings.Contains(gotBody, `"messages"`) {
t.Fatalf("expected anthropic request body, body=%s", gotBody)
}
apiResp, ok := resp.Data["api_response"].(map[string]any)
if !ok {
t.Fatalf("expected translated api_response map, data=%+v", resp.Data)
}
if _, ok := apiResp["choices"]; !ok {
t.Fatalf("expected openai-compatible api_response, got=%+v", apiResp)
}
}
func TestHandleChannelTest_UsesCodexProtocolTransformWithBasePathPrefix(t *testing.T) {
var gotPath string
var gotBody string
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}
gotPath = r.URL.Path
gotBody = string(body)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "msg_test",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "translated codex ok"}],
"model": "claude-3-5-sonnet",
"usage": {"input_tokens": 10, "output_tokens": 5}
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "anthropic-with-prefixed-base-path",
URL: upstream.URL + "/anthropic",
Priority: 1,
ChannelType: "anthropic",
ProtocolTransformMode: model.ProtocolTransformModeLocal,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{
"model": "claude-3-5-sonnet",
"protocol_transform": "codex",
"content": "hello",
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("expected data.success=true, data=%+v", resp.Data)
}
if gotPath != "/anthropic/v1/messages" {
t.Fatalf("path=%q, want %q", gotPath, "/anthropic/v1/messages")
}
if !strings.Contains(gotBody, `"messages"`) {
t.Fatalf("expected anthropic request body, body=%s", gotBody)
}
apiResp, ok := resp.Data["api_response"].(map[string]any)
if !ok {
t.Fatalf("expected translated api_response map, data=%+v", resp.Data)
}
if _, ok := apiResp["object"]; !ok {
t.Fatalf("expected codex-compatible api_response, got=%+v", apiResp)
}
}
// TestHandleChannelTest_UpstreamModeBypassesLocalTransform 验证 mode=upstream 时
// 即使客户端选择的协议与渠道原生协议不同,也直接以客户端协议构造上游请求,不触发本地翻译。
func TestHandleChannelTest_UpstreamModeBypassesLocalTransform(t *testing.T) {
var gotPath string
var gotBody string
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}
gotPath = r.URL.Path
gotBody = string(body)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "chatcmpl_test",
"object": "chat.completion",
"model": "claude-3-5-sonnet",
"choices": [{"index": 0, "message": {"role": "assistant", "content": "hello"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "anthropic-upstream-passthrough",
URL: upstream.URL,
Priority: 1,
ChannelType: "anthropic",
ProtocolTransformMode: model.ProtocolTransformModeUpstream,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, fmt.Sprintf("/admin/channels/%d/test", created.ID), map[string]any{
"model": "claude-3-5-sonnet",
"protocol_transform": "openai",
"content": "hello",
}))
c.Params = gin.Params{{Key: "id", Value: fmt.Sprintf("%d", created.ID)}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("expected data.success=true, data=%+v", resp.Data)
}
if gotPath != "/v1/chat/completions" {
t.Fatalf("path=%q, want %q (mode=upstream 应直通 client 协议)", gotPath, "/v1/chat/completions")
}
if !strings.Contains(gotBody, `"messages"`) {
t.Fatalf("expected openai chat request body, body=%s", gotBody)
}
if strings.Contains(gotBody, `"max_tokens"`) {
t.Fatalf("body should be openai shape (no anthropic max_tokens), body=%s", gotBody)
}
apiResp, ok := resp.Data["api_response"].(map[string]any)
if !ok {
t.Fatalf("expected raw api_response map, data=%+v", resp.Data)
}
if _, ok := apiResp["choices"]; !ok {
t.Fatalf("expected openai-style api_response (choices), got=%+v", apiResp)
}
}
// TestHandleChannelTest_SuccessfulAPI 使用 mock server 模拟成功的 API 调用
func TestHandleChannelTest_SuccessfulAPI(t *testing.T) {
// 创建 mock 上游服务器,返回成功的 Anthropic 响应
mockResp := `{
"id": "msg_test",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "Hello"}],
"model": "claude-3-5-sonnet",
"usage": {"input_tokens": 10, "output_tokens": 5}
}`
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(mockResp))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
// 替换 HTTP client 以使用 mock server
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-success-channel",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
if !resp.Success {
t.Fatalf("外层 APIResponse.Success 应为 true, error=%q", resp.Error)
}
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("data.success 应为 true(API 调用成功), data=%+v", resp.Data)
}
}
func TestHandleChannelTest_OpenAIRequestIncludesSessionID(t *testing.T) {
var gotSessionID string
var gotBody []byte
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotSessionID = r.Header.Get("Session_id")
if got := r.Header.Get("Session-Id"); got != "" {
t.Fatalf("Session-Id header should be omitted, got %q", got)
}
gotBody, _ = io.ReadAll(r.Body)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "chatcmpl_test",
"object": "chat.completion",
"model": "gpt-test",
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "openai-test-session-id",
URL: upstream.URL,
Priority: 1,
ChannelType: "openai",
ModelEntries: []model.ModelEntry{{Model: "gpt-test"}},
Enabled: true,
})
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"}}); err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{
"model": "gpt-test",
"channel_type": "openai",
}))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
if !uuidPattern.MatchString(gotSessionID) {
t.Fatalf("Session_id header missing or invalid: %q", gotSessionID)
}
var upstreamBody map[string]any
if err := json.Unmarshal(gotBody, &upstreamBody); err != nil {
t.Fatalf("unmarshal upstream body failed: %v; body=%s", err, gotBody)
}
if got, _ := upstreamBody["user"].(string); got != gotSessionID {
t.Fatalf("body user = %q, want session id %q; body=%s", got, gotSessionID, gotBody)
}
if got, _ := upstreamBody["prompt_cache_key"].(string); got != gotSessionID {
t.Fatalf("body prompt_cache_key = %q, want session id %q; body=%s", got, gotSessionID, gotBody)
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("data.success 应为 true, data=%+v", resp.Data)
}
}
// TestHandleChannelTest_FailedAPI 使用 mock server 模拟失败的 API 调用
func TestHandleChannelTest_FailedAPI(t *testing.T) {
// 创建 mock 上游服务器,返回 401 错误
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"error":{"type":"authentication_error","message":"invalid api key"}}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-fail-channel",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-invalid-key"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatal("data.success 应为 false(API 调用失败 401)")
}
// 验证冷却决策被记录
if action, ok := resp.Data["cooldown_action"].(string); ok {
if action == "" {
t.Fatal("失败时应有冷却决策记录")
}
t.Logf("冷却决策: %s", action)
}
}
func TestHandleChannelTest_HonorsRequestedKeyIndexEvenIfCooled(t *testing.T) {
mockResp := `{
"id": "msg_test",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "Hello"}],
"model": "claude-3-5-sonnet",
"usage": {"input_tokens": 10, "output_tokens": 5}
}`
var gotAuth string
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotAuth = r.Header.Get("Authorization")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(mockResp))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "test-honor-cooled-key-channel",
URL: upstream.URL,
Priority: 1,
ChannelType: "anthropic",
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-cooled"},
{ChannelID: created.ID, KeyIndex: 1, APIKey: "sk-fresh"},
}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
if err := srv.store.SetKeyCooldown(ctx, created.ID, 0, time.Now().Add(10*time.Minute)); err != nil {
t.Fatalf("SetKeyCooldown failed: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
"key_index": 0,
}))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
if dataSuccess, _ := resp.Data["success"].(bool); !dataSuccess {
t.Fatalf("data.success=false, data=%+v", resp.Data)
}
if gotAuth != "Bearer sk-cooled" {
t.Fatalf("Authorization=%q, want Bearer sk-cooled (requested key must be honored even if cooled)", gotAuth)
}
if gotIndex, _ := resp.Data["tested_key_index"].(float64); gotIndex != 0 {
t.Fatalf("tested_key_index=%v, want 0", resp.Data["tested_key_index"])
}
}
// TestHandleChannelTest_RejectsUnknownKeyIndex 验证:请求一个不存在的 key_index 时直接报错,
// 不再静默回退到其他可用 Key(既往会调用 SelectAvailableKey)。配合 HonorsRequestedKeyIndexEvenIfCooled
// 共同保证"显式 key_index 即真"语义。
func TestHandleChannelTest_RejectsUnknownKeyIndex(t *testing.T) {
srv := newInMemoryServer(t)
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "test-reject-unknown-key-channel",
URL: "http://test.example.com",
Priority: 1,
ChannelType: "anthropic",
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-only"},
}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
"key_index": 99, // 不存在
}))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatalf("data.success=true, want false; data=%+v", resp.Data)
}
dataError, _ := resp.Data["error"].(string)
if !strings.Contains(dataError, "Key #99") {
t.Fatalf("data.error=%q, want mention of Key #99", dataError)
}
}
func TestHandleChannelTest_UsesRequestAPIKeyWithoutTouchingSavedCooldown(t *testing.T) {
mockResp := `{
"id": "msg_test",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "Hello"}],
"model": "claude-3-5-sonnet",
"usage": {"input_tokens": 10, "output_tokens": 5}
}`
var gotAuth string
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotAuth = r.Header.Get("Authorization")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(mockResp))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "test-request-key-channel",
URL: upstream.URL,
Priority: 1,
ChannelType: "anthropic",
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-saved-key"},
}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
coolUntil := time.Now().Add(10 * time.Minute)
if err := srv.store.SetKeyCooldown(ctx, created.ID, 0, coolUntil); err != nil {
t.Fatalf("SetKeyCooldown failed: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
"key_index": 1,
"api_key": "sk-unsaved-key",
}))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
if dataSuccess, _ := resp.Data["success"].(bool); !dataSuccess {
t.Fatalf("data.success=false, data=%+v", resp.Data)
}
if gotAuth != "Bearer sk-unsaved-key" {
t.Fatalf("Authorization=%q, want request api key", gotAuth)
}
keys, err := srv.store.GetAPIKeys(ctx, created.ID)
if err != nil {
t.Fatalf("GetAPIKeys failed: %v", err)
}
if len(keys) != 1 {
t.Fatalf("keys len=%d, want 1", len(keys))
}
if keys[0].CooldownUntil == 0 {
t.Fatalf("saved key cooldown was reset for an unsaved request key")
}
}
func TestHandleChannelTest_WritesManualTestLog(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"error":{"type":"authentication_error","message":"invalid api key"}}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
now := time.Now().Add(-time.Minute)
created, err := srv.store.CreateConfig(ctx, &model.Config{
Name: "manual-test-log-channel",
URL: upstream.URL,
Priority: 1,
ChannelType: "anthropic",
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
})
if err != nil {
t.Fatalf("CreateConfig failed: %v", err)
}
if err := srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-invalid-key"}}); err != nil {
t.Fatalf("CreateAPIKeysBatch failed: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
}))
c.Request.RemoteAddr = "198.51.100.10:12345"
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want %d, body=%s", w.Code, http.StatusOK, w.Body.String())
}
logs, err := srv.store.ListLogs(ctx, now, 10, 0, &model.LogFilter{LogSource: model.LogSourceManualTest})
if err != nil {
t.Fatalf("ListLogs failed: %v", err)
}
if len(logs) != 1 {
t.Fatalf("expected 1 manual test log, got %d", len(logs))
}
entry := logs[0]
if entry.LogSource != model.LogSourceManualTest {
t.Fatalf("LogSource=%q, want %q", entry.LogSource, model.LogSourceManualTest)
}
if entry.StatusCode != http.StatusUnauthorized {
t.Fatalf("StatusCode=%d, want %d", entry.StatusCode, http.StatusUnauthorized)
}
if entry.ClientIP != "198.51.100.10" {
t.Fatalf("ClientIP=%q, want %q", entry.ClientIP, "198.51.100.10")
}
if entry.AuthTokenID != 0 {
t.Fatalf("AuthTokenID=%d, want 0", entry.AuthTokenID)
}
if entry.BaseURL != upstream.URL {
t.Fatalf("BaseURL=%q, want %q", entry.BaseURL, upstream.URL)
}
}
func TestHandleChannelTest_SSESoftErrorTriggersCooldown(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "event: \n")
_, _ = fmt.Fprint(w, "data: {\"error\":{\"code\":\"1113\",\"message\":\"Insufficient balance or no resource package. Please recharge.\"},\"request_id\":\"req_1113\"}\n\n")
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-sse-soft-error",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "claude-3-5-sonnet"}},
Enabled: true,
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-soft-error"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "claude-3-5-sonnet",
"channel_type": "anthropic",
"stream": true,
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
if !resp.Success {
t.Fatalf("外层 APIResponse.Success 应为 true, error=%q", resp.Error)
}
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatalf("data.success 应为 false, data=%+v", resp.Data)
}
if got, _ := resp.Data["error"].(string); got != "Insufficient balance or no resource package. Please recharge." {
t.Fatalf("错误信息不对,got=%q data=%+v", got, resp.Data)
}
if got, _ := resp.Data["cooldown_action"].(string); got != "channel_cooldown_applied" {
t.Fatalf("1113 软错误在单 Key 渠道应升级为渠道冷却,got=%q data=%+v", got, resp.Data)
}
}
func TestHandleChannelTest_EventStreamHeaderWithJSONBodyFallback(t *testing.T) {
// 模拟“Content-Type=event-stream,但实际返回完整JSON”场景
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id":"resp_test",
"status":"completed",
"output":[
{
"type":"message",
"content":[{"type":"output_text","text":"fallback text"}]
}
],
"usage":{"input_tokens":12,"output_tokens":8}
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-codex-json-fallback",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "gpt-5.2"}},
Enabled: true,
ChannelType: "codex",
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "gpt-5.2",
"channel_type": "codex",
"stream": false,
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if !dataSuccess {
t.Fatalf("data.success 应为 true, data=%+v", resp.Data)
}
responseText, _ := resp.Data["response_text"].(string)
if responseText == "" {
t.Fatalf("应解析出 response_text, data=%+v", resp.Data)
}
if responseText != "fallback text" {
t.Fatalf("response_text 解析错误: %q", responseText)
}
message, _ := resp.Data["message"].(string)
if message != "API测试成功" {
t.Fatalf("应按非流式成功文案返回,实际: %q", message)
}
}
func TestHandleChannelTest_CodexJSONFailedResponseShouldBeFailure(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id":"resp_failed",
"object":"response",
"status":"failed",
"error":{
"code":"server_error",
"message":"upstream failed"
},
"output":[]
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-codex-json-failed",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "gpt-5.4"}},
Enabled: true,
ChannelType: "codex",
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "gpt-5.4",
"channel_type": "codex",
"stream": false,
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatalf("data.success 应为 false, data=%+v", resp.Data)
}
errorMsg, _ := resp.Data["error"].(string)
if errorMsg != "upstream failed" {
t.Fatalf("应返回上游错误信息,实际: %q, data=%+v", errorMsg, resp.Data)
}
if message, _ := resp.Data["message"].(string); message != "" {
t.Fatalf("失败响应不应返回成功文案,实际: %q", message)
}
}
func TestHandleChannelTest_StringAPIErrorShouldExposeUpstreamMessage(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusTooManyRequests)
_, _ = w.Write([]byte(`{
"error":"由于负载过高,为了尽量保证用户体验,本站已开启限流,当前用户本周无法使用,请下周重试",
"type":"error"
}`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-string-api-error",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "gpt-5.4"}},
Enabled: true,
ChannelType: "openai",
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "gpt-5.4",
"channel_type": "openai",
"stream": false,
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatalf("data.success 应为 false, data=%+v", resp.Data)
}
errorMsg, _ := resp.Data["error"].(string)
expected := "由于负载过高,为了尽量保证用户体验,本站已开启限流,当前用户本周无法使用,请下周重试"
if errorMsg != expected {
t.Fatalf("应返回上游字符串错误信息,实际: %q, data=%+v", errorMsg, resp.Data)
}
}
func TestHandleChannelTest_HTMLBlockPageShouldBeFailure(t *testing.T) {
upstream := newTestHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>您的IP已被封锁</title>
</head>
<body>
<div class="container">
<h1>当前 IP 已被封锁</h1>
<p>暂时无法访问本站内容。</p>
</div>
</body>
</html>`))
}))
defer upstream.Close()
srv := newInMemoryServer(t)
srv.client = upstream.Client()
ctx := context.Background()
cfg := &model.Config{
Name: "test-html-block-page",
URL: upstream.URL,
Priority: 1,
ModelEntries: []model.ModelEntry{{Model: "gpt-5.4"}},
Enabled: true,
ChannelType: "openai",
}
created, err := srv.store.CreateConfig(ctx, cfg)
if err != nil {
t.Fatalf("创建测试渠道失败: %v", err)
}
err = srv.store.CreateAPIKeysBatch(ctx, []*model.APIKey{
{ChannelID: created.ID, KeyIndex: 0, APIKey: "sk-test-key"},
})
if err != nil {
t.Fatalf("添加 API key 失败: %v", err)
}
channelID := fmt.Sprintf("%d", created.ID)
reqBody := map[string]any{
"model": "gpt-5.4",
"channel_type": "openai",
"stream": false,
}
c, w := newTestContext(t, newJSONRequest(t, http.MethodPost, "/admin/channels/"+channelID+"/test", reqBody))
c.Params = gin.Params{{Key: "id", Value: channelID}}
srv.HandleChannelTest(c)
if w.Code != http.StatusOK {
t.Fatalf("期望 200, 实际 %d, 响应: %s", w.Code, w.Body.String())
}
resp := mustParseAPIResponse[map[string]any](t, w.Body.Bytes())
dataSuccess, _ := resp.Data["success"].(bool)
if dataSuccess {
t.Fatalf("HTML 封禁页必须判定为失败, data=%+v", resp.Data)
}
errorMsg, _ := resp.Data["error"].(string)
if !strings.Contains(errorMsg, "IP") || !strings.Contains(errorMsg, "封锁") {
t.Fatalf("应提炼出上游封禁信息,实际: %q, data=%+v", errorMsg, resp.Data)
}
rawResp, _ := resp.Data["raw_response"].(string)
if !strings.Contains(rawResp, "<title>您的IP已被封锁</title>") {
t.Fatalf("应保留原始 HTML 响应,实际: %q", rawResp)
}
if message, _ := resp.Data["message"].(string); message != "" {
t.Fatalf("失败响应不应返回成功文案,实际: %q", message)
}
}
func TestShouldFallbackToNextURL_StructuredSoftErrors(t *testing.T) {
t.Run("key_level_soft_error_should_not_fallback_or_cooldown_url", func(t *testing.T) {
result := map[string]any{
"success": false,
"status_code": http.StatusOK,
"api_error": map[string]any{
"error": map[string]any{
"code": "1113",
"message": "Insufficient balance or no resource package. Please recharge.",
},
},
"response_headers": map[string]string{
"Content-Type": "text/event-stream",
},
}
continueFallback, shouldCooldown := shouldFallbackToNextURL(result)
if continueFallback || shouldCooldown {
t.Fatalf("Key级软错误不应继续切URL或冷却URL,got fallback=%v cooldown=%v", continueFallback, shouldCooldown)
}
})
t.Run("channel_level_soft_error_should_fallback_and_cooldown_url", func(t *testing.T) {
result := map[string]any{
"success": false,
"status_code": http.StatusOK,
"api_error": map[string]any{
"type": "error",
"error": map[string]any{
"type": "api_error",
"message": "upstream overloaded",
},
},
}
continueFallback, shouldCooldown := shouldFallbackToNextURL(result)
if !continueFallback || !shouldCooldown {
t.Fatalf("渠道级软错误应继续切URL并冷却当前URL,got fallback=%v cooldown=%v", continueFallback, shouldCooldown)
}
})
}