|
|
import asyncio |
|
|
import json |
|
|
from typing import Literal |
|
|
from fastapi import HTTPException, Request, status |
|
|
from fastapi.responses import StreamingResponse |
|
|
from app.models import ChatCompletionRequest |
|
|
from app.services import GeminiClient |
|
|
from app.utils import protect_from_abuse, handle_gemini_error, handle_api_error |
|
|
from app.utils.logging import log |
|
|
from .stream_handlers import process_stream_request |
|
|
from .nonstream_handlers import process_nonstream_request |
|
|
|
|
|
|
|
|
async def process_request( |
|
|
chat_request: ChatCompletionRequest, |
|
|
http_request: Request, |
|
|
request_type: Literal['stream', 'non-stream'], |
|
|
key_manager, |
|
|
response_cache_manager, |
|
|
active_requests_manager, |
|
|
safety_settings, |
|
|
safety_settings_g2, |
|
|
api_call_stats, |
|
|
FAKE_STREAMING, |
|
|
FAKE_STREAMING_INTERVAL, |
|
|
MAX_REQUESTS_PER_MINUTE, |
|
|
MAX_REQUESTS_PER_DAY_PER_IP, |
|
|
cache_key: str = None, |
|
|
client_ip: str = None |
|
|
): |
|
|
"""处理API请求的主函数,根据需要处理流式或非流式请求""" |
|
|
global current_api_key |
|
|
|
|
|
|
|
|
protect_from_abuse( |
|
|
http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP) |
|
|
if chat_request.model not in GeminiClient.AVAILABLE_MODELS: |
|
|
log('error', "无效的模型", |
|
|
extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 400}) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="无效的模型") |
|
|
|
|
|
|
|
|
key_manager.reset_tried_keys_for_request() |
|
|
|
|
|
contents, system_instruction = GeminiClient.convert_messages( |
|
|
GeminiClient, chat_request.messages,model=chat_request.model) |
|
|
|
|
|
|
|
|
retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1 |
|
|
|
|
|
|
|
|
for attempt in range(1, retry_attempts + 1): |
|
|
|
|
|
current_api_key = key_manager.get_available_key() |
|
|
|
|
|
|
|
|
if current_api_key is None: |
|
|
log('warning', "没有可用的 API 密钥,跳过本次尝试", |
|
|
extra={'request_type': request_type, 'model': chat_request.model}) |
|
|
break |
|
|
|
|
|
|
|
|
log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
server_error_retries = 3 |
|
|
for server_retry in range(1, server_error_retries + 1): |
|
|
try: |
|
|
|
|
|
if chat_request.stream: |
|
|
try: |
|
|
return await process_stream_request( |
|
|
chat_request, |
|
|
http_request, |
|
|
contents, |
|
|
system_instruction, |
|
|
current_api_key, |
|
|
key_manager, |
|
|
safety_settings, |
|
|
safety_settings_g2, |
|
|
api_call_stats, |
|
|
FAKE_STREAMING, |
|
|
FAKE_STREAMING_INTERVAL |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
error_detail = handle_gemini_error(e, current_api_key, key_manager) |
|
|
log('info', f"流式请求失败: {error_detail}", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
|
|
raise |
|
|
else: |
|
|
return await process_nonstream_request( |
|
|
chat_request, |
|
|
http_request, |
|
|
request_type, |
|
|
contents, |
|
|
system_instruction, |
|
|
current_api_key, |
|
|
response_cache_manager, |
|
|
active_requests_manager, |
|
|
safety_settings, |
|
|
safety_settings_g2, |
|
|
api_call_stats, |
|
|
cache_key, |
|
|
client_ip |
|
|
) |
|
|
except HTTPException as e: |
|
|
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: |
|
|
log('info', "客户端连接中断", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, |
|
|
'model': chat_request.model, 'status_code': 408}) |
|
|
raise |
|
|
else: |
|
|
raise |
|
|
except Exception as e: |
|
|
|
|
|
error_result = await handle_api_error( |
|
|
e, |
|
|
current_api_key, |
|
|
key_manager, |
|
|
request_type, |
|
|
chat_request.model, |
|
|
server_retry - 1 |
|
|
) |
|
|
|
|
|
|
|
|
if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache: |
|
|
log('info', f"因API错误,删除缓存: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
else: |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
msg = "所有API密钥均请求失败,请稍后重试" |
|
|
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'request_type': 'switch_key'}) |
|
|
|
|
|
|
|
|
if chat_request.stream: |
|
|
async def error_generator(): |
|
|
error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}}) |
|
|
yield f"data: {error_json}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
return StreamingResponse(error_generator(), media_type="text/event-stream") |
|
|
else: |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg) |