import logging import uuid from datetime import datetime, timezone from typing import Dict, Any, List, Optional logger = logging.getLogger(__name__) class TaskStore: def __init__(self): self.tasks: Dict[str, Dict[str, Any]] = {} def create_task(self, task_type: str, params: Dict[str, Any] = None) -> str: """Create a new task and return its ID""" task_id = str(uuid.uuid4()) self.tasks[task_id] = { "id": task_id, "type": task_type, "status": "pending", "created_at": datetime.now(timezone.utc).isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(), "params": params or {}, "result": None, "error": None, "progress": 0 } return task_id def update_task(self, task_id: str, status: Optional[str] = None, result: Any = None, error: Optional[str] = None, progress: Optional[int] = None) -> None: """Update a task's status and result""" if task_id not in self.tasks: # Instead of raising an error, just log it. This can happen in complex async scenarios. logger.warning(f"Attempted to update a non-existent task with ID {task_id}") return task = self.tasks[task_id] if status: task["status"] = status if result is not None: task["result"] = result if error is not None: task["error"] = error if progress is not None: task["progress"] = progress task["updated_at"] = datetime.now(timezone.utc).isoformat() logger.info(f"TASK_UPDATE: id={task_id}, status={status}, progress={progress}, message={task.get('error')}") def get_task(self, task_id: str) -> Optional[Dict[str, Any]]: """Get a task by ID""" return self.tasks.get(task_id) def list_tasks(self, task_type: Optional[str] = None, status: Optional[str] = None) -> List[Dict[str, Any]]: """List tasks with optional filtering""" tasks = list(self.tasks.values()) if task_type: tasks = [t for t in tasks if t["type"] == task_type] if status: tasks = [t for t in tasks if t["status"] == status] return sorted(tasks, key=lambda t: t["created_at"], reverse=True) # Create a global task store instance task_store = TaskStore()