File size: 4,809 Bytes
69fec20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Global task lifecycle management module
管理应用程序中所有异步任务的生命周期,确保正确清理
"""

import asyncio
import weakref
from typing import Any, Dict, Set

from log import log


class TaskManager:
    """全局异步任务管理器 - 单例模式"""

    _instance = None
    _lock = asyncio.Lock()

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return

        self._tasks: Set[asyncio.Task] = set()
        self._resources: Set[Any] = set()  # 需要关闭的资源
        self._shutdown_event = asyncio.Event()
        self._initialized = True
        log.debug("TaskManager initialized")

    def register_task(self, task: asyncio.Task, description: str = None) -> asyncio.Task:
        """注册一个任务供生命周期管理"""
        self._tasks.add(task)
        task.add_done_callback(lambda t: self._tasks.discard(t))

        if description:
            task.set_name(description)

        log.debug(f"Registered task: {task.get_name() or 'unnamed'}")
        return task

    def create_task(self, coro, *, name: str = None) -> asyncio.Task:
        """创建并注册一个任务"""
        task = asyncio.create_task(coro, name=name)
        return self.register_task(task, name)

    def register_resource(self, resource: Any) -> Any:
        """注册一个需要清理的资源(如HTTP客户端、文件句柄等)"""
        # 使用弱引用避免循环引用
        self._resources.add(weakref.ref(resource))
        log.debug(f"Registered resource: {type(resource).__name__}")
        return resource

    async def shutdown(self, timeout: float = 30.0):
        """关闭所有任务和资源"""
        log.info("TaskManager shutdown initiated")

        # 设置关闭标志
        self._shutdown_event.set()

        # 取消所有未完成的任务
        cancelled_count = 0
        for task in list(self._tasks):
            if not task.done():
                task.cancel()
                cancelled_count += 1

        if cancelled_count > 0:
            log.info(f"Cancelled {cancelled_count} pending tasks")

        # 等待所有任务完成(包括取消)
        if self._tasks:
            try:
                await asyncio.wait_for(
                    asyncio.gather(*self._tasks, return_exceptions=True), timeout=timeout
                )
            except asyncio.TimeoutError:
                log.warning(f"Some tasks did not complete within {timeout}s timeout")

        # 清理资源 - 改进弱引用处理
        cleaned_resources = 0
        failed_resources = 0
        for resource_ref in list(self._resources):
            resource = resource_ref()
            if resource is not None:
                try:
                    if hasattr(resource, "close"):
                        if asyncio.iscoroutinefunction(resource.close):
                            await resource.close()
                        else:
                            resource.close()
                    elif hasattr(resource, "aclose"):
                        await resource.aclose()
                    cleaned_resources += 1
                except Exception as e:
                    log.warning(f"Failed to close resource {type(resource).__name__}: {e}")
                    failed_resources += 1
            # 如果弱引用已失效,资源已经被自动回收,无需操作

        if cleaned_resources > 0:
            log.info(f"Cleaned up {cleaned_resources} resources")
        if failed_resources > 0:
            log.warning(f"Failed to clean {failed_resources} resources")

        self._tasks.clear()
        self._resources.clear()
        log.info("TaskManager shutdown completed")

    @property
    def is_shutdown(self) -> bool:
        """检查是否已经开始关闭"""
        return self._shutdown_event.is_set()

    def get_stats(self) -> Dict[str, int]:
        """获取任务管理统计信息"""
        return {
            "active_tasks": len(self._tasks),
            "registered_resources": len(self._resources),
            "is_shutdown": self.is_shutdown,
        }


# 全局任务管理器实例
task_manager = TaskManager()


def create_managed_task(coro, *, name: str = None) -> asyncio.Task:
    """创建一个被管理的异步任务的便捷函数"""
    return task_manager.create_task(coro, name=name)


def register_resource(resource: Any) -> Any:
    """注册资源的便捷函数"""
    return task_manager.register_resource(resource)


async def shutdown_all_tasks(timeout: float = 30.0):
    """关闭所有任务的便捷函数"""
    await task_manager.shutdown(timeout)