| """ |
| 任务状态管理 |
| 用于跟踪长时间运行的任务(如图谱构建) |
| """ |
|
|
| import uuid |
| import threading |
| from datetime import datetime |
| from enum import Enum |
| from typing import Dict, Any, Optional |
| from dataclasses import dataclass, field |
|
|
|
|
| class TaskStatus(str, Enum): |
| """任务状态枚举""" |
| PENDING = "pending" |
| PROCESSING = "processing" |
| COMPLETED = "completed" |
| FAILED = "failed" |
|
|
|
|
| @dataclass |
| class Task: |
| """任务数据类""" |
| task_id: str |
| task_type: str |
| status: TaskStatus |
| created_at: datetime |
| updated_at: datetime |
| progress: int = 0 |
| message: str = "" |
| result: Optional[Dict] = None |
| error: Optional[str] = None |
| metadata: Dict = field(default_factory=dict) |
| progress_detail: Dict = field(default_factory=dict) |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """转换为字典""" |
| return { |
| "task_id": self.task_id, |
| "task_type": self.task_type, |
| "status": self.status.value, |
| "created_at": self.created_at.isoformat(), |
| "updated_at": self.updated_at.isoformat(), |
| "progress": self.progress, |
| "message": self.message, |
| "progress_detail": self.progress_detail, |
| "result": self.result, |
| "error": self.error, |
| "metadata": self.metadata, |
| } |
|
|
|
|
| class TaskManager: |
| """ |
| 任务管理器 |
| 线程安全的任务状态管理 |
| """ |
| |
| _instance = None |
| _lock = threading.Lock() |
| |
| def __new__(cls): |
| """单例模式""" |
| if cls._instance is None: |
| with cls._lock: |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| cls._instance._tasks: Dict[str, Task] = {} |
| cls._instance._task_lock = threading.Lock() |
| return cls._instance |
| |
| def create_task(self, task_type: str, metadata: Optional[Dict] = None) -> str: |
| """ |
| 创建新任务 |
| |
| Args: |
| task_type: 任务类型 |
| metadata: 额外元数据 |
| |
| Returns: |
| 任务ID |
| """ |
| task_id = str(uuid.uuid4()) |
| now = datetime.now() |
| |
| task = Task( |
| task_id=task_id, |
| task_type=task_type, |
| status=TaskStatus.PENDING, |
| created_at=now, |
| updated_at=now, |
| metadata=metadata or {} |
| ) |
| |
| with self._task_lock: |
| self._tasks[task_id] = task |
| |
| return task_id |
| |
| def get_task(self, task_id: str) -> Optional[Task]: |
| """获取任务""" |
| with self._task_lock: |
| return self._tasks.get(task_id) |
| |
| def update_task( |
| self, |
| task_id: str, |
| status: Optional[TaskStatus] = None, |
| progress: Optional[int] = None, |
| message: Optional[str] = None, |
| result: Optional[Dict] = None, |
| error: Optional[str] = None, |
| progress_detail: Optional[Dict] = None |
| ): |
| """ |
| 更新任务状态 |
| |
| Args: |
| task_id: 任务ID |
| status: 新状态 |
| progress: 进度 |
| message: 消息 |
| result: 结果 |
| error: 错误信息 |
| progress_detail: 详细进度信息 |
| """ |
| with self._task_lock: |
| task = self._tasks.get(task_id) |
| if task: |
| task.updated_at = datetime.now() |
| if status is not None: |
| task.status = status |
| if progress is not None: |
| task.progress = progress |
| if message is not None: |
| task.message = message |
| if result is not None: |
| task.result = result |
| if error is not None: |
| task.error = error |
| if progress_detail is not None: |
| task.progress_detail = progress_detail |
| |
| def complete_task(self, task_id: str, result: Dict): |
| """标记任务完成""" |
| self.update_task( |
| task_id, |
| status=TaskStatus.COMPLETED, |
| progress=100, |
| message="任务完成", |
| result=result |
| ) |
| |
| def fail_task(self, task_id: str, error: str): |
| """标记任务失败""" |
| self.update_task( |
| task_id, |
| status=TaskStatus.FAILED, |
| message="任务失败", |
| error=error |
| ) |
| |
| def list_tasks(self, task_type: Optional[str] = None) -> list: |
| """列出任务""" |
| with self._task_lock: |
| tasks = list(self._tasks.values()) |
| if task_type: |
| tasks = [t for t in tasks if t.task_type == task_type] |
| return [t.to_dict() for t in sorted(tasks, key=lambda x: x.created_at, reverse=True)] |
| |
| def cleanup_old_tasks(self, max_age_hours: int = 24): |
| """清理旧任务""" |
| from datetime import timedelta |
| cutoff = datetime.now() - timedelta(hours=max_age_hours) |
| |
| with self._task_lock: |
| old_ids = [ |
| tid for tid, task in self._tasks.items() |
| if task.created_at < cutoff and task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] |
| ] |
| for tid in old_ids: |
| del self._tasks[tid] |
|
|
|
|