""" Base API Client - 共用的 API 客户端基础功能 提供错误处理、自动封禁、重试逻辑等共同功能 """ import asyncio import json from datetime import datetime, timezone from typing import Any, Dict, Optional from fastapi import Response from config import ( get_auto_ban_enabled, get_auto_ban_error_codes, get_retry_429_enabled, get_retry_429_interval, get_retry_429_max_retries, ) from log import log from src.credential_manager import CredentialManager # ==================== 错误检查与处理 ==================== async def check_should_auto_ban(status_code: int) -> bool: """ 检查是否应该触发自动封禁 Args: status_code: HTTP状态码 Returns: bool: 是否应该触发自动封禁 """ return ( await get_auto_ban_enabled() and status_code in await get_auto_ban_error_codes() ) async def handle_auto_ban( credential_manager: CredentialManager, status_code: int, credential_name: str, mode: str = "geminicli" ) -> None: """ 处理自动封禁:直接禁用凭证 Args: credential_manager: 凭证管理器实例 status_code: HTTP状态码 credential_name: 凭证名称 mode: 模式(geminicli 或 antigravity) """ if credential_manager and credential_name: log.warning( f"[{mode.upper()} AUTO_BAN] Status {status_code} triggers auto-ban for credential: {credential_name}" ) await credential_manager.set_cred_disabled( credential_name, True, mode=mode ) async def handle_error_with_retry( credential_manager: CredentialManager, status_code: int, credential_name: str, retry_enabled: bool, attempt: int, max_retries: int, retry_interval: float, mode: str = "geminicli" ) -> bool: """ 统一处理错误和重试逻辑 仅在以下情况下进行自动重试: 1. 429错误(速率限制) 2. 导致凭证封禁的错误(AUTO_BAN_ERROR_CODES配置) Args: credential_manager: 凭证管理器实例 status_code: HTTP状态码 credential_name: 凭证名称 retry_enabled: 是否启用重试 attempt: 当前重试次数 max_retries: 最大重试次数 retry_interval: 重试间隔 mode: 模式(geminicli 或 antigravity) Returns: bool: True表示需要继续重试,False表示不需要重试 """ # 优先检查自动封禁 should_auto_ban = await check_should_auto_ban(status_code) if should_auto_ban: # 触发自动封禁 await handle_auto_ban(credential_manager, status_code, credential_name, mode) # 自动封禁后,仍然尝试重试(会在下次循环中自动获取新凭证) if retry_enabled and attempt < max_retries: log.info( f"[{mode.upper()} RETRY] Retrying with next credential after auto-ban " f"(status {status_code}, attempt {attempt + 1}/{max_retries})" ) await asyncio.sleep(retry_interval) return True return False # 如果不触发自动封禁,仅对429错误进行重试 if status_code == 429 and retry_enabled and attempt < max_retries: log.info( f"[{mode.upper()} RETRY] 429 rate limit encountered, retrying " f"(attempt {attempt + 1}/{max_retries})" ) await asyncio.sleep(retry_interval) return True # 其他错误不进行重试 return False # ==================== 重试配置获取 ==================== async def get_retry_config() -> Dict[str, Any]: """ 获取重试配置 Returns: 包含重试配置的字典 """ return { "retry_enabled": await get_retry_429_enabled(), "max_retries": await get_retry_429_max_retries(), "retry_interval": await get_retry_429_interval(), } # ==================== API调用结果记录 ==================== async def record_api_call_success( credential_manager: CredentialManager, credential_name: str, mode: str = "geminicli", model_key: Optional[str] = None ) -> None: """ 记录API调用成功 Args: credential_manager: 凭证管理器实例 credential_name: 凭证名称 mode: 模式(geminicli 或 antigravity) model_key: 模型键(用于模型级CD) """ if credential_manager and credential_name: await credential_manager.record_api_call_result( credential_name, True, mode=mode, model_key=model_key ) async def record_api_call_error( credential_manager: CredentialManager, credential_name: str, status_code: int, cooldown_until: Optional[float] = None, mode: str = "geminicli", model_key: Optional[str] = None ) -> None: """ 记录API调用错误 Args: credential_manager: 凭证管理器实例 credential_name: 凭证名称 status_code: HTTP状态码 cooldown_until: 冷却截止时间(Unix时间戳) mode: 模式(geminicli 或 antigravity) model_key: 模型键(用于模型级CD) """ if credential_manager and credential_name: await credential_manager.record_api_call_result( credential_name, False, status_code, cooldown_until=cooldown_until, mode=mode, model_key=model_key ) # ==================== 429错误处理 ==================== async def parse_and_log_cooldown( error_text: str, mode: str = "geminicli" ) -> Optional[float]: """ 解析并记录冷却时间 Args: error_text: 错误响应文本 mode: 模式(geminicli 或 antigravity) Returns: 冷却截止时间(Unix时间戳),如果解析失败则返回None """ try: error_data = json.loads(error_text) cooldown_until = parse_quota_reset_timestamp(error_data) if cooldown_until: log.info( f"[{mode.upper()}] 检测到quota冷却时间: " f"{datetime.fromtimestamp(cooldown_until, timezone.utc).isoformat()}" ) return cooldown_until except Exception as parse_err: log.debug(f"[{mode.upper()}] Failed to parse cooldown time: {parse_err}") return None # ==================== 流式响应收集 ==================== async def collect_streaming_response(stream_generator) -> Response: """ 将Gemini流式响应收集为一条完整的非流式响应 Args: stream_generator: 流式响应生成器,产生 "data: {json}" 格式的行或Response对象 Returns: Response: 合并后的完整响应对象 Example: >>> async for line in stream_generator: ... # line format: "data: {...}" or Response object >>> response = await collect_streaming_response(stream_generator) """ # 初始化响应结构 merged_response = { "response": { "candidates": [{ "content": { "parts": [], "role": "model" }, "finishReason": None, "safetyRatings": [], "citationMetadata": None }], "usageMetadata": { "promptTokenCount": 0, "candidatesTokenCount": 0, "totalTokenCount": 0 } } } collected_text = [] # 用于收集文本内容 collected_thought_text = [] # 用于收集思维链内容 collected_other_parts = [] # 用于收集其他类型的parts(图片、文件等) has_data = False line_count = 0 log.debug("[STREAM COLLECTOR] Starting to collect streaming response") try: async for line in stream_generator: line_count += 1 # 如果收到的是Response对象(错误),直接返回 if isinstance(line, Response): log.debug(f"[STREAM COLLECTOR] 收到错误Response,状态码: {line.status_code}") return line # 处理 bytes 类型 if isinstance(line, bytes): line_str = line.decode('utf-8', errors='ignore') log.debug(f"[STREAM COLLECTOR] Processing bytes line {line_count}: {line_str[:200] if line_str else 'empty'}") elif isinstance(line, str): line_str = line log.debug(f"[STREAM COLLECTOR] Processing line {line_count}: {line_str[:200] if line_str else 'empty'}") else: log.debug(f"[STREAM COLLECTOR] Skipping non-string/bytes line: {type(line)}") continue # 解析流式数据行 if not line_str.startswith("data: "): log.debug(f"[STREAM COLLECTOR] Skipping line without 'data: ' prefix: {line_str[:100]}") continue raw = line_str[6:].strip() if raw == "[DONE]": log.debug("[STREAM COLLECTOR] Received [DONE] marker") break try: log.debug(f"[STREAM COLLECTOR] Parsing JSON: {raw[:200]}") chunk = json.loads(raw) has_data = True log.debug(f"[STREAM COLLECTOR] Chunk keys: {chunk.keys() if isinstance(chunk, dict) else type(chunk)}") # 提取响应对象 response_obj = chunk.get("response", {}) if not response_obj: log.debug("[STREAM COLLECTOR] No 'response' key in chunk, trying direct access") response_obj = chunk # 尝试直接使用chunk candidates = response_obj.get("candidates", []) log.debug(f"[STREAM COLLECTOR] Found {len(candidates)} candidates") if not candidates: log.debug(f"[STREAM COLLECTOR] No candidates in chunk, chunk structure: {list(chunk.keys()) if isinstance(chunk, dict) else type(chunk)}") continue candidate = candidates[0] # 收集文本内容 content = candidate.get("content", {}) parts = content.get("parts", []) log.debug(f"[STREAM COLLECTOR] Processing {len(parts)} parts from candidate") for part in parts: if not isinstance(part, dict): continue # 处理文本内容 text = part.get("text", "") if text: # 区分普通文本和思维链 if part.get("thought", False): collected_thought_text.append(text) log.debug(f"[STREAM COLLECTOR] Collected thought text: {text[:100]}") else: collected_text.append(text) log.debug(f"[STREAM COLLECTOR] Collected regular text: {text[:100]}") # 处理非文本内容(图片、文件等) elif "inlineData" in part or "fileData" in part or "executableCode" in part or "codeExecutionResult" in part: collected_other_parts.append(part) log.debug(f"[STREAM COLLECTOR] Collected non-text part: {list(part.keys())}") # 收集其他信息(使用最后一个块的值) if candidate.get("finishReason"): merged_response["response"]["candidates"][0]["finishReason"] = candidate["finishReason"] if candidate.get("safetyRatings"): merged_response["response"]["candidates"][0]["safetyRatings"] = candidate["safetyRatings"] if candidate.get("citationMetadata"): merged_response["response"]["candidates"][0]["citationMetadata"] = candidate["citationMetadata"] # 更新使用元数据 usage = response_obj.get("usageMetadata", {}) if usage: merged_response["response"]["usageMetadata"].update(usage) except json.JSONDecodeError as e: log.debug(f"[STREAM COLLECTOR] Failed to parse JSON chunk: {e}") continue except Exception as e: log.debug(f"[STREAM COLLECTOR] Error processing chunk: {e}") continue except Exception as e: log.error(f"[STREAM COLLECTOR] Error collecting stream after {line_count} lines: {e}") return Response( content=json.dumps({"error": f"收集流式响应失败: {str(e)}"}), status_code=500, media_type="application/json" ) log.debug(f"[STREAM COLLECTOR] Finished iteration, has_data={has_data}, line_count={line_count}") # 如果没有收集到任何数据,返回错误 if not has_data: log.error(f"[STREAM COLLECTOR] No data collected from stream after {line_count} lines") return Response( content=json.dumps({"error": "No data collected from stream"}), status_code=500, media_type="application/json" ) # 组装最终的parts final_parts = [] # 先添加思维链内容(如果有) if collected_thought_text: final_parts.append({ "text": "".join(collected_thought_text), "thought": True }) # 再添加普通文本内容 if collected_text: final_parts.append({ "text": "".join(collected_text) }) # 添加其他类型的parts(图片、文件等) final_parts.extend(collected_other_parts) # 如果没有任何内容,添加空文本 if not final_parts: final_parts.append({"text": ""}) merged_response["response"]["candidates"][0]["content"]["parts"] = final_parts log.info(f"[STREAM COLLECTOR] Collected {len(collected_text)} text chunks, {len(collected_thought_text)} thought chunks, and {len(collected_other_parts)} other parts") # 去掉嵌套的 "response" 包装(Antigravity格式 -> 标准Gemini格式) if "response" in merged_response and "candidates" not in merged_response: log.debug(f"[STREAM COLLECTOR] 展开response包装") merged_response = merged_response["response"] # 返回纯JSON格式 return Response( content=json.dumps(merged_response, ensure_ascii=False).encode('utf-8'), status_code=200, headers={}, media_type="application/json" ) def parse_quota_reset_timestamp(error_response: dict) -> Optional[float]: """ 从Google API错误响应中提取quota重置时间戳 Args: error_response: Google API返回的错误响应字典 Returns: Unix时间戳(秒),如果无法解析则返回None 示例错误响应: { "error": { "code": 429, "message": "You have exhausted your capacity...", "status": "RESOURCE_EXHAUSTED", "details": [ { "@type": "type.googleapis.com/google.rpc.ErrorInfo", "reason": "QUOTA_EXHAUSTED", "metadata": { "quotaResetTimeStamp": "2025-11-30T14:57:24Z", "quotaResetDelay": "13h19m1.20964964s" } } ] } } """ try: details = error_response.get("error", {}).get("details", []) for detail in details: if detail.get("@type") == "type.googleapis.com/google.rpc.ErrorInfo": reset_timestamp_str = detail.get("metadata", {}).get("quotaResetTimeStamp") if reset_timestamp_str: if reset_timestamp_str.endswith("Z"): reset_timestamp_str = reset_timestamp_str.replace("Z", "+00:00") reset_dt = datetime.fromisoformat(reset_timestamp_str) if reset_dt.tzinfo is None: reset_dt = reset_dt.replace(tzinfo=timezone.utc) return reset_dt.astimezone(timezone.utc).timestamp() return None except Exception: return None def get_model_group(model_name: str) -> str: """ 获取模型组,用于 GCLI CD 机制。 Args: model_name: 模型名称 Returns: "pro" 或 "flash" 说明: - pro 组: gemini-2.5-pro, gemini-3-pro-preview 共享额度 - flash 组: gemini-2.5-flash 单独额度 """ # 判断模型组 if "flash" in model_name.lower(): return "flash" else: # pro 模型(包括 gemini-2.5-pro 和 gemini-3-pro-preview) return "pro"