""" 任务状态管理 用于跟踪长时间运行的任务(如图谱构建) """ import uuid import threading from datetime import datetime from enum import Enum from typing import Dict, Any, Optional from dataclasses import dataclass, field class TaskStatus(str, Enum): """任务状态枚举""" PENDING = "pending" # 等待中 PROCESSING = "processing" # 处理中 COMPLETED = "completed" # 已完成 FAILED = "failed" # 失败 @dataclass class Task: """任务数据类""" task_id: str task_type: str status: TaskStatus created_at: datetime updated_at: datetime progress: int = 0 # 总进度百分比 0-100 message: str = "" # 状态消息 result: Optional[Dict] = None # 任务结果 error: Optional[str] = None # 错误信息 metadata: Dict = field(default_factory=dict) # 额外元数据 progress_detail: Dict = field(default_factory=dict) # 详细进度信息 def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "task_id": self.task_id, "task_type": self.task_type, "status": self.status.value, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), "progress": self.progress, "message": self.message, "progress_detail": self.progress_detail, "result": self.result, "error": self.error, "metadata": self.metadata, } class TaskManager: """ 任务管理器 线程安全的任务状态管理 """ _instance = None _lock = threading.Lock() def __new__(cls): """单例模式""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._tasks: Dict[str, Task] = {} cls._instance._task_lock = threading.Lock() return cls._instance def create_task(self, task_type: str, metadata: Optional[Dict] = None) -> str: """ 创建新任务 Args: task_type: 任务类型 metadata: 额外元数据 Returns: 任务ID """ task_id = str(uuid.uuid4()) now = datetime.now() task = Task( task_id=task_id, task_type=task_type, status=TaskStatus.PENDING, created_at=now, updated_at=now, metadata=metadata or {} ) with self._task_lock: self._tasks[task_id] = task return task_id def get_task(self, task_id: str) -> Optional[Task]: """获取任务""" with self._task_lock: return self._tasks.get(task_id) def update_task( self, task_id: str, status: Optional[TaskStatus] = None, progress: Optional[int] = None, message: Optional[str] = None, result: Optional[Dict] = None, error: Optional[str] = None, progress_detail: Optional[Dict] = None ): """ 更新任务状态 Args: task_id: 任务ID status: 新状态 progress: 进度 message: 消息 result: 结果 error: 错误信息 progress_detail: 详细进度信息 """ with self._task_lock: task = self._tasks.get(task_id) if task: task.updated_at = datetime.now() if status is not None: task.status = status if progress is not None: task.progress = progress if message is not None: task.message = message if result is not None: task.result = result if error is not None: task.error = error if progress_detail is not None: task.progress_detail = progress_detail def complete_task(self, task_id: str, result: Dict): """标记任务完成""" self.update_task( task_id, status=TaskStatus.COMPLETED, progress=100, message="任务完成", result=result ) def fail_task(self, task_id: str, error: str): """标记任务失败""" self.update_task( task_id, status=TaskStatus.FAILED, message="任务失败", error=error ) def list_tasks(self, task_type: Optional[str] = None) -> list: """列出任务""" with self._task_lock: tasks = list(self._tasks.values()) if task_type: tasks = [t for t in tasks if t.task_type == task_type] return [t.to_dict() for t in sorted(tasks, key=lambda x: x.created_at, reverse=True)] def cleanup_old_tasks(self, max_age_hours: int = 24): """清理旧任务""" from datetime import timedelta cutoff = datetime.now() - timedelta(hours=max_age_hours) with self._task_lock: old_ids = [ tid for tid, task in self._tasks.items() if task.created_at < cutoff and task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] ] for tid in old_ids: del self._tasks[tid]