|
|
from fastapi import APIRouter, HTTPException, Request, Depends, status |
|
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
|
from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList |
|
|
from app.services import GeminiClient, ResponseWrapper |
|
|
from app.utils import ( |
|
|
handle_gemini_error, |
|
|
protect_from_abuse, |
|
|
APIKeyManager, |
|
|
test_api_key, |
|
|
format_log_message, |
|
|
log_manager, |
|
|
generate_cache_key, |
|
|
cache_response, |
|
|
create_chat_response, |
|
|
create_error_response, |
|
|
handle_api_error, |
|
|
update_api_call_stats |
|
|
) |
|
|
import json |
|
|
import asyncio |
|
|
import time |
|
|
import logging |
|
|
import random |
|
|
from typing import Literal |
|
|
from app.config.settings import ( |
|
|
api_call_stats |
|
|
) |
|
|
|
|
|
logger = logging.getLogger("my_logger") |
|
|
|
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
|
|
|
key_manager = None |
|
|
response_cache_manager = None |
|
|
active_requests_manager = None |
|
|
safety_settings = None |
|
|
safety_settings_g2 = None |
|
|
current_api_key = None |
|
|
FAKE_STREAMING = None |
|
|
FAKE_STREAMING_INTERVAL = None |
|
|
PASSWORD = None |
|
|
MAX_REQUESTS_PER_MINUTE = None |
|
|
MAX_REQUESTS_PER_DAY_PER_IP = None |
|
|
|
|
|
|
|
|
def init_router( |
|
|
_key_manager, |
|
|
_response_cache_manager, |
|
|
_active_requests_manager, |
|
|
_safety_settings, |
|
|
_safety_settings_g2, |
|
|
_current_api_key, |
|
|
_fake_streaming, |
|
|
_fake_streaming_interval, |
|
|
_password, |
|
|
_max_requests_per_minute, |
|
|
_max_requests_per_day_per_ip |
|
|
): |
|
|
global key_manager, response_cache_manager, active_requests_manager |
|
|
global safety_settings, safety_settings_g2, current_api_key |
|
|
global FAKE_STREAMING, FAKE_STREAMING_INTERVAL |
|
|
global PASSWORD, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP |
|
|
|
|
|
key_manager = _key_manager |
|
|
response_cache_manager = _response_cache_manager |
|
|
active_requests_manager = _active_requests_manager |
|
|
safety_settings = _safety_settings |
|
|
safety_settings_g2 = _safety_settings_g2 |
|
|
current_api_key = _current_api_key |
|
|
FAKE_STREAMING = _fake_streaming |
|
|
FAKE_STREAMING_INTERVAL = _fake_streaming_interval |
|
|
PASSWORD = _password |
|
|
MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute |
|
|
MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip |
|
|
|
|
|
|
|
|
def log(level: str, message: str, **extra): |
|
|
"""简化日志记录的统一函数""" |
|
|
msg = format_log_message(level.upper(), message, extra=extra) |
|
|
getattr(logger, level.lower())(msg) |
|
|
|
|
|
|
|
|
async def verify_password(request: Request): |
|
|
if PASSWORD: |
|
|
auth_header = request.headers.get("Authorization") |
|
|
if not auth_header or not auth_header.startswith("Bearer "): |
|
|
raise HTTPException( |
|
|
status_code=401, detail="Unauthorized: Missing or invalid token") |
|
|
token = auth_header.split(" ")[1] |
|
|
if token != PASSWORD: |
|
|
raise HTTPException( |
|
|
status_code=401, detail="Unauthorized: Invalid token") |
|
|
|
|
|
|
|
|
@router.get("/v1/models", response_model=ModelList) |
|
|
def list_models(): |
|
|
log('info', "Received request to list models", extra={'request_type': 'list_models', 'status_code': 200}) |
|
|
return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in GeminiClient.AVAILABLE_MODELS]) |
|
|
|
|
|
@router.post("/v1/chat/completions", response_model=ChatCompletionResponse) |
|
|
async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(verify_password)): |
|
|
|
|
|
client_ip = http_request.client.host if http_request.client else "unknown" |
|
|
|
|
|
|
|
|
if request.stream: |
|
|
return await process_request(request, http_request, "stream") |
|
|
|
|
|
|
|
|
cache_key = generate_cache_key(request) |
|
|
|
|
|
|
|
|
log('info', f"请求缓存键: {cache_key[:8]}...", |
|
|
extra={'cache_key': cache_key[:8], 'request_type': 'non-stream'}) |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
|
|
|
log('info', f"精确缓存命中: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'hit', 'request_type': 'non-stream'}) |
|
|
|
|
|
|
|
|
active_requests_manager.remove_by_prefix(f"cache:{cache_key}") |
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) |
|
|
|
|
|
|
|
|
return cached_response |
|
|
|
|
|
|
|
|
pool_key = f"cache:{cache_key}" |
|
|
|
|
|
|
|
|
active_task = active_requests_manager.get(pool_key) |
|
|
if active_task and not active_task.done(): |
|
|
log('info', f"发现相同请求的进行中任务", |
|
|
extra={'request_type': 'non-stream', 'model': request.model}) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
await asyncio.wait_for(active_task, timeout=180) |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
log('info', f"使用已完成任务的缓存后删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) |
|
|
|
|
|
return cached_response |
|
|
|
|
|
|
|
|
if active_task.done() and not active_task.cancelled(): |
|
|
result = active_task.result() |
|
|
if result: |
|
|
|
|
|
|
|
|
new_response = ChatCompletionResponse( |
|
|
id=f"chatcmpl-{int(time.time()*1000)}", |
|
|
object="chat.completion", |
|
|
created=int(time.time()), |
|
|
model=result.model, |
|
|
choices=result.choices |
|
|
) |
|
|
|
|
|
|
|
|
return new_response |
|
|
except (asyncio.TimeoutError, asyncio.CancelledError) as e: |
|
|
|
|
|
error_type = "超时" if isinstance(e, asyncio.TimeoutError) else "被取消" |
|
|
log('warning', f"等待已有任务{error_type}: {pool_key}", |
|
|
extra={'request_type': 'non-stream', 'model': request.model}) |
|
|
|
|
|
|
|
|
if active_task.done() or active_task.cancelled(): |
|
|
active_requests_manager.remove(pool_key) |
|
|
log('info', f"已从活跃请求池移除{error_type}任务: {pool_key}", |
|
|
extra={'request_type': 'non-stream'}) |
|
|
|
|
|
|
|
|
process_task = asyncio.create_task( |
|
|
process_request(request, http_request, "non-stream", cache_key=cache_key, client_ip=client_ip) |
|
|
) |
|
|
|
|
|
|
|
|
active_requests_manager.add(pool_key, process_task) |
|
|
|
|
|
|
|
|
try: |
|
|
response = await process_task |
|
|
return response |
|
|
except Exception as e: |
|
|
|
|
|
active_requests_manager.remove(pool_key) |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
log('info', f"任务失败但找到缓存,使用缓存结果: {cache_key[:8]}...", |
|
|
extra={'request_type': 'non-stream', 'model': request.model}) |
|
|
return cached_response |
|
|
|
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
async def process_request(chat_request: ChatCompletionRequest, http_request: Request, request_type: Literal['stream', 'non-stream'], cache_key: str = None, client_ip: str = None): |
|
|
"""处理API请求的主函数,根据需要处理流式或非流式请求""" |
|
|
global current_api_key |
|
|
|
|
|
|
|
|
protect_from_abuse( |
|
|
http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP) |
|
|
if chat_request.model not in GeminiClient.AVAILABLE_MODELS: |
|
|
error_msg = "无效的模型" |
|
|
extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg} |
|
|
log('error', error_msg, extra=extra_log) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg) |
|
|
|
|
|
|
|
|
key_manager.reset_tried_keys_for_request() |
|
|
|
|
|
|
|
|
contents, system_instruction = GeminiClient.convert_messages( |
|
|
GeminiClient, chat_request.messages) |
|
|
|
|
|
|
|
|
retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1 |
|
|
|
|
|
|
|
|
for attempt in range(1, retry_attempts + 1): |
|
|
|
|
|
current_api_key = key_manager.get_available_key() |
|
|
|
|
|
|
|
|
if current_api_key is None: |
|
|
log('warning', "没有可用的 API 密钥,跳过本次尝试", |
|
|
extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'}) |
|
|
break |
|
|
|
|
|
|
|
|
log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
server_error_retries = 3 |
|
|
for server_retry in range(1, server_error_retries + 1): |
|
|
try: |
|
|
|
|
|
if chat_request.stream: |
|
|
try: |
|
|
return await process_stream_request( |
|
|
chat_request, |
|
|
http_request, |
|
|
contents, |
|
|
system_instruction, |
|
|
current_api_key |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
error_detail = handle_gemini_error(e, current_api_key, key_manager) |
|
|
log('error', f"流式请求失败: {error_detail}", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
|
|
|
|
|
raise |
|
|
else: |
|
|
return await process_nonstream_request( |
|
|
chat_request, |
|
|
http_request, |
|
|
request_type, |
|
|
contents, |
|
|
system_instruction, |
|
|
current_api_key, |
|
|
cache_key, |
|
|
client_ip |
|
|
) |
|
|
except HTTPException as e: |
|
|
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: |
|
|
log('error', "客户端连接中断", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, |
|
|
'model': chat_request.model, 'status_code': 408}) |
|
|
raise |
|
|
else: |
|
|
raise |
|
|
except Exception as e: |
|
|
|
|
|
error_result = await handle_api_error( |
|
|
e, |
|
|
current_api_key, |
|
|
key_manager, |
|
|
request_type, |
|
|
chat_request.model, |
|
|
server_retry - 1 |
|
|
) |
|
|
|
|
|
|
|
|
if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache: |
|
|
log('info', f"因API错误,删除缓存: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
if error_result.get('should_retry', False): |
|
|
|
|
|
continue |
|
|
elif error_result.get('should_switch_key', False) and attempt < retry_attempts: |
|
|
|
|
|
log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type}) |
|
|
break |
|
|
else: |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
msg = "所有API密钥均请求失败,请稍后重试" |
|
|
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'}) |
|
|
|
|
|
|
|
|
if chat_request.stream: |
|
|
async def error_generator(): |
|
|
error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}}) |
|
|
yield f"data: {error_json}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
return StreamingResponse(error_generator(), media_type="text/event-stream") |
|
|
else: |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg) |
|
|
|
|
|
|
|
|
async def process_stream_request( |
|
|
chat_request: ChatCompletionRequest, |
|
|
http_request: Request, |
|
|
contents, |
|
|
system_instruction, |
|
|
current_api_key: str |
|
|
) -> StreamingResponse: |
|
|
"""处理流式API请求""" |
|
|
|
|
|
|
|
|
async def stream_response_generator(): |
|
|
|
|
|
if FAKE_STREAMING: |
|
|
|
|
|
queue = asyncio.Queue() |
|
|
keep_alive_task = None |
|
|
api_request_task = None |
|
|
|
|
|
try: |
|
|
|
|
|
async def keep_alive_sender(): |
|
|
try: |
|
|
|
|
|
keep_alive_client = GeminiClient(current_api_key) |
|
|
|
|
|
|
|
|
keep_alive_generator = keep_alive_client.stream_chat( |
|
|
chat_request, |
|
|
contents, |
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
|
|
system_instruction |
|
|
) |
|
|
|
|
|
|
|
|
async for line in keep_alive_generator: |
|
|
if line == "\n": |
|
|
|
|
|
formatted_chunk = { |
|
|
"id": "chatcmpl-keepalive", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}] |
|
|
} |
|
|
|
|
|
await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") |
|
|
except asyncio.CancelledError: |
|
|
log('info', "保持连接任务被取消", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
|
|
raise |
|
|
except Exception as e: |
|
|
log('error', f"保持连接任务出错: {str(e)}", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
|
|
|
|
|
await queue.put(None) |
|
|
raise |
|
|
|
|
|
|
|
|
async def api_request_handler(): |
|
|
success = False |
|
|
try: |
|
|
|
|
|
key_manager.reset_tried_keys_for_request() |
|
|
|
|
|
|
|
|
available_keys = key_manager.api_keys.copy() |
|
|
random.shuffle(available_keys) |
|
|
|
|
|
|
|
|
for attempt, api_key in enumerate(available_keys, 1): |
|
|
try: |
|
|
log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})", |
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
non_stream_client = GeminiClient(api_key) |
|
|
|
|
|
|
|
|
response_content = await asyncio.to_thread( |
|
|
non_stream_client.complete_chat, |
|
|
chat_request, |
|
|
contents, |
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
|
|
system_instruction |
|
|
) |
|
|
|
|
|
|
|
|
if response_content and response_content.text: |
|
|
log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应", |
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
full_text = response_content.text |
|
|
chunk_size = max(len(full_text) // 10, 1) |
|
|
|
|
|
for i in range(0, len(full_text), chunk_size): |
|
|
chunk = full_text[i:i+chunk_size] |
|
|
formatted_chunk = { |
|
|
"id": "chatcmpl-someid", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] |
|
|
} |
|
|
|
|
|
await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") |
|
|
|
|
|
success = True |
|
|
|
|
|
from app.utils.stats import update_api_call_stats |
|
|
update_api_call_stats(api_call_stats,api_key) |
|
|
break |
|
|
else: |
|
|
log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应", |
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
|
|
except Exception as e: |
|
|
error_detail = handle_gemini_error(e, api_key, key_manager) |
|
|
log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}", |
|
|
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
|
|
|
if not success: |
|
|
error_msg = "所有API密钥均请求失败,请稍后重试" |
|
|
log('error', error_msg, |
|
|
extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
error_json = { |
|
|
"id": "chatcmpl-error", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}] |
|
|
} |
|
|
await queue.put(f"data: {json.dumps(error_json)}\n\n") |
|
|
|
|
|
|
|
|
await queue.put("data: [DONE]\n\n") |
|
|
|
|
|
await queue.put(None) |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
log('info', "API请求任务被取消", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
|
|
|
|
|
await queue.put(None) |
|
|
raise |
|
|
except Exception as e: |
|
|
log('error', f"API请求任务出错: {str(e)}", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
|
|
|
|
|
error_json = { |
|
|
"id": "chatcmpl-error", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] |
|
|
} |
|
|
await queue.put(f"data: {json.dumps(error_json)}\n\n") |
|
|
await queue.put("data: [DONE]\n\n") |
|
|
|
|
|
await queue.put(None) |
|
|
raise |
|
|
|
|
|
|
|
|
keep_alive_task = asyncio.create_task(keep_alive_sender()) |
|
|
|
|
|
api_request_task = asyncio.create_task(api_request_handler()) |
|
|
|
|
|
|
|
|
while True: |
|
|
chunk = await queue.get() |
|
|
if chunk is None: |
|
|
break |
|
|
yield chunk |
|
|
|
|
|
|
|
|
if api_request_task.done() and not keep_alive_task.done(): |
|
|
keep_alive_task.cancel() |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
log('info', "流式响应生成器被取消", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
|
|
|
|
|
if keep_alive_task and not keep_alive_task.done(): |
|
|
keep_alive_task.cancel() |
|
|
if api_request_task and not api_request_task.done(): |
|
|
api_request_task.cancel() |
|
|
except Exception as e: |
|
|
log('error', f"流式响应生成器出错: {str(e)}", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
|
|
|
|
|
if keep_alive_task and not keep_alive_task.done(): |
|
|
keep_alive_task.cancel() |
|
|
if api_request_task and not api_request_task.done(): |
|
|
api_request_task.cancel() |
|
|
|
|
|
error_json = { |
|
|
"id": "chatcmpl-error", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] |
|
|
} |
|
|
yield f"data: {json.dumps(error_json)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
finally: |
|
|
|
|
|
if keep_alive_task and not keep_alive_task.done(): |
|
|
keep_alive_task.cancel() |
|
|
if api_request_task and not api_request_task.done(): |
|
|
api_request_task.cancel() |
|
|
else: |
|
|
|
|
|
gemini_client = GeminiClient(current_api_key) |
|
|
success = False |
|
|
|
|
|
try: |
|
|
|
|
|
async for chunk in gemini_client.stream_chat( |
|
|
chat_request, |
|
|
contents, |
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
|
|
system_instruction |
|
|
): |
|
|
|
|
|
if not chunk: |
|
|
continue |
|
|
|
|
|
formatted_chunk = { |
|
|
"id": "chatcmpl-someid", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] |
|
|
} |
|
|
success = True |
|
|
yield f"data: {json.dumps(formatted_chunk)}\n\n" |
|
|
|
|
|
|
|
|
if success: |
|
|
from app.utils.stats import update_api_call_stats |
|
|
update_api_call_stats(api_call_stats, current_api_key) |
|
|
|
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
extra_log_cancel = {'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model, 'error_message': '客户端已断开连接'} |
|
|
log('info', "客户端连接已中断", extra=extra_log_cancel) |
|
|
except Exception as e: |
|
|
error_detail = handle_gemini_error(e, current_api_key, key_manager) |
|
|
log('error', f"流式请求失败: {error_detail}", |
|
|
extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
|
|
|
|
|
error_json = { |
|
|
"id": "chatcmpl-error", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": int(time.time()), |
|
|
"model": chat_request.model, |
|
|
"choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}] |
|
|
} |
|
|
yield f"data: {json.dumps(error_json)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
raise e |
|
|
|
|
|
return StreamingResponse(stream_response_generator(), media_type="text/event-stream") |
|
|
|
|
|
|
|
|
async def run_gemini_completion( |
|
|
gemini_client, |
|
|
chat_request: ChatCompletionRequest, |
|
|
contents, |
|
|
system_instruction, |
|
|
request_type: str, |
|
|
current_api_key: str |
|
|
): |
|
|
"""运行Gemini非流式请求""" |
|
|
|
|
|
run_fn = run_gemini_completion |
|
|
|
|
|
try: |
|
|
|
|
|
response_future = asyncio.create_task( |
|
|
asyncio.to_thread( |
|
|
gemini_client.complete_chat, |
|
|
chat_request, |
|
|
contents, |
|
|
safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
|
|
system_instruction |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
response_content = await asyncio.shield(response_future) |
|
|
|
|
|
|
|
|
if not hasattr(run_fn, 'logged_complete'): |
|
|
log('info', "非流式请求成功完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
run_fn.logged_complete = True |
|
|
return response_content |
|
|
except asyncio.CancelledError: |
|
|
|
|
|
if 'response_future' in locals() and not response_future.done(): |
|
|
try: |
|
|
|
|
|
response_content = await asyncio.shield(response_future) |
|
|
log('info', "API请求在客户端断开后完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
return response_content |
|
|
except Exception as e: |
|
|
extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': f'API请求在客户端断开后失败: {str(e)}'} |
|
|
log('info', "API调用因客户端断开而失败", extra=extra_log_gemini_cancel) |
|
|
raise |
|
|
|
|
|
|
|
|
extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': '客户端断开导致API调用取消'} |
|
|
log('info', "API调用因客户端断开而取消", extra=extra_log_gemini_cancel) |
|
|
raise |
|
|
|
|
|
|
|
|
async def check_client_disconnect(http_request: Request, current_api_key: str, request_type: str, model: str): |
|
|
"""检查客户端是否断开连接""" |
|
|
while True: |
|
|
if await http_request.is_disconnected(): |
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': model, 'error_message': '检测到客户端断开连接'} |
|
|
log('info', "客户端连接已中断,等待API请求完成", extra=extra_log) |
|
|
return True |
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
async def handle_client_disconnect( |
|
|
gemini_task: asyncio.Task, |
|
|
chat_request: ChatCompletionRequest, |
|
|
request_type: str, |
|
|
current_api_key: str, |
|
|
cache_key: str = None, |
|
|
client_ip: str = None |
|
|
): |
|
|
try: |
|
|
|
|
|
response_content = await asyncio.shield(gemini_task) |
|
|
|
|
|
|
|
|
if response_content is None or response_content.text == "": |
|
|
if response_content is None: |
|
|
log('info', "客户端断开后API任务返回None", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
else: |
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'status_code': 204} |
|
|
log('info', "客户端断开后Gemini API 返回空响应", extra=extra_log) |
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache: |
|
|
log('info', f"因空响应,删除缓存: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'remove-on-empty', 'request_type': request_type}) |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
|
|
|
return create_error_response(chat_request.model, "AI未返回任何内容,请重试") |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
log('info', f"客户端断开但找到已存在缓存,将删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'disconnect-found-cache', 'request_type': request_type}) |
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from app.utils.response import create_response |
|
|
response = create_response(chat_request, response_content) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return response |
|
|
except asyncio.CancelledError: |
|
|
|
|
|
log('info', "客户端断开后任务被取消,但我们仍会尝试完成", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
if gemini_task.done() and not gemini_task.cancelled(): |
|
|
try: |
|
|
response_content = gemini_task.result() |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
log('info', f"任务被取消但找到已存在缓存,将删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'cancel-found-cache', 'request_type': request_type}) |
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
|
|
|
from app.utils.response import create_response |
|
|
response = create_response(chat_request, response_content) |
|
|
return response |
|
|
except Exception as inner_e: |
|
|
log('error', f"客户端断开后从已完成任务获取结果失败: {str(inner_e)}", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache: |
|
|
log('info', f"因任务获取结果失败,删除缓存: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
|
|
|
return create_error_response(chat_request.model, "请求处理过程中发生错误,请重试") |
|
|
except Exception as e: |
|
|
|
|
|
error_msg = str(e) |
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': error_msg} |
|
|
log('error', f"客户端断开后处理API响应时出错: {error_msg}", extra=extra_log) |
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache: |
|
|
log('info', f"因API响应错误,删除缓存: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
|
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
|
|
|
return create_error_response(chat_request.model, f"请求处理错误: {error_msg}") |
|
|
|
|
|
|
|
|
async def process_nonstream_request( |
|
|
chat_request: ChatCompletionRequest, |
|
|
http_request: Request, |
|
|
request_type: str, |
|
|
contents, |
|
|
system_instruction, |
|
|
current_api_key: str, |
|
|
cache_key: str = None, |
|
|
client_ip: str = None |
|
|
): |
|
|
"""处理非流式API请求""" |
|
|
gemini_client = GeminiClient(current_api_key) |
|
|
|
|
|
|
|
|
gemini_task = asyncio.create_task( |
|
|
run_gemini_completion( |
|
|
gemini_client, |
|
|
chat_request, |
|
|
contents, |
|
|
system_instruction, |
|
|
request_type, |
|
|
current_api_key |
|
|
) |
|
|
) |
|
|
|
|
|
disconnect_task = asyncio.create_task( |
|
|
check_client_disconnect( |
|
|
http_request, |
|
|
current_api_key, |
|
|
request_type, |
|
|
chat_request.model |
|
|
) |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
done, pending = await asyncio.wait( |
|
|
[gemini_task, disconnect_task], |
|
|
return_when=asyncio.FIRST_COMPLETED |
|
|
) |
|
|
|
|
|
if disconnect_task in done: |
|
|
|
|
|
return await handle_client_disconnect( |
|
|
gemini_task, |
|
|
chat_request, |
|
|
request_type, |
|
|
current_api_key, |
|
|
cache_key, |
|
|
client_ip |
|
|
) |
|
|
else: |
|
|
|
|
|
disconnect_task.cancel() |
|
|
|
|
|
|
|
|
response_content = await gemini_task |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
log('info', f"缓存已存在,直接返回: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'use-existing', 'request_type': request_type}) |
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) |
|
|
|
|
|
return cached_response |
|
|
|
|
|
|
|
|
from app.utils.response import create_response |
|
|
response = create_response(chat_request, response_content) |
|
|
|
|
|
|
|
|
cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key) |
|
|
|
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
log('info', f"缓存创建后立即删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'store-and-remove', 'request_type': request_type}) |
|
|
|
|
|
|
|
|
return response |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"} |
|
|
log('info', "请求取消", extra=extra_log) |
|
|
|
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
|
if cache_hit: |
|
|
log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type}) |
|
|
|
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
|
del response_cache_manager.cache[cache_key] |
|
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
|
|
extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) |
|
|
|
|
|
return cached_response |
|
|
|
|
|
|
|
|
if not gemini_task.done(): |
|
|
log('info', "请求取消但API请求尚未完成,继续等待...", |
|
|
extra={'key': current_api_key[:8], 'request_type': request_type}) |
|
|
|
|
|
|
|
|
response_content = await asyncio.shield(gemini_task) |
|
|
|
|
|
|
|
|
from app.utils.response import create_response |
|
|
response = create_response(chat_request, response_content) |
|
|
|
|
|
|
|
|
return response |
|
|
else: |
|
|
|
|
|
response_content = gemini_task.result() |
|
|
|
|
|
|
|
|
from app.utils.response import create_response |
|
|
response = create_response(chat_request, response_content) |
|
|
|
|
|
|
|
|
return response |
|
|
|
|
|
except HTTPException as e: |
|
|
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: |
|
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, |
|
|
'status_code': 408, 'error_message': '客户端连接中断'} |
|
|
log('error', "客户端连接中断,终止后续重试", extra=extra_log) |
|
|
raise |
|
|
else: |
|
|
raise |