Spaces:
Running
Running
| #!/usr/bin/env python | |
| """ | |
| Task Router | |
| This module provides API endpoints for task management and status tracking. | |
| It implements a simple in-memory task queue with status tracking. | |
| """ | |
| import logging | |
| import asyncio | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List, Optional, Callable, Awaitable, Union | |
| from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Path, Body | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from backend.database import get_db | |
| from sqlalchemy.orm import Session | |
| from backend.services.task_queue import TaskQueue | |
| from backend.services.task_service import get_task_data | |
| from backend.services.task_store_service import task_store | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter( | |
| prefix="/api/tasks", | |
| tags=["tasks"], | |
| ) | |
| # Simple in-memory task store has been moved to backend/services/task_store_service.py | |
| # Task queue for background processing | |
| class TaskQueue: | |
| def __init__(self): | |
| self.queue: Dict[str, asyncio.Task] = {} | |
| async def add_task(self, task_id: str, | |
| coro: Awaitable, | |
| on_complete: Optional[Callable[[str, Any], None]] = None, | |
| on_error: Optional[Callable[[str, Exception], None]] = None) -> None: | |
| """Add a task to the queue and execute it""" | |
| async def _wrapped_task(): | |
| try: | |
| result = await coro | |
| task_store.update_task(task_id, status="completed", result=result) | |
| if on_complete: | |
| on_complete(task_id, result) | |
| except Exception as e: | |
| logger.error(f"Task {task_id} failed: {str(e)}") | |
| error_message = str(e) | |
| task_store.update_task(task_id, status="failed", error=error_message) | |
| if on_error: | |
| on_error(task_id, e) | |
| finally: | |
| # Remove task from queue | |
| if task_id in self.queue: | |
| del self.queue[task_id] | |
| # Create and start the task | |
| self.queue[task_id] = asyncio.create_task(_wrapped_task()) | |
| def cancel_task(self, task_id: str) -> bool: | |
| """Cancel a running task""" | |
| if task_id in self.queue and not self.queue[task_id].done(): | |
| self.queue[task_id].cancel() | |
| task_store.update_task(task_id, status="cancelled") | |
| return True | |
| return False | |
| # Create a global task queue instance | |
| task_queue = TaskQueue() | |
| async def get_task_status_endpoint(task_id: str): | |
| """Get the status of a background task.""" | |
| try: | |
| task_data = get_task_data(task_id) | |
| if not task_data: | |
| return JSONResponse( | |
| status_code=404, | |
| content={"status": "not_found", "message": f"Task with ID {task_id} not found"} | |
| ) | |
| return task_data | |
| except Exception as e: | |
| logger.error(f"Error getting task status: {e}") | |
| return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) | |
| async def get_task_endpoint(task_id: str): | |
| """Get detailed information about a task by its ID.""" | |
| task = task_store.get_task(task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") | |
| return task | |
| async def cancel_task_endpoint(task_id: str): | |
| """ | |
| Cancel a running task. | |
| Args: | |
| task_id: ID of the task | |
| Returns: | |
| Cancellation status | |
| """ | |
| task = task_store.get_task(task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") | |
| if task["status"] in ["completed", "failed", "cancelled"]: | |
| return {"message": f"Task is already in {task['status']} state and cannot be cancelled"} | |
| cancelled = task_queue.cancel_task(task_id) | |
| if not cancelled: | |
| task_store.update_task(task_id, status="cancelled") | |
| return {"message": "Task cancelled successfully"} | |
| async def list_tasks( | |
| task_type: Optional[str] = None, | |
| status: Optional[str] = None, | |
| limit: Optional[int] = None | |
| ): | |
| """ | |
| List all tasks with optional filtering. | |
| Args: | |
| task_type: Filter by task type | |
| status: Filter by task status | |
| limit: Limit the number of tasks returned | |
| Returns: | |
| List of tasks with detailed information | |
| """ | |
| tasks = task_store.list_tasks(task_type=task_type, status=status) | |
| # Sort tasks by creation time (newest first) | |
| tasks.sort(key=lambda t: t["created_at"], reverse=True) | |
| # Limit results if requested | |
| if limit and isinstance(limit, int) and limit > 0: | |
| tasks = tasks[:limit] | |
| # Enhance task information | |
| for task in tasks: | |
| # Ensure progress is an integer | |
| if "progress" in task and task["progress"] is not None: | |
| task["progress"] = int(task["progress"]) | |
| else: | |
| task["progress"] = 0 | |
| # Calculate time elapsed | |
| if "created_at" in task and "updated_at" in task: | |
| try: | |
| created = datetime.fromisoformat(task["created_at"]) | |
| updated = datetime.fromisoformat(task["updated_at"]) | |
| elapsed_seconds = (updated - created).total_seconds() | |
| task["elapsed_seconds"] = elapsed_seconds | |
| # Format elapsed time as HH:MM:SS | |
| hours, remainder = divmod(int(elapsed_seconds), 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| task["elapsed_formatted"] = f"{hours:02}:{minutes:02}:{seconds:02}" | |
| except Exception as e: | |
| logger.error(f"Error calculating elapsed time: {str(e)}") | |
| return {"tasks": tasks, "total": len(tasks)} |