| | import asyncio |
| | import time |
| | from typing import Dict, Any |
| | from app.utils.logging import log |
| |
|
| | class ActiveRequestsManager: |
| | """管理活跃API请求的类""" |
| | |
| | def __init__(self, requests_pool: Dict[str, asyncio.Task] = None): |
| | self.active_requests = requests_pool if requests_pool is not None else {} |
| | |
| | def add(self, key: str, task: asyncio.Task): |
| | """添加新的活跃请求任务""" |
| | task.creation_time = time.time() |
| | self.active_requests[key] = task |
| | |
| | def get(self, key: str): |
| | """获取活跃请求任务""" |
| | return self.active_requests.get(key) |
| | |
| | def remove(self, key: str): |
| | """移除活跃请求任务""" |
| | if key in self.active_requests: |
| | del self.active_requests[key] |
| | return True |
| | return False |
| | |
| | def remove_by_prefix(self, prefix: str): |
| | """移除所有以特定前缀开头的活跃请求任务""" |
| | keys_to_remove = [k for k in self.active_requests.keys() if k.startswith(prefix)] |
| | for key in keys_to_remove: |
| | self.remove(key) |
| | return len(keys_to_remove) |
| | |
| | def clean_completed(self): |
| | """清理所有已完成或已取消的任务""" |
| | keys_to_remove = [] |
| | |
| | for key, task in self.active_requests.items(): |
| | if task.done() or task.cancelled(): |
| | keys_to_remove.append(key) |
| | |
| | for key in keys_to_remove: |
| | self.remove(key) |
| | |
| | |
| | |
| | |
| | def clean_long_running(self, max_age_seconds: int = 300): |
| | """清理长时间运行的任务""" |
| | now = time.time() |
| | long_running_keys = [] |
| | |
| | for key, task in list(self.active_requests.items()): |
| | if (hasattr(task, 'creation_time') and |
| | task.creation_time < now - max_age_seconds and |
| | not task.done() and not task.cancelled()): |
| | |
| | long_running_keys.append(key) |
| | task.cancel() |
| | |
| | if long_running_keys: |
| | log('warning', f"取消长时间运行的任务: {len(long_running_keys)}个", cleanup='long_running_tasks') |
| |
|
| | async def check_client_disconnect(http_request, current_api_key: str, request_type: str, model: str): |
| | """检查客户端是否断开连接""" |
| | while True: |
| | if await http_request.is_disconnected(): |
| | extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': model, 'error_message': '检测到客户端断开连接'} |
| | log('info', "客户端连接已中断,等待API请求完成", extra=extra_log) |
| | return True |
| | await asyncio.sleep(0.5) |