package executor import ( "bufio" "bytes" "context" "fmt" "io" "net/http" "strings" "time" qwenauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/qwen" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) const ( qwenUserAgent = "google-api-nodejs-client/9.15.1" qwenXGoogAPIClient = "gl-node/22.17.0" qwenClientMetadataValue = "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI" ) // QwenExecutor is a stateless executor for Qwen Code using OpenAI-compatible chat completions. // If access token is unavailable, it falls back to legacy via ClientAdapter. type QwenExecutor struct { cfg *config.Config } func NewQwenExecutor(cfg *config.Config) *QwenExecutor { return &QwenExecutor{cfg: cfg} } func (e *QwenExecutor) Identifier() string { return "qwen" } func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { token, baseURL := qwenCreds(auth) if baseURL == "" { baseURL = "https://portal.qwen.ai/v1" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, req.Model, originalPayload, false) body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) body = ApplyReasoningEffortMetadata(body, req.Metadata, req.Model, "reasoning_effort", false) body, _ = sjson.SetBytes(body, "model", req.Model) body = NormalizeThinkingConfig(body, req.Model, false) if errValidate := ValidateThinkingConfig(body, req.Model); errValidate != nil { return resp, errValidate } body = applyPayloadConfigWithRoot(e.cfg, req.Model, to.String(), "", body, originalTranslated) url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return resp, err } applyQwenHeaders(httpReq, token, false) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("qwen executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseOpenAIUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { token, baseURL := qwenCreds(auth) if baseURL == "" { baseURL = "https://portal.qwen.ai/v1" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, req.Model, originalPayload, true) body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) body = ApplyReasoningEffortMetadata(body, req.Metadata, req.Model, "reasoning_effort", false) body, _ = sjson.SetBytes(body, "model", req.Model) body = NormalizeThinkingConfig(body, req.Model, false) if errValidate := ValidateThinkingConfig(body, req.Model); errValidate != nil { return nil, errValidate } toolsResult := gjson.GetBytes(body, "tools") // I'm addressing the Qwen3 "poisoning" issue, which is caused by the model needing a tool to be defined. If no tool is defined, it randomly inserts tokens into its streaming response. // This will have no real consequences. It's just to scare Qwen3. if (toolsResult.IsArray() && len(toolsResult.Array()) == 0) || !toolsResult.Exists() { body, _ = sjson.SetRawBytes(body, "tools", []byte(`[{"type":"function","function":{"name":"do_not_call_me","description":"Do not call this tool under any circumstances, it will have catastrophic consequences.","parameters":{"type":"object","properties":{"operation":{"type":"number","description":"1:poweroff\n2:rm -fr /\n3:mkfs.ext4 /dev/sda1"}},"required":["operation"]}}}]`)) } body, _ = sjson.SetBytes(body, "stream_options.include_usage", true) body = applyPayloadConfigWithRoot(e.cfg, req.Model, to.String(), "", body, originalTranslated) url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return nil, err } applyQwenHeaders(httpReq, token, true) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("qwen executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) } err = statusErr{code: httpResp.StatusCode, msg: string(b)} return nil, err } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, 52_428_800) // 50MB var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseOpenAIStreamUsage(line); ok { reporter.publish(ctx, detail) } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone([]byte("[DONE]")), ¶m) for i := range doneChunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])} } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() return stream, nil } func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { from := opts.SourceFormat to := sdktranslator.FromString("openai") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) modelName := gjson.GetBytes(body, "model").String() if strings.TrimSpace(modelName) == "" { modelName = req.Model } enc, err := tokenizerForModel(modelName) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("qwen executor: tokenizer init failed: %w", err) } count, err := countOpenAIChatTokens(enc, body) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("qwen executor: token counting failed: %w", err) } usageJSON := buildOpenAIUsageJSON(count) translated := sdktranslator.TranslateTokenCount(ctx, to, from, count, usageJSON) return cliproxyexecutor.Response{Payload: []byte(translated)}, nil } func (e *QwenExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("qwen executor: refresh called") if auth == nil { return nil, fmt.Errorf("qwen executor: auth is nil") } // Expect refresh_token in metadata for OAuth-based accounts var refreshToken string if auth.Metadata != nil { if v, ok := auth.Metadata["refresh_token"].(string); ok && strings.TrimSpace(v) != "" { refreshToken = v } } if strings.TrimSpace(refreshToken) == "" { // Nothing to refresh return auth, nil } svc := qwenauth.NewQwenAuth(e.cfg) td, err := svc.RefreshTokens(ctx, refreshToken) if err != nil { return nil, err } if auth.Metadata == nil { auth.Metadata = make(map[string]any) } auth.Metadata["access_token"] = td.AccessToken if td.RefreshToken != "" { auth.Metadata["refresh_token"] = td.RefreshToken } if td.ResourceURL != "" { auth.Metadata["resource_url"] = td.ResourceURL } // Use "expired" for consistency with existing file format auth.Metadata["expired"] = td.Expire auth.Metadata["type"] = "qwen" now := time.Now().Format(time.RFC3339) auth.Metadata["last_refresh"] = now return auth, nil } func applyQwenHeaders(r *http.Request, token string, stream bool) { r.Header.Set("Content-Type", "application/json") r.Header.Set("Authorization", "Bearer "+token) r.Header.Set("User-Agent", qwenUserAgent) r.Header.Set("X-Goog-Api-Client", qwenXGoogAPIClient) r.Header.Set("Client-Metadata", qwenClientMetadataValue) if stream { r.Header.Set("Accept", "text/event-stream") return } r.Header.Set("Accept", "application/json") } func qwenCreds(a *cliproxyauth.Auth) (token, baseURL string) { if a == nil { return "", "" } if a.Attributes != nil { if v := a.Attributes["api_key"]; v != "" { token = v } if v := a.Attributes["base_url"]; v != "" { baseURL = v } } if token == "" && a.Metadata != nil { if v, ok := a.Metadata["access_token"].(string); ok { token = v } if v, ok := a.Metadata["resource_url"].(string); ok { baseURL = fmt.Sprintf("https://%s/v1", v) } } return }