package executor import ( "bufio" "bytes" "context" "fmt" "io" "net/http" "strings" "time" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/sjson" ) // OpenAICompatExecutor implements a stateless executor for OpenAI-compatible providers. // It performs request/response translation and executes against the provider base URL // using per-auth credentials (API key) and per-auth HTTP transport (proxy) from context. type OpenAICompatExecutor struct { provider string cfg *config.Config } // NewOpenAICompatExecutor creates an executor bound to a provider key (e.g., "openrouter"). func NewOpenAICompatExecutor(provider string, cfg *config.Config) *OpenAICompatExecutor { return &OpenAICompatExecutor{provider: provider, cfg: cfg} } // Identifier implements cliproxyauth.ProviderExecutor. func (e *OpenAICompatExecutor) Identifier() string { return e.provider } // PrepareRequest is a no-op for now (credentials are added via headers at execution time). func (e *OpenAICompatExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) baseURL, apiKey := e.resolveCredentials(auth) if baseURL == "" { err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} return } // Translate inbound request to OpenAI format from := opts.SourceFormat to := sdktranslator.FromString("openai") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, req.Model, originalPayload, opts.Stream) translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), opts.Stream) modelOverride := e.resolveUpstreamModel(req.Model, auth) if modelOverride != "" { translated = e.overrideModel(translated, modelOverride) } translated = applyPayloadConfigWithRoot(e.cfg, req.Model, to.String(), "", translated, originalTranslated) allowCompat := e.allowCompatReasoningEffort(req.Model, auth) translated = ApplyReasoningEffortMetadata(translated, req.Metadata, req.Model, "reasoning_effort", allowCompat) translated = NormalizeThinkingConfig(translated, req.Model, allowCompat) if errValidate := ValidateThinkingConfig(translated, req.Model); errValidate != nil { return resp, errValidate } url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) if err != nil { return resp, err } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { httpReq.Header.Set("Authorization", "Bearer "+apiKey) } httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat") var attrs map[string]string if auth != nil { attrs = auth.Attributes } util.ApplyCustomHeadersFromAttrs(httpReq, attrs) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: translated, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("openai compat executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("openai-compat executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } body, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } appendAPIResponseChunk(ctx, e.cfg, body) reporter.publish(ctx, parseOpenAIUsage(body)) // Ensure we at least record the request even if upstream doesn't return usage reporter.ensurePublished(ctx) // Translate response back to source format when needed var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) baseURL, apiKey := e.resolveCredentials(auth) if baseURL == "" { err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} return nil, err } from := opts.SourceFormat to := sdktranslator.FromString("openai") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, req.Model, originalPayload, true) translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) modelOverride := e.resolveUpstreamModel(req.Model, auth) if modelOverride != "" { translated = e.overrideModel(translated, modelOverride) } translated = applyPayloadConfigWithRoot(e.cfg, req.Model, to.String(), "", translated, originalTranslated) allowCompat := e.allowCompatReasoningEffort(req.Model, auth) translated = ApplyReasoningEffortMetadata(translated, req.Metadata, req.Model, "reasoning_effort", allowCompat) translated = NormalizeThinkingConfig(translated, req.Model, allowCompat) if errValidate := ValidateThinkingConfig(translated, req.Model); errValidate != nil { return nil, errValidate } url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) if err != nil { return nil, err } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { httpReq.Header.Set("Authorization", "Bearer "+apiKey) } httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat") var attrs map[string]string if auth != nil { attrs = auth.Attributes } util.ApplyCustomHeadersFromAttrs(httpReq, attrs) httpReq.Header.Set("Accept", "text/event-stream") httpReq.Header.Set("Cache-Control", "no-cache") var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: translated, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("openai-compat executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("openai compat executor: close response body error: %v", errClose) } err = statusErr{code: httpResp.StatusCode, msg: string(b)} return nil, err } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("openai compat executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, 52_428_800) // 50MB var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseOpenAIStreamUsage(line); ok { reporter.publish(ctx, detail) } if len(line) == 0 { continue } // OpenAI-compatible streams are SSE: lines typically prefixed with "data: ". // Pass through translator; it yields one or more chunks for the target schema. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } // Ensure we record the request if no usage chunk was ever seen reporter.ensurePublished(ctx) }() return stream, nil } func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { from := opts.SourceFormat to := sdktranslator.FromString("openai") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) modelForCounting := req.Model if modelOverride := e.resolveUpstreamModel(req.Model, auth); modelOverride != "" { translated = e.overrideModel(translated, modelOverride) modelForCounting = modelOverride } enc, err := tokenizerForModel(modelForCounting) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("openai compat executor: tokenizer init failed: %w", err) } count, err := countOpenAIChatTokens(enc, translated) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("openai compat executor: token counting failed: %w", err) } usageJSON := buildOpenAIUsageJSON(count) translatedUsage := sdktranslator.TranslateTokenCount(ctx, to, from, count, usageJSON) return cliproxyexecutor.Response{Payload: []byte(translatedUsage)}, nil } // Refresh is a no-op for API-key based compatibility providers. func (e *OpenAICompatExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("openai compat executor: refresh called") _ = ctx return auth, nil } func (e *OpenAICompatExecutor) resolveCredentials(auth *cliproxyauth.Auth) (baseURL, apiKey string) { if auth == nil { return "", "" } if auth.Attributes != nil { baseURL = strings.TrimSpace(auth.Attributes["base_url"]) apiKey = strings.TrimSpace(auth.Attributes["api_key"]) } return } func (e *OpenAICompatExecutor) resolveUpstreamModel(alias string, auth *cliproxyauth.Auth) string { if alias == "" || auth == nil || e.cfg == nil { return "" } compat := e.resolveCompatConfig(auth) if compat == nil { return "" } for i := range compat.Models { model := compat.Models[i] if model.Alias != "" { if strings.EqualFold(model.Alias, alias) { if model.Name != "" { return model.Name } return alias } continue } if strings.EqualFold(model.Name, alias) { return model.Name } } return "" } func (e *OpenAICompatExecutor) allowCompatReasoningEffort(model string, auth *cliproxyauth.Auth) bool { trimmed := strings.TrimSpace(model) if trimmed == "" || e == nil || e.cfg == nil { return false } compat := e.resolveCompatConfig(auth) if compat == nil || len(compat.Models) == 0 { return false } for i := range compat.Models { entry := compat.Models[i] if strings.EqualFold(strings.TrimSpace(entry.Alias), trimmed) { return true } if strings.EqualFold(strings.TrimSpace(entry.Name), trimmed) { return true } } return false } func (e *OpenAICompatExecutor) resolveCompatConfig(auth *cliproxyauth.Auth) *config.OpenAICompatibility { if auth == nil || e.cfg == nil { return nil } candidates := make([]string, 0, 3) if auth.Attributes != nil { if v := strings.TrimSpace(auth.Attributes["compat_name"]); v != "" { candidates = append(candidates, v) } if v := strings.TrimSpace(auth.Attributes["provider_key"]); v != "" { candidates = append(candidates, v) } } if v := strings.TrimSpace(auth.Provider); v != "" { candidates = append(candidates, v) } for i := range e.cfg.OpenAICompatibility { compat := &e.cfg.OpenAICompatibility[i] for _, candidate := range candidates { if candidate != "" && strings.EqualFold(strings.TrimSpace(candidate), compat.Name) { return compat } } } return nil } func (e *OpenAICompatExecutor) overrideModel(payload []byte, model string) []byte { if len(payload) == 0 || model == "" { return payload } payload, _ = sjson.SetBytes(payload, "model", model) return payload } type statusErr struct { code int msg string retryAfter *time.Duration } func (e statusErr) Error() string { if e.msg != "" { return e.msg } return fmt.Sprintf("status %d", e.code) } func (e statusErr) StatusCode() int { return e.code } func (e statusErr) RetryAfter() *time.Duration { return e.retryAfter }