| |
| """ |
| 处理 Gemini 原生 API (v2) 相关的路由。 |
| """ |
| import logging |
| from fastapi import APIRouter, Request, HTTPException, status, Depends, Path |
| from fastapi.responses import JSONResponse |
| from typing import Dict, Any, Optional |
|
|
| |
| from app.core.services.gemini import GeminiClient |
| from app.core.utils.response_wrapper import ResponseWrapper |
| from app.core.context.store import load_context, save_context, convert_openai_to_gemini_contents, convert_gemini_to_storage_format, load_context_as_gemini |
| from app.core.keys.manager import APIKeyManager |
| import httpx |
| |
| from app.core.dependencies import get_key_manager, get_http_client |
| |
| from app.core.utils.request_helpers import get_client_ip, get_current_timestamps |
| from app.core.processing.utils import check_rate_limits_and_update_counts, update_token_counts |
| from app import config as app_config |
| from app.core.database.utils import IS_MEMORY_DB |
| from app.config import ENABLE_CONTEXT_COMPLETION |
|
|
| |
| from app.api.middleware import verify_proxy_key |
|
|
| |
| from app.api.models import GeminiGenerateContentRequestV2, GeminiGenerateContentResponseV2 |
|
|
| logger = logging.getLogger('my_logger') |
|
|
| v2_router = APIRouter() |
|
|
| async def _load_and_inject_context_v2( |
| proxy_key: str, |
| request_body: GeminiGenerateContentRequestV2, |
| enable_context: bool |
| ): |
| """ |
| 加载历史上下文并注入到请求体中。 |
| """ |
| if not enable_context: |
| logger.debug(f"Key {proxy_key[:8]}... 的上下文补全已禁用,跳过加载和注入。") |
| return |
|
|
| |
| gemini_context = await load_context_as_gemini(proxy_key) |
| if gemini_context: |
| |
| request_body.contents = gemini_context + request_body.contents |
| logger.debug(f"为 Key {proxy_key[:8]}... 注入了 {len(gemini_context)} 条上下文消息。") |
| else: |
| logger.debug(f"Key {proxy_key[:8]}... 没有找到上下文或加载失败,跳过注入。") |
|
|
|
|
| async def _save_context_after_v2_success( |
| proxy_key: str, |
| original_contents: list, |
| gemini_response: Dict[str, Any], |
| enable_context: bool |
| ): |
| """ |
| 在 v2 API 调用成功后保存上下文。 |
| """ |
| if not enable_context or not gemini_response or not gemini_response.get('candidates'): |
| if enable_context: |
| logger.debug(f"为 Key {proxy_key[:8]}... 跳过上下文存储:上下文补全未启用或响应无效。") |
| return |
|
|
| |
| |
| user_message_gemini = original_contents[-1] if original_contents else None |
| |
| model_response_gemini = gemini_response['candidates'][0]['content'] if gemini_response.get('candidates') else None |
|
|
| if user_message_gemini and model_response_gemini: |
| |
| new_context_entry = convert_gemini_to_storage_format(user_message_gemini, model_response_gemini) |
| if new_context_entry: |
| |
| existing_context = await load_context(proxy_key) |
| updated_context = (existing_context or []) + new_context_entry |
| await save_context(proxy_key, updated_context) |
| logger.debug(f"为 Key {proxy_key[:8]}... 存储了新的上下文回合。") |
| else: |
| logger.warning(f"为 Key {proxy_key[:8]}... 转换 Gemini 请求/响应为存储格式失败,跳过上下文存储。") |
| else: |
| logger.warning(f"为 Key {proxy_key[:8]}... 存储上下文失败:无法提取用户消息或模型响应。") |
|
|
|
|
| @v2_router.post("/models/{model}:generateContent", response_model=GeminiGenerateContentResponseV2) |
| async def generate_content_v2( |
| request: Request, |
| request_body: GeminiGenerateContentRequestV2, |
| model: str = Path(..., description="要使用的 Gemini 模型名称"), |
| auth_data: Dict[str, Any] = Depends(verify_proxy_key), |
| key_manager: APIKeyManager = Depends(get_key_manager), |
| http_client: httpx.AsyncClient = Depends(get_http_client), |
| ): |
| """ |
| 处理 Gemini 原生 API 的 generateContent 请求 (/v2)。 |
| """ |
| proxy_key = auth_data.get("key") |
| key_config = auth_data.get("config", {}) |
| enable_context = key_config.get('enable_context_completion', ENABLE_CONTEXT_COMPLETION) |
|
|
| logger.info(f"收到 /v2/models/{model}:generateContent 请求,使用 Key: {proxy_key[:8]}..., 上下文补全: {enable_context}") |
|
|
| |
| client = GeminiClient(api_key=proxy_key, http_client=http_client) |
|
|
| |
| original_contents = request_body.contents |
| await _load_and_inject_context_v2(proxy_key, request_body, enable_context) |
|
|
|
|
| |
| client_ip = get_client_ip(request) |
| _, today_date_str_pt = get_current_timestamps() |
|
|
| |
| limits = app_config.MODEL_LIMITS.get(model) |
| if not check_rate_limits_and_update_counts(proxy_key, model, limits): |
| |
| raise HTTPException( |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| detail=f"API Key for model '{model}' has reached rate limits. Please try again later." |
| ) |
|
|
| |
| try: |
| |
| request_payload = request_body.model_dump(exclude_none=True) |
| logger.debug(f"调用 Gemini API,模型: {model}, Payload: {request_payload}") |
| |
| gemini_response = await client.generate_content(model_name=model, request_payload=request_payload) |
| logger.debug(f"收到 Gemini API 响应: {gemini_response}") |
|
|
| |
| prompt_tokens = None |
| if isinstance(gemini_response, dict) and 'usageMetadata' in gemini_response: |
| prompt_tokens = gemini_response['usageMetadata'].get('promptTokenCount') |
| update_token_counts(proxy_key, model, limits, prompt_tokens, client_ip, today_date_str_pt) |
|
|
| |
| await _save_context_after_v2_success(proxy_key, original_contents, gemini_response, enable_context) |
|
|
| |
| |
| return JSONResponse(content=gemini_response) |
|
|
| except HTTPException as e: |
| |
| raise e |
| except Exception as e: |
| |
| logger.error(f"处理 /v2/models/{model}:generateContent 请求时发生错误 (Key: {proxy_key[:8]}...): {e}", exc_info=True) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"处理请求时发生内部错误: {e}" |
| ) |
|
|