from fastapi import APIRouter, HTTPException, Request, Depends, status from fastapi.responses import JSONResponse, StreamingResponse from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList from app.services import GeminiClient from app.utils import ( generate_cache_key, cache_response, create_chat_response, create_error_response ) from app.config.settings import ( api_call_stats, BLOCKED_MODELS ) import asyncio import time import logging # 导入拆分后的模块 from .auth import verify_password from .logging_utils import log from .request_handlers import process_request # 创建路由器 router = APIRouter() # 全局变量引用 - 这些将在main.py中初始化并传递给路由 key_manager = None response_cache_manager = None active_requests_manager = None safety_settings = None safety_settings_g2 = None current_api_key = None FAKE_STREAMING = None FAKE_STREAMING_INTERVAL = None PASSWORD = None MAX_REQUESTS_PER_MINUTE = None MAX_REQUESTS_PER_DAY_PER_IP = None # 初始化路由器的函数 def init_router( _key_manager, _response_cache_manager, _active_requests_manager, _safety_settings, _safety_settings_g2, _current_api_key, _fake_streaming, _fake_streaming_interval, _password, _max_requests_per_minute, _max_requests_per_day_per_ip ): global key_manager, response_cache_manager, active_requests_manager global safety_settings, safety_settings_g2, current_api_key global FAKE_STREAMING, FAKE_STREAMING_INTERVAL global PASSWORD, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP key_manager = _key_manager response_cache_manager = _response_cache_manager active_requests_manager = _active_requests_manager safety_settings = _safety_settings safety_settings_g2 = _safety_settings_g2 current_api_key = _current_api_key FAKE_STREAMING = _fake_streaming FAKE_STREAMING_INTERVAL = _fake_streaming_interval PASSWORD = _password MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip # 自定义密码验证依赖 async def custom_verify_password(request: Request): await verify_password(request, PASSWORD) # API路由 @router.get("/v1/models", response_model=ModelList) def list_models(): log('info', "Received request to list models", extra={'request_type': 'list_models', 'status_code': 200}) filtered_models = [model for model in GeminiClient.AVAILABLE_MODELS if model not in BLOCKED_MODELS] return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in filtered_models]) @router.post("/v1/chat/completions", response_model=ChatCompletionResponse) async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(custom_verify_password)): # 获取客户端IP client_ip = http_request.client.host if http_request.client else "unknown" # 流式请求直接处理,不使用缓存 if request.stream: return await process_request( request, http_request, "stream", key_manager, response_cache_manager, active_requests_manager, safety_settings, safety_settings_g2, api_call_stats, FAKE_STREAMING, FAKE_STREAMING_INTERVAL, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP ) # 生成完整缓存键 - 用于精确匹配 cache_key = generate_cache_key(request) # 记录请求缓存键信息 log('info', f"请求缓存键: {cache_key[:8]}...", extra={'cache_key': cache_key[:8], 'request_type': 'non-stream'}) # 检查精确缓存是否存在且未过期 cached_response, cache_hit = response_cache_manager.get(cache_key) if cache_hit: # 精确缓存命中 log('info', f"精确缓存命中: {cache_key[:8]}...", extra={'cache_operation': 'hit', 'request_type': 'non-stream'}) # 同时清理相关的活跃任务,避免后续请求等待已经不需要的任务 active_requests_manager.remove_by_prefix(f"cache:{cache_key}") # 安全删除缓存 if cache_key in response_cache_manager.cache: del response_cache_manager.cache[cache_key] log('info', f"缓存使用后已删除: {cache_key[:8]}...", extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) # 返回缓存响应 return cached_response # 构建包含缓存键的活跃请求池键 pool_key = f"cache:{cache_key}" # 查找所有使用相同缓存键的活跃任务 active_task = active_requests_manager.get(pool_key) if active_task and not active_task.done(): log('info', f"发现相同请求的进行中任务", extra={'request_type': 'non-stream', 'model': request.model}) # 等待已有任务完成 try: # 设置超时,避免无限等待 await asyncio.wait_for(active_task, timeout=180) # 通过缓存管理器获取已完成任务的结果 cached_response, cache_hit = response_cache_manager.get(cache_key) if cache_hit: # 安全删除缓存 if cache_key in response_cache_manager.cache: del response_cache_manager.cache[cache_key] log('info', f"使用已完成任务的缓存后删除: {cache_key[:8]}...", extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) return cached_response # 如果缓存已被清除或不存在,使用任务结果 if active_task.done() and not active_task.cancelled(): result = active_task.result() if result: # 使用原始结果时,我们需要创建一个新的响应对象 # 避免使用可能已被其他请求修改的对象 new_response = ChatCompletionResponse( id=f"chatcmpl-{int(time.time()*1000)}", object="chat.completion", created=int(time.time()), model=result.model, choices=result.choices ) # 不要缓存此结果,因为它很可能是一个已存在但被使用后清除的缓存 return new_response except (asyncio.TimeoutError, asyncio.CancelledError) as e: # 任务超时或被取消的情况下,记录日志然后让代码继续执行 error_type = "超时" if isinstance(e, asyncio.TimeoutError) else "被取消" log('warning', f"等待已有任务{error_type}: {pool_key}", extra={'request_type': 'non-stream', 'model': request.model}) # 从活跃请求池移除该任务 if active_task.done() or active_task.cancelled(): active_requests_manager.remove(pool_key) log('info', f"已从活跃请求池移除{error_type}任务: {pool_key}", extra={'request_type': 'non-stream'}) # 创建请求处理任务 process_task = asyncio.create_task( process_request( request, http_request, "non-stream", key_manager, response_cache_manager, active_requests_manager, safety_settings, safety_settings_g2, api_call_stats, FAKE_STREAMING, FAKE_STREAMING_INTERVAL, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP, cache_key, client_ip ) ) # 将任务添加到活跃请求池 active_requests_manager.add(pool_key, process_task) # 等待任务完成 try: response = await process_task return response except Exception as e: # 如果任务失败,从活跃请求池中移除 active_requests_manager.remove(pool_key) # 检查是否已有缓存的结果(可能是由另一个任务创建的) cached_response, cache_hit = response_cache_manager.get(cache_key) if cache_hit: log('info', f"任务失败但找到缓存,使用缓存结果: {cache_key[:8]}...", extra={'request_type': 'non-stream', 'model': request.model}) return cached_response # 重新抛出异常 raise