#!/usr/bin/env python """ Task Queue Service A lightweight task queue implementation that manages background tasks and tracks their status in the database. """ import asyncio import logging import uuid from typing import Dict, Any, Callable, Awaitable, Optional from datetime import datetime from sqlalchemy.orm import Session # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TaskQueue: """ Simple in-memory task queue manager. Tracks task status and manages asynchronous execution. """ def __init__(self): self.tasks: Dict[str, Dict[str, Any]] = {} self._running = True def generate_task_id(self) -> str: """Generate a unique task ID.""" return str(uuid.uuid4()) async def add_task(self, func: Callable[..., Awaitable[Any]], session: Session, **kwargs) -> str: """ Add a task to the queue. Args: func: Async function to execute session: Database session **kwargs: Arguments to pass to the function Returns: task_id: Unique identifier for the task """ task_id = self.generate_task_id() # Store task data self.tasks[task_id] = { "status": "queued", "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "progress": 0, "result": None, "error": None } # Create task in database try: # If you want to store task status in your database, do it here pass except Exception as e: logger.error(f"Error creating task record: {str(e)}") # Schedule the task to run asyncio.create_task(self._run_task(task_id, func, session, **kwargs)) return task_id async def _run_task(self, task_id: str, func: Callable[..., Awaitable[Any]], session: Session, **kwargs): """Execute a task and update its status.""" try: # Update task status self.tasks[task_id]["status"] = "running" self.tasks[task_id]["updated_at"] = datetime.utcnow() # Execute the task result = await func(task_id=task_id, session=session, **kwargs) # Update task status self.tasks[task_id]["status"] = "completed" self.tasks[task_id]["progress"] = 100 self.tasks[task_id]["result"] = result self.tasks[task_id]["updated_at"] = datetime.utcnow() logger.info(f"Task {task_id} completed successfully") except Exception as e: # Update task status on error logger.error(f"Task {task_id} failed: {str(e)}") self.tasks[task_id]["status"] = "failed" self.tasks[task_id]["error"] = str(e) self.tasks[task_id]["updated_at"] = datetime.utcnow() def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]: """Get the current status of a task.""" if task_id not in self.tasks: return None return self.tasks[task_id] def update_task_progress(self, task_id: str, progress: int, message: Optional[str] = None): """Update the progress of a task.""" if task_id in self.tasks: self.tasks[task_id]["progress"] = progress self.tasks[task_id]["updated_at"] = datetime.utcnow() if message: self.tasks[task_id]["message"] = message logger.debug(f"Task {task_id} progress: {progress}%") # Singleton instance task_queue = TaskQueue()