import asyncio from fastapi import HTTPException, status, Request from app.models import ChatCompletionRequest from app.services import GeminiClient from app.utils import cache_response, update_api_call_stats from .logging_utils import log from .client_disconnect import check_client_disconnect, handle_client_disconnect from .gemini_handlers import run_gemini_completion # 非流式请求处理函数 async def process_nonstream_request( chat_request: ChatCompletionRequest, http_request: Request, request_type: str, contents, system_instruction, current_api_key: str, response_cache_manager, active_requests_manager, safety_settings, safety_settings_g2, api_call_stats, cache_key: str = None, client_ip: str = None ): """处理非流式API请求""" gemini_client = GeminiClient(current_api_key) # 创建任务 gemini_task = asyncio.create_task( run_gemini_completion( gemini_client, chat_request, contents, system_instruction, request_type, current_api_key, safety_settings, safety_settings_g2 ) ) disconnect_task = asyncio.create_task( check_client_disconnect( http_request, current_api_key, request_type, chat_request.model ) ) try: # 先等待看是否API任务先完成,或者客户端先断开连接 done, pending = await asyncio.wait( [gemini_task, disconnect_task], return_when=asyncio.FIRST_COMPLETED ) if disconnect_task in done: # 客户端已断开连接,但我们仍继续完成API请求以便缓存结果 return await handle_client_disconnect( gemini_task, chat_request, request_type, current_api_key, response_cache_manager, cache_key, client_ip ) else: # API任务先完成,取消断开检测任务 disconnect_task.cancel() # 获取响应内容 response_content = await gemini_task # 检查缓存是否已经存在,如果存在则不再创建新缓存 cached_response, cache_hit = response_cache_manager.get(cache_key) if cache_hit: log('info', f"缓存已存在,直接返回: {cache_key[:8]}...", extra={'cache_operation': 'use-existing', 'request_type': request_type}) # 安全删除缓存 if cache_key in response_cache_manager.cache: del response_cache_manager.cache[cache_key] log('info', f"缓存使用后已删除: {cache_key[:8]}...", extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) return cached_response # 创建响应 from app.utils.response import create_response response = create_response(chat_request, response_content) # 缓存响应 cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key) # 立即删除缓存,确保只能使用一次 if cache_key and cache_key in response_cache_manager.cache: del response_cache_manager.cache[cache_key] log('info', f"缓存创建后立即删除: {cache_key[:8]}...", extra={'cache_operation': 'store-and-remove', 'request_type': request_type}) # 返回响应 return response except asyncio.CancelledError: extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"} log('info', "请求取消", extra=extra_log) # 在请求被取消时先检查缓存中是否已有结果 cached_response, cache_hit = response_cache_manager.get(cache_key) if cache_hit: log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...", extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type}) # 安全删除缓存 if cache_key in response_cache_manager.cache: del response_cache_manager.cache[cache_key] log('info', f"缓存使用后已删除: {cache_key[:8]}...", extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) return cached_response # 尝试完成正在进行的API请求 if not gemini_task.done(): log('info', "请求取消但API请求尚未完成,继续等待...", extra={'key': current_api_key[:8], 'request_type': request_type}) # 使用shield确保任务不会被取消 response_content = await asyncio.shield(gemini_task) # 创建响应 from app.utils.response import create_response response = create_response(chat_request, response_content) # 不缓存这个响应,直接返回 return response else: # 任务已完成,获取结果 response_content = gemini_task.result() # 创建响应 from app.utils.response import create_response response = create_response(chat_request, response_content) # 不缓存这个响应,直接返回 return response except HTTPException as e: if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'status_code': 408, 'error_message': '客户端连接中断'} log('error', "客户端连接中断,终止后续重试", extra=extra_log) raise else: raise