Spaces:
Sleeping
Sleeping
| """ | |
| 任务状态管理 | |
| 用于跟踪长时间运行的任务(如图谱构建) | |
| """ | |
| import uuid | |
| import threading | |
| from datetime import datetime | |
| from enum import Enum | |
| from typing import Dict, Any, Optional | |
| from dataclasses import dataclass, field | |
| class TaskStatus(str, Enum): | |
| """任务状态枚举""" | |
| PENDING = "pending" # 等待中 | |
| PROCESSING = "processing" # 处理中 | |
| COMPLETED = "completed" # 已完成 | |
| FAILED = "failed" # 失败 | |
| class Task: | |
| """任务数据类""" | |
| task_id: str | |
| task_type: str | |
| status: TaskStatus | |
| created_at: datetime | |
| updated_at: datetime | |
| progress: int = 0 # 总进度百分比 0-100 | |
| message: str = "" # 状态消息 | |
| result: Optional[Dict] = None # 任务结果 | |
| error: Optional[str] = None # 错误信息 | |
| metadata: Dict = field(default_factory=dict) # 额外元数据 | |
| progress_detail: Dict = field(default_factory=dict) # 详细进度信息 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """转换为字典""" | |
| return { | |
| "task_id": self.task_id, | |
| "task_type": self.task_type, | |
| "status": self.status.value, | |
| "created_at": self.created_at.isoformat(), | |
| "updated_at": self.updated_at.isoformat(), | |
| "progress": self.progress, | |
| "message": self.message, | |
| "progress_detail": self.progress_detail, | |
| "result": self.result, | |
| "error": self.error, | |
| "metadata": self.metadata, | |
| } | |
| class TaskManager: | |
| """ | |
| 任务管理器 | |
| 线程安全的任务状态管理 | |
| """ | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| """单例模式""" | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._tasks: Dict[str, Task] = {} | |
| cls._instance._task_lock = threading.Lock() | |
| return cls._instance | |
| def create_task(self, task_type: str, metadata: Optional[Dict] = None) -> str: | |
| """ | |
| 创建新任务 | |
| Args: | |
| task_type: 任务类型 | |
| metadata: 额外元数据 | |
| Returns: | |
| 任务ID | |
| """ | |
| task_id = str(uuid.uuid4()) | |
| now = datetime.now() | |
| task = Task( | |
| task_id=task_id, | |
| task_type=task_type, | |
| status=TaskStatus.PENDING, | |
| created_at=now, | |
| updated_at=now, | |
| metadata=metadata or {} | |
| ) | |
| with self._task_lock: | |
| self._tasks[task_id] = task | |
| return task_id | |
| def get_task(self, task_id: str) -> Optional[Task]: | |
| """获取任务""" | |
| with self._task_lock: | |
| return self._tasks.get(task_id) | |
| def update_task( | |
| self, | |
| task_id: str, | |
| status: Optional[TaskStatus] = None, | |
| progress: Optional[int] = None, | |
| message: Optional[str] = None, | |
| result: Optional[Dict] = None, | |
| error: Optional[str] = None, | |
| progress_detail: Optional[Dict] = None | |
| ): | |
| """ | |
| 更新任务状态 | |
| Args: | |
| task_id: 任务ID | |
| status: 新状态 | |
| progress: 进度 | |
| message: 消息 | |
| result: 结果 | |
| error: 错误信息 | |
| progress_detail: 详细进度信息 | |
| """ | |
| with self._task_lock: | |
| task = self._tasks.get(task_id) | |
| if task: | |
| task.updated_at = datetime.now() | |
| if status is not None: | |
| task.status = status | |
| if progress is not None: | |
| task.progress = progress | |
| if message is not None: | |
| task.message = message | |
| if result is not None: | |
| task.result = result | |
| if error is not None: | |
| task.error = error | |
| if progress_detail is not None: | |
| task.progress_detail = progress_detail | |
| def complete_task(self, task_id: str, result: Dict): | |
| """标记任务完成""" | |
| self.update_task( | |
| task_id, | |
| status=TaskStatus.COMPLETED, | |
| progress=100, | |
| message="任务完成", | |
| result=result | |
| ) | |
| def fail_task(self, task_id: str, error: str): | |
| """标记任务失败""" | |
| self.update_task( | |
| task_id, | |
| status=TaskStatus.FAILED, | |
| message="任务失败", | |
| error=error | |
| ) | |
| def list_tasks(self, task_type: Optional[str] = None) -> list: | |
| """列出任务""" | |
| with self._task_lock: | |
| tasks = list(self._tasks.values()) | |
| if task_type: | |
| tasks = [t for t in tasks if t.task_type == task_type] | |
| return [t.to_dict() for t in sorted(tasks, key=lambda x: x.created_at, reverse=True)] | |
| def cleanup_old_tasks(self, max_age_hours: int = 24): | |
| """清理旧任务""" | |
| from datetime import timedelta | |
| cutoff = datetime.now() - timedelta(hours=max_age_hours) | |
| with self._task_lock: | |
| old_ids = [ | |
| tid for tid, task in self._tasks.items() | |
| if task.created_at < cutoff and task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] | |
| ] | |
| for tid in old_ids: | |
| del self._tasks[tid] | |