Spaces:
Sleeping
Sleeping
File size: 3,687 Bytes
5df8a73 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | """
Task ID Manager - Assigns unique IDs to each background task
"""
from datetime import datetime, timedelta
import threading
from typing import Optional
import uuid
class TaskIDManager:
"""Singleton class for managing task IDs"""
_instance: Optional["TaskIDManager"] = None
_lock = threading.Lock()
_task_ids: dict[str, str] = {} # task_key -> task_id
_task_metadata: dict[str, dict] = {} # task_id -> metadata
@classmethod
def get_instance(cls) -> "TaskIDManager":
"""Get singleton instance"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def generate_task_id(self, task_type: str, task_key: str) -> str:
"""
Generate unique ID for task
Args:
task_type: Task type (e.g., 'kb_init', 'kb_upload', 'question_gen', 'solve', 'research')
task_key: Task unique identifier (e.g., knowledge base name, question ID, etc.)
Returns:
Task ID (format: {task_type}_{timestamp}_{uuid})
"""
with self._lock:
# If task already exists, return existing ID
if task_key in self._task_ids:
return self._task_ids[task_key]
# Generate new ID
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
task_id = f"{task_type}_{timestamp}_{unique_id}"
# Save mapping and metadata
self._task_ids[task_key] = task_id
self._task_metadata[task_id] = {
"task_type": task_type,
"task_key": task_key,
"created_at": datetime.now().isoformat(),
"status": "running",
}
return task_id
def get_task_id(self, task_key: str) -> str | None:
"""Get task ID"""
with self._lock:
return self._task_ids.get(task_key)
def update_task_status(self, task_id: str, status: str, **kwargs):
"""Update task status"""
with self._lock:
if task_id in self._task_metadata:
self._task_metadata[task_id]["status"] = status
self._task_metadata[task_id].update(kwargs)
if status in ["completed", "error", "cancelled"]:
self._task_metadata[task_id]["finished_at"] = datetime.now().isoformat()
def get_task_metadata(self, task_id: str) -> dict | None:
"""Get task metadata"""
with self._lock:
return self._task_metadata.get(task_id, {}).copy()
def cleanup_old_tasks(self, max_age_hours: int = 24):
"""Clean up old tasks (completed tasks older than specified hours)"""
with self._lock:
cutoff = datetime.now() - timedelta(hours=max_age_hours)
to_remove = []
for task_id, metadata in self._task_metadata.items():
if metadata.get("status") in ["completed", "error", "cancelled"]:
finished_at = metadata.get("finished_at")
if finished_at:
try:
finished_time = datetime.fromisoformat(finished_at)
if finished_time < cutoff:
to_remove.append(task_id)
except:
pass
for task_id in to_remove:
metadata = self._task_metadata.pop(task_id, {})
task_key = metadata.get("task_key")
if task_key:
self._task_ids.pop(task_key, None)
|