# -*- coding: utf-8 -*- """ 处理 Gemini 原生 API (v2) 相关的路由。 """ import logging # 导入日志模块 from fastapi import APIRouter, Request, HTTPException, status, Depends, Path # 导入 FastAPI 相关组件 from fastapi.responses import JSONResponse # 导入 JSONResponse from typing import Dict, Any, Optional # 导入类型提示 # 导入自定义模块 from app.core.services.gemini import GeminiClient # 导入 GeminiClient (新路径) from app.core.utils.response_wrapper import ResponseWrapper # 导入响应包装类 (新路径) from app.core.context.store import load_context, save_context, convert_openai_to_gemini_contents, convert_gemini_to_storage_format, load_context_as_gemini # 导入上下文管理函数和转换函数 (新路径) from app.core.keys.manager import APIKeyManager # 导入类型 (新路径) import httpx # 导入 httpx 用于类型提示 # 导入依赖注入函数 from app.core.dependencies import get_key_manager, get_http_client # 导入请求工具函数和处理工具函数 from app.core.utils.request_helpers import get_client_ip, get_current_timestamps # (新路径) from app.core.processing.utils import check_rate_limits_and_update_counts, update_token_counts # (新路径) from app import config as app_config # 导入应用配置 from app.core.database.utils import IS_MEMORY_DB # 导入数据库模式标志 (新路径) from app.config import ENABLE_CONTEXT_COMPLETION # 导入全局上下文补全配置 # 导入认证依赖项 from app.api.middleware import verify_proxy_key # 导入代理 Key 验证依赖 # 导入 Pydantic 模型 from app.api.models import GeminiGenerateContentRequestV2, GeminiGenerateContentResponseV2 # 导入 v2 请求和响应模型 logger = logging.getLogger('my_logger') # 获取日志记录器实例 v2_router = APIRouter() # 创建 APIRouter 实例 async def _load_and_inject_context_v2( proxy_key: str, request_body: GeminiGenerateContentRequestV2, enable_context: bool ): """ 加载历史上下文并注入到请求体中。 """ if not enable_context: logger.debug(f"Key {proxy_key[:8]}... 的上下文补全已禁用,跳过加载和注入。") return # 加载历史上下文并转换为 Gemini 格式 gemini_context = await load_context_as_gemini(proxy_key) # 加载并转换历史上下文 if gemini_context: # 如果存在上下文历史 # 将上下文注入到当前请求内容的开头 request_body.contents = gemini_context + request_body.contents # 注入上下文 logger.debug(f"为 Key {proxy_key[:8]}... 注入了 {len(gemini_context)} 条上下文消息。") # 记录注入的上下文数量 else: logger.debug(f"Key {proxy_key[:8]}... 没有找到上下文或加载失败,跳过注入。") # 没有找到上下文或加载失败 async def _save_context_after_v2_success( proxy_key: str, original_contents: list, gemini_response: Dict[str, Any], enable_context: bool ): """ 在 v2 API 调用成功后保存上下文。 """ if not enable_context or not gemini_response or not gemini_response.get('candidates'): if enable_context: logger.debug(f"为 Key {proxy_key[:8]}... 跳过上下文存储:上下文补全未启用或响应无效。") return # 提取用户请求内容和模型响应内容 # 假设原始请求内容的最后一条是用户消息 user_message_gemini = original_contents[-1] if original_contents else None # 获取用户消息 # 假设响应的第一个候选的 content 是模型响应 model_response_gemini = gemini_response['candidates'][0]['content'] if gemini_response.get('candidates') else None # 获取模型响应 if user_message_gemini and model_response_gemini: # 如果用户消息和模型响应都存在 # 将 Gemini 格式的用户请求和模型响应转换为存储格式 (OpenAI 格式) new_context_entry = convert_gemini_to_storage_format(user_message_gemini, model_response_gemini) # 转换格式 if new_context_entry: # 如果转换成功 # 加载现有上下文,添加新条目,然后保存 existing_context = await load_context(proxy_key) # 重新加载现有上下文 updated_context = (existing_context or []) + new_context_entry # 合并新旧上下文 await save_context(proxy_key, updated_context) # 保存更新后的上下文 logger.debug(f"为 Key {proxy_key[:8]}... 存储了新的上下文回合。") # 存储了新的上下文回合 else: logger.warning(f"为 Key {proxy_key[:8]}... 转换 Gemini 请求/响应为存储格式失败,跳过上下文存储。") # 转换 Gemini 请求/响应为存储格式失败 else: logger.warning(f"为 Key {proxy_key[:8]}... 存储上下文失败:无法提取用户消息或模型响应。") # 存储上下文失败:无法提取用户消息或模型响应 @v2_router.post("/models/{model}:generateContent", response_model=GeminiGenerateContentResponseV2) # 定义 POST 请求端点,指定响应模型 async def generate_content_v2( request: Request, # FastAPI 请求对象 request_body: GeminiGenerateContentRequestV2, # 请求体,使用 Pydantic 模型进行验证 model: str = Path(..., description="要使用的 Gemini 模型名称"), # 从路径参数获取模型名称 auth_data: Dict[str, Any] = Depends(verify_proxy_key), # 依赖项:验证代理 Key 并获取 Key 和配置 key_manager: APIKeyManager = Depends(get_key_manager), # 注入 Key Manager http_client: httpx.AsyncClient = Depends(get_http_client), # 注入 HTTP Client ): """ 处理 Gemini 原生 API 的 generateContent 请求 (/v2)。 """ proxy_key = auth_data.get("key") # 从认证数据中获取代理 Key key_config = auth_data.get("config", {}) # 从认证数据中获取 Key 配置,默认为空字典 enable_context = key_config.get('enable_context_completion', ENABLE_CONTEXT_COMPLETION) # 获取 Key 的上下文补全配置,如果 Key 配置中没有则使用全局配置 logger.info(f"收到 /v2/models/{model}:generateContent 请求,使用 Key: {proxy_key[:8]}..., 上下文补全: {enable_context}") # 记录收到的请求信息 # 初始化 Gemini 客户端,传入共享的 http_client client = GeminiClient(api_key=proxy_key, http_client=http_client) # 使用代理 Key 初始化 Gemini 客户端 # 获取并注入上下文 (如果启用) - 委托给辅助函数 original_contents = request_body.contents # 保存原始请求内容 await _load_and_inject_context_v2(proxy_key, request_body, enable_context) # --- 获取客户端 IP 和时间戳 --- client_ip = get_client_ip(request) # 获取客户端 IP _, today_date_str_pt = get_current_timestamps() # 获取 PT 日期字符串 # --- 速率限制检查 --- limits = app_config.MODEL_LIMITS.get(model) # 获取模型限制 if not check_rate_limits_and_update_counts(proxy_key, model, limits): # 检查并更新速率限制计数 # 如果达到限制,check_rate_limits_and_update_counts 会记录警告 raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, # 429 状态码 detail=f"API Key for model '{model}' has reached rate limits. Please try again later." # 错误详情 ) # 调用 Gemini API try: # 将 Pydantic 模型转换为字典,以便传递给 GeminiClient request_payload = request_body.model_dump(exclude_none=True) # 转换为字典,排除 None 值 logger.debug(f"调用 Gemini API,模型: {model}, Payload: {request_payload}") # 调用 Gemini API # 调用 GeminiClient 的 generate_content 方法 gemini_response = await client.generate_content(model_name=model, request_payload=request_payload) # 调用 Gemini API logger.debug(f"收到 Gemini API 响应: {gemini_response}") # 收到 Gemini API 响应 # --- 更新 Token 计数 --- prompt_tokens = None # 初始化 prompt_tokens if isinstance(gemini_response, dict) and 'usageMetadata' in gemini_response: # 检查响应是否为字典且包含 usageMetadata prompt_tokens = gemini_response['usageMetadata'].get('promptTokenCount') # 获取 promptTokenCount update_token_counts(proxy_key, model, limits, prompt_tokens, client_ip, today_date_str_pt) # 更新 token 计数 # 存储上下文 (如果启用) - 委托给辅助函数 await _save_context_after_v2_success(proxy_key, original_contents, gemini_response, enable_context) # 返回 Gemini 响应 # 直接返回原始 Gemini 响应,不进行 OpenAI 包装 return JSONResponse(content=gemini_response) # 返回 JSON 响应 except HTTPException as e: # 捕获 FastAPI HTTPException 并重新抛出 raise e except Exception as e: # 捕获其他所有异常,记录并返回 500 错误 logger.error(f"处理 /v2/models/{model}:generateContent 请求时发生错误 (Key: {proxy_key[:8]}...): {e}", exc_info=True) # 处理 /v2/models/{model}:generateContent 请求时发生错误 raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # 内部服务器错误状态码 detail=f"处理请求时发生内部错误: {e}" # 错误详情 )