File size: 2,523 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional

logger = logging.getLogger(__name__)

class TaskStore:
    def __init__(self):
        self.tasks: Dict[str, Dict[str, Any]] = {}
    
    def create_task(self, task_type: str, params: Dict[str, Any] = None) -> str:
        """Create a new task and return its ID"""
        task_id = str(uuid.uuid4())
        self.tasks[task_id] = {
            "id": task_id,
            "type": task_type,
            "status": "pending",
            "created_at": datetime.now(timezone.utc).isoformat(),
            "updated_at": datetime.now(timezone.utc).isoformat(),
            "params": params or {},
            "result": None,
            "error": None,
            "progress": 0
        }
        return task_id
    
    def update_task(self, task_id: str, status: Optional[str] = None, 
                   result: Any = None, error: Optional[str] = None,
                   progress: Optional[int] = None) -> None:
        """Update a task's status and result"""
        if task_id not in self.tasks:
            # Instead of raising an error, just log it. This can happen in complex async scenarios.
            logger.warning(f"Attempted to update a non-existent task with ID {task_id}")
            return
        
        task = self.tasks[task_id]
        
        if status:
            task["status"] = status
        if result is not None:
            task["result"] = result
        if error is not None:
            task["error"] = error
        if progress is not None:
            task["progress"] = progress
        
        task["updated_at"] = datetime.now(timezone.utc).isoformat()
        logger.info(f"TASK_UPDATE: id={task_id}, status={status}, progress={progress}, message={task.get('error')}")
    
    def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
        """Get a task by ID"""
        return self.tasks.get(task_id)
    
    def list_tasks(self, task_type: Optional[str] = None, 
                  status: Optional[str] = None) -> List[Dict[str, Any]]:
        """List tasks with optional filtering"""
        tasks = list(self.tasks.values())
        
        if task_type:
            tasks = [t for t in tasks if t["type"] == task_type]
        if status:
            tasks = [t for t in tasks if t["status"] == status]
            
        return sorted(tasks, key=lambda t: t["created_at"], reverse=True)

# Create a global task store instance
task_store = TaskStore()