| | from fastapi import APIRouter, HTTPException, Request, Depends, status |
| | from fastapi.responses import JSONResponse, StreamingResponse |
| | from app.models import ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, ModelList |
| | from app.services import GeminiClient, ResponseWrapper |
| | from app.utils import ( |
| | handle_gemini_error, |
| | protect_from_abuse, |
| | APIKeyManager, |
| | test_api_key, |
| | format_log_message, |
| | log_manager, |
| | generate_cache_key, |
| | cache_response, |
| | create_chat_response, |
| | create_error_response, |
| | handle_api_error, |
| | update_api_call_stats |
| | ) |
| | import json |
| | import asyncio |
| | import time |
| | import logging |
| | import random |
| | from typing import Literal |
| | from app.config.settings import ( |
| | api_call_stats |
| | ) |
| | |
| | logger = logging.getLogger("my_logger") |
| |
|
| | |
| | router = APIRouter() |
| |
|
| | |
| | key_manager = None |
| | response_cache_manager = None |
| | active_requests_manager = None |
| | safety_settings = None |
| | safety_settings_g2 = None |
| | current_api_key = None |
| | FAKE_STREAMING = None |
| | FAKE_STREAMING_INTERVAL = None |
| | PASSWORD = None |
| | MAX_REQUESTS_PER_MINUTE = None |
| | MAX_REQUESTS_PER_DAY_PER_IP = None |
| |
|
| | |
| | def init_router( |
| | _key_manager, |
| | _response_cache_manager, |
| | _active_requests_manager, |
| | _safety_settings, |
| | _safety_settings_g2, |
| | _current_api_key, |
| | _fake_streaming, |
| | _fake_streaming_interval, |
| | _password, |
| | _max_requests_per_minute, |
| | _max_requests_per_day_per_ip |
| | ): |
| | global key_manager, response_cache_manager, active_requests_manager |
| | global safety_settings, safety_settings_g2, current_api_key |
| | global FAKE_STREAMING, FAKE_STREAMING_INTERVAL |
| | global PASSWORD, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP |
| | |
| | key_manager = _key_manager |
| | response_cache_manager = _response_cache_manager |
| | active_requests_manager = _active_requests_manager |
| | safety_settings = _safety_settings |
| | safety_settings_g2 = _safety_settings_g2 |
| | current_api_key = _current_api_key |
| | FAKE_STREAMING = _fake_streaming |
| | FAKE_STREAMING_INTERVAL = _fake_streaming_interval |
| | PASSWORD = _password |
| | MAX_REQUESTS_PER_MINUTE = _max_requests_per_minute |
| | MAX_REQUESTS_PER_DAY_PER_IP = _max_requests_per_day_per_ip |
| |
|
| | |
| | def log(level: str, message: str, **extra): |
| | """简化日志记录的统一函数""" |
| | msg = format_log_message(level.upper(), message, extra=extra) |
| | getattr(logger, level.lower())(msg) |
| |
|
| | |
| | async def verify_password(request: Request): |
| | if PASSWORD: |
| | auth_header = request.headers.get("Authorization") |
| | if not auth_header or not auth_header.startswith("Bearer "): |
| | raise HTTPException( |
| | status_code=401, detail="Unauthorized: Missing or invalid token") |
| | token = auth_header.split(" ")[1] |
| | if token != PASSWORD: |
| | raise HTTPException( |
| | status_code=401, detail="Unauthorized: Invalid token") |
| |
|
| | |
| | @router.get("/v1/models", response_model=ModelList) |
| | def list_models(): |
| | log('info', "Received request to list models", extra={'request_type': 'list_models', 'status_code': 200}) |
| | return ModelList(data=[{"id": model, "object": "model", "created": 1678888888, "owned_by": "organization-owner"} for model in GeminiClient.AVAILABLE_MODELS]) |
| |
|
| | @router.post("/v1/chat/completions", response_model=ChatCompletionResponse) |
| | async def chat_completions(request: ChatCompletionRequest, http_request: Request, _: None = Depends(verify_password)): |
| | |
| | client_ip = http_request.client.host if http_request.client else "unknown" |
| | |
| | |
| | if request.stream: |
| | return await process_request(request, http_request, "stream") |
| | |
| | |
| | cache_key = generate_cache_key(request) |
| | |
| | |
| | log('info', f"请求缓存键: {cache_key[:8]}...", |
| | extra={'cache_key': cache_key[:8], 'request_type': 'non-stream'}) |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | |
| | log('info', f"精确缓存命中: {cache_key[:8]}...", |
| | extra={'cache_operation': 'hit', 'request_type': 'non-stream'}) |
| | |
| | |
| | active_requests_manager.remove_by_prefix(f"cache:{cache_key}") |
| | |
| | |
| | if cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) |
| | |
| | |
| | return cached_response |
| | |
| | |
| | pool_key = f"cache:{cache_key}" |
| | |
| | |
| | active_task = active_requests_manager.get(pool_key) |
| | if active_task and not active_task.done(): |
| | log('info', f"发现相同请求的进行中任务", |
| | extra={'request_type': 'non-stream', 'model': request.model}) |
| | |
| | |
| | try: |
| | |
| | await asyncio.wait_for(active_task, timeout=180) |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | |
| | if cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | log('info', f"使用已完成任务的缓存后删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'used-and-removed', 'request_type': 'non-stream'}) |
| | |
| | return cached_response |
| | |
| | |
| | if active_task.done() and not active_task.cancelled(): |
| | result = active_task.result() |
| | if result: |
| | |
| | |
| | new_response = ChatCompletionResponse( |
| | id=f"chatcmpl-{int(time.time()*1000)}", |
| | object="chat.completion", |
| | created=int(time.time()), |
| | model=result.model, |
| | choices=result.choices |
| | ) |
| | |
| | |
| | return new_response |
| | except (asyncio.TimeoutError, asyncio.CancelledError) as e: |
| | |
| | error_type = "超时" if isinstance(e, asyncio.TimeoutError) else "被取消" |
| | log('warning', f"等待已有任务{error_type}: {pool_key}", |
| | extra={'request_type': 'non-stream', 'model': request.model}) |
| | |
| | |
| | if active_task.done() or active_task.cancelled(): |
| | active_requests_manager.remove(pool_key) |
| | log('info', f"已从活跃请求池移除{error_type}任务: {pool_key}", |
| | extra={'request_type': 'non-stream'}) |
| | |
| | |
| | process_task = asyncio.create_task( |
| | process_request(request, http_request, "non-stream", cache_key=cache_key, client_ip=client_ip) |
| | ) |
| | |
| | |
| | active_requests_manager.add(pool_key, process_task) |
| | |
| | |
| | try: |
| | response = await process_task |
| | return response |
| | except Exception as e: |
| | |
| | active_requests_manager.remove(pool_key) |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | log('info', f"任务失败但找到缓存,使用缓存结果: {cache_key[:8]}...", |
| | extra={'request_type': 'non-stream', 'model': request.model}) |
| | return cached_response |
| | |
| | |
| | raise |
| |
|
| | |
| | async def process_request(chat_request: ChatCompletionRequest, http_request: Request, request_type: Literal['stream', 'non-stream'], cache_key: str = None, client_ip: str = None): |
| | """处理API请求的主函数,根据需要处理流式或非流式请求""" |
| | global current_api_key |
| | |
| | |
| | protect_from_abuse( |
| | http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP) |
| | if chat_request.model not in GeminiClient.AVAILABLE_MODELS: |
| | error_msg = "无效的模型" |
| | extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg} |
| | log('error', error_msg, extra=extra_log) |
| | raise HTTPException( |
| | status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg) |
| |
|
| | |
| | key_manager.reset_tried_keys_for_request() |
| | |
| | |
| | contents, system_instruction = GeminiClient.convert_messages( |
| | GeminiClient, chat_request.messages) |
| |
|
| | |
| | retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1 |
| | |
| | |
| | for attempt in range(1, retry_attempts + 1): |
| | |
| | current_api_key = key_manager.get_available_key() |
| | |
| | |
| | if current_api_key is None: |
| | log('warning', "没有可用的 API 密钥,跳过本次尝试", |
| | extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'}) |
| | break |
| | |
| | |
| | log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...", |
| | extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
| |
|
| | |
| | server_error_retries = 3 |
| | for server_retry in range(1, server_error_retries + 1): |
| | try: |
| | |
| | if chat_request.stream: |
| | try: |
| | return await process_stream_request( |
| | chat_request, |
| | http_request, |
| | contents, |
| | system_instruction, |
| | current_api_key |
| | ) |
| | except Exception as e: |
| | |
| | |
| | error_detail = handle_gemini_error(e, current_api_key, key_manager) |
| | log('error', f"流式请求失败: {error_detail}", |
| | extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
| | |
| | raise |
| | else: |
| | return await process_nonstream_request( |
| | chat_request, |
| | http_request, |
| | request_type, |
| | contents, |
| | system_instruction, |
| | current_api_key, |
| | cache_key, |
| | client_ip |
| | ) |
| | except HTTPException as e: |
| | if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: |
| | log('error', "客户端连接中断", |
| | extra={'key': current_api_key[:8], 'request_type': request_type, |
| | 'model': chat_request.model, 'status_code': 408}) |
| | raise |
| | else: |
| | raise |
| | except Exception as e: |
| | |
| | error_result = await handle_api_error( |
| | e, |
| | current_api_key, |
| | key_manager, |
| | request_type, |
| | chat_request.model, |
| | server_retry - 1 |
| | ) |
| | |
| | |
| | if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache: |
| | log('info', f"因API错误,删除缓存: {cache_key[:8]}...", |
| | extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
| | del response_cache_manager.cache[cache_key] |
| | |
| | if error_result.get('should_retry', False): |
| | |
| | continue |
| | elif error_result.get('should_switch_key', False) and attempt < retry_attempts: |
| | |
| | log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥", |
| | extra={'key': current_api_key[:8], 'request_type': request_type}) |
| | break |
| | else: |
| | |
| | break |
| |
|
| | |
| | msg = "所有API密钥均请求失败,请稍后重试" |
| | log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'}) |
| | |
| | |
| | if chat_request.stream: |
| | async def error_generator(): |
| | error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}}) |
| | yield f"data: {error_json}\n\n" |
| | yield "data: [DONE]\n\n" |
| | |
| | return StreamingResponse(error_generator(), media_type="text/event-stream") |
| | else: |
| | |
| | raise HTTPException( |
| | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg) |
| |
|
| | |
| | async def process_stream_request( |
| | chat_request: ChatCompletionRequest, |
| | http_request: Request, |
| | contents, |
| | system_instruction, |
| | current_api_key: str |
| | ) -> StreamingResponse: |
| | """处理流式API请求""" |
| | |
| | |
| | async def stream_response_generator(): |
| | |
| | if FAKE_STREAMING: |
| | |
| | queue = asyncio.Queue() |
| | keep_alive_task = None |
| | api_request_task = None |
| | |
| | try: |
| | |
| | async def keep_alive_sender(): |
| | try: |
| | |
| | keep_alive_client = GeminiClient(current_api_key) |
| | |
| | |
| | keep_alive_generator = keep_alive_client.stream_chat( |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ) |
| | |
| | |
| | async for line in keep_alive_generator: |
| | if line == "\n": |
| | |
| | formatted_chunk = { |
| | "id": "chatcmpl-keepalive", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}] |
| | } |
| | |
| | await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") |
| | except asyncio.CancelledError: |
| | log('info', "保持连接任务被取消", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | raise |
| | except Exception as e: |
| | log('error', f"保持连接任务出错: {str(e)}", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | await queue.put(None) |
| | raise |
| | |
| | |
| | async def api_request_handler(): |
| | success = False |
| | try: |
| | |
| | key_manager.reset_tried_keys_for_request() |
| | |
| | |
| | available_keys = key_manager.api_keys.copy() |
| | random.shuffle(available_keys) |
| | |
| | |
| | for attempt, api_key in enumerate(available_keys, 1): |
| | try: |
| | log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | non_stream_client = GeminiClient(api_key) |
| | |
| | |
| | response_content = await asyncio.to_thread( |
| | non_stream_client.complete_chat, |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ) |
| | |
| | |
| | if response_content and response_content.text: |
| | log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | full_text = response_content.text |
| | chunk_size = max(len(full_text) // 10, 1) |
| | |
| | for i in range(0, len(full_text), chunk_size): |
| | chunk = full_text[i:i+chunk_size] |
| | formatted_chunk = { |
| | "id": "chatcmpl-someid", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] |
| | } |
| | |
| | await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") |
| | |
| | success = True |
| | |
| | from app.utils.stats import update_api_call_stats |
| | update_api_call_stats(api_call_stats,api_key) |
| | break |
| | else: |
| | log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | except Exception as e: |
| | error_detail = handle_gemini_error(e, api_key, key_manager) |
| | log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | |
| | if not success: |
| | error_msg = "所有API密钥均请求失败,请稍后重试" |
| | log('error', error_msg, |
| | extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | await queue.put(f"data: {json.dumps(error_json)}\n\n") |
| | |
| | |
| | await queue.put("data: [DONE]\n\n") |
| | |
| | await queue.put(None) |
| | |
| | except asyncio.CancelledError: |
| | log('info', "API请求任务被取消", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | await queue.put(None) |
| | raise |
| | except Exception as e: |
| | log('error', f"API请求任务出错: {str(e)}", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | await queue.put(f"data: {json.dumps(error_json)}\n\n") |
| | await queue.put("data: [DONE]\n\n") |
| | |
| | await queue.put(None) |
| | raise |
| | |
| | |
| | keep_alive_task = asyncio.create_task(keep_alive_sender()) |
| | |
| | api_request_task = asyncio.create_task(api_request_handler()) |
| | |
| | |
| | while True: |
| | chunk = await queue.get() |
| | if chunk is None: |
| | break |
| | yield chunk |
| | |
| | |
| | if api_request_task.done() and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | |
| | except asyncio.CancelledError: |
| | log('info', "流式响应生成器被取消", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | if keep_alive_task and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | if api_request_task and not api_request_task.done(): |
| | api_request_task.cancel() |
| | except Exception as e: |
| | log('error', f"流式响应生成器出错: {str(e)}", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | if keep_alive_task and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | if api_request_task and not api_request_task.done(): |
| | api_request_task.cancel() |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | yield f"data: {json.dumps(error_json)}\n\n" |
| | yield "data: [DONE]\n\n" |
| | finally: |
| | |
| | if keep_alive_task and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | if api_request_task and not api_request_task.done(): |
| | api_request_task.cancel() |
| | else: |
| | |
| | gemini_client = GeminiClient(current_api_key) |
| | success = False |
| | |
| | try: |
| | |
| | async for chunk in gemini_client.stream_chat( |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ): |
| | |
| | if not chunk: |
| | continue |
| | |
| | formatted_chunk = { |
| | "id": "chatcmpl-someid", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] |
| | } |
| | success = True |
| | yield f"data: {json.dumps(formatted_chunk)}\n\n" |
| | |
| | |
| | if success: |
| | from app.utils.stats import update_api_call_stats |
| | update_api_call_stats(api_call_stats, current_api_key) |
| | |
| | yield "data: [DONE]\n\n" |
| | |
| | except asyncio.CancelledError: |
| | extra_log_cancel = {'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model, 'error_message': '客户端已断开连接'} |
| | log('info', "客户端连接已中断", extra=extra_log_cancel) |
| | except Exception as e: |
| | error_detail = handle_gemini_error(e, current_api_key, key_manager) |
| | log('error', f"流式请求失败: {error_detail}", |
| | extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | yield f"data: {json.dumps(error_json)}\n\n" |
| | yield "data: [DONE]\n\n" |
| | |
| | raise e |
| | |
| | return StreamingResponse(stream_response_generator(), media_type="text/event-stream") |
| |
|
| | |
| | async def run_gemini_completion( |
| | gemini_client, |
| | chat_request: ChatCompletionRequest, |
| | contents, |
| | system_instruction, |
| | request_type: str, |
| | current_api_key: str |
| | ): |
| | """运行Gemini非流式请求""" |
| | |
| | run_fn = run_gemini_completion |
| | |
| | try: |
| | |
| | response_future = asyncio.create_task( |
| | asyncio.to_thread( |
| | gemini_client.complete_chat, |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ) |
| | ) |
| | |
| | |
| | response_content = await asyncio.shield(response_future) |
| | |
| | |
| | if not hasattr(run_fn, 'logged_complete'): |
| | log('info', "非流式请求成功完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
| | run_fn.logged_complete = True |
| | return response_content |
| | except asyncio.CancelledError: |
| | |
| | if 'response_future' in locals() and not response_future.done(): |
| | try: |
| | |
| | response_content = await asyncio.shield(response_future) |
| | log('info', "API请求在客户端断开后完成", extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
| | return response_content |
| | except Exception as e: |
| | extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': f'API请求在客户端断开后失败: {str(e)}'} |
| | log('info', "API调用因客户端断开而失败", extra=extra_log_gemini_cancel) |
| | raise |
| | |
| | |
| | extra_log_gemini_cancel = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': '客户端断开导致API调用取消'} |
| | log('info', "API调用因客户端断开而取消", extra=extra_log_gemini_cancel) |
| | raise |
| |
|
| | |
| | async def check_client_disconnect(http_request: Request, current_api_key: str, request_type: str, model: str): |
| | """检查客户端是否断开连接""" |
| | while True: |
| | if await http_request.is_disconnected(): |
| | extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': model, 'error_message': '检测到客户端断开连接'} |
| | log('info', "客户端连接已中断,等待API请求完成", extra=extra_log) |
| | return True |
| | await asyncio.sleep(0.5) |
| |
|
| | |
| | async def handle_client_disconnect( |
| | gemini_task: asyncio.Task, |
| | chat_request: ChatCompletionRequest, |
| | request_type: str, |
| | current_api_key: str, |
| | cache_key: str = None, |
| | client_ip: str = None |
| | ): |
| | try: |
| | |
| | response_content = await asyncio.shield(gemini_task) |
| | |
| | |
| | if response_content is None or response_content.text == "": |
| | if response_content is None: |
| | log('info', "客户端断开后API任务返回None", |
| | extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
| | else: |
| | extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'status_code': 204} |
| | log('info', "客户端断开后Gemini API 返回空响应", extra=extra_log) |
| | |
| | |
| | if cache_key and cache_key in response_cache_manager.cache: |
| | log('info', f"因空响应,删除缓存: {cache_key[:8]}...", |
| | extra={'cache_operation': 'remove-on-empty', 'request_type': request_type}) |
| | del response_cache_manager.cache[cache_key] |
| | |
| | |
| | return create_error_response(chat_request.model, "AI未返回任何内容,请重试") |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | log('info', f"客户端断开但找到已存在缓存,将删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'disconnect-found-cache', 'request_type': request_type}) |
| | |
| | |
| | if cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | |
| | |
| | |
| | |
| | from app.utils.response import create_response |
| | response = create_response(chat_request, response_content) |
| | |
| | |
| | |
| | |
| | |
| | return response |
| | except asyncio.CancelledError: |
| | |
| | log('info', "客户端断开后任务被取消,但我们仍会尝试完成", |
| | extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
| | |
| | |
| | if gemini_task.done() and not gemini_task.cancelled(): |
| | try: |
| | response_content = gemini_task.result() |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | log('info', f"任务被取消但找到已存在缓存,将删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'cancel-found-cache', 'request_type': request_type}) |
| | |
| | |
| | if cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | |
| | |
| | from app.utils.response import create_response |
| | response = create_response(chat_request, response_content) |
| | return response |
| | except Exception as inner_e: |
| | log('error', f"客户端断开后从已完成任务获取结果失败: {str(inner_e)}", |
| | extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
| | |
| | |
| | if cache_key and cache_key in response_cache_manager.cache: |
| | log('info', f"因任务获取结果失败,删除缓存: {cache_key[:8]}...", |
| | extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
| | del response_cache_manager.cache[cache_key] |
| | |
| | |
| | return create_error_response(chat_request.model, "请求处理过程中发生错误,请重试") |
| | except Exception as e: |
| | |
| | error_msg = str(e) |
| | extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message': error_msg} |
| | log('error', f"客户端断开后处理API响应时出错: {error_msg}", extra=extra_log) |
| | |
| | |
| | if cache_key and cache_key in response_cache_manager.cache: |
| | log('info', f"因API响应错误,删除缓存: {cache_key[:8]}...", |
| | extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
| | del response_cache_manager.cache[cache_key] |
| | |
| | |
| | return create_error_response(chat_request.model, f"请求处理错误: {error_msg}") |
| |
|
| | |
| | async def process_nonstream_request( |
| | chat_request: ChatCompletionRequest, |
| | http_request: Request, |
| | request_type: str, |
| | contents, |
| | system_instruction, |
| | current_api_key: str, |
| | cache_key: str = None, |
| | client_ip: str = None |
| | ): |
| | """处理非流式API请求""" |
| | gemini_client = GeminiClient(current_api_key) |
| | |
| | |
| | gemini_task = asyncio.create_task( |
| | run_gemini_completion( |
| | gemini_client, |
| | chat_request, |
| | contents, |
| | system_instruction, |
| | request_type, |
| | current_api_key |
| | ) |
| | ) |
| | |
| | disconnect_task = asyncio.create_task( |
| | check_client_disconnect( |
| | http_request, |
| | current_api_key, |
| | request_type, |
| | chat_request.model |
| | ) |
| | ) |
| |
|
| | try: |
| | |
| | done, pending = await asyncio.wait( |
| | [gemini_task, disconnect_task], |
| | return_when=asyncio.FIRST_COMPLETED |
| | ) |
| |
|
| | if disconnect_task in done: |
| | |
| | return await handle_client_disconnect( |
| | gemini_task, |
| | chat_request, |
| | request_type, |
| | current_api_key, |
| | cache_key, |
| | client_ip |
| | ) |
| | else: |
| | |
| | disconnect_task.cancel() |
| | |
| | |
| | response_content = await gemini_task |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | log('info', f"缓存已存在,直接返回: {cache_key[:8]}...", |
| | extra={'cache_operation': 'use-existing', 'request_type': request_type}) |
| | |
| | |
| | if cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) |
| | |
| | return cached_response |
| | |
| | |
| | from app.utils.response import create_response |
| | response = create_response(chat_request, response_content) |
| | |
| | |
| | cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key) |
| | |
| | |
| | if cache_key and cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | log('info', f"缓存创建后立即删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'store-and-remove', 'request_type': request_type}) |
| | |
| | |
| | return response |
| |
|
| | except asyncio.CancelledError: |
| | extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"} |
| | log('info', "请求取消", extra=extra_log) |
| | |
| | |
| | cached_response, cache_hit = response_cache_manager.get(cache_key) |
| | if cache_hit: |
| | log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...", |
| | extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type}) |
| | |
| | |
| | if cache_key in response_cache_manager.cache: |
| | del response_cache_manager.cache[cache_key] |
| | log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
| | extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) |
| | |
| | return cached_response |
| | |
| | |
| | if not gemini_task.done(): |
| | log('info', "请求取消但API请求尚未完成,继续等待...", |
| | extra={'key': current_api_key[:8], 'request_type': request_type}) |
| | |
| | |
| | response_content = await asyncio.shield(gemini_task) |
| | |
| | |
| | from app.utils.response import create_response |
| | response = create_response(chat_request, response_content) |
| | |
| | |
| | return response |
| | else: |
| | |
| | response_content = gemini_task.result() |
| | |
| | |
| | from app.utils.response import create_response |
| | response = create_response(chat_request, response_content) |
| | |
| | |
| | return response |
| |
|
| | except HTTPException as e: |
| | if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: |
| | extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, |
| | 'status_code': 408, 'error_message': '客户端连接中断'} |
| | log('error', "客户端连接中断,终止后续重试", extra=extra_log) |
| | raise |
| | else: |
| | raise |