AgentGraph / backend /services /task_store_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
class TaskStore:
def __init__(self):
self.tasks: Dict[str, Dict[str, Any]] = {}
def create_task(self, task_type: str, params: Dict[str, Any] = None) -> str:
"""Create a new task and return its ID"""
task_id = str(uuid.uuid4())
self.tasks[task_id] = {
"id": task_id,
"type": task_type,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"params": params or {},
"result": None,
"error": None,
"progress": 0
}
return task_id
def update_task(self, task_id: str, status: Optional[str] = None,
result: Any = None, error: Optional[str] = None,
progress: Optional[int] = None) -> None:
"""Update a task's status and result"""
if task_id not in self.tasks:
# Instead of raising an error, just log it. This can happen in complex async scenarios.
logger.warning(f"Attempted to update a non-existent task with ID {task_id}")
return
task = self.tasks[task_id]
if status:
task["status"] = status
if result is not None:
task["result"] = result
if error is not None:
task["error"] = error
if progress is not None:
task["progress"] = progress
task["updated_at"] = datetime.now(timezone.utc).isoformat()
logger.info(f"TASK_UPDATE: id={task_id}, status={status}, progress={progress}, message={task.get('error')}")
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Get a task by ID"""
return self.tasks.get(task_id)
def list_tasks(self, task_type: Optional[str] = None,
status: Optional[str] = None) -> List[Dict[str, Any]]:
"""List tasks with optional filtering"""
tasks = list(self.tasks.values())
if task_type:
tasks = [t for t in tasks if t["type"] == task_type]
if status:
tasks = [t for t in tasks if t["status"] == status]
return sorted(tasks, key=lambda t: t["created_at"], reverse=True)
# Create a global task store instance
task_store = TaskStore()