Spaces:
Running
Running
| #!/usr/bin/env python | |
| """ | |
| Task Queue Service | |
| A lightweight task queue implementation that manages background tasks | |
| and tracks their status in the database. | |
| """ | |
| import asyncio | |
| import logging | |
| import uuid | |
| from typing import Dict, Any, Callable, Awaitable, Optional | |
| from datetime import datetime | |
| from sqlalchemy.orm import Session | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TaskQueue: | |
| """ | |
| Simple in-memory task queue manager. | |
| Tracks task status and manages asynchronous execution. | |
| """ | |
| def __init__(self): | |
| self.tasks: Dict[str, Dict[str, Any]] = {} | |
| self._running = True | |
| def generate_task_id(self) -> str: | |
| """Generate a unique task ID.""" | |
| return str(uuid.uuid4()) | |
| async def add_task(self, | |
| func: Callable[..., Awaitable[Any]], | |
| session: Session, | |
| **kwargs) -> str: | |
| """ | |
| Add a task to the queue. | |
| Args: | |
| func: Async function to execute | |
| session: Database session | |
| **kwargs: Arguments to pass to the function | |
| Returns: | |
| task_id: Unique identifier for the task | |
| """ | |
| task_id = self.generate_task_id() | |
| # Store task data | |
| self.tasks[task_id] = { | |
| "status": "queued", | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow(), | |
| "progress": 0, | |
| "result": None, | |
| "error": None | |
| } | |
| # Create task in database | |
| try: | |
| # If you want to store task status in your database, do it here | |
| pass | |
| except Exception as e: | |
| logger.error(f"Error creating task record: {str(e)}") | |
| # Schedule the task to run | |
| asyncio.create_task(self._run_task(task_id, func, session, **kwargs)) | |
| return task_id | |
| async def _run_task(self, | |
| task_id: str, | |
| func: Callable[..., Awaitable[Any]], | |
| session: Session, | |
| **kwargs): | |
| """Execute a task and update its status.""" | |
| try: | |
| # Update task status | |
| self.tasks[task_id]["status"] = "running" | |
| self.tasks[task_id]["updated_at"] = datetime.utcnow() | |
| # Execute the task | |
| result = await func(task_id=task_id, session=session, **kwargs) | |
| # Update task status | |
| self.tasks[task_id]["status"] = "completed" | |
| self.tasks[task_id]["progress"] = 100 | |
| self.tasks[task_id]["result"] = result | |
| self.tasks[task_id]["updated_at"] = datetime.utcnow() | |
| logger.info(f"Task {task_id} completed successfully") | |
| except Exception as e: | |
| # Update task status on error | |
| logger.error(f"Task {task_id} failed: {str(e)}") | |
| self.tasks[task_id]["status"] = "failed" | |
| self.tasks[task_id]["error"] = str(e) | |
| self.tasks[task_id]["updated_at"] = datetime.utcnow() | |
| def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]: | |
| """Get the current status of a task.""" | |
| if task_id not in self.tasks: | |
| return None | |
| return self.tasks[task_id] | |
| def update_task_progress(self, task_id: str, progress: int, message: Optional[str] = None): | |
| """Update the progress of a task.""" | |
| if task_id in self.tasks: | |
| self.tasks[task_id]["progress"] = progress | |
| self.tasks[task_id]["updated_at"] = datetime.utcnow() | |
| if message: | |
| self.tasks[task_id]["message"] = message | |
| logger.debug(f"Task {task_id} progress: {progress}%") | |
| # Singleton instance | |
| task_queue = TaskQueue() |