|
|
from fastapi import APIRouter, HTTPException, Request, Depends, status
|
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
|
from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList
|
|
|
from app.services import GeminiClient, ResponseWrapper
|
|
|
from app.utils import (
|
|
|
handle_gemini_error,
|
|
|
protect_from_abuse,
|
|
|
APIKeyManager,
|
|
|
test_api_key,
|
|
|
format_log_message,
|
|
|
log_manager,
|
|
|
generate_cache_key,
|
|
|
cache_response,
|
|
|
create_chat_response,
|
|
|
create_error_response,
|
|
|
handle_api_error,
|
|
|
update_api_call_stats
|
|
|
)
|
|
|
import json
|
|
|
import asyncio
|
|
|
import time
|
|
|
import logging
|
|
|
import random
|
|
|
from typing import Literal
|
|
|
from app.config.settings import (
|
|
|
api_call_stats,
|
|
|
BLOCKED_MODELS
|
|
|
)
|
|
|
|
|
|
logger = logging.getLogger("my_logger")
|
|
|
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
|
key_manager = None
|
|
|
response_cache_manager = None
|
|
|
active_requests_manager = None
|
|
|
safety_settings = None
|
|
|
safety_settings_g2 = None
|
|
|
current_api_key = None
|
|
|
FAKE_STREAMING = None
|
|
|
FAKE_STREAMING_INTERVAL = None
|
|
|
PASSWORD = None
|
|
|
MAX_REQUESTS_PER_MINUTE = None
|
|
|
MAX_REQUESTS_PER_DAY_PER_IP = None
|
|
|
|
|
|
|
|
|
def init_router(
|
|
|
_key_manager,
|
|
|
_response_cache_manager,
|
|
|
_active_requests_manager,
|
|
|
_safety_settings,
|
|
|
_safety_settings_g2,
|
|
|
_current_api_key,
|
|
|
_fake_streaming,
|
|
|
_fake_streaming_interval,
|
|
|
_password,
|
|
|
_max_requests_per_minute,
|
|
|
_max_requests_per_day_per_ip
|
|
|
):
|
|
|
global key_manager, response_cache_manager, active_requests_manager
|
|
|
global safety_settings, safety_settings_g2, current_api_key
|
|
|
global FAKE_STREAMING, FAKE_STREAMING_INTERVAL
|
|
|
global PASSWORD, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP
|
|
|
|
|
|
key_manager = _key_manager
|
|
|
response_cache_manager = _response_cache_manager
|
|
|
active_requests_manager = _active_requests_manager
|
|
|
safety_settings = _safety_settings
|
|
|
safety_settings_g2 = _safety_settings_g2
|
|
|
current_api_key = _current_api_key
|
|
|
FAKE_STREAMING = _fake_streaming
|
|
|
FAKE_STREAMING_INTERVAL = _fake_streaming_interval
|
|
|
PASSWORD = _password
|
|
|
MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute
|
|
|
MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip
|
|
|
|
|
|
|
|
|
def log(level: str, message: str, **extra):
|
|
|
"""简化日志记录的统一函数"""
|
|
|
msg = format_log_message(level.upper(), message, extra=extra)
|
|
|
getattr(logger, level.lower())(msg)
|
|
|
|
|
|
|
|
|
async def verify_password(request: Request):
|
|
|
if PASSWORD:
|
|
|
auth_header = request.headers.get("Authorization")
|
|
|
if not auth_header or not auth_header.startswith("Bearer "):
|
|
|
raise HTTPException(
|
|
|
status_code=401, detail="Unauthorized: Missing or invalid token")
|
|
|
token = auth_header.split(" ")[1]
|
|
|
if token != PASSWORD:
|
|
|
raise HTTPException(
|
|
|
status_code=401, detail="Unauthorized: Invalid token")
|
|
|
|
|
|
|
|
|
@router.get("/v1/models", response_model=ModelList)
|
|
|
def list_models():
|
|
|
log('info', "Received request to list models", extra={'request_type': 'list_models', 'status_code': 200})
|
|
|
filtered_models = [model for model in GeminiClient.AVAILABLE_MODELS if model not in BLOCKED_MODELS]
|
|
|
return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in filtered_models])
|
|
|
|
|
|
@router.post("/v1/chat/completions", response_model=ChatCompletionResponse)
|
|
|
async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(verify_password)):
|
|
|
|
|
|
client_ip = http_request.client.host if http_request.client else "unknown"
|
|
|
|
|
|
|
|
|
if request.stream:
|
|
|
return await process_request(request, http_request, "stream")
|
|
|
|
|
|
|
|
|
cache_key = generate_cache_key(request)
|
|
|
|
|
|
|
|
|
log('info', f"请求缓存键: {cache_key[:8]}...",
|
|
|
extra={'cache_key': cache_key[:8], 'request_type': 'non-stream'})
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
|
|
|
log('info', f"精确缓存命中: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'hit', 'request_type': 'non-stream'})
|
|
|
|
|
|
|
|
|
active_requests_manager.remove_by_prefix(f"cache:{cache_key}")
|
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'})
|
|
|
|
|
|
|
|
|
return cached_response
|
|
|
|
|
|
|
|
|
pool_key = f"cache:{cache_key}"
|
|
|
|
|
|
|
|
|
active_task = active_requests_manager.get(pool_key)
|
|
|
if active_task and not active_task.done():
|
|
|
log('info', f"发现相同请求的进行中任务",
|
|
|
extra={'request_type': 'non-stream', 'model': request.model})
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
await asyncio.wait_for(active_task, timeout=180)
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
|
|
|
if cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
log('info', f"使用已完成任务的缓存后删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'})
|
|
|
|
|
|
return cached_response
|
|
|
|
|
|
|
|
|
if active_task.done() and not active_task.cancelled():
|
|
|
result = active_task.result()
|
|
|
if result:
|
|
|
|
|
|
|
|
|
new_response = ChatCompletionResponse(
|
|
|
id=f"chatcmpl-{int(time.time()*1000)}",
|
|
|
object="chat.completion",
|
|
|
created=int(time.time()),
|
|
|
model=result.model,
|
|
|
choices=result.choices
|
|
|
)
|
|
|
|
|
|
|
|
|
return new_response
|
|
|
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
|
|
|
|
|
error_type = "超时" if isinstance(e, asyncio.TimeoutError) else "被取消"
|
|
|
log('warning', f"等待已有任务{error_type}: {pool_key}",
|
|
|
extra={'request_type': 'non-stream', 'model': request.model})
|
|
|
|
|
|
|
|
|
if active_task.done() or active_task.cancelled():
|
|
|
active_requests_manager.remove(pool_key)
|
|
|
log('info', f"已从活跃请求池移除{error_type}任务: {pool_key}",
|
|
|
extra={'request_type': 'non-stream'})
|
|
|
|
|
|
|
|
|
process_task = asyncio.create_task(
|
|
|
process_request(request, http_request, "non-stream", cache_key=cache_key, client_ip=client_ip)
|
|
|
)
|
|
|
|
|
|
|
|
|
active_requests_manager.add(pool_key, process_task)
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = await process_task
|
|
|
return response
|
|
|
except Exception as e:
|
|
|
|
|
|
active_requests_manager.remove(pool_key)
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
log('info', f"任务失败但找到缓存,使用缓存结果: {cache_key[:8]}...",
|
|
|
extra={'request_type': 'non-stream', 'model': request.model})
|
|
|
return cached_response
|
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
async def process_request(chat_request: ChatCompletionRequest, http_request: Request, request_type: Literal['stream', 'non-stream'], cache_key: str = None, client_ip: str = None):
|
|
|
"""处理API请求的主函数,根据需要处理流式或非流式请求"""
|
|
|
global current_api_key
|
|
|
|
|
|
|
|
|
protect_from_abuse(
|
|
|
http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP)
|
|
|
if chat_request.model not in GeminiClient.AVAILABLE_MODELS:
|
|
|
error_msg = "无效的模型"
|
|
|
extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg}
|
|
|
log('error', error_msg, extra=extra_log)
|
|
|
raise HTTPException(
|
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
|
|
|
|
|
|
|
|
|
key_manager.reset_tried_keys_for_request()
|
|
|
|
|
|
|
|
|
contents, system_instruction = GeminiClient.convert_messages(
|
|
|
GeminiClient, chat_request.messages)
|
|
|
|
|
|
|
|
|
retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1
|
|
|
|
|
|
|
|
|
for attempt in range(1, retry_attempts + 1):
|
|
|
|
|
|
current_api_key = key_manager.get_available_key()
|
|
|
|
|
|
|
|
|
if current_api_key is None:
|
|
|
log('warning', "没有可用的 API 密钥,跳过本次尝试",
|
|
|
extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'})
|
|
|
break
|
|
|
|
|
|
|
|
|
log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
server_error_retries = 3
|
|
|
for server_retry in range(1, server_error_retries + 1):
|
|
|
try:
|
|
|
|
|
|
if chat_request.stream:
|
|
|
try:
|
|
|
return await process_stream_request(
|
|
|
chat_request,
|
|
|
http_request,
|
|
|
contents,
|
|
|
system_instruction,
|
|
|
current_api_key
|
|
|
)
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
|
error_detail = handle_gemini_error(e, current_api_key, key_manager)
|
|
|
log('error', f"流式请求失败: {error_detail}",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
|
|
|
|
|
|
raise
|
|
|
else:
|
|
|
return await process_nonstream_request(
|
|
|
chat_request,
|
|
|
http_request,
|
|
|
request_type,
|
|
|
contents,
|
|
|
system_instruction,
|
|
|
current_api_key,
|
|
|
cache_key,
|
|
|
client_ip
|
|
|
)
|
|
|
except HTTPException as e:
|
|
|
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
|
|
|
log('error', "客户端连接中断",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type,
|
|
|
'model': chat_request.model, 'status_code': 408})
|
|
|
raise
|
|
|
else:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
|
|
|
error_result = await handle_api_error(
|
|
|
e,
|
|
|
current_api_key,
|
|
|
key_manager,
|
|
|
request_type,
|
|
|
chat_request.model,
|
|
|
server_retry - 1
|
|
|
)
|
|
|
|
|
|
|
|
|
if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache:
|
|
|
log('info', f"因API错误,删除缓存: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
|
|
|
if error_result.get('should_retry', False):
|
|
|
|
|
|
continue
|
|
|
elif error_result.get('should_switch_key', False) and attempt < retry_attempts:
|
|
|
|
|
|
log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type})
|
|
|
break
|
|
|
else:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
msg = "所有API密钥均请求失败,请稍后重试"
|
|
|
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
|
|
|
|
|
|
|
|
|
if chat_request.stream:
|
|
|
async def error_generator():
|
|
|
error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}})
|
|
|
yield f"data: {error_json}\n\n"
|
|
|
yield "data: [DONE]\n\n"
|
|
|
|
|
|
return StreamingResponse(error_generator(), media_type="text/event-stream")
|
|
|
else:
|
|
|
|
|
|
raise HTTPException(
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg)
|
|
|
|
|
|
|
|
|
async def process_stream_request(
|
|
|
chat_request: ChatCompletionRequest,
|
|
|
http_request: Request,
|
|
|
contents,
|
|
|
system_instruction,
|
|
|
current_api_key: str
|
|
|
) -> StreamingResponse:
|
|
|
"""处理流式API请求"""
|
|
|
|
|
|
|
|
|
async def stream_response_generator():
|
|
|
|
|
|
if FAKE_STREAMING:
|
|
|
|
|
|
queue = asyncio.Queue()
|
|
|
keep_alive_task = None
|
|
|
api_request_task = None
|
|
|
|
|
|
try:
|
|
|
|
|
|
async def keep_alive_sender():
|
|
|
try:
|
|
|
|
|
|
keep_alive_client = GeminiClient(current_api_key)
|
|
|
|
|
|
|
|
|
keep_alive_generator = keep_alive_client.stream_chat(
|
|
|
chat_request,
|
|
|
contents,
|
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
|
|
|
system_instruction
|
|
|
)
|
|
|
|
|
|
|
|
|
async for line in keep_alive_generator:
|
|
|
if line == "\n":
|
|
|
|
|
|
formatted_chunk = {
|
|
|
"id": "chatcmpl-keepalive",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]
|
|
|
}
|
|
|
|
|
|
await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n")
|
|
|
except asyncio.CancelledError:
|
|
|
log('info', "保持连接任务被取消",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
log('error', f"保持连接任务出错: {str(e)}",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
|
|
|
|
|
|
await queue.put(None)
|
|
|
raise
|
|
|
|
|
|
|
|
|
async def api_request_handler():
|
|
|
success = False
|
|
|
try:
|
|
|
|
|
|
key_manager.reset_tried_keys_for_request()
|
|
|
|
|
|
|
|
|
available_keys = key_manager.api_keys.copy()
|
|
|
random.shuffle(available_keys)
|
|
|
|
|
|
|
|
|
for attempt, api_key in enumerate(available_keys, 1):
|
|
|
try:
|
|
|
log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})",
|
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
non_stream_client = GeminiClient(api_key)
|
|
|
|
|
|
|
|
|
response_content = await asyncio.to_thread(
|
|
|
non_stream_client.complete_chat,
|
|
|
chat_request,
|
|
|
contents,
|
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
|
|
|
system_instruction
|
|
|
)
|
|
|
|
|
|
|
|
|
if response_content and response_content.text:
|
|
|
log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应",
|
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
full_text = response_content.text
|
|
|
chunk_size = max(len(full_text) // 10, 1)
|
|
|
|
|
|
for i in range(0, len(full_text), chunk_size):
|
|
|
chunk = full_text[i:i+chunk_size]
|
|
|
formatted_chunk = {
|
|
|
"id": "chatcmpl-someid",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}]
|
|
|
}
|
|
|
|
|
|
await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n")
|
|
|
|
|
|
success = True
|
|
|
|
|
|
from app.utils.stats import update_api_call_stats
|
|
|
update_api_call_stats(api_call_stats,api_key)
|
|
|
break
|
|
|
else:
|
|
|
log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应",
|
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
|
|
|
except Exception as e:
|
|
|
error_detail = handle_gemini_error(e, api_key, key_manager)
|
|
|
log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
|
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
|
|
|
if not success:
|
|
|
error_msg = "所有API密钥均请求失败,请稍后重试"
|
|
|
log('error', error_msg,
|
|
|
extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
error_json = {
|
|
|
"id": "chatcmpl-error",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}]
|
|
|
}
|
|
|
await queue.put(f"data: {json.dumps(error_json)}\n\n")
|
|
|
|
|
|
|
|
|
await queue.put("data: [DONE]\n\n")
|
|
|
|
|
|
await queue.put(None)
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
log('info', "API请求任务被取消",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
|
|
|
|
|
|
await queue.put(None)
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
log('error', f"API请求任务出错: {str(e)}",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
|
|
|
|
|
|
error_json = {
|
|
|
"id": "chatcmpl-error",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}]
|
|
|
}
|
|
|
await queue.put(f"data: {json.dumps(error_json)}\n\n")
|
|
|
await queue.put("data: [DONE]\n\n")
|
|
|
|
|
|
await queue.put(None)
|
|
|
raise
|
|
|
|
|
|
|
|
|
keep_alive_task = asyncio.create_task(keep_alive_sender())
|
|
|
|
|
|
api_request_task = asyncio.create_task(api_request_handler())
|
|
|
|
|
|
|
|
|
while True:
|
|
|
chunk = await queue.get()
|
|
|
if chunk is None:
|
|
|
break
|
|
|
yield chunk
|
|
|
|
|
|
|
|
|
if api_request_task.done() and not keep_alive_task.done():
|
|
|
keep_alive_task.cancel()
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
log('info', "流式响应生成器被取消",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
|
|
|
|
|
|
if keep_alive_task and not keep_alive_task.done():
|
|
|
keep_alive_task.cancel()
|
|
|
if api_request_task and not api_request_task.done():
|
|
|
api_request_task.cancel()
|
|
|
except Exception as e:
|
|
|
log('error', f"流式响应生成器出错: {str(e)}",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'})
|
|
|
|
|
|
if keep_alive_task and not keep_alive_task.done():
|
|
|
keep_alive_task.cancel()
|
|
|
if api_request_task and not api_request_task.done():
|
|
|
api_request_task.cancel()
|
|
|
|
|
|
error_json = {
|
|
|
"id": "chatcmpl-error",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}]
|
|
|
}
|
|
|
yield f"data: {json.dumps(error_json)}\n\n"
|
|
|
yield "data: [DONE]\n\n"
|
|
|
finally:
|
|
|
|
|
|
if keep_alive_task and not keep_alive_task.done():
|
|
|
keep_alive_task.cancel()
|
|
|
if api_request_task and not api_request_task.done():
|
|
|
api_request_task.cancel()
|
|
|
else:
|
|
|
|
|
|
gemini_client = GeminiClient(current_api_key)
|
|
|
success = False
|
|
|
|
|
|
try:
|
|
|
|
|
|
async for chunk in gemini_client.stream_chat(
|
|
|
chat_request,
|
|
|
contents,
|
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
|
|
|
system_instruction
|
|
|
):
|
|
|
|
|
|
if not chunk:
|
|
|
continue
|
|
|
|
|
|
formatted_chunk = {
|
|
|
"id": "chatcmpl-someid",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}]
|
|
|
}
|
|
|
success = True
|
|
|
yield f"data: {json.dumps(formatted_chunk)}\n\n"
|
|
|
|
|
|
|
|
|
if success:
|
|
|
from app.utils.stats import update_api_call_stats
|
|
|
update_api_call_stats(api_call_stats, current_api_key)
|
|
|
|
|
|
yield "data: [DONE]\n\n"
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
extra_log_cancel = {'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model, 'error_message': '客户端已断开连接'}
|
|
|
log('info', "客户端连接已中断", extra=extra_log_cancel)
|
|
|
except Exception as e:
|
|
|
error_detail = handle_gemini_error(e, current_api_key, key_manager)
|
|
|
log('error', f"流式请求失败: {error_detail}",
|
|
|
extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
|
|
|
|
|
|
error_json = {
|
|
|
"id": "chatcmpl-error",
|
|
|
"object": "chat.completion.chunk",
|
|
|
"created": int(time.time()),
|
|
|
"model": chat_request.model,
|
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}]
|
|
|
}
|
|
|
yield f"data: {json.dumps(error_json)}\n\n"
|
|
|
yield "data: [DONE]\n\n"
|
|
|
|
|
|
raise e
|
|
|
|
|
|
return StreamingResponse(stream_response_generator(), media_type="text/event-stream")
|
|
|
|
|
|
|
|
|
async def run_gemini_completion(
|
|
|
gemini_client,
|
|
|
chat_request: ChatCompletionRequest,
|
|
|
contents,
|
|
|
system_instruction,
|
|
|
request_type: str,
|
|
|
current_api_key: str
|
|
|
):
|
|
|
"""运行Gemini非流式请求"""
|
|
|
|
|
|
run_fn = run_gemini_completion
|
|
|
|
|
|
try:
|
|
|
|
|
|
response_future = asyncio.create_task(
|
|
|
asyncio.to_thread(
|
|
|
gemini_client.complete_chat,
|
|
|
chat_request,
|
|
|
contents,
|
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings,
|
|
|
system_instruction
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
response_content = await asyncio.shield(response_future)
|
|
|
|
|
|
|
|
|
if not hasattr(run_fn, 'logged_complete'):
|
|
|
log('info', "非流式请求成功完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
|
|
|
run_fn.logged_complete = True
|
|
|
return response_content
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
|
if 'response_future' in locals() and not response_future.done():
|
|
|
try:
|
|
|
|
|
|
response_content = await asyncio.shield(response_future)
|
|
|
log('info', "API请求在客户端断开后完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
|
|
|
return response_content
|
|
|
except Exception as e:
|
|
|
extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': f'API请求在客户端断开后失败: {str(e)}'}
|
|
|
log('info', "API调用因客户端断开而失败", extra=extra_log_gemini_cancel)
|
|
|
raise
|
|
|
|
|
|
|
|
|
extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': '客户端断开导致API调用取消'}
|
|
|
log('info', "API调用因客户端断开而取消", extra=extra_log_gemini_cancel)
|
|
|
raise
|
|
|
|
|
|
|
|
|
async def check_client_disconnect(http_request: Request, current_api_key: str, request_type: str, model: str):
|
|
|
"""检查客户端是否断开连接"""
|
|
|
while True:
|
|
|
if await http_request.is_disconnected():
|
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': model, 'error_message': '检测到客户端断开连接'}
|
|
|
log('info', "客户端连接已中断,等待API请求完成", extra=extra_log)
|
|
|
return True
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
|
|
|
async def handle_client_disconnect(
|
|
|
gemini_task: asyncio.Task,
|
|
|
chat_request: ChatCompletionRequest,
|
|
|
request_type: str,
|
|
|
current_api_key: str,
|
|
|
cache_key: str = None,
|
|
|
client_ip: str = None
|
|
|
):
|
|
|
try:
|
|
|
|
|
|
response_content = await asyncio.shield(gemini_task)
|
|
|
|
|
|
|
|
|
if response_content is None or response_content.text == "":
|
|
|
if response_content is None:
|
|
|
log('info', "客户端断开后API任务返回None",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
|
|
|
else:
|
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'status_code': 204}
|
|
|
log('info', "客户端断开后Gemini API 返回空响应", extra=extra_log)
|
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache:
|
|
|
log('info', f"因空响应,删除缓存: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'remove-on-empty', 'request_type': request_type})
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
|
|
|
|
|
|
return create_error_response(chat_request.model, "AI未返回任何内容,请重试")
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
log('info', f"客户端断开但找到已存在缓存,将删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'disconnect-found-cache', 'request_type': request_type})
|
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from app.utils.response import create_response
|
|
|
response = create_response(chat_request, response_content)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return response
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
|
log('info', "客户端断开后任务被取消,但我们仍会尝试完成",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
if gemini_task.done() and not gemini_task.cancelled():
|
|
|
try:
|
|
|
response_content = gemini_task.result()
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
log('info', f"任务被取消但找到已存在缓存,将删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'cancel-found-cache', 'request_type': request_type})
|
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
|
|
|
|
|
|
from app.utils.response import create_response
|
|
|
response = create_response(chat_request, response_content)
|
|
|
return response
|
|
|
except Exception as inner_e:
|
|
|
log('error', f"客户端断开后从已完成任务获取结果失败: {str(inner_e)}",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
|
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache:
|
|
|
log('info', f"因任务获取结果失败,删除缓存: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
|
|
|
|
|
|
return create_error_response(chat_request.model, "请求处理过程中发生错误,请重试")
|
|
|
except Exception as e:
|
|
|
|
|
|
error_msg = str(e)
|
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': error_msg}
|
|
|
log('error', f"客户端断开后处理API响应时出错: {error_msg}", extra=extra_log)
|
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache:
|
|
|
log('info', f"因API响应错误,删除缓存: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
|
|
|
|
|
|
return create_error_response(chat_request.model, f"请求处理错误: {error_msg}")
|
|
|
|
|
|
|
|
|
async def process_nonstream_request(
|
|
|
chat_request: ChatCompletionRequest,
|
|
|
http_request: Request,
|
|
|
request_type: str,
|
|
|
contents,
|
|
|
system_instruction,
|
|
|
current_api_key: str,
|
|
|
cache_key: str = None,
|
|
|
client_ip: str = None
|
|
|
):
|
|
|
"""处理非流式API请求"""
|
|
|
gemini_client = GeminiClient(current_api_key)
|
|
|
|
|
|
|
|
|
gemini_task = asyncio.create_task(
|
|
|
run_gemini_completion(
|
|
|
gemini_client,
|
|
|
chat_request,
|
|
|
contents,
|
|
|
system_instruction,
|
|
|
request_type,
|
|
|
current_api_key
|
|
|
)
|
|
|
)
|
|
|
|
|
|
disconnect_task = asyncio.create_task(
|
|
|
check_client_disconnect(
|
|
|
http_request,
|
|
|
current_api_key,
|
|
|
request_type,
|
|
|
chat_request.model
|
|
|
)
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
|
|
|
done, pending = await asyncio.wait(
|
|
|
[gemini_task, disconnect_task],
|
|
|
return_when=asyncio.FIRST_COMPLETED
|
|
|
)
|
|
|
|
|
|
if disconnect_task in done:
|
|
|
|
|
|
return await handle_client_disconnect(
|
|
|
gemini_task,
|
|
|
chat_request,
|
|
|
request_type,
|
|
|
current_api_key,
|
|
|
cache_key,
|
|
|
client_ip
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
disconnect_task.cancel()
|
|
|
|
|
|
|
|
|
response_content = await gemini_task
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
log('info', f"缓存已存在,直接返回: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'use-existing', 'request_type': request_type})
|
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
|
|
|
|
|
|
return cached_response
|
|
|
|
|
|
|
|
|
from app.utils.response import create_response
|
|
|
response = create_response(chat_request, response_content)
|
|
|
|
|
|
|
|
|
cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key)
|
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
log('info', f"缓存创建后立即删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'store-and-remove', 'request_type': request_type})
|
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
|
|
|
log('info', "请求取消", extra=extra_log)
|
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key)
|
|
|
if cache_hit:
|
|
|
log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
|
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache:
|
|
|
del response_cache_manager.cache[cache_key]
|
|
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
|
|
|
|
|
|
return cached_response
|
|
|
|
|
|
|
|
|
if not gemini_task.done():
|
|
|
log('info', "请求取消但API请求尚未完成,继续等待...",
|
|
|
extra={'key': current_api_key[:8], 'request_type': request_type})
|
|
|
|
|
|
|
|
|
response_content = await asyncio.shield(gemini_task)
|
|
|
|
|
|
|
|
|
from app.utils.response import create_response
|
|
|
response = create_response(chat_request, response_content)
|
|
|
|
|
|
|
|
|
return response
|
|
|
else:
|
|
|
|
|
|
response_content = gemini_task.result()
|
|
|
|
|
|
|
|
|
from app.utils.response import create_response
|
|
|
response = create_response(chat_request, response_content)
|
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
except HTTPException as e:
|
|
|
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
|
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model,
|
|
|
'status_code': 408, 'error_message': '客户端连接中断'}
|
|
|
log('error', "客户端连接中断,终止后续重试", extra=extra_log)
|
|
|
raise
|
|
|
else:
|
|
|
raise |