Spaces:
Sleeping
Sleeping
| """ | |
| Task ID Manager - Assigns unique IDs to each background task | |
| """ | |
| from datetime import datetime, timedelta | |
| import threading | |
| from typing import Optional | |
| import uuid | |
| class TaskIDManager: | |
| """Singleton class for managing task IDs""" | |
| _instance: Optional["TaskIDManager"] = None | |
| _lock = threading.Lock() | |
| _task_ids: dict[str, str] = {} # task_key -> task_id | |
| _task_metadata: dict[str, dict] = {} # task_id -> metadata | |
| def get_instance(cls) -> "TaskIDManager": | |
| """Get singleton instance""" | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = cls() | |
| return cls._instance | |
| def generate_task_id(self, task_type: str, task_key: str) -> str: | |
| """ | |
| Generate unique ID for task | |
| Args: | |
| task_type: Task type (e.g., 'kb_init', 'kb_upload', 'question_gen', 'solve', 'research') | |
| task_key: Task unique identifier (e.g., knowledge base name, question ID, etc.) | |
| Returns: | |
| Task ID (format: {task_type}_{timestamp}_{uuid}) | |
| """ | |
| with self._lock: | |
| # If task already exists, return existing ID | |
| if task_key in self._task_ids: | |
| return self._task_ids[task_key] | |
| # Generate new ID | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| unique_id = str(uuid.uuid4())[:8] | |
| task_id = f"{task_type}_{timestamp}_{unique_id}" | |
| # Save mapping and metadata | |
| self._task_ids[task_key] = task_id | |
| self._task_metadata[task_id] = { | |
| "task_type": task_type, | |
| "task_key": task_key, | |
| "created_at": datetime.now().isoformat(), | |
| "status": "running", | |
| } | |
| return task_id | |
| def get_task_id(self, task_key: str) -> str | None: | |
| """Get task ID""" | |
| with self._lock: | |
| return self._task_ids.get(task_key) | |
| def update_task_status(self, task_id: str, status: str, **kwargs): | |
| """Update task status""" | |
| with self._lock: | |
| if task_id in self._task_metadata: | |
| self._task_metadata[task_id]["status"] = status | |
| self._task_metadata[task_id].update(kwargs) | |
| if status in ["completed", "error", "cancelled"]: | |
| self._task_metadata[task_id]["finished_at"] = datetime.now().isoformat() | |
| def get_task_metadata(self, task_id: str) -> dict | None: | |
| """Get task metadata""" | |
| with self._lock: | |
| return self._task_metadata.get(task_id, {}).copy() | |
| def cleanup_old_tasks(self, max_age_hours: int = 24): | |
| """Clean up old tasks (completed tasks older than specified hours)""" | |
| with self._lock: | |
| cutoff = datetime.now() - timedelta(hours=max_age_hours) | |
| to_remove = [] | |
| for task_id, metadata in self._task_metadata.items(): | |
| if metadata.get("status") in ["completed", "error", "cancelled"]: | |
| finished_at = metadata.get("finished_at") | |
| if finished_at: | |
| try: | |
| finished_time = datetime.fromisoformat(finished_at) | |
| if finished_time < cutoff: | |
| to_remove.append(task_id) | |
| except: | |
| pass | |
| for task_id in to_remove: | |
| metadata = self._task_metadata.pop(task_id, {}) | |
| task_key = metadata.get("task_key") | |
| if task_key: | |
| self._task_ids.pop(task_key, None) | |