Spaces:
Sleeping
Sleeping
File size: 4,656 Bytes
5868187 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
"""
Global task lifecycle management module
管理应用程序中所有异步任务的生命周期,确保正确清理
"""
import asyncio
import weakref
from typing import Set, Dict, Any
from log import log
class TaskManager:
"""全局异步任务管理器 - 单例模式"""
_instance = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._tasks: Set[asyncio.Task] = set()
self._resources: Set[Any] = set() # 需要关闭的资源
self._shutdown_event = asyncio.Event()
self._initialized = True
log.debug("TaskManager initialized")
def register_task(self, task: asyncio.Task, description: str = None) -> asyncio.Task:
"""注册一个任务供生命周期管理"""
self._tasks.add(task)
task.add_done_callback(lambda t: self._tasks.discard(t))
if description:
task.set_name(description)
log.debug(f"Registered task: {task.get_name() or 'unnamed'}")
return task
def create_task(self, coro, *, name: str = None) -> asyncio.Task:
"""创建并注册一个任务"""
task = asyncio.create_task(coro, name=name)
return self.register_task(task, name)
def register_resource(self, resource: Any) -> Any:
"""注册一个需要清理的资源(如HTTP客户端、文件句柄等)"""
# 使用弱引用避免循环引用
self._resources.add(weakref.ref(resource))
log.debug(f"Registered resource: {type(resource).__name__}")
return resource
async def shutdown(self, timeout: float = 30.0):
"""关闭所有任务和资源"""
log.info("TaskManager shutdown initiated")
# 设置关闭标志
self._shutdown_event.set()
# 取消所有未完成的任务
cancelled_count = 0
for task in list(self._tasks):
if not task.done():
task.cancel()
cancelled_count += 1
if cancelled_count > 0:
log.info(f"Cancelled {cancelled_count} pending tasks")
# 等待所有任务完成(包括取消)
if self._tasks:
try:
await asyncio.wait_for(
asyncio.gather(*self._tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
log.warning(f"Some tasks did not complete within {timeout}s timeout")
# 清理资源
cleaned_resources = 0
for resource_ref in list(self._resources):
resource = resource_ref()
if resource is not None:
try:
if hasattr(resource, 'close'):
if asyncio.iscoroutinefunction(resource.close):
await resource.close()
else:
resource.close()
elif hasattr(resource, 'aclose'):
await resource.aclose()
cleaned_resources += 1
except Exception as e:
log.warning(f"Failed to close resource {type(resource).__name__}: {e}")
if cleaned_resources > 0:
log.info(f"Cleaned up {cleaned_resources} resources")
self._tasks.clear()
self._resources.clear()
log.info("TaskManager shutdown completed")
@property
def is_shutdown(self) -> bool:
"""检查是否已经开始关闭"""
return self._shutdown_event.is_set()
def get_stats(self) -> Dict[str, int]:
"""获取任务管理统计信息"""
return {
'active_tasks': len(self._tasks),
'registered_resources': len(self._resources),
'is_shutdown': self.is_shutdown
}
# 全局任务管理器实例
task_manager = TaskManager()
def create_managed_task(coro, *, name: str = None) -> asyncio.Task:
"""创建一个被管理的异步任务的便捷函数"""
return task_manager.create_task(coro, name=name)
def register_resource(resource: Any) -> Any:
"""注册资源的便捷函数"""
return task_manager.register_resource(resource)
async def shutdown_all_tasks(timeout: float = 30.0):
"""关闭所有任务的便捷函数"""
await task_manager.shutdown(timeout) |