Spaces:
Paused
Paused
| import asyncio | |
| import json | |
| from typing import Literal | |
| from fastapi import HTTPException, Request, status | |
| from fastapi.responses import StreamingResponse | |
| from app.models import ChatCompletionRequest | |
| from app.services import GeminiClient | |
| from app.utils import protect_from_abuse, handle_gemini_error, handle_api_error | |
| from .logging_utils import log | |
| from .stream_handlers import process_stream_request | |
| from .nonstream_handlers import process_nonstream_request | |
| # 请求处理函数 | |
| async def process_request( | |
| chat_request: ChatCompletionRequest, | |
| http_request: Request, | |
| request_type: Literal['stream', 'non-stream'], | |
| key_manager, | |
| response_cache_manager, | |
| active_requests_manager, | |
| safety_settings, | |
| safety_settings_g2, | |
| api_call_stats, | |
| FAKE_STREAMING, | |
| FAKE_STREAMING_INTERVAL, | |
| MAX_REQUESTS_PER_MINUTE, | |
| MAX_REQUESTS_PER_DAY_PER_IP, | |
| cache_key: str = None, | |
| client_ip: str = None | |
| ): | |
| """处理API请求的主函数,根据需要处理流式或非流式请求""" | |
| global current_api_key | |
| # 请求前基本检查 | |
| protect_from_abuse( | |
| http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP) | |
| if chat_request.model not in GeminiClient.AVAILABLE_MODELS: | |
| error_msg = "无效的模型" | |
| extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg} | |
| log('error', error_msg, extra=extra_log) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg) | |
| # 重置已尝试的密钥 | |
| key_manager.reset_tried_keys_for_request() | |
| # 转换消息格式 | |
| contents, system_instruction = GeminiClient.convert_messages( | |
| GeminiClient, chat_request.messages) | |
| # 设置重试次数(使用可用API密钥数量作为最大重试次数) | |
| retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1 | |
| # 尝试使用不同API密钥 | |
| for attempt in range(1, retry_attempts + 1): | |
| # 获取下一个密钥 | |
| current_api_key = key_manager.get_available_key() | |
| # 检查API密钥是否可用 | |
| if current_api_key is None: | |
| log('warning', "没有可用的 API 密钥,跳过本次尝试", | |
| extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'}) | |
| break | |
| # 记录当前尝试的密钥信息 | |
| log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...", | |
| extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) | |
| # 服务器错误重试逻辑 | |
| server_error_retries = 3 | |
| for server_retry in range(1, server_error_retries + 1): | |
| try: | |
| # 根据请求类型分别处理 | |
| if chat_request.stream: | |
| try: | |
| return await process_stream_request( | |
| chat_request, | |
| http_request, | |
| contents, | |
| system_instruction, | |
| current_api_key, | |
| key_manager, | |
| safety_settings, | |
| safety_settings_g2, | |
| api_call_stats, | |
| FAKE_STREAMING, | |
| FAKE_STREAMING_INTERVAL | |
| ) | |
| except Exception as e: | |
| # 捕获流式请求的异常,但不立即返回错误 | |
| # 记录错误并继续尝试下一个API密钥 | |
| error_detail = handle_gemini_error(e, current_api_key, key_manager) | |
| log('error', f"流式请求失败: {error_detail}", | |
| extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) | |
| # 不返回错误,而是抛出异常让外层循环处理 | |
| raise | |
| else: | |
| return await process_nonstream_request( | |
| chat_request, | |
| http_request, | |
| request_type, | |
| contents, | |
| system_instruction, | |
| current_api_key, | |
| response_cache_manager, | |
| active_requests_manager, | |
| safety_settings, | |
| safety_settings_g2, | |
| api_call_stats, | |
| cache_key, | |
| client_ip | |
| ) | |
| except HTTPException as e: | |
| if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: | |
| log('error', "客户端连接中断", | |
| extra={'key': current_api_key[:8], 'request_type': request_type, | |
| 'model': chat_request.model, 'status_code': 408}) | |
| raise | |
| else: | |
| raise | |
| except Exception as e: | |
| # 使用统一的API错误处理函数 | |
| error_result = await handle_api_error( | |
| e, | |
| current_api_key, | |
| key_manager, | |
| request_type, | |
| chat_request.model, | |
| server_retry - 1 | |
| ) | |
| # 如果需要删除缓存,清除缓存 | |
| if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache: | |
| log('info', f"因API错误,删除缓存: {cache_key[:8]}...", | |
| extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) | |
| del response_cache_manager.cache[cache_key] | |
| if error_result.get('should_retry', False): | |
| # 服务器错误需要重试(等待已在handle_api_error中完成) | |
| continue | |
| elif error_result.get('should_switch_key', False) and attempt < retry_attempts: | |
| # 跳出服务器错误重试循环,获取下一个可用密钥 | |
| log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥", | |
| extra={'key': current_api_key[:8], 'request_type': request_type}) | |
| break | |
| else: | |
| # 无法处理的错误或已达到重试上限 | |
| break | |
| # 如果所有尝试都失败 | |
| msg = "所有API密钥均请求失败,请稍后重试" | |
| log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'}) | |
| # 对于流式请求,创建一个特殊的StreamingResponse返回错误 | |
| if chat_request.stream: | |
| async def error_generator(): | |
| error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}}) | |
| yield f"data: {error_json}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(error_generator(), media_type="text/event-stream") | |
| else: | |
| # 非流式请求使用标准HTTP异常 | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg) |