// 上游客户端实现 // 基于高性能通讯接口封装 use dashmap::DashMap; use rquest::{header, Client, Response, StatusCode}; use serde_json::Value; use std::sync::Arc; use tokio::sync::RwLock; use tokio::time::Duration; /// 端点降级尝试的记录信息 #[derive(Debug, Clone)] pub struct FallbackAttemptLog { /// 尝试的端点 URL pub endpoint_url: String, /// HTTP 状态码 (网络错误时为 None) pub status: Option, /// 错误描述 pub error: String, } /// 上游调用结果,包含响应和降级尝试记录 pub struct UpstreamCallResult { /// 最终的 HTTP 响应 pub response: Response, /// 降级过程中失败的端点尝试记录 (成功时为空) pub fallback_attempts: Vec, } /// 邮箱脱敏:只显示前3位 + *** + @域名前2位 + *** /// 例: "userexample@gmail.com" → "use***@gm***" pub fn mask_email(email: &str) -> String { if let Some(at_pos) = email.find('@') { let local = &email[..at_pos]; let domain = &email[at_pos + 1..]; let local_prefix: String = local.chars().take(3).collect(); let domain_prefix: String = domain.chars().take(2).collect(); format!("{}***@{}***", local_prefix, domain_prefix) } else { // 不是合法邮箱格式,直接截取前5位 let prefix: String = email.chars().take(5).collect(); format!("{}***", prefix) } } /// [NEW] 错误日志脱敏:抹除报错信息中的 access_token, proxy_url 等敏感凭证 pub fn sanitize_error_for_log(error_text: &str) -> String { // 抹除常见敏感 key 的值 let re = regex::Regex::new(r#"(?i)(access_token|refresh_token|id_token|authorization|api_key|secret|password|proxy_url|http_proxy|https_proxy)\s*[:=]\s*[^"'\\\s,}\]]+"#).unwrap(); let redacted = re.replace_all(error_text, "$1="); // 抹除 Bearer token let re_bearer = regex::Regex::new(r#"(?i)(bearer\s+)[^"'\\\s,}\]]+"#).unwrap(); let redacted = re_bearer.replace_all(&redacted, "$1"); // 限制长度防止日志炸弹 if redacted.len() > 1000 { format!("{}... (truncated)", &redacted[..1000]) } else { redacted.into_owned() } } // Cloud Code v1internal endpoints (fallback order: Sandbox → Daily → Prod) // 优先使用 Sandbox/Daily 环境以避免 Prod环境的 429 错误 (Ref: Issue #1176) const V1_INTERNAL_BASE_URL_PROD: &str = "https://cloudcode-pa.googleapis.com/v1internal"; const V1_INTERNAL_BASE_URL_DAILY: &str = "https://daily-cloudcode-pa.googleapis.com/v1internal"; const V1_INTERNAL_BASE_URL_SANDBOX: &str = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal"; const V1_INTERNAL_BASE_URL_FALLBACKS: [&str; 3] = [ V1_INTERNAL_BASE_URL_SANDBOX, // 优先级 1: Sandbox (已知有效且稳定) V1_INTERNAL_BASE_URL_DAILY, // 优先级 2: Daily (备用) V1_INTERNAL_BASE_URL_PROD, // 优先级 3: Prod (仅作为兜底) ]; pub struct UpstreamClient { default_client: Client, proxy_pool: Option>, client_cache: DashMap, // proxy_id -> Client user_agent_override: RwLock>, } impl UpstreamClient { pub fn new( proxy_config: Option, proxy_pool: Option>, ) -> Self { let default_client = match Self::build_client_internal(proxy_config.clone()) { Ok(client) => client, Err(err_with_proxy) => { tracing::error!( error = %err_with_proxy, "Failed to create default HTTP client with configured upstream proxy; retrying without proxy" ); match Self::build_client_internal(None) { Ok(client) => client, Err(err_without_proxy) => { tracing::error!( error = %err_without_proxy, "Failed to create default HTTP client without proxy; falling back to bare client" ); Client::new() } } } }; Self { default_client, proxy_pool, client_cache: DashMap::new(), user_agent_override: RwLock::new(None), } } /// Internal helper to build a client with optional upstream proxy config fn build_client_internal( proxy_config: Option, ) -> Result { let mut builder = Client::builder() .emulation(rquest_util::Emulation::Chrome123) // Connection settings (优化连接复用,减少建立开销) .connect_timeout(Duration::from_secs(20)) .pool_max_idle_per_host(20) // 每主机最多 20 个空闲连接 (对齐官方指纹) .pool_idle_timeout(Duration::from_secs(90)) // 空闲连接保持 90 秒 .tcp_keepalive(Duration::from_secs(60)) // TCP 保活探测 60 秒 // 强制开启 HTTP/2 协议,并支持在 SOCKS/HTTPS 代理下通过 ALPN 强制降级/协商 .timeout(Duration::from_secs(600)); builder = Self::apply_default_user_agent(builder); if let Some(config) = proxy_config { if config.enabled && !config.url.is_empty() { let url = crate::proxy::config::normalize_proxy_url(&config.url); if let Ok(proxy) = rquest::Proxy::all(&url) { builder = builder.proxy(proxy); tracing::info!("UpstreamClient enabled proxy: {}", url); } } } builder.build() } /// Build a client with a specific PoolProxyConfig (from ProxyPool) fn build_client_with_proxy( &self, proxy_config: crate::proxy::proxy_pool::PoolProxyConfig, ) -> Result { // Reuse base settings similar to default client but with specific proxy let builder = Client::builder() .emulation(rquest_util::Emulation::Chrome123) .connect_timeout(Duration::from_secs(20)) .pool_max_idle_per_host(20) .pool_idle_timeout(Duration::from_secs(90)) .tcp_keepalive(Duration::from_secs(60)) .timeout(Duration::from_secs(600)) .proxy(proxy_config.proxy); // Apply the specific proxy Self::apply_default_user_agent(builder).build() } fn apply_default_user_agent(builder: rquest::ClientBuilder) -> rquest::ClientBuilder { let ua = crate::constants::USER_AGENT.as_str(); if header::HeaderValue::from_str(ua).is_ok() { builder.user_agent(ua) } else { tracing::warn!( user_agent = %ua, "Invalid default User-Agent value, using fallback" ); builder.user_agent("antigravity") } } /// Set dynamic User-Agent override pub async fn set_user_agent_override(&self, ua: Option) { let mut lock = self.user_agent_override.write().await; *lock = ua; tracing::debug!("UpstreamClient User-Agent override updated: {:?}", lock); } /// Get current User-Agent pub async fn get_user_agent(&self) -> String { let ua_override = self.user_agent_override.read().await; ua_override .as_ref() .cloned() .unwrap_or_else(|| crate::constants::USER_AGENT.clone()) } /// Get client for a specific account (or default if no proxy bound) pub async fn get_client(&self, account_id: Option<&str>) -> Client { if let Some(pool) = &self.proxy_pool { if let Some(acc_id) = account_id { // Try to get per-account proxy match pool.get_proxy_for_account(acc_id).await { Ok(Some(proxy_cfg)) => { // Check cache if let Some(client) = self.client_cache.get(&proxy_cfg.entry_id) { return client.clone(); } // Build new client and cache it match self.build_client_with_proxy(proxy_cfg.clone()) { Ok(client) => { self.client_cache .insert(proxy_cfg.entry_id.clone(), client.clone()); tracing::info!( "Using ProxyPool proxy ID: {} for account: {}", proxy_cfg.entry_id, acc_id ); return client; } Err(e) => { tracing::error!("Failed to build client for proxy {}: {}, falling back to default", proxy_cfg.entry_id, e); } } } Ok(None) => { // No proxy found or required for this account, use default } Err(e) => { tracing::error!( "Error getting proxy for account {}: {}, falling back to default", acc_id, e ); } } } } // Fallback to default client self.default_client.clone() } /// Build v1internal URL fn build_url(base_url: &str, method: &str, query_string: Option<&str>) -> String { if let Some(qs) = query_string { format!("{}:{}?{}", base_url, method, qs) } else { format!("{}:{}", base_url, method) } } /// Determine if we should try next endpoint (fallback logic) fn should_try_next_endpoint(status: StatusCode) -> bool { status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::REQUEST_TIMEOUT || status == StatusCode::NOT_FOUND || status.is_server_error() } /// Call v1internal API (Basic Method) /// /// Initiates a basic network request, supporting multi-endpoint auto-fallback. /// [UPDATED] Takes optional account_id for per-account proxy selection. pub async fn call_v1_internal( &self, method: &str, access_token: &str, body: Value, query_string: Option<&str>, account_id: Option<&str>, // [NEW] Account ID for proxy selection ) -> Result { self.call_v1_internal_with_headers( method, access_token, body, query_string, std::collections::HashMap::new(), account_id, ) .await } /// [FIX #765] 调用 v1internal API,支持透传额外的 Headers /// [ENHANCED] 返回 UpstreamCallResult,包含降级尝试记录,用于 debug 日志 pub async fn call_v1_internal_with_headers( &self, method: &str, access_token: &str, body: Value, query_string: Option<&str>, extra_headers: std::collections::HashMap, account_id: Option<&str>, // [NEW] Account ID ) -> Result { // [NEW] Get client based on account (cached in proxy pool manager) let client = self.get_client(account_id).await; // 构建 Headers (所有端点复用) let mut headers = header::HeaderMap::new(); headers.insert( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), ); headers.insert( header::AUTHORIZATION, header::HeaderValue::from_str(&format!("Bearer {}", access_token)) .map_err(|e| e.to_string())?, ); headers.insert( header::USER_AGENT, header::HeaderValue::from_str(&self.get_user_agent().await).unwrap_or_else(|e| { tracing::warn!("Invalid User-Agent header value, using fallback: {}", e); header::HeaderValue::from_static("antigravity") }), ); // [ENHANCED] 注入 Antigravity 官方客户端关键特征 Headers // 1. Client Identity headers.insert( "x-client-name", header::HeaderValue::from_static("antigravity"), ); if let Ok(ver) = header::HeaderValue::from_str(&crate::constants::CURRENT_VERSION) { headers.insert("x-client-version", ver); } // 2. Device & Session Identity // Machine ID (Persistent) if let Ok(mid) = machine_uid::get() { if let Ok(mid_val) = header::HeaderValue::from_str(&mid) { headers.insert("x-machine-id", mid_val); } } // Session ID (Per App Launch) if let Ok(sess_val) = header::HeaderValue::from_str(&crate::constants::SESSION_ID) { headers.insert("x-vscode-sessionid", sess_val); } // [REMOVED v4.1.24] x-goog-api-client (gl-node/fire/grpc) header has been removed. // This header belongs to the IDE's JS layer, not the official client's egress. // Sending it creates a contradictory "Electron + Node.js" fingerprint. // [NEW] 深度解析 body 中的 project_id 并注入 Header // 只有当 Body 包含 project 字段且非测试项目时,注入 x-goog-user-project if let Some(proj) = body.get("project").and_then(|v| v.as_str()) { if !proj.is_empty() && proj != "test-project" && proj != "project-id" { if let Ok(hv) = header::HeaderValue::from_str(proj) { headers.insert("x-goog-user-project", hv); } } } // 注入额外的 Headers (如 anthropic-beta) for (k, v) in extra_headers { if let Ok(hk) = header::HeaderName::from_bytes(k.as_bytes()) { if let Ok(hv) = header::HeaderValue::from_str(&v) { headers.insert(hk, hv); } } } // [DEBUG] Log headers for verification tracing::debug!(?headers, "Final Upstream Request Headers"); let mut last_err: Option = None; // [NEW] 收集降级尝试记录 let mut fallback_attempts: Vec = Vec::new(); // 遍历所有端点,失败时自动切换 for (idx, base_url) in V1_INTERNAL_BASE_URL_FALLBACKS.iter().enumerate() { let url = Self::build_url(base_url, method, query_string); let has_next = idx + 1 < V1_INTERNAL_BASE_URL_FALLBACKS.len(); let body_bytes = serde_json::to_vec(&body).map_err(|e| e.to_string())?; let response = client .post(&url) .headers(headers.clone()) // [NEW] 强制分块传输仿真: 包装为流以触发 Transfer-Encoding: chunked // 这对齐了官方 Go Worker 通过遮蔽 Content-Length 来模拟 IDE 流量的行为 .body(rquest::Body::wrap_stream(futures::stream::once(async move { Ok::<_, std::io::Error>(body_bytes) }))) .send() .await; match response { Ok(resp) => { let status = resp.status(); if status.is_success() { if idx > 0 { tracing::info!( "✓ Upstream fallback succeeded | Endpoint: {} | Status: {} | Next endpoints available: {}", base_url, status, V1_INTERNAL_BASE_URL_FALLBACKS.len() - idx - 1 ); } else { tracing::debug!( "✓ Upstream request succeeded | Endpoint: {} | Status: {}", base_url, status ); } return Ok(UpstreamCallResult { response: resp, fallback_attempts, }); } // 如果有下一个端点且当前错误可重试,则切换 if has_next && Self::should_try_next_endpoint(status) { let err_msg = format!("Upstream {} returned {}", base_url, status); tracing::warn!( "Upstream endpoint returned {} at {} (method={}), trying next endpoint", status, base_url, method ); // [NEW] 记录降级尝试 fallback_attempts.push(FallbackAttemptLog { endpoint_url: url.clone(), status: Some(status.as_u16()), error: err_msg.clone(), }); last_err = Some(err_msg); continue; } // 不可重试的错误或已是最后一个端点,直接返回 return Ok(UpstreamCallResult { response: resp, fallback_attempts, }); } Err(e) => { let msg = format!("HTTP request failed at {}: {}", base_url, e); tracing::debug!("{}", msg); // [NEW] 记录网络错误的降级尝试 fallback_attempts.push(FallbackAttemptLog { endpoint_url: url.clone(), status: None, error: msg.clone(), }); last_err = Some(msg); // 如果是最后一个端点,退出循环 if !has_next { break; } continue; } } } Err(last_err.unwrap_or_else(|| "All endpoints failed".to_string())) } /// 调用 v1internal API(带 429 重试,支持闭包) /// /// 带容错和重试的核心请求逻辑 /// /// # Arguments /// * `method` - API method (e.g., "generateContent") /// * `query_string` - Optional query string (e.g., "?alt=sse") /// * `get_credentials` - 闭包,获取凭证(支持账号轮换) /// * `build_body` - 闭包,接收 project_id 构建请求体 /// * `max_attempts` - 最大重试次数 /// /// # Returns /// HTTP Response // 已移除弃用的重试方法 (call_v1_internal_with_retry) // 已移除弃用的辅助方法 (parse_retry_delay) // 已移除弃用的辅助方法 (parse_duration_ms) /// 获取可用模型列表 /// /// 获取远端模型列表,支持多端点自动 Fallback #[allow(dead_code)] // API ready for future model discovery feature pub async fn fetch_available_models( &self, access_token: &str, account_id: Option<&str>, ) -> Result { // 复用 call_v1_internal,然后解析 JSON let result = self .call_v1_internal( "fetchAvailableModels", access_token, serde_json::json!({}), None, account_id, ) .await?; let json: Value = result .response .json() .await .map_err(|e| format!("Parse json failed: {}", e))?; Ok(json) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_build_url() { let base_url = "https://cloudcode-pa.googleapis.com/v1internal"; let url1 = UpstreamClient::build_url(base_url, "generateContent", None); assert_eq!( url1, "https://cloudcode-pa.googleapis.com/v1internal:generateContent" ); let url2 = UpstreamClient::build_url(base_url, "streamGenerateContent", Some("alt=sse")); assert_eq!( url2, "https://cloudcode-pa.googleapis.com/v1internal:streamGenerateContent?alt=sse" ); } }