| | """ |
| | Global task lifecycle management module |
| | 管理应用程序中所有异步任务的生命周期,确保正确清理 |
| | """ |
| |
|
| | import asyncio |
| | import weakref |
| | from typing import Any, Dict, Set |
| |
|
| | from log import log |
| |
|
| |
|
| | class TaskManager: |
| | """全局异步任务管理器 - 单例模式""" |
| |
|
| | _instance = None |
| | _lock = asyncio.Lock() |
| |
|
| | def __new__(cls): |
| | if cls._instance is None: |
| | cls._instance = super().__new__(cls) |
| | cls._instance._initialized = False |
| | return cls._instance |
| |
|
| | def __init__(self): |
| | if self._initialized: |
| | return |
| |
|
| | self._tasks: Set[asyncio.Task] = set() |
| | self._resources: Set[Any] = set() |
| | self._shutdown_event = asyncio.Event() |
| | self._initialized = True |
| | log.debug("TaskManager initialized") |
| |
|
| | def register_task(self, task: asyncio.Task, description: str = None) -> asyncio.Task: |
| | """注册一个任务供生命周期管理""" |
| | self._tasks.add(task) |
| | task.add_done_callback(lambda t: self._tasks.discard(t)) |
| |
|
| | if description: |
| | task.set_name(description) |
| |
|
| | log.debug(f"Registered task: {task.get_name() or 'unnamed'}") |
| | return task |
| |
|
| | def create_task(self, coro, *, name: str = None) -> asyncio.Task: |
| | """创建并注册一个任务""" |
| | task = asyncio.create_task(coro, name=name) |
| | return self.register_task(task, name) |
| |
|
| | def register_resource(self, resource: Any) -> Any: |
| | """注册一个需要清理的资源(如HTTP客户端、文件句柄等)""" |
| | |
| | self._resources.add(weakref.ref(resource)) |
| | log.debug(f"Registered resource: {type(resource).__name__}") |
| | return resource |
| |
|
| | async def shutdown(self, timeout: float = 30.0): |
| | """关闭所有任务和资源""" |
| | log.info("TaskManager shutdown initiated") |
| |
|
| | |
| | self._shutdown_event.set() |
| |
|
| | |
| | cancelled_count = 0 |
| | for task in list(self._tasks): |
| | if not task.done(): |
| | task.cancel() |
| | cancelled_count += 1 |
| |
|
| | if cancelled_count > 0: |
| | log.info(f"Cancelled {cancelled_count} pending tasks") |
| |
|
| | |
| | if self._tasks: |
| | try: |
| | await asyncio.wait_for( |
| | asyncio.gather(*self._tasks, return_exceptions=True), timeout=timeout |
| | ) |
| | except asyncio.TimeoutError: |
| | log.warning(f"Some tasks did not complete within {timeout}s timeout") |
| |
|
| | |
| | cleaned_resources = 0 |
| | failed_resources = 0 |
| | for resource_ref in list(self._resources): |
| | resource = resource_ref() |
| | if resource is not None: |
| | try: |
| | if hasattr(resource, "close"): |
| | if asyncio.iscoroutinefunction(resource.close): |
| | await resource.close() |
| | else: |
| | resource.close() |
| | elif hasattr(resource, "aclose"): |
| | await resource.aclose() |
| | cleaned_resources += 1 |
| | except Exception as e: |
| | log.warning(f"Failed to close resource {type(resource).__name__}: {e}") |
| | failed_resources += 1 |
| | |
| |
|
| | if cleaned_resources > 0: |
| | log.info(f"Cleaned up {cleaned_resources} resources") |
| | if failed_resources > 0: |
| | log.warning(f"Failed to clean {failed_resources} resources") |
| |
|
| | self._tasks.clear() |
| | self._resources.clear() |
| | log.info("TaskManager shutdown completed") |
| |
|
| | @property |
| | def is_shutdown(self) -> bool: |
| | """检查是否已经开始关闭""" |
| | return self._shutdown_event.is_set() |
| |
|
| | def get_stats(self) -> Dict[str, int]: |
| | """获取任务管理统计信息""" |
| | return { |
| | "active_tasks": len(self._tasks), |
| | "registered_resources": len(self._resources), |
| | "is_shutdown": self.is_shutdown, |
| | } |
| |
|
| |
|
| | |
| | task_manager = TaskManager() |
| |
|
| |
|
| | def create_managed_task(coro, *, name: str = None) -> asyncio.Task: |
| | """创建一个被管理的异步任务的便捷函数""" |
| | return task_manager.create_task(coro, name=name) |
| |
|
| |
|
| | def register_resource(resource: Any) -> Any: |
| | """注册资源的便捷函数""" |
| | return task_manager.register_resource(resource) |
| |
|
| |
|
| | async def shutdown_all_tasks(timeout: float = 30.0): |
| | """关闭所有任务的便捷函数""" |
| | await task_manager.shutdown(timeout) |
| |
|