gcli2api / src /task_manager.py
lightspeed's picture
Upload 22 files
5868187 verified
"""
Global task lifecycle management module
管理应用程序中所有异步任务的生命周期,确保正确清理
"""
import asyncio
import weakref
from typing import Set, Dict, Any
from log import log
class TaskManager:
"""全局异步任务管理器 - 单例模式"""
_instance = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._tasks: Set[asyncio.Task] = set()
self._resources: Set[Any] = set() # 需要关闭的资源
self._shutdown_event = asyncio.Event()
self._initialized = True
log.debug("TaskManager initialized")
def register_task(self, task: asyncio.Task, description: str = None) -> asyncio.Task:
"""注册一个任务供生命周期管理"""
self._tasks.add(task)
task.add_done_callback(lambda t: self._tasks.discard(t))
if description:
task.set_name(description)
log.debug(f"Registered task: {task.get_name() or 'unnamed'}")
return task
def create_task(self, coro, *, name: str = None) -> asyncio.Task:
"""创建并注册一个任务"""
task = asyncio.create_task(coro, name=name)
return self.register_task(task, name)
def register_resource(self, resource: Any) -> Any:
"""注册一个需要清理的资源(如HTTP客户端、文件句柄等)"""
# 使用弱引用避免循环引用
self._resources.add(weakref.ref(resource))
log.debug(f"Registered resource: {type(resource).__name__}")
return resource
async def shutdown(self, timeout: float = 30.0):
"""关闭所有任务和资源"""
log.info("TaskManager shutdown initiated")
# 设置关闭标志
self._shutdown_event.set()
# 取消所有未完成的任务
cancelled_count = 0
for task in list(self._tasks):
if not task.done():
task.cancel()
cancelled_count += 1
if cancelled_count > 0:
log.info(f"Cancelled {cancelled_count} pending tasks")
# 等待所有任务完成(包括取消)
if self._tasks:
try:
await asyncio.wait_for(
asyncio.gather(*self._tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
log.warning(f"Some tasks did not complete within {timeout}s timeout")
# 清理资源
cleaned_resources = 0
for resource_ref in list(self._resources):
resource = resource_ref()
if resource is not None:
try:
if hasattr(resource, 'close'):
if asyncio.iscoroutinefunction(resource.close):
await resource.close()
else:
resource.close()
elif hasattr(resource, 'aclose'):
await resource.aclose()
cleaned_resources += 1
except Exception as e:
log.warning(f"Failed to close resource {type(resource).__name__}: {e}")
if cleaned_resources > 0:
log.info(f"Cleaned up {cleaned_resources} resources")
self._tasks.clear()
self._resources.clear()
log.info("TaskManager shutdown completed")
@property
def is_shutdown(self) -> bool:
"""检查是否已经开始关闭"""
return self._shutdown_event.is_set()
def get_stats(self) -> Dict[str, int]:
"""获取任务管理统计信息"""
return {
'active_tasks': len(self._tasks),
'registered_resources': len(self._resources),
'is_shutdown': self.is_shutdown
}
# 全局任务管理器实例
task_manager = TaskManager()
def create_managed_task(coro, *, name: str = None) -> asyncio.Task:
"""创建一个被管理的异步任务的便捷函数"""
return task_manager.create_task(coro, name=name)
def register_resource(resource: Any) -> Any:
"""注册资源的便捷函数"""
return task_manager.register_resource(resource)
async def shutdown_all_tasks(timeout: float = 30.0):
"""关闭所有任务的便捷函数"""
await task_manager.shutdown(timeout)