""" GeminiCli API Client - Handles all communication with GeminiCli API. This module is used by both OpenAI compatibility layer and native Gemini endpoints. GeminiCli API 客户端 - 处理与 GeminiCli API 的所有通信 """ import sys from pathlib import Path # 添加项目根目录到Python路径(用于直接运行测试) if __name__ == "__main__": project_root = Path(__file__).resolve().parent.parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) import asyncio import json from typing import Any, Dict, Optional from fastapi import Response from config import get_code_assist_endpoint, get_auto_ban_error_codes from src.api.utils import get_model_group from log import log from src.credential_manager import CredentialManager from src.httpx_client import stream_post_async, post_async # 导入共同的基础功能 from src.api.utils import ( handle_error_with_retry, get_retry_config, record_api_call_success, record_api_call_error, parse_and_log_cooldown, ) from src.utils import GEMINICLI_USER_AGENT # ==================== 全局凭证管理器 ==================== # 全局凭证管理器实例(单例模式) _credential_manager: Optional[CredentialManager] = None async def _get_credential_manager() -> CredentialManager: """ 获取全局凭证管理器实例 Returns: CredentialManager实例 """ global _credential_manager if not _credential_manager: _credential_manager = CredentialManager() await _credential_manager.initialize() return _credential_manager # ==================== 请求准备 ==================== async def prepare_request_headers_and_payload( payload: dict, credential_data: dict, target_url: str ): """ 从凭证数据准备请求头和最终payload Args: payload: 原始请求payload credential_data: 凭证数据字典 target_url: 目标URL Returns: 元组: (headers, final_payload, target_url) Raises: Exception: 如果凭证中缺少必要字段 """ token = credential_data.get("token") or credential_data.get("access_token", "") if not token: raise Exception("凭证中没有找到有效的访问令牌(token或access_token字段)") source_request = payload.get("request", {}) # 内部API使用Bearer Token和项目ID headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "User-Agent": GEMINICLI_USER_AGENT, } project_id = credential_data.get("project_id", "") if not project_id: raise Exception("项目ID不存在于凭证数据中") final_payload = { "model": payload.get("model"), "project": project_id, "request": source_request, } return headers, final_payload, target_url # ==================== 新的流式和非流式请求函数 ==================== async def stream_request( body: Dict[str, Any], native: bool = False, headers: Optional[Dict[str, str]] = None, ): """ 流式请求函数 Args: body: 请求体 native: 是否返回原生bytes流,False则返回str流 headers: 额外的请求头 Yields: Response对象(错误时)或 bytes流/str流(成功时) """ # 获取凭证管理器 credential_manager = await _get_credential_manager() model_name = body.get("model", "") model_group = get_model_group(model_name) # 1. 获取有效凭证 cred_result = await credential_manager.get_valid_credential( mode="geminicli", model_key=model_group ) if not cred_result: # 如果返回值是None,直接返回错误500 yield Response( content=json.dumps({"error": "当前无可用凭证"}), status_code=500, media_type="application/json" ) return current_file, credential_data = cred_result # 2. 构建URL和请求头 try: auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( body, credential_data, f"{await get_code_assist_endpoint()}/v1internal:streamGenerateContent?alt=sse" ) # 合并自定义headers if headers: auth_headers.update(headers) except Exception as e: log.error(f"准备请求失败: {e}") yield Response( content=json.dumps({"error": f"准备请求失败: {str(e)}"}), status_code=500, media_type="application/json" ) return # 3. 调用stream_post_async进行请求 retry_config = await get_retry_config() max_retries = retry_config["max_retries"] retry_interval = retry_config["retry_interval"] DISABLE_ERROR_CODES = await get_auto_ban_error_codes() # 禁用凭证的错误码 last_error_response = None # 记录最后一次的错误响应 for attempt in range(max_retries + 1): success_recorded = False # 标记是否已记录成功 try: async for chunk in stream_post_async( url=target_url, body=final_payload, native=native, headers=auth_headers ): # 判断是否是Response对象 if isinstance(chunk, Response): status_code = chunk.status_code last_error_response = chunk # 记录最后一次错误 # 如果错误码是429或者不在禁用码当中,做好记录后进行重试 if status_code == 429 or status_code not in DISABLE_ERROR_CODES: # 解析错误响应内容 try: error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) log.warning(f"流式请求失败 (status={status_code}), 凭证: {current_file}, 响应: {error_body[:500]}") except Exception: log.warning(f"流式请求失败 (status={status_code}), 凭证: {current_file}") # 记录错误 cooldown_until = None if status_code == 429: # 尝试解析冷却时间 try: error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) cooldown_until = await parse_and_log_cooldown(error_body, mode="geminicli") except Exception: pass await record_api_call_error( credential_manager, current_file, status_code, cooldown_until, mode="geminicli", model_key=model_group ) # 检查是否应该重试 should_retry = await handle_error_with_retry( credential_manager, status_code, current_file, retry_config["retry_enabled"], attempt, max_retries, retry_interval, mode="geminicli" ) if should_retry and attempt < max_retries: # 重新获取凭证并重试 log.info(f"[STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...") await asyncio.sleep(retry_interval) # 获取新凭证 cred_result = await credential_manager.get_valid_credential( mode="geminicli", model_key=model_group ) if not cred_result: log.error("[STREAM] 重试时无可用凭证") yield Response( content=json.dumps({"error": "当前无可用凭证"}), status_code=500, media_type="application/json" ) return current_file, credential_data = cred_result auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( body, credential_data, f"{await get_code_assist_endpoint()}/v1internal:streamGenerateContent?alt=sse" ) if headers: auth_headers.update(headers) break # 跳出内层循环,重新请求 else: # 不重试,直接返回原始错误 log.error(f"[STREAM] 达到最大重试次数或不应重试,返回原始错误") yield chunk return else: # 错误码在禁用码当中,直接返回,无需重试 try: error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) log.error(f"流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}, 响应: {error_body[:500]}") except Exception: log.error(f"流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}") await record_api_call_error( credential_manager, current_file, status_code, None, mode="geminicli", model_key=model_group ) yield chunk return else: # 不是Response,说明是真流,直接yield返回 # 只在第一个chunk时记录成功 if not success_recorded: await record_api_call_success( credential_manager, current_file, mode="geminicli", model_key=model_group ) success_recorded = True yield chunk # 流式请求成功完成,退出重试循环 return except Exception as e: log.error(f"流式请求异常: {e}, 凭证: {current_file}") if attempt < max_retries: log.info(f"[STREAM] 异常后重试 (attempt {attempt + 2}/{max_retries + 1})...") await asyncio.sleep(retry_interval) continue else: # 所有重试都失败,返回最后一次的错误(如果有)或500错误 log.error(f"[STREAM] 所有重试均失败,最后异常: {e}") yield last_error_response async def non_stream_request( body: Dict[str, Any], headers: Optional[Dict[str, str]] = None, ) -> Response: """ 非流式请求函数 Args: body: 请求体 native: 保留参数以保持接口一致性(实际未使用) headers: 额外的请求头 Returns: Response对象 """ # 获取凭证管理器 credential_manager = await _get_credential_manager() model_name = body.get("model", "") model_group = get_model_group(model_name) # 1. 获取有效凭证 cred_result = await credential_manager.get_valid_credential( mode="geminicli", model_key=model_group ) if not cred_result: # 如果返回值是None,直接返回错误500 return Response( content=json.dumps({"error": "当前无可用凭证"}), status_code=500, media_type="application/json" ) current_file, credential_data = cred_result # 2. 构建URL和请求头 try: auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( body, credential_data, f"{await get_code_assist_endpoint()}/v1internal:generateContent" ) # 合并自定义headers if headers: auth_headers.update(headers) except Exception as e: log.error(f"准备请求失败: {e}") return Response( content=json.dumps({"error": f"准备请求失败: {str(e)}"}), status_code=500, media_type="application/json" ) # 3. 调用post_async进行请求 retry_config = await get_retry_config() max_retries = retry_config["max_retries"] retry_interval = retry_config["retry_interval"] DISABLE_ERROR_CODES = await get_auto_ban_error_codes() # 禁用凭证的错误码 last_error_response = None # 记录最后一次的错误响应 for attempt in range(max_retries + 1): try: response = await post_async( url=target_url, json=final_payload, headers=auth_headers, timeout=300.0 ) status_code = response.status_code # 成功 if status_code == 200: await record_api_call_success( credential_manager, current_file, mode="geminicli", model_key=model_group ) # 创建响应头,移除压缩相关的header避免重复解压 response_headers = dict(response.headers) response_headers.pop('content-encoding', None) response_headers.pop('content-length', None) return Response( content=response.content, status_code=200, headers=response_headers ) # 失败 - 记录最后一次错误 # 创建响应头,移除压缩相关的header避免重复解压 error_headers = dict(response.headers) error_headers.pop('content-encoding', None) error_headers.pop('content-length', None) last_error_response = Response( content=response.content, status_code=status_code, headers=error_headers ) # 判断是否需要重试 if status_code == 429 or status_code not in DISABLE_ERROR_CODES: try: error_text = response.text log.warning(f"非流式请求失败 (status={status_code}), 凭证: {current_file}, 响应: {error_text[:500]}") except Exception: log.warning(f"非流式请求失败 (status={status_code}), 凭证: {current_file}") # 记录错误 cooldown_until = None if status_code == 429: # 尝试解析冷却时间 try: error_text = response.text cooldown_until = await parse_and_log_cooldown(error_text, mode="geminicli") except Exception: pass await record_api_call_error( credential_manager, current_file, status_code, cooldown_until, mode="geminicli", model_key=model_group ) # 检查是否应该重试 should_retry = await handle_error_with_retry( credential_manager, status_code, current_file, retry_config["retry_enabled"], attempt, max_retries, retry_interval, mode="geminicli" ) if should_retry and attempt < max_retries: # 重新获取凭证并重试 log.info(f"[NON-STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...") await asyncio.sleep(retry_interval) # 获取新凭证 cred_result = await credential_manager.get_valid_credential( mode="geminicli", model_key=model_group ) if not cred_result: log.error("[NON-STREAM] 重试时无可用凭证") return Response( content=json.dumps({"error": "当前无可用凭证"}), status_code=500, media_type="application/json" ) current_file, credential_data = cred_result auth_headers, final_payload, target_url = await prepare_request_headers_and_payload( body, credential_data, f"{await get_code_assist_endpoint()}/v1internal:generateContent" ) if headers: auth_headers.update(headers) continue # 重试 else: # 不重试,直接返回原始错误 log.error(f"[NON-STREAM] 达到最大重试次数或不应重试,返回原始错误") return last_error_response else: # 错误码在禁用码当中,直接返回,无需重试 try: error_text = response.text log.error(f"非流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}, 响应: {error_text[:500]}") except Exception: log.error(f"非流式请求失败,禁用错误码 (status={status_code}), 凭证: {current_file}") await record_api_call_error( credential_manager, current_file, status_code, None, mode="geminicli", model_key=model_group ) return last_error_response except Exception as e: log.error(f"非流式请求异常: {e}, 凭证: {current_file}") if attempt < max_retries: log.info(f"[NON-STREAM] 异常后重试 (attempt {attempt + 2}/{max_retries + 1})...") await asyncio.sleep(retry_interval) continue else: # 所有重试都失败,返回最后一次的错误(如果有)或500错误 log.error(f"[NON-STREAM] 所有重试均失败,最后异常: {e}") if last_error_response: return last_error_response else: return Response( content=json.dumps({"error": f"请求异常: {str(e)}"}), status_code=500, media_type="application/json" ) # 所有重试都失败,返回最后一次的原始错误 log.error("[NON-STREAM] 所有重试均失败") return last_error_response # ==================== 测试代码 ==================== if __name__ == "__main__": """ 测试代码:演示API返回的流式和非流式数据格式 运行方式: python src/api/geminicli.py """ print("=" * 80) print("GeminiCli API 测试") print("=" * 80) # 测试请求体 test_body = { "model": "gemini-2.5-flash", "request": { "contents": [ { "role": "user", "parts": [{"text": "Hello, tell me a joke in one sentence."}] } ] } } async def test_stream_request(): """测试流式请求""" print("\n" + "=" * 80) print("【测试1】流式请求 (stream_request with native=False)") print("=" * 80) print(f"请求体: {json.dumps(test_body, indent=2, ensure_ascii=False)}\n") print("流式响应数据 (每个chunk):") print("-" * 80) chunk_count = 0 async for chunk in stream_request(body=test_body, native=False): chunk_count += 1 if isinstance(chunk, Response): # 错误响应 print(f"\n❌ 错误响应:") print(f" 状态码: {chunk.status_code}") print(f" Content-Type: {chunk.headers.get('content-type', 'N/A')}") try: content = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body) print(f" 内容: {content}") except Exception as e: print(f" 内容解析失败: {e}") else: # 正常的流式数据块 (str类型) print(f"\nChunk #{chunk_count}:") print(f" 类型: {type(chunk).__name__}") print(f" 长度: {len(chunk) if hasattr(chunk, '__len__') else 'N/A'}") print(f" 内容预览: {repr(chunk[:200] if len(chunk) > 200 else chunk)}") # 如果是SSE格式,尝试解析 if isinstance(chunk, str) and chunk.startswith("data: "): try: data_line = chunk.strip() if data_line.startswith("data: "): json_str = data_line[6:] # 去掉 "data: " 前缀 json_data = json.loads(json_str) print(f" 解析后的JSON: {json.dumps(json_data, indent=4, ensure_ascii=False)}") except Exception as e: print(f" SSE解析尝试失败: {e}") print(f"\n总共收到 {chunk_count} 个chunk") async def test_non_stream_request(): """测试非流式请求""" print("\n" + "=" * 80) print("【测试2】非流式请求 (non_stream_request)") print("=" * 80) print(f"请求体: {json.dumps(test_body, indent=2, ensure_ascii=False)}\n") response = await non_stream_request(body=test_body) print("非流式响应数据:") print("-" * 80) print(f"状态码: {response.status_code}") print(f"Content-Type: {response.headers.get('content-type', 'N/A')}") print(f"\n响应头: {dict(response.headers)}\n") try: content = response.body.decode('utf-8') if isinstance(response.body, bytes) else str(response.body) print(f"响应内容 (原始):\n{content}\n") # 尝试解析JSON try: json_data = json.loads(content) print(f"响应内容 (格式化JSON):") print(json.dumps(json_data, indent=2, ensure_ascii=False)) except json.JSONDecodeError: print("(非JSON格式)") except Exception as e: print(f"内容解析失败: {e}") async def main(): """主测试函数""" try: # 测试流式请求 await test_stream_request() # 测试非流式请求 await test_non_stream_request() print("\n" + "=" * 80) print("测试完成") print("=" * 80) except Exception as e: print(f"\n❌ 测试过程中出现异常: {e}") import traceback traceback.print_exc() # 运行测试 asyncio.run(main())