Spaces:
Sleeping
Sleeping
File size: 2,523 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
class TaskStore:
def __init__(self):
self.tasks: Dict[str, Dict[str, Any]] = {}
def create_task(self, task_type: str, params: Dict[str, Any] = None) -> str:
"""Create a new task and return its ID"""
task_id = str(uuid.uuid4())
self.tasks[task_id] = {
"id": task_id,
"type": task_type,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"params": params or {},
"result": None,
"error": None,
"progress": 0
}
return task_id
def update_task(self, task_id: str, status: Optional[str] = None,
result: Any = None, error: Optional[str] = None,
progress: Optional[int] = None) -> None:
"""Update a task's status and result"""
if task_id not in self.tasks:
# Instead of raising an error, just log it. This can happen in complex async scenarios.
logger.warning(f"Attempted to update a non-existent task with ID {task_id}")
return
task = self.tasks[task_id]
if status:
task["status"] = status
if result is not None:
task["result"] = result
if error is not None:
task["error"] = error
if progress is not None:
task["progress"] = progress
task["updated_at"] = datetime.now(timezone.utc).isoformat()
logger.info(f"TASK_UPDATE: id={task_id}, status={status}, progress={progress}, message={task.get('error')}")
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Get a task by ID"""
return self.tasks.get(task_id)
def list_tasks(self, task_type: Optional[str] = None,
status: Optional[str] = None) -> List[Dict[str, Any]]:
"""List tasks with optional filtering"""
tasks = list(self.tasks.values())
if task_type:
tasks = [t for t in tasks if t["type"] == task_type]
if status:
tasks = [t for t in tasks if t["status"] == status]
return sorted(tasks, key=lambda t: t["created_at"], reverse=True)
# Create a global task store instance
task_store = TaskStore() |