File size: 4,656 Bytes
5868187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
Global task lifecycle management module
管理应用程序中所有异步任务的生命周期,确保正确清理
"""
import asyncio
import weakref
from typing import Set, Dict, Any
from log import log


class TaskManager:
    """全局异步任务管理器 - 单例模式"""
    
    _instance = None
    _lock = asyncio.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        
        self._tasks: Set[asyncio.Task] = set()
        self._resources: Set[Any] = set()  # 需要关闭的资源
        self._shutdown_event = asyncio.Event()
        self._initialized = True
        log.debug("TaskManager initialized")
    
    def register_task(self, task: asyncio.Task, description: str = None) -> asyncio.Task:
        """注册一个任务供生命周期管理"""
        self._tasks.add(task)
        task.add_done_callback(lambda t: self._tasks.discard(t))
        
        if description:
            task.set_name(description)
        
        log.debug(f"Registered task: {task.get_name() or 'unnamed'}")
        return task
    
    def create_task(self, coro, *, name: str = None) -> asyncio.Task:
        """创建并注册一个任务"""
        task = asyncio.create_task(coro, name=name)
        return self.register_task(task, name)
    
    def register_resource(self, resource: Any) -> Any:
        """注册一个需要清理的资源(如HTTP客户端、文件句柄等)"""
        # 使用弱引用避免循环引用
        self._resources.add(weakref.ref(resource))
        log.debug(f"Registered resource: {type(resource).__name__}")
        return resource
    
    async def shutdown(self, timeout: float = 30.0):
        """关闭所有任务和资源"""
        log.info("TaskManager shutdown initiated")
        
        # 设置关闭标志
        self._shutdown_event.set()
        
        # 取消所有未完成的任务
        cancelled_count = 0
        for task in list(self._tasks):
            if not task.done():
                task.cancel()
                cancelled_count += 1
        
        if cancelled_count > 0:
            log.info(f"Cancelled {cancelled_count} pending tasks")
        
        # 等待所有任务完成(包括取消)
        if self._tasks:
            try:
                await asyncio.wait_for(
                    asyncio.gather(*self._tasks, return_exceptions=True),
                    timeout=timeout
                )
            except asyncio.TimeoutError:
                log.warning(f"Some tasks did not complete within {timeout}s timeout")
        
        # 清理资源
        cleaned_resources = 0
        for resource_ref in list(self._resources):
            resource = resource_ref()
            if resource is not None:
                try:
                    if hasattr(resource, 'close'):
                        if asyncio.iscoroutinefunction(resource.close):
                            await resource.close()
                        else:
                            resource.close()
                    elif hasattr(resource, 'aclose'):
                        await resource.aclose()
                    cleaned_resources += 1
                except Exception as e:
                    log.warning(f"Failed to close resource {type(resource).__name__}: {e}")
        
        if cleaned_resources > 0:
            log.info(f"Cleaned up {cleaned_resources} resources")
        
        self._tasks.clear()
        self._resources.clear()
        log.info("TaskManager shutdown completed")
    
    @property
    def is_shutdown(self) -> bool:
        """检查是否已经开始关闭"""
        return self._shutdown_event.is_set()
    
    def get_stats(self) -> Dict[str, int]:
        """获取任务管理统计信息"""
        return {
            'active_tasks': len(self._tasks),
            'registered_resources': len(self._resources),
            'is_shutdown': self.is_shutdown
        }


# 全局任务管理器实例
task_manager = TaskManager()


def create_managed_task(coro, *, name: str = None) -> asyncio.Task:
    """创建一个被管理的异步任务的便捷函数"""
    return task_manager.create_task(coro, name=name)


def register_resource(resource: Any) -> Any:
    """注册资源的便捷函数"""
    return task_manager.register_resource(resource)


async def shutdown_all_tasks(timeout: float = 30.0):
    """关闭所有任务的便捷函数"""
    await task_manager.shutdown(timeout)