Spaces:
Running
Running
| import logging | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List, Optional | |
| logger = logging.getLogger(__name__) | |
| class TaskStore: | |
| def __init__(self): | |
| self.tasks: Dict[str, Dict[str, Any]] = {} | |
| def create_task(self, task_type: str, params: Dict[str, Any] = None) -> str: | |
| """Create a new task and return its ID""" | |
| task_id = str(uuid.uuid4()) | |
| self.tasks[task_id] = { | |
| "id": task_id, | |
| "type": task_type, | |
| "status": "pending", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "updated_at": datetime.now(timezone.utc).isoformat(), | |
| "params": params or {}, | |
| "result": None, | |
| "error": None, | |
| "progress": 0 | |
| } | |
| return task_id | |
| def update_task(self, task_id: str, status: Optional[str] = None, | |
| result: Any = None, error: Optional[str] = None, | |
| progress: Optional[int] = None) -> None: | |
| """Update a task's status and result""" | |
| if task_id not in self.tasks: | |
| # Instead of raising an error, just log it. This can happen in complex async scenarios. | |
| logger.warning(f"Attempted to update a non-existent task with ID {task_id}") | |
| return | |
| task = self.tasks[task_id] | |
| if status: | |
| task["status"] = status | |
| if result is not None: | |
| task["result"] = result | |
| if error is not None: | |
| task["error"] = error | |
| if progress is not None: | |
| task["progress"] = progress | |
| task["updated_at"] = datetime.now(timezone.utc).isoformat() | |
| logger.info(f"TASK_UPDATE: id={task_id}, status={status}, progress={progress}, message={task.get('error')}") | |
| def get_task(self, task_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a task by ID""" | |
| return self.tasks.get(task_id) | |
| def list_tasks(self, task_type: Optional[str] = None, | |
| status: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """List tasks with optional filtering""" | |
| tasks = list(self.tasks.values()) | |
| if task_type: | |
| tasks = [t for t in tasks if t["type"] == task_type] | |
| if status: | |
| tasks = [t for t in tasks if t["status"] == status] | |
| return sorted(tasks, key=lambda t: t["created_at"], reverse=True) | |
| # Create a global task store instance | |
| task_store = TaskStore() |