| |
| """ |
| 主请求处理程序,包含 process_request 和 _attempt_api_call 逻辑。 |
| 此模块负责协调处理来自 API 端点的请求,包括: |
| - 解析请求数据 |
| - 加载和处理上下文 |
| - 选择合适的 API Key |
| - 调用 Gemini API (流式或非流式) |
| - 处理 API 响应和错误 |
| - 更新速率限制和 Token 计数 |
| - 创建缓存条目 |
| - 保存上下文 |
| """ |
| import asyncio |
| import json |
| import logging |
| import time |
| import pytz |
| import uuid |
| from datetime import datetime |
| import random |
| import hashlib |
| from typing import Literal, List, Tuple, Dict, Any, Optional, Union |
| from fastapi import HTTPException, Request, status, Depends |
| from fastapi.responses import StreamingResponse |
| from collections import Counter, defaultdict |
| import httpx |
| from sqlalchemy.orm import Session |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from aiosqlite import Connection |
|
|
| |
| from app.api.models import ChatCompletionRequest, ChatCompletionResponse, ResponseMessage |
|
|
| |
| from app.core.services.gemini import GeminiClient |
| from app.core.utils.response_wrapper import ResponseWrapper |
| from app.core.context import store as context_store |
| from app.core.database import utils as db_utils |
| from app.core.context.converter import convert_messages |
| from app.core.keys.manager import APIKeyManager |
| from app.core.cache.manager import CacheManager |
| from app.core.processing.error_handler import _handle_api_call_exception |
| from app.core.utils.request_helpers import get_client_ip, get_current_timestamps |
| from app.core.security.rate_limit import protect_from_abuse |
| |
| from app.core.dependencies import get_db_session, get_key_manager, get_cache_manager, get_http_client |
|
|
| |
| from app.core.processing.utils import ( |
| estimate_token_count, truncate_context, check_rate_limits_and_update_counts, |
| update_token_counts, save_context_after_success |
| ) |
|
|
| |
| from app.core.processing.stream_handler import generate_stream_response |
|
|
| |
| from app import config |
| from app.config import ( |
| DISABLE_SAFETY_FILTERING, |
| MAX_REQUESTS_PER_MINUTE, |
| ENABLE_NATIVE_CACHING, |
| ENABLE_STICKY_SESSION, |
| STREAM_SAVE_REPLY, |
| MAX_REQUESTS_PER_DAY_PER_IP, |
| safety_settings, |
| safety_settings_g2 |
| ) |
|
|
| |
| from app.core.tracking import ( |
| usage_data, usage_lock, RPM_WINDOW_SECONDS, TPM_WINDOW_SECONDS, |
| ip_daily_input_token_counts, ip_input_token_counts_lock, |
| increment_cache_hit_count, increment_cache_miss_count, add_tokens_saved, |
| track_cache_hit, track_cache_miss |
| ) |
|
|
| |
| |
|
|
| logger = logging.getLogger('my_logger') |
|
|
| |
| async def _attempt_api_call( |
| chat_request: ChatCompletionRequest, |
| contents: List[Dict[str, Any]], |
| system_instruction: Optional[str], |
| current_api_key: str, |
| http_client: httpx.AsyncClient, |
| key_manager: APIKeyManager, |
| model_name: str, |
| limits: Optional[Dict[str, Any]], |
| client_ip: str, |
| today_date_str_pt: str, |
| enable_native_caching: bool, |
| cache_manager_instance: CacheManager, |
| request_id: Optional[str] = None, |
| |
| cached_content_id_to_use: Optional[str] = None, |
| content_to_cache_on_success: Optional[Dict[str, Any]] = None, |
| user_id: Optional[str] = None, |
| db: AsyncSession = None |
| ) -> Tuple[Optional[Union[StreamingResponse, ChatCompletionResponse]], Optional[Dict[str, Any]], bool]: |
| """ |
| 尝试使用给定的 API Key 和内容进行一次对 Gemini API 的调用。 |
| 此函数封装了单次 API 调用的逻辑,包括流式和非流式处理、 |
| 原生缓存的创建(如果需要且调用成功)、以及基本的成功/失败判断。 |
| |
| Args: |
| (参数说明见上方的类型提示) |
| |
| Returns: |
| Tuple[Optional[Union[StreamingResponse, ChatCompletionResponse]], Optional[Dict[str, Any]], bool]: |
| 一个元组包含: |
| - 第一个元素:如果调用成功,返回 FastAPI 的响应对象 (StreamingResponse 或 ChatCompletionResponse);否则为 None。 |
| - 第二个元素:如果调用失败,返回包含错误信息的字典;否则为 None。 |
| - 第三个元素:一个布尔值,指示调用失败后是否需要外部重试循环尝试使用其他 Key (True 表示需要重试,False 表示不需要)。 |
| """ |
| response = None |
| error_info = None |
| needs_retry = False |
|
|
| try: |
| |
| |
| |
| current_safety_settings = safety_settings_g2 if config.DISABLE_SAFETY_FILTERING or 'gemini-2.0-flash-exp' in chat_request.model else safety_settings |
|
|
| |
| |
| |
| gemini_client_instance = GeminiClient(current_api_key, http_client) |
|
|
| |
| is_stream = chat_request.stream |
|
|
| if is_stream: |
| |
| response_id = f"chatcmpl-{int(time.time() * 1000)}" |
| |
| response = StreamingResponse(generate_stream_response( |
| |
| gemini_client_instance=gemini_client_instance, |
| chat_request=chat_request, |
| contents=contents, |
| safety_settings=current_safety_settings, |
| system_instruction=system_instruction, |
| cached_content_id=cached_content_id_to_use, |
| response_id=response_id, |
| enable_native_caching=enable_native_caching, |
| cache_manager_instance=cache_manager_instance, |
| content_to_cache_on_success=content_to_cache_on_success, |
| db_for_cache=db, |
| user_id_for_mapping=user_id, |
| key_manager=key_manager, |
| selected_key=current_api_key, |
| model_name=model_name, |
| limits=limits, |
| client_ip=client_ip, |
| today_date_str_pt=today_date_str_pt, |
| ), media_type="text/event-stream") |
| logger.info(f"流式响应已启动 (Key: {current_api_key[:8]}, ID: {response_id})") |
| |
| |
| return response, None, False |
|
|
| else: |
| |
| |
| |
| response_obj = await gemini_client_instance.complete_chat( |
| request=chat_request, |
| contents=contents, |
| safety_settings=current_safety_settings, |
| system_instruction=system_instruction, |
| cached_content_id=cached_content_id_to_use |
| ) |
| |
| if isinstance(response_obj, ResponseWrapper): |
| |
| response = ChatCompletionResponse( |
| id=f"chatcmpl-{int(time.time() * 1000)}", |
| object="chat.completion", |
| created=int(time.time()), |
| model=chat_request.model, |
| choices=[{ |
| "index": 0, |
| "message": ResponseMessage( |
| role="assistant", |
| content=response_obj.text, |
| tool_calls=response_obj.tool_calls |
| ), |
| "finish_reason": response_obj.finish_reason |
| }], |
| usage={ |
| "prompt_tokens": response_obj.prompt_token_count or 0, |
| "completion_tokens": response_obj.candidates_token_count or 0, |
| "total_tokens": response_obj.total_token_count or 0 |
| } |
| ) |
| else: |
| |
| logger.error(f"complete_chat 返回了意外的类型: {type(response_obj)}") |
| raise TypeError("API 调用返回了非预期的响应类型") |
|
|
|
|
| |
| |
| with usage_lock: |
| |
| key_usage = usage_data.setdefault(current_api_key, defaultdict(lambda: defaultdict(int)))[model_name] |
| key_usage['last_used_timestamp'] = time.time() |
| logger.debug(f"非流式请求成功,更新 Key {current_api_key[:8]}... ({model_name}) 的 last_used_timestamp") |
|
|
| |
| if response.usage: |
| prompt_tokens = response.usage.prompt_tokens |
| |
| await update_token_counts(current_api_key, model_name, limits, prompt_tokens, client_ip, today_date_str_pt) |
| logger.debug(f"非流式请求成功,更新 Key {current_api_key[:8]}... ({model_name}) 的 Token 计数 (占位符)。") |
| else: |
| |
| logger.warning(f"非流式响应成功但未找到 usage metadata (Key: {current_api_key[:8]}...). Token counts not updated.") |
|
|
| |
| if enable_native_caching and content_to_cache_on_success: |
| logger.debug(f"非流式请求成功且是缓存未命中,尝试创建新缓存 (Key: {current_api_key[:8]}...)") |
| try: |
| |
| if db and user_id is not None: |
| |
| api_key_id = await key_manager.get_key_id(current_api_key) |
| if api_key_id is not None: |
| |
| new_cache_id = await cache_manager_instance.create_cache( |
| db=db, |
| user_id=user_id, |
| api_key_id=api_key_id, |
| content=content_to_cache_on_success, |
| ttl=3600 |
| ) |
| if new_cache_id: |
| logger.info(f"新缓存创建成功: {new_cache_id} (Key: {current_api_key[:8]}...)") |
| |
| else: |
| logger.warning(f"创建新缓存失败 (Key: {current_api_key[:8]}...)") |
| else: |
| logger.warning(f"无法获取 Key {current_api_key[:8]}... 的 ID,跳过缓存创建。") |
| else: |
| logger.warning(f"db session ({'有效' if db else '无效'}) 或 user_id ({user_id if user_id is not None else '无效'}) 无效,跳过缓存创建。") |
| except Exception as cache_create_err: |
| |
| logger.error(f"创建缓存时发生异常 (Key: {current_api_key[:8]}...): {cache_create_err}", exc_info=True) |
|
|
| |
|
|
| |
| return response, None, False |
|
|
| except Exception as api_exc: |
| |
| |
| |
| error_info, needs_retry_from_exception = await _handle_api_call_exception( |
| exc=api_exc, |
| current_api_key=current_api_key, |
| key_manager=key_manager, |
| is_stream=is_stream, |
| request_id=request_id |
| ) |
| |
| return None, error_info, needs_retry_from_exception |
|
|
|
|
| |
| async def process_request( |
| chat_request: ChatCompletionRequest, |
| http_request: Request, |
| request_type: Literal['stream', 'non-stream'], |
| auth_data: Dict[str, Any], |
| |
| key_manager: APIKeyManager = Depends(get_key_manager), |
| http_client: httpx.AsyncClient = Depends(get_http_client), |
| cache_manager_instance: CacheManager = Depends(get_cache_manager), |
| db: AsyncSession = Depends(get_db_session) |
| ): |
| """ |
| 处理来自 API 端点的聊天补全请求的核心逻辑。 |
| 负责:上下文加载、消息转换、缓存查找、Key 选择、API 调用尝试与重试、 |
| 结果处理、Token 计数更新、上下文保存等。 |
| |
| Args: |
| chat_request (ChatCompletionRequest): 请求体数据。 |
| http_request (Request): FastAPI 请求对象。 |
| request_type (Literal['stream', 'non-stream']): 请求类型。 |
| auth_data (Dict[str, Any]): 认证数据,包含 'key' (代理 Key) 和 'config' (Key 特定配置)。 |
| key_manager (APIKeyManager): 依赖注入的 Key 管理器。 |
| http_client (httpx.AsyncClient): 依赖注入的 HTTP 客户端。 |
| cache_manager_instance (CacheManager): 依赖注入的缓存管理器。 |
| db (AsyncSession): 依赖注入的数据库会话。 |
| |
| Returns: |
| Union[StreamingResponse, ChatCompletionResponse]: 成功时返回 FastAPI 响应对象。 |
| |
| Raises: |
| HTTPException: 在处理过程中发生错误时抛出,例如无可用 Key、API 错误、内部错误等。 |
| """ |
| |
| proxy_key = auth_data.get("key") |
| key_config = auth_data.get("config", {}) |
| model_name = chat_request.model |
| client_ip = get_client_ip(http_request) |
| _, today_date_str_pt = get_current_timestamps() |
| request_id = f"req_{uuid.uuid4().hex[:8]}" |
| logger.info(f"开始处理请求 {request_id} (类型: {request_type}, 模型: {model_name}, Key: {proxy_key[:8]}...)") |
|
|
| |
| |
| try: |
| await protect_from_abuse( |
| http_request, |
| config.MAX_REQUESTS_PER_MINUTE, |
| config.MAX_REQUESTS_PER_DAY_PER_IP |
| ) |
| logger.debug(f"请求 {request_id}: IP {client_ip} 通过滥用检查。") |
| except HTTPException as ip_limit_exc: |
| |
| logger.warning(f"请求 {request_id}: IP {client_ip} 未通过滥用检查: {ip_limit_exc.detail}") |
| raise ip_limit_exc |
|
|
| |
| normalized_model_name = model_name.lower() |
| |
| |
| |
| supported_models_keys = config.MODEL_LIMITS.keys() |
| |
| original_model_name_for_error = model_name |
|
|
| if normalized_model_name not in supported_models_keys: |
| |
| |
| found_case_insensitive = False |
| for m_key in supported_models_keys: |
| if m_key.lower() == normalized_model_name: |
| logger.info(f"请求 {request_id}: 模型名称 '{original_model_name_for_error}' 通过大小写不敏感匹配规范化为 '{m_key}'。") |
| model_name = m_key |
| normalized_model_name = m_key |
| found_case_insensitive = True |
| break |
| |
| if not found_case_insensitive: |
| logger.error(f"请求 {request_id}: 不支持的模型名称 '{original_model_name_for_error}' (规范化尝试: '{normalized_model_name}')。支持的模型: {list(supported_models_keys)}") |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=f"不支持的模型: '{original_model_name_for_error}'. 支持的模型包括: {', '.join(supported_models_keys)}." |
| ) |
| else: |
| |
| if model_name != normalized_model_name: |
| logger.info(f"请求 {request_id}: 模型名称 '{original_model_name_for_error}' 规范化为 '{normalized_model_name}'。") |
| model_name = normalized_model_name |
| else: |
| logger.info(f"请求 {request_id}: 模型名称 '{model_name}' 有效且大小写规范。") |
|
|
| |
| |
| limits = config.MODEL_LIMITS.get(model_name) |
| if not limits: |
| |
| logger.critical(f"请求 {request_id}: 严重错误!模型 '{model_name}' 通过了名称校验,但在 MODEL_LIMITS 中未找到其限制配置。原始请求模型: '{original_model_name_for_error}'") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"模型 '{model_name}' 的内部配置错误。请联系管理员。") |
|
|
| |
| |
| |
|
|
| |
| enable_native_caching = config.ENABLE_NATIVE_CACHING |
| |
| enable_context = key_config.get('enable_context_completion', config.ENABLE_CONTEXT_COMPLETION) |
| if enable_native_caching: |
| enable_context = False |
| logger.info(f"请求 {request_id}: 原生缓存已启用,传统上下文补全已禁用。") |
|
|
| |
| initial_contents = [] |
| if enable_context and chat_request.user_id: |
| try: |
| |
| initial_contents = await context_store.load_context(chat_request.user_id) or [] |
| logger.debug(f"请求 {request_id}: 传统上下文已启用 (用户: {chat_request.user_id}), 加载了 {len(initial_contents)} 条历史。") |
| except Exception as context_load_err: |
| |
| logger.error(f"请求 {request_id}: 加载上下文失败 (用户: {chat_request.user_id}): {context_load_err}", exc_info=True) |
| initial_contents = [] |
| elif enable_context and not chat_request.user_id: |
| |
| logger.warning(f"请求 {request_id}: 传统上下文已启用但未提供用户 ID,跳过上下文加载。") |
| else: |
| |
| logger.debug(f"请求 {request_id}: 传统上下文已禁用或未提供用户 ID,跳过上下文加载。") |
|
|
| |
| try: |
| |
| conversion_result = convert_messages(chat_request.messages) |
| |
| if isinstance(conversion_result, list): |
| error_detail = "; ".join(conversion_result) |
| logger.error(f"请求 {request_id}: 转换用户消息失败: {error_detail}") |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"消息格式错误: {error_detail}") |
| |
| gemini_contents, system_instruction_dict = conversion_result |
| |
| system_instruction_text = system_instruction_dict.get("parts", [{}])[0].get("text") if system_instruction_dict else None |
| except Exception as e: |
| |
| logger.error(f"请求 {request_id}: 转换消息时发生意外错误: {e}", exc_info=True) |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="处理消息时出错。") |
|
|
|
|
| |
| cached_content_id_to_use = None |
| content_to_cache_on_success = None |
| if enable_native_caching and chat_request.user_id: |
| try: |
| |
| cached_content_id_to_use = await cache_manager_instance.find_cache( |
| db=db, |
| user_id=chat_request.user_id, |
| messages=chat_request.messages |
| ) |
| if cached_content_id_to_use: |
| logger.info(f"请求 {request_id}: 缓存命中 (用户: {chat_request.user_id}, 缓存 ID: {cached_content_id_to_use})") |
| |
| track_cache_hit(request_id, cached_content_id_to_use, await estimate_token_count(initial_contents + gemini_contents)) |
| else: |
| |
| content_to_cache_on_success = {"messages": [msg.model_dump() for msg in chat_request.messages], "model": chat_request.model} |
| logger.debug(f"请求 {request_id}: 缓存未命中 (用户: {chat_request.user_id}), 将在成功后创建缓存。") |
| |
| track_cache_miss(request_id, cache_manager_instance._calculate_hash(content_to_cache_on_success)) |
| except Exception as cache_find_err: |
| |
| logger.error(f"请求 {request_id}: 查找缓存时发生异常 (用户: {chat_request.user_id}): {cache_find_err}", exc_info=True) |
| elif enable_native_caching and not chat_request.user_id: |
| |
| logger.warning(f"请求 {request_id}: 原生缓存已启用但未提供 user_id,无法进行缓存查找或创建。") |
|
|
|
|
| |
| max_attempts = key_manager.get_active_keys_count() + 1 |
| attempt_count = 0 |
| last_error_info = None |
| selected_key = None |
|
|
| |
| key_manager.tried_keys_for_request.clear() |
| logger.debug(f"请求 {request_id}: 重置已尝试 Key 列表。") |
|
|
| |
| while attempt_count < max_attempts: |
| attempt_count += 1 |
| logger.info(f"请求 {request_id}: 尝试 API 调用 (尝试 {attempt_count}/{max_attempts})") |
|
|
| |
| |
| merged_contents_for_estimation = initial_contents + gemini_contents |
| |
| estimated_input_tokens = await estimate_token_count(merged_contents_for_estimation) |
| logger.debug(f"请求 {request_id}: 估算本次请求输入 Token 数: {estimated_input_tokens}") |
|
|
| |
| |
| selected_key, available_input_tokens = await key_manager.select_best_key( |
| model_name=model_name, |
| model_limits=limits, |
| estimated_input_tokens=estimated_input_tokens, |
| user_id=chat_request.user_id, |
| enable_sticky_session=config.ENABLE_STICKY_SESSION, |
| request_id=request_id, |
| cached_content_id=cached_content_id_to_use, |
| db=db |
| ) |
|
|
| if selected_key: |
| logger.info(f"请求 {request_id}: 第 {attempt_count} 次尝试,选定 Key: {selected_key[:8]}...") |
|
|
| |
| |
| merged_contents_for_api = initial_contents + gemini_contents |
| |
| dynamic_limit_for_truncation = available_input_tokens |
| logger.debug(f"请求 {request_id}: 选定 Key {selected_key[:8]}... 剩余可用输入 Token (用于动态截断): {available_input_tokens}") |
|
|
| |
| truncated_contents_for_api, context_over_limit_after_truncation = await truncate_context( |
| contents=merged_contents_for_api, |
| model_name=model_name, |
| dynamic_max_tokens_limit=dynamic_limit_for_truncation |
| ) |
|
|
| |
| if context_over_limit_after_truncation: |
| |
| logger.error(f"请求 {request_id}: 动态截断后上下文仍然超限 ({await estimate_token_count(truncated_contents_for_api)} tokens)。跳过此 Key。") |
| key_manager.record_selection_reason(selected_key, "Context Over Limit After Dynamic Truncation", request_id) |
| key_manager.tried_keys_for_request.add(selected_key) |
| continue |
|
|
| |
| |
| response, error_info, needs_retry = await _attempt_api_call( |
| chat_request=chat_request, |
| contents=truncated_contents_for_api, |
| system_instruction=system_instruction_text, |
| current_api_key=selected_key, |
| http_client=http_client, |
| key_manager=key_manager, |
| model_name=model_name, |
| limits=limits, |
| client_ip=client_ip, |
| today_date_str_pt=today_date_str_pt, |
| enable_native_caching=enable_native_caching, |
| cache_manager_instance=cache_manager_instance, |
| request_id=request_id, |
| cached_content_id_to_use=cached_content_id_to_use, |
| content_to_cache_on_success=content_to_cache_on_success, |
| user_id=chat_request.user_id, |
| db=db |
| ) |
|
|
| |
| if response: |
| logger.info(f"请求 {request_id}: API 调用成功 (Key: {selected_key[:8]}..., 尝试 {attempt_count})") |
| |
| if chat_request.user_id and config.KEY_STORAGE_MODE == 'database': |
| try: |
| await key_manager.update_user_key_association(db, chat_request.user_id, selected_key) |
| logger.debug(f"请求 {request_id}: 更新用户 {chat_request.user_id} 与 Key {selected_key[:8]}... 的关联。") |
| except Exception as assoc_err: |
| logger.error(f"请求 {request_id}: 更新用户 Key 关联失败: {assoc_err}", exc_info=True) |
|
|
| |
| |
| if request_type == 'non-stream' and not enable_native_caching and enable_context and chat_request.user_id: |
| if isinstance(response, ChatCompletionResponse): |
| |
| model_reply_content = response.choices[0].message.content if response.choices and response.choices[0].message else "" |
| if model_reply_content: |
| |
| await save_context_after_success( |
| proxy_key=chat_request.user_id, |
| contents_to_send=merged_contents_for_api, |
| model_reply_content=model_reply_content, |
| model_name=model_name, |
| enable_context=True, |
| final_tool_calls=response.choices[0].message.tool_calls if response.choices and response.choices[0].message else None |
| ) |
| else: |
| |
| logger.warning(f"请求 {request_id}: 非流式响应成功但回复内容为空,跳过上下文保存。") |
| else: |
| |
| logger.warning(f"请求 {request_id}: 非流式响应类型异常 ({type(response)}),跳过上下文保存。") |
| |
|
|
| return response |
|
|
| elif needs_retry: |
| logger.warning(f"请求 {request_id}: API 调用失败,需要重试 (Key: {selected_key[:8]}..., 尝试 {attempt_count}). 错误: {error_info.get('message', '未知错误')}") |
| last_error_info = error_info |
| key_manager.tried_keys_for_request.add(selected_key) |
| continue |
|
|
| else: |
| logger.error(f"请求 {request_id}: API 调用失败,无需重试 (Key: {selected_key[:8]}..., 尝试 {attempt_count}). 错误: {error_info.get('message', '未知错误')}") |
| last_error_info = error_info |
| |
| break |
|
|
| else: |
| logger.warning(f"请求 {request_id}: 第 {attempt_count} 次尝试未找到可用 Key。") |
| |
| last_error_info = {"message": "所有可用 API Key 均尝试失败或达到限制。", "type": "key_error", "code": status.HTTP_503_SERVICE_UNAVAILABLE} |
| |
| await asyncio.sleep(0.5) |
|
|
| |
| |
| logger.error(f"请求 {request_id}: 所有 API 调用尝试均失败。") |
| |
| error_detail = last_error_info.get("message", "所有尝试均失败,无法处理请求。") if last_error_info else "所有尝试均失败,无法处理请求。" |
| status_code = last_error_info.get("code", status.HTTP_503_SERVICE_UNAVAILABLE) if last_error_info else status.HTTP_503_SERVICE_UNAVAILABLE |
| |
| raise HTTPException(status_code=status_code, detail=error_detail) |
|
|