Spaces:
Paused
Paused
| from fastapi import APIRouter, HTTPException, Request, Depends, status | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList | |
| from app.services import GeminiClient | |
| from app.utils import ( | |
| generate_cache_key, | |
| cache_response, | |
| create_chat_response, | |
| create_error_response | |
| ) | |
| from app.config.settings import ( | |
| api_call_stats, | |
| BLOCKED_MODELS | |
| ) | |
| import asyncio | |
| import time | |
| import logging | |
| # 导入拆分后的模块 | |
| from .auth import verify_password | |
| from .logging_utils import log | |
| from .request_handlers import process_request | |
| # 创建路由器 | |
| router = APIRouter() | |
| # 全局变量引用 - 这些将在main.py中初始化并传递给路由 | |
| key_manager = None | |
| response_cache_manager = None | |
| active_requests_manager = None | |
| safety_settings = None | |
| safety_settings_g2 = None | |
| current_api_key = None | |
| FAKE_STREAMING = None | |
| FAKE_STREAMING_INTERVAL = None | |
| PASSWORD = None | |
| MAX_REQUESTS_PER_MINUTE = None | |
| MAX_REQUESTS_PER_DAY_PER_IP = None | |
| # 初始化路由器的函数 | |
| def init_router( | |
| _key_manager, | |
| _response_cache_manager, | |
| _active_requests_manager, | |
| _safety_settings, | |
| _safety_settings_g2, | |
| _current_api_key, | |
| _fake_streaming, | |
| _fake_streaming_interval, | |
| _password, | |
| _max_requests_per_minute, | |
| _max_requests_per_day_per_ip | |
| ): | |
| global key_manager, response_cache_manager, active_requests_manager | |
| global safety_settings, safety_settings_g2, current_api_key | |
| global FAKE_STREAMING, FAKE_STREAMING_INTERVAL | |
| global PASSWORD, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP | |
| key_manager = _key_manager | |
| response_cache_manager = _response_cache_manager | |
| active_requests_manager = _active_requests_manager | |
| safety_settings = _safety_settings | |
| safety_settings_g2 = _safety_settings_g2 | |
| current_api_key = _current_api_key | |
| FAKE_STREAMING = _fake_streaming | |
| FAKE_STREAMING_INTERVAL = _fake_streaming_interval | |
| PASSWORD = _password | |
| MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute | |
| MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip | |
| # 自定义密码验证依赖 | |
| async def custom_verify_password(request: Request): | |
| await verify_password(request, PASSWORD) | |
| # API路由 | |
| def list_models(): | |
| log('info', "Received request to list models", extra={'request_type': 'list_models', 'status_code': 200}) | |
| filtered_models = [model for model in GeminiClient.AVAILABLE_MODELS if model not in BLOCKED_MODELS] | |
| return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in filtered_models]) | |
| async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(custom_verify_password)): | |
| # 获取客户端IP | |
| client_ip = http_request.client.host if http_request.client else "unknown" | |
| # 流式请求直接处理,不使用缓存 | |
| if request.stream: | |
| return await process_request( | |
| request, | |
| http_request, | |
| "stream", | |
| key_manager, | |
| response_cache_manager, | |
| active_requests_manager, | |
| safety_settings, | |
| safety_settings_g2, | |
| api_call_stats, | |
| FAKE_STREAMING, | |
| FAKE_STREAMING_INTERVAL, | |
| MAX_REQUESTS_PER_MINUTE, | |
| MAX_REQUESTS_PER_DAY_PER_IP | |
| ) | |
| # 生成完整缓存键 - 用于精确匹配 | |
| cache_key = generate_cache_key(request) | |
| # 记录请求缓存键信息 | |
| log('info', f"请求缓存键: {cache_key[:8]}...", | |
| extra={'cache_key': cache_key[:8], 'request_type': 'non-stream'}) | |
| # 检查精确缓存是否存在且未过期 | |
| cached_response, cache_hit = response_cache_manager.get(cache_key) | |
| if cache_hit: | |
| # 精确缓存命中 | |
| log('info', f"精确缓存命中: {cache_key[:8]}...", | |
| extra={'cache_operation': 'hit', 'request_type': 'non-stream'}) | |
| # 同时清理相关的活跃任务,避免后续请求等待已经不需要的任务 | |
| active_requests_manager.remove_by_prefix(f"cache:{cache_key}") | |
| # 安全删除缓存 | |
| if cache_key in response_cache_manager.cache: | |
| del response_cache_manager.cache[cache_key] | |
| log('info', f"缓存使用后已删除: {cache_key[:8]}...", | |
| extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) | |
| # 返回缓存响应 | |
| return cached_response | |
| # 构建包含缓存键的活跃请求池键 | |
| pool_key = f"cache:{cache_key}" | |
| # 查找所有使用相同缓存键的活跃任务 | |
| active_task = active_requests_manager.get(pool_key) | |
| if active_task and not active_task.done(): | |
| log('info', f"发现相同请求的进行中任务", | |
| extra={'request_type': 'non-stream', 'model': request.model}) | |
| # 等待已有任务完成 | |
| try: | |
| # 设置超时,避免无限等待 | |
| await asyncio.wait_for(active_task, timeout=180) | |
| # 通过缓存管理器获取已完成任务的结果 | |
| cached_response, cache_hit = response_cache_manager.get(cache_key) | |
| if cache_hit: | |
| # 安全删除缓存 | |
| if cache_key in response_cache_manager.cache: | |
| del response_cache_manager.cache[cache_key] | |
| log('info', f"使用已完成任务的缓存后删除: {cache_key[:8]}...", | |
| extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) | |
| return cached_response | |
| # 如果缓存已被清除或不存在,使用任务结果 | |
| if active_task.done() and not active_task.cancelled(): | |
| result = active_task.result() | |
| if result: | |
| # 使用原始结果时,我们需要创建一个新的响应对象 | |
| # 避免使用可能已被其他请求修改的对象 | |
| new_response = ChatCompletionResponse( | |
| id=f"chatcmpl-{int(time.time()*1000)}", | |
| object="chat.completion", | |
| created=int(time.time()), | |
| model=result.model, | |
| choices=result.choices | |
| ) | |
| # 不要缓存此结果,因为它很可能是一个已存在但被使用后清除的缓存 | |
| return new_response | |
| except (asyncio.TimeoutError, asyncio.CancelledError) as e: | |
| # 任务超时或被取消的情况下,记录日志然后让代码继续执行 | |
| error_type = "超时" if isinstance(e, asyncio.TimeoutError) else "被取消" | |
| log('warning', f"等待已有任务{error_type}: {pool_key}", | |
| extra={'request_type': 'non-stream', 'model': request.model}) | |
| # 从活跃请求池移除该任务 | |
| if active_task.done() or active_task.cancelled(): | |
| active_requests_manager.remove(pool_key) | |
| log('info', f"已从活跃请求池移除{error_type}任务: {pool_key}", | |
| extra={'request_type': 'non-stream'}) | |
| # 创建请求处理任务 | |
| process_task = asyncio.create_task( | |
| process_request( | |
| request, | |
| http_request, | |
| "non-stream", | |
| key_manager, | |
| response_cache_manager, | |
| active_requests_manager, | |
| safety_settings, | |
| safety_settings_g2, | |
| api_call_stats, | |
| FAKE_STREAMING, | |
| FAKE_STREAMING_INTERVAL, | |
| MAX_REQUESTS_PER_MINUTE, | |
| MAX_REQUESTS_PER_DAY_PER_IP, | |
| cache_key, | |
| client_ip | |
| ) | |
| ) | |
| # 将任务添加到活跃请求池 | |
| active_requests_manager.add(pool_key, process_task) | |
| # 等待任务完成 | |
| try: | |
| response = await process_task | |
| return response | |
| except Exception as e: | |
| # 如果任务失败,从活跃请求池中移除 | |
| active_requests_manager.remove(pool_key) | |
| # 检查是否已有缓存的结果(可能是由另一个任务创建的) | |
| cached_response, cache_hit = response_cache_manager.get(cache_key) | |
| if cache_hit: | |
| log('info', f"任务失败但找到缓存,使用缓存结果: {cache_key[:8]}...", | |
| extra={'request_type': 'non-stream', 'model': request.model}) | |
| return cached_response | |
| # 重新抛出异常 | |
| raise |