// Package executor provides runtime execution capabilities for various AI service providers. // This file implements the Vertex AI Gemini executor that talks to Google Vertex AI // endpoints using service account credentials or API keys. package executor import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" vertexauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/vertex" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" "golang.org/x/oauth2" "golang.org/x/oauth2/google" ) const ( // vertexAPIVersion aligns with current public Vertex Generative AI API. vertexAPIVersion = "v1" ) // GeminiVertexExecutor sends requests to Vertex AI Gemini endpoints using service account credentials. type GeminiVertexExecutor struct { cfg *config.Config } // NewGeminiVertexExecutor creates a new Vertex AI Gemini executor instance. // // Parameters: // - cfg: The application configuration // // Returns: // - *GeminiVertexExecutor: A new Vertex AI Gemini executor instance func NewGeminiVertexExecutor(cfg *config.Config) *GeminiVertexExecutor { return &GeminiVertexExecutor{cfg: cfg} } // Identifier returns the executor identifier. func (e *GeminiVertexExecutor) Identifier() string { return "vertex" } // PrepareRequest prepares the HTTP request for execution (no-op for Vertex). func (e *GeminiVertexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } // Execute performs a non-streaming request to the Vertex AI API. func (e *GeminiVertexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { // Try API key authentication first apiKey, baseURL := vertexAPICreds(auth) // If no API key found, fall back to service account authentication if apiKey == "" { projectID, location, saJSON, errCreds := vertexCreds(auth) if errCreds != nil { return resp, errCreds } return e.executeWithServiceAccount(ctx, auth, req, opts, projectID, location, saJSON) } // Use API key authentication return e.executeWithAPIKey(ctx, auth, req, opts, apiKey, baseURL) } // ExecuteStream performs a streaming request to the Vertex AI API. func (e *GeminiVertexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { // Try API key authentication first apiKey, baseURL := vertexAPICreds(auth) // If no API key found, fall back to service account authentication if apiKey == "" { projectID, location, saJSON, errCreds := vertexCreds(auth) if errCreds != nil { return nil, errCreds } return e.executeStreamWithServiceAccount(ctx, auth, req, opts, projectID, location, saJSON) } // Use API key authentication return e.executeStreamWithAPIKey(ctx, auth, req, opts, apiKey, baseURL) } // CountTokens counts tokens for the given request using the Vertex AI API. func (e *GeminiVertexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { // Try API key authentication first apiKey, baseURL := vertexAPICreds(auth) // If no API key found, fall back to service account authentication if apiKey == "" { projectID, location, saJSON, errCreds := vertexCreds(auth) if errCreds != nil { return cliproxyexecutor.Response{}, errCreds } return e.countTokensWithServiceAccount(ctx, auth, req, opts, projectID, location, saJSON) } // Use API key authentication return e.countTokensWithAPIKey(ctx, auth, req, opts, apiKey, baseURL) } // Refresh refreshes the authentication credentials (no-op for Vertex). func (e *GeminiVertexExecutor) Refresh(_ context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { return auth, nil } // executeWithServiceAccount handles authentication using service account credentials. // This method contains the original service account authentication logic. func (e *GeminiVertexExecutor) executeWithServiceAccount(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, projectID, location string, saJSON []byte) (resp cliproxyexecutor.Response, err error) { reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("gemini") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, req.Model, originalPayload, false) body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) if budgetOverride, includeOverride, ok := util.ResolveThinkingConfigFromMetadata(req.Model, req.Metadata); ok && util.ModelSupportsThinking(req.Model) { if budgetOverride != nil { norm := util.NormalizeThinkingBudget(req.Model, *budgetOverride) budgetOverride = &norm } body = util.ApplyGeminiThinkingConfig(body, budgetOverride, includeOverride) } body = util.ApplyDefaultThinkingIfNeeded(req.Model, body) body = util.NormalizeGeminiThinkingBudget(req.Model, body) body = util.StripThinkingConfigIfUnsupported(req.Model, body) body = fixGeminiImageAspectRatio(req.Model, body) body = applyPayloadConfigWithRoot(e.cfg, req.Model, to.String(), "", body, originalTranslated) body, _ = sjson.SetBytes(body, "model", req.Model) action := "generateContent" if req.Metadata != nil { if a, _ := req.Metadata["action"].(string); a == "countTokens" { action = "countTokens" } } baseURL := vertexBaseURL(location) url := fmt.Sprintf("%s/%s/projects/%s/locations/%s/publishers/google/models/%s:%s", baseURL, vertexAPIVersion, projectID, location, req.Model, action) if opts.Alt != "" && action != "countTokens" { url = url + fmt.Sprintf("?$alt=%s", opts.Alt) } body, _ = sjson.DeleteBytes(body, "session_id") httpReq, errNewReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if errNewReq != nil { return resp, errNewReq } httpReq.Header.Set("Content-Type", "application/json") if token, errTok := vertexAccessToken(ctx, e.cfg, auth, saJSON); errTok == nil && token != "" { httpReq.Header.Set("Authorization", "Bearer "+token) } else if errTok != nil { log.Errorf("vertex executor: access token error: %v", errTok) return resp, statusErr{code: 500, msg: "internal server error"} } applyGeminiHeaders(httpReq, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) return resp, errDo } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } data, errRead := io.ReadAll(httpResp.Body) if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) return resp, errRead } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseGeminiUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } // executeWithAPIKey handles authentication using API key credentials. func (e *GeminiVertexExecutor) executeWithAPIKey(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, apiKey, baseURL string) (resp cliproxyexecutor.Response, err error) { reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) model := req.Model if override := e.resolveUpstreamModel(req.Model, auth); override != "" { model = override } from := opts.SourceFormat to := sdktranslator.FromString("gemini") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, model, originalPayload, false) body := sdktranslator.TranslateRequest(from, to, model, bytes.Clone(req.Payload), false) if budgetOverride, includeOverride, ok := util.ResolveThinkingConfigFromMetadata(model, req.Metadata); ok && util.ModelSupportsThinking(model) { if budgetOverride != nil { norm := util.NormalizeThinkingBudget(model, *budgetOverride) budgetOverride = &norm } body = util.ApplyGeminiThinkingConfig(body, budgetOverride, includeOverride) } body = util.ApplyDefaultThinkingIfNeeded(model, body) body = util.NormalizeGeminiThinkingBudget(model, body) body = util.StripThinkingConfigIfUnsupported(model, body) body = fixGeminiImageAspectRatio(model, body) body = applyPayloadConfigWithRoot(e.cfg, model, to.String(), "", body, originalTranslated) body, _ = sjson.SetBytes(body, "model", model) action := "generateContent" if req.Metadata != nil { if a, _ := req.Metadata["action"].(string); a == "countTokens" { action = "countTokens" } } // For API key auth, use simpler URL format without project/location if baseURL == "" { baseURL = "https://generativelanguage.googleapis.com" } url := fmt.Sprintf("%s/%s/publishers/google/models/%s:%s", baseURL, vertexAPIVersion, model, action) if opts.Alt != "" && action != "countTokens" { url = url + fmt.Sprintf("?$alt=%s", opts.Alt) } body, _ = sjson.DeleteBytes(body, "session_id") httpReq, errNewReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if errNewReq != nil { return resp, errNewReq } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { httpReq.Header.Set("x-goog-api-key", apiKey) } applyGeminiHeaders(httpReq, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) return resp, errDo } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } data, errRead := io.ReadAll(httpResp.Body) if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) return resp, errRead } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseGeminiUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } // executeStreamWithServiceAccount handles streaming authentication using service account credentials. func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, projectID, location string, saJSON []byte) (stream <-chan cliproxyexecutor.StreamChunk, err error) { reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("gemini") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, req.Model, originalPayload, true) body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) if budgetOverride, includeOverride, ok := util.ResolveThinkingConfigFromMetadata(req.Model, req.Metadata); ok && util.ModelSupportsThinking(req.Model) { if budgetOverride != nil { norm := util.NormalizeThinkingBudget(req.Model, *budgetOverride) budgetOverride = &norm } body = util.ApplyGeminiThinkingConfig(body, budgetOverride, includeOverride) } body = util.ApplyDefaultThinkingIfNeeded(req.Model, body) body = util.NormalizeGeminiThinkingBudget(req.Model, body) body = util.StripThinkingConfigIfUnsupported(req.Model, body) body = fixGeminiImageAspectRatio(req.Model, body) body = applyPayloadConfigWithRoot(e.cfg, req.Model, to.String(), "", body, originalTranslated) body, _ = sjson.SetBytes(body, "model", req.Model) baseURL := vertexBaseURL(location) url := fmt.Sprintf("%s/%s/projects/%s/locations/%s/publishers/google/models/%s:%s", baseURL, vertexAPIVersion, projectID, location, req.Model, "streamGenerateContent") if opts.Alt == "" { url = url + "?alt=sse" } else { url = url + fmt.Sprintf("?$alt=%s", opts.Alt) } body, _ = sjson.DeleteBytes(body, "session_id") httpReq, errNewReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if errNewReq != nil { return nil, errNewReq } httpReq.Header.Set("Content-Type", "application/json") if token, errTok := vertexAccessToken(ctx, e.cfg, auth, saJSON); errTok == nil && token != "" { httpReq.Header.Set("Authorization", "Bearer "+token) } else if errTok != nil { log.Errorf("vertex executor: access token error: %v", errTok) return nil, statusErr{code: 500, msg: "internal server error"} } applyGeminiHeaders(httpReq, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) return nil, errDo } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } return nil, statusErr{code: httpResp.StatusCode, msg: string(b)} } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, streamScannerBuffer) var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseGeminiStreamUsage(line); ok { reporter.publish(ctx, detail) } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range lines { out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, []byte("[DONE]"), ¶m) for i := range lines { out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() return stream, nil } // executeStreamWithAPIKey handles streaming authentication using API key credentials. func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, apiKey, baseURL string) (stream <-chan cliproxyexecutor.StreamChunk, err error) { reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) model := req.Model if override := e.resolveUpstreamModel(req.Model, auth); override != "" { model = override } from := opts.SourceFormat to := sdktranslator.FromString("gemini") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, model, originalPayload, true) body := sdktranslator.TranslateRequest(from, to, model, bytes.Clone(req.Payload), true) if budgetOverride, includeOverride, ok := util.ResolveThinkingConfigFromMetadata(model, req.Metadata); ok && util.ModelSupportsThinking(model) { if budgetOverride != nil { norm := util.NormalizeThinkingBudget(model, *budgetOverride) budgetOverride = &norm } body = util.ApplyGeminiThinkingConfig(body, budgetOverride, includeOverride) } body = util.ApplyDefaultThinkingIfNeeded(model, body) body = util.NormalizeGeminiThinkingBudget(model, body) body = util.StripThinkingConfigIfUnsupported(model, body) body = fixGeminiImageAspectRatio(model, body) body = applyPayloadConfigWithRoot(e.cfg, model, to.String(), "", body, originalTranslated) body, _ = sjson.SetBytes(body, "model", model) // For API key auth, use simpler URL format without project/location if baseURL == "" { baseURL = "https://generativelanguage.googleapis.com" } url := fmt.Sprintf("%s/%s/publishers/google/models/%s:%s", baseURL, vertexAPIVersion, model, "streamGenerateContent") if opts.Alt == "" { url = url + "?alt=sse" } else { url = url + fmt.Sprintf("?$alt=%s", opts.Alt) } body, _ = sjson.DeleteBytes(body, "session_id") httpReq, errNewReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if errNewReq != nil { return nil, errNewReq } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { httpReq.Header.Set("x-goog-api-key", apiKey) } applyGeminiHeaders(httpReq, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) return nil, errDo } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } return nil, statusErr{code: httpResp.StatusCode, msg: string(b)} } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, streamScannerBuffer) var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseGeminiStreamUsage(line); ok { reporter.publish(ctx, detail) } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range lines { out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, []byte("[DONE]"), ¶m) for i := range lines { out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() return stream, nil } // countTokensWithServiceAccount counts tokens using service account credentials. func (e *GeminiVertexExecutor) countTokensWithServiceAccount(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, projectID, location string, saJSON []byte) (cliproxyexecutor.Response, error) { from := opts.SourceFormat to := sdktranslator.FromString("gemini") translatedReq := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) if budgetOverride, includeOverride, ok := util.ResolveThinkingConfigFromMetadata(req.Model, req.Metadata); ok && util.ModelSupportsThinking(req.Model) { if budgetOverride != nil { norm := util.NormalizeThinkingBudget(req.Model, *budgetOverride) budgetOverride = &norm } translatedReq = util.ApplyGeminiThinkingConfig(translatedReq, budgetOverride, includeOverride) } translatedReq = util.StripThinkingConfigIfUnsupported(req.Model, translatedReq) translatedReq = fixGeminiImageAspectRatio(req.Model, translatedReq) translatedReq, _ = sjson.SetBytes(translatedReq, "model", req.Model) respCtx := context.WithValue(ctx, "alt", opts.Alt) translatedReq, _ = sjson.DeleteBytes(translatedReq, "tools") translatedReq, _ = sjson.DeleteBytes(translatedReq, "generationConfig") translatedReq, _ = sjson.DeleteBytes(translatedReq, "safetySettings") baseURL := vertexBaseURL(location) url := fmt.Sprintf("%s/%s/projects/%s/locations/%s/publishers/google/models/%s:%s", baseURL, vertexAPIVersion, projectID, location, req.Model, "countTokens") httpReq, errNewReq := http.NewRequestWithContext(respCtx, http.MethodPost, url, bytes.NewReader(translatedReq)) if errNewReq != nil { return cliproxyexecutor.Response{}, errNewReq } httpReq.Header.Set("Content-Type", "application/json") if token, errTok := vertexAccessToken(ctx, e.cfg, auth, saJSON); errTok == nil && token != "" { httpReq.Header.Set("Authorization", "Bearer "+token) } else if errTok != nil { log.Errorf("vertex executor: access token error: %v", errTok) return cliproxyexecutor.Response{}, statusErr{code: 500, msg: "internal server error"} } applyGeminiHeaders(httpReq, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: translatedReq, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) return cliproxyexecutor.Response{}, errDo } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(b)} } data, errRead := io.ReadAll(httpResp.Body) if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) return cliproxyexecutor.Response{}, errRead } appendAPIResponseChunk(ctx, e.cfg, data) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(data)} } count := gjson.GetBytes(data, "totalTokens").Int() out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data) return cliproxyexecutor.Response{Payload: []byte(out)}, nil } // countTokensWithAPIKey handles token counting using API key credentials. func (e *GeminiVertexExecutor) countTokensWithAPIKey(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, apiKey, baseURL string) (cliproxyexecutor.Response, error) { model := req.Model if override := e.resolveUpstreamModel(req.Model, auth); override != "" { model = override } from := opts.SourceFormat to := sdktranslator.FromString("gemini") translatedReq := sdktranslator.TranslateRequest(from, to, model, bytes.Clone(req.Payload), false) if budgetOverride, includeOverride, ok := util.ResolveThinkingConfigFromMetadata(model, req.Metadata); ok && util.ModelSupportsThinking(model) { if budgetOverride != nil { norm := util.NormalizeThinkingBudget(model, *budgetOverride) budgetOverride = &norm } translatedReq = util.ApplyGeminiThinkingConfig(translatedReq, budgetOverride, includeOverride) } translatedReq = util.StripThinkingConfigIfUnsupported(model, translatedReq) translatedReq = fixGeminiImageAspectRatio(model, translatedReq) translatedReq, _ = sjson.SetBytes(translatedReq, "model", model) respCtx := context.WithValue(ctx, "alt", opts.Alt) translatedReq, _ = sjson.DeleteBytes(translatedReq, "tools") translatedReq, _ = sjson.DeleteBytes(translatedReq, "generationConfig") translatedReq, _ = sjson.DeleteBytes(translatedReq, "safetySettings") // For API key auth, use simpler URL format without project/location if baseURL == "" { baseURL = "https://generativelanguage.googleapis.com" } url := fmt.Sprintf("%s/%s/publishers/google/models/%s:%s", baseURL, vertexAPIVersion, model, "countTokens") httpReq, errNewReq := http.NewRequestWithContext(respCtx, http.MethodPost, url, bytes.NewReader(translatedReq)) if errNewReq != nil { return cliproxyexecutor.Response{}, errNewReq } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { httpReq.Header.Set("x-goog-api-key", apiKey) } applyGeminiHeaders(httpReq, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: translatedReq, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) return cliproxyexecutor.Response{}, errDo } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(b)} } data, errRead := io.ReadAll(httpResp.Body) if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) return cliproxyexecutor.Response{}, errRead } appendAPIResponseChunk(ctx, e.cfg, data) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { log.Warnf("gemini-vertex executor: upstream API error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(data)} } count := gjson.GetBytes(data, "totalTokens").Int() out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data) return cliproxyexecutor.Response{Payload: []byte(out)}, nil } // vertexCreds extracts project, location and raw service account JSON from auth metadata. func vertexCreds(a *cliproxyauth.Auth) (projectID, location string, serviceAccountJSON []byte, err error) { if a == nil || a.Metadata == nil { return "", "", nil, fmt.Errorf("vertex executor: missing auth metadata") } if v, ok := a.Metadata["project_id"].(string); ok { projectID = strings.TrimSpace(v) } if projectID == "" { // Some service accounts may use "project"; still prefer standard field if v, ok := a.Metadata["project"].(string); ok { projectID = strings.TrimSpace(v) } } if projectID == "" { return "", "", nil, fmt.Errorf("vertex executor: missing project_id in credentials") } if v, ok := a.Metadata["location"].(string); ok && strings.TrimSpace(v) != "" { location = strings.TrimSpace(v) } else { location = "us-central1" } var sa map[string]any if raw, ok := a.Metadata["service_account"].(map[string]any); ok { sa = raw } if sa == nil { return "", "", nil, fmt.Errorf("vertex executor: missing service_account in credentials") } normalized, errNorm := vertexauth.NormalizeServiceAccountMap(sa) if errNorm != nil { return "", "", nil, fmt.Errorf("vertex executor: %w", errNorm) } saJSON, errMarshal := json.Marshal(normalized) if errMarshal != nil { return "", "", nil, fmt.Errorf("vertex executor: marshal service_account failed: %w", errMarshal) } return projectID, location, saJSON, nil } // vertexAPICreds extracts API key and base URL from auth attributes following the claudeCreds pattern. func vertexAPICreds(a *cliproxyauth.Auth) (apiKey, baseURL string) { if a == nil { return "", "" } if a.Attributes != nil { apiKey = a.Attributes["api_key"] baseURL = a.Attributes["base_url"] } if apiKey == "" && a.Metadata != nil { if v, ok := a.Metadata["access_token"].(string); ok { apiKey = v } } return } func vertexBaseURL(location string) string { loc := strings.TrimSpace(location) if loc == "" { loc = "us-central1" } return fmt.Sprintf("https://%s-aiplatform.googleapis.com", loc) } func vertexAccessToken(ctx context.Context, cfg *config.Config, auth *cliproxyauth.Auth, saJSON []byte) (string, error) { if httpClient := newProxyAwareHTTPClient(ctx, cfg, auth, 0); httpClient != nil { ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient) } // Use cloud-platform scope for Vertex AI. creds, errCreds := google.CredentialsFromJSON(ctx, saJSON, "https://www.googleapis.com/auth/cloud-platform") if errCreds != nil { return "", fmt.Errorf("vertex executor: parse service account json failed: %w", errCreds) } tok, errTok := creds.TokenSource.Token() if errTok != nil { return "", fmt.Errorf("vertex executor: get access token failed: %w", errTok) } return tok.AccessToken, nil } // resolveUpstreamModel resolves the upstream model name from vertex-api-key configuration. // It matches the requested model alias against configured models and returns the actual upstream name. func (e *GeminiVertexExecutor) resolveUpstreamModel(alias string, auth *cliproxyauth.Auth) string { trimmed := strings.TrimSpace(alias) if trimmed == "" { return "" } entry := e.resolveVertexConfig(auth) if entry == nil { return "" } normalizedModel, metadata := util.NormalizeThinkingModel(trimmed) // Candidate names to match against configured aliases/names. candidates := []string{strings.TrimSpace(normalizedModel)} if !strings.EqualFold(normalizedModel, trimmed) { candidates = append(candidates, trimmed) } if original := util.ResolveOriginalModel(normalizedModel, metadata); original != "" && !strings.EqualFold(original, normalizedModel) { candidates = append(candidates, original) } for i := range entry.Models { model := entry.Models[i] name := strings.TrimSpace(model.Name) modelAlias := strings.TrimSpace(model.Alias) for _, candidate := range candidates { if candidate == "" { continue } if modelAlias != "" && strings.EqualFold(modelAlias, candidate) { if name != "" { return name } return candidate } if name != "" && strings.EqualFold(name, candidate) { return name } } } return "" } // resolveVertexConfig finds the matching vertex-api-key configuration entry for the given auth. func (e *GeminiVertexExecutor) resolveVertexConfig(auth *cliproxyauth.Auth) *config.VertexCompatKey { if auth == nil || e.cfg == nil { return nil } var attrKey, attrBase string if auth.Attributes != nil { attrKey = strings.TrimSpace(auth.Attributes["api_key"]) attrBase = strings.TrimSpace(auth.Attributes["base_url"]) } for i := range e.cfg.VertexCompatAPIKey { entry := &e.cfg.VertexCompatAPIKey[i] cfgKey := strings.TrimSpace(entry.APIKey) cfgBase := strings.TrimSpace(entry.BaseURL) if attrKey != "" && attrBase != "" { if strings.EqualFold(cfgKey, attrKey) && strings.EqualFold(cfgBase, attrBase) { return entry } continue } if attrKey != "" && strings.EqualFold(cfgKey, attrKey) { if cfgBase == "" || strings.EqualFold(cfgBase, attrBase) { return entry } } if attrKey == "" && attrBase != "" && strings.EqualFold(cfgBase, attrBase) { return entry } } if attrKey != "" { for i := range e.cfg.VertexCompatAPIKey { entry := &e.cfg.VertexCompatAPIKey[i] if strings.EqualFold(strings.TrimSpace(entry.APIKey), attrKey) { return entry } } } return nil }