hajimi / app /api /request_handlers.py
clash-linux's picture
Upload 32 files
8fea9ab verified
import asyncio
import json
from typing import Literal
from fastapi import HTTPException, Request, status
from fastapi.responses import StreamingResponse
from app.models import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import protect_from_abuse, handle_gemini_error, handle_api_error
from .logging_utils import log
from .stream_handlers import process_stream_request
from .nonstream_handlers import process_nonstream_request
# 请求处理函数
async def process_request(
chat_request: ChatCompletionRequest,
http_request: Request,
request_type: Literal['stream', 'non-stream'],
key_manager,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
FAKE_STREAMING,
FAKE_STREAMING_INTERVAL,
MAX_REQUESTS_PER_MINUTE,
MAX_REQUESTS_PER_DAY_PER_IP,
cache_key: str = None,
client_ip: str = None
):
"""处理API请求的主函数,根据需要处理流式或非流式请求"""
global current_api_key
# 请求前基本检查
protect_from_abuse(
http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP)
if chat_request.model not in GeminiClient.AVAILABLE_MODELS:
error_msg = "无效的模型"
extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg}
log('error', error_msg, extra=extra_log)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
# 重置已尝试的密钥
key_manager.reset_tried_keys_for_request()
# 转换消息格式
contents, system_instruction = GeminiClient.convert_messages(
GeminiClient, chat_request.messages)
# 设置重试次数(使用可用API密钥数量作为最大重试次数)
retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1
# 尝试使用不同API密钥
for attempt in range(1, retry_attempts + 1):
# 获取下一个密钥
current_api_key = key_manager.get_available_key()
# 检查API密钥是否可用
if current_api_key is None:
log('warning', "没有可用的 API 密钥,跳过本次尝试",
extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'})
break
# 记录当前尝试的密钥信息
log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...",
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
# 服务器错误重试逻辑
server_error_retries = 3
for server_retry in range(1, server_error_retries + 1):
try:
# 根据请求类型分别处理
if chat_request.stream:
try:
return await process_stream_request(
chat_request,
http_request,
contents,
system_instruction,
current_api_key,
key_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
FAKE_STREAMING,
FAKE_STREAMING_INTERVAL
)
except Exception as e:
# 捕获流式请求的异常,但不立即返回错误
# 记录错误并继续尝试下一个API密钥
error_detail = handle_gemini_error(e, current_api_key, key_manager)
log('error', f"流式请求失败: {error_detail}",
extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
# 不返回错误,而是抛出异常让外层循环处理
raise
else:
return await process_nonstream_request(
chat_request,
http_request,
request_type,
contents,
system_instruction,
current_api_key,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
cache_key,
client_ip
)
except HTTPException as e:
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
log('error', "客户端连接中断",
extra={'key': current_api_key[:8], 'request_type': request_type,
'model': chat_request.model, 'status_code': 408})
raise
else:
raise
except Exception as e:
# 使用统一的API错误处理函数
error_result = await handle_api_error(
e,
current_api_key,
key_manager,
request_type,
chat_request.model,
server_retry - 1
)
# 如果需要删除缓存,清除缓存
if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache:
log('info', f"因API错误,删除缓存: {cache_key[:8]}...",
extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
del response_cache_manager.cache[cache_key]
if error_result.get('should_retry', False):
# 服务器错误需要重试(等待已在handle_api_error中完成)
continue
elif error_result.get('should_switch_key', False) and attempt < retry_attempts:
# 跳出服务器错误重试循环,获取下一个可用密钥
log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥",
extra={'key': current_api_key[:8], 'request_type': request_type})
break
else:
# 无法处理的错误或已达到重试上限
break
# 如果所有尝试都失败
msg = "所有API密钥均请求失败,请稍后重试"
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
# 对于流式请求,创建一个特殊的StreamingResponse返回错误
if chat_request.stream:
async def error_generator():
error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}})
yield f"data: {error_json}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(error_generator(), media_type="text/event-stream")
else:
# 非流式请求使用标准HTTP异常
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg)