hajimi / app /api /nonstream_handlers.py
clash-linux's picture
Upload 32 files
8fea9ab verified
import asyncio
from fastapi import HTTPException, status, Request
from app.models import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import cache_response, update_api_call_stats
from .logging_utils import log
from .client_disconnect import check_client_disconnect, handle_client_disconnect
from .gemini_handlers import run_gemini_completion
# 非流式请求处理函数
async def process_nonstream_request(
chat_request: ChatCompletionRequest,
http_request: Request,
request_type: str,
contents,
system_instruction,
current_api_key: str,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
cache_key: str = None,
client_ip: str = None
):
"""处理非流式API请求"""
gemini_client = GeminiClient(current_api_key)
# 创建任务
gemini_task = asyncio.create_task(
run_gemini_completion(
gemini_client,
chat_request,
contents,
system_instruction,
request_type,
current_api_key,
safety_settings,
safety_settings_g2
)
)
disconnect_task = asyncio.create_task(
check_client_disconnect(
http_request,
current_api_key,
request_type,
chat_request.model
)
)
try:
# 先等待看是否API任务先完成,或者客户端先断开连接
done, pending = await asyncio.wait(
[gemini_task, disconnect_task],
return_when=asyncio.FIRST_COMPLETED
)
if disconnect_task in done:
# 客户端已断开连接,但我们仍继续完成API请求以便缓存结果
return await handle_client_disconnect(
gemini_task,
chat_request,
request_type,
current_api_key,
response_cache_manager,
cache_key,
client_ip
)
else:
# API任务先完成,取消断开检测任务
disconnect_task.cancel()
# 获取响应内容
response_content = await gemini_task
# 检查缓存是否已经存在,如果存在则不再创建新缓存
cached_response, cache_hit = response_cache_manager.get(cache_key)
if cache_hit:
log('info', f"缓存已存在,直接返回: {cache_key[:8]}...",
extra={'cache_operation': 'use-existing', 'request_type': request_type})
# 安全删除缓存
if cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
return cached_response
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 缓存响应
cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key)
# 立即删除缓存,确保只能使用一次
if cache_key and cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存创建后立即删除: {cache_key[:8]}...",
extra={'cache_operation': 'store-and-remove', 'request_type': request_type})
# 返回响应
return response
except asyncio.CancelledError:
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
log('info', "请求取消", extra=extra_log)
# 在请求被取消时先检查缓存中是否已有结果
cached_response, cache_hit = response_cache_manager.get(cache_key)
if cache_hit:
log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...",
extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
# 安全删除缓存
if cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
return cached_response
# 尝试完成正在进行的API请求
if not gemini_task.done():
log('info', "请求取消但API请求尚未完成,继续等待...",
extra={'key': current_api_key[:8], 'request_type': request_type})
# 使用shield确保任务不会被取消
response_content = await asyncio.shield(gemini_task)
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 不缓存这个响应,直接返回
return response
else:
# 任务已完成,获取结果
response_content = gemini_task.result()
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 不缓存这个响应,直接返回
return response
except HTTPException as e:
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model,
'status_code': 408, 'error_message': '客户端连接中断'}
log('error', "客户端连接中断,终止后续重试", extra=extra_log)
raise
else:
raise