Spaces:
Paused
Paused
File size: 6,450 Bytes
8fea9ab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import asyncio
from fastapi import HTTPException, status, Request
from app.models import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import cache_response, update_api_call_stats
from .logging_utils import log
from .client_disconnect import check_client_disconnect, handle_client_disconnect
from .gemini_handlers import run_gemini_completion
# 非流式请求处理函数
async def process_nonstream_request(
chat_request: ChatCompletionRequest,
http_request: Request,
request_type: str,
contents,
system_instruction,
current_api_key: str,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
cache_key: str = None,
client_ip: str = None
):
"""处理非流式API请求"""
gemini_client = GeminiClient(current_api_key)
# 创建任务
gemini_task = asyncio.create_task(
run_gemini_completion(
gemini_client,
chat_request,
contents,
system_instruction,
request_type,
current_api_key,
safety_settings,
safety_settings_g2
)
)
disconnect_task = asyncio.create_task(
check_client_disconnect(
http_request,
current_api_key,
request_type,
chat_request.model
)
)
try:
# 先等待看是否API任务先完成,或者客户端先断开连接
done, pending = await asyncio.wait(
[gemini_task, disconnect_task],
return_when=asyncio.FIRST_COMPLETED
)
if disconnect_task in done:
# 客户端已断开连接,但我们仍继续完成API请求以便缓存结果
return await handle_client_disconnect(
gemini_task,
chat_request,
request_type,
current_api_key,
response_cache_manager,
cache_key,
client_ip
)
else:
# API任务先完成,取消断开检测任务
disconnect_task.cancel()
# 获取响应内容
response_content = await gemini_task
# 检查缓存是否已经存在,如果存在则不再创建新缓存
cached_response, cache_hit = response_cache_manager.get(cache_key)
if cache_hit:
log('info', f"缓存已存在,直接返回: {cache_key[:8]}...",
extra={'cache_operation': 'use-existing', 'request_type': request_type})
# 安全删除缓存
if cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
return cached_response
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 缓存响应
cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key)
# 立即删除缓存,确保只能使用一次
if cache_key and cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存创建后立即删除: {cache_key[:8]}...",
extra={'cache_operation': 'store-and-remove', 'request_type': request_type})
# 返回响应
return response
except asyncio.CancelledError:
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
log('info', "请求取消", extra=extra_log)
# 在请求被取消时先检查缓存中是否已有结果
cached_response, cache_hit = response_cache_manager.get(cache_key)
if cache_hit:
log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...",
extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
# 安全删除缓存
if cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
return cached_response
# 尝试完成正在进行的API请求
if not gemini_task.done():
log('info', "请求取消但API请求尚未完成,继续等待...",
extra={'key': current_api_key[:8], 'request_type': request_type})
# 使用shield确保任务不会被取消
response_content = await asyncio.shield(gemini_task)
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 不缓存这个响应,直接返回
return response
else:
# 任务已完成,获取结果
response_content = gemini_task.result()
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 不缓存这个响应,直接返回
return response
except HTTPException as e:
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model,
'status_code': 408, 'error_message': '客户端连接中断'}
log('error', "客户端连接中断,终止后续重试", extra=extra_log)
raise
else:
raise |