AgentGraph / backend /services /task_queue.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
#!/usr/bin/env python
"""
Task Queue Service
A lightweight task queue implementation that manages background tasks
and tracks their status in the database.
"""
import asyncio
import logging
import uuid
from typing import Dict, Any, Callable, Awaitable, Optional
from datetime import datetime
from sqlalchemy.orm import Session
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TaskQueue:
"""
Simple in-memory task queue manager.
Tracks task status and manages asynchronous execution.
"""
def __init__(self):
self.tasks: Dict[str, Dict[str, Any]] = {}
self._running = True
def generate_task_id(self) -> str:
"""Generate a unique task ID."""
return str(uuid.uuid4())
async def add_task(self,
func: Callable[..., Awaitable[Any]],
session: Session,
**kwargs) -> str:
"""
Add a task to the queue.
Args:
func: Async function to execute
session: Database session
**kwargs: Arguments to pass to the function
Returns:
task_id: Unique identifier for the task
"""
task_id = self.generate_task_id()
# Store task data
self.tasks[task_id] = {
"status": "queued",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"progress": 0,
"result": None,
"error": None
}
# Create task in database
try:
# If you want to store task status in your database, do it here
pass
except Exception as e:
logger.error(f"Error creating task record: {str(e)}")
# Schedule the task to run
asyncio.create_task(self._run_task(task_id, func, session, **kwargs))
return task_id
async def _run_task(self,
task_id: str,
func: Callable[..., Awaitable[Any]],
session: Session,
**kwargs):
"""Execute a task and update its status."""
try:
# Update task status
self.tasks[task_id]["status"] = "running"
self.tasks[task_id]["updated_at"] = datetime.utcnow()
# Execute the task
result = await func(task_id=task_id, session=session, **kwargs)
# Update task status
self.tasks[task_id]["status"] = "completed"
self.tasks[task_id]["progress"] = 100
self.tasks[task_id]["result"] = result
self.tasks[task_id]["updated_at"] = datetime.utcnow()
logger.info(f"Task {task_id} completed successfully")
except Exception as e:
# Update task status on error
logger.error(f"Task {task_id} failed: {str(e)}")
self.tasks[task_id]["status"] = "failed"
self.tasks[task_id]["error"] = str(e)
self.tasks[task_id]["updated_at"] = datetime.utcnow()
def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Get the current status of a task."""
if task_id not in self.tasks:
return None
return self.tasks[task_id]
def update_task_progress(self, task_id: str, progress: int, message: Optional[str] = None):
"""Update the progress of a task."""
if task_id in self.tasks:
self.tasks[task_id]["progress"] = progress
self.tasks[task_id]["updated_at"] = datetime.utcnow()
if message:
self.tasks[task_id]["message"] = message
logger.debug(f"Task {task_id} progress: {progress}%")
# Singleton instance
task_queue = TaskQueue()