File size: 5,619 Bytes
ebdfd3b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | """
任务状态管理
用于跟踪长时间运行的任务(如图谱构建)
"""
import uuid
import threading
from datetime import datetime
from enum import Enum
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
class TaskStatus(str, Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待中
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
@dataclass
class Task:
"""任务数据类"""
task_id: str
task_type: str
status: TaskStatus
created_at: datetime
updated_at: datetime
progress: int = 0 # 总进度百分比 0-100
message: str = "" # 状态消息
result: Optional[Dict] = None # 任务结果
error: Optional[str] = None # 错误信息
metadata: Dict = field(default_factory=dict) # 额外元数据
progress_detail: Dict = field(default_factory=dict) # 详细进度信息
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"task_id": self.task_id,
"task_type": self.task_type,
"status": self.status.value,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"progress": self.progress,
"message": self.message,
"progress_detail": self.progress_detail,
"result": self.result,
"error": self.error,
"metadata": self.metadata,
}
class TaskManager:
"""
任务管理器
线程安全的任务状态管理
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
"""单例模式"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._tasks: Dict[str, Task] = {}
cls._instance._task_lock = threading.Lock()
return cls._instance
def create_task(self, task_type: str, metadata: Optional[Dict] = None) -> str:
"""
创建新任务
Args:
task_type: 任务类型
metadata: 额外元数据
Returns:
任务ID
"""
task_id = str(uuid.uuid4())
now = datetime.now()
task = Task(
task_id=task_id,
task_type=task_type,
status=TaskStatus.PENDING,
created_at=now,
updated_at=now,
metadata=metadata or {}
)
with self._task_lock:
self._tasks[task_id] = task
return task_id
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务"""
with self._task_lock:
return self._tasks.get(task_id)
def update_task(
self,
task_id: str,
status: Optional[TaskStatus] = None,
progress: Optional[int] = None,
message: Optional[str] = None,
result: Optional[Dict] = None,
error: Optional[str] = None,
progress_detail: Optional[Dict] = None
):
"""
更新任务状态
Args:
task_id: 任务ID
status: 新状态
progress: 进度
message: 消息
result: 结果
error: 错误信息
progress_detail: 详细进度信息
"""
with self._task_lock:
task = self._tasks.get(task_id)
if task:
task.updated_at = datetime.now()
if status is not None:
task.status = status
if progress is not None:
task.progress = progress
if message is not None:
task.message = message
if result is not None:
task.result = result
if error is not None:
task.error = error
if progress_detail is not None:
task.progress_detail = progress_detail
def complete_task(self, task_id: str, result: Dict):
"""标记任务完成"""
self.update_task(
task_id,
status=TaskStatus.COMPLETED,
progress=100,
message="任务完成",
result=result
)
def fail_task(self, task_id: str, error: str):
"""标记任务失败"""
self.update_task(
task_id,
status=TaskStatus.FAILED,
message="任务失败",
error=error
)
def list_tasks(self, task_type: Optional[str] = None) -> list:
"""列出任务"""
with self._task_lock:
tasks = list(self._tasks.values())
if task_type:
tasks = [t for t in tasks if t.task_type == task_type]
return [t.to_dict() for t in sorted(tasks, key=lambda x: x.created_at, reverse=True)]
def cleanup_old_tasks(self, max_age_hours: int = 24):
"""清理旧任务"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(hours=max_age_hours)
with self._task_lock:
old_ids = [
tid for tid, task in self._tasks.items()
if task.created_at < cutoff and task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
]
for tid in old_ids:
del self._tasks[tid]
|