#!/usr/bin/env python """ Task Router This module provides API endpoints for task management and status tracking. It implements a simple in-memory task queue with status tracking. """ import logging import asyncio import uuid from datetime import datetime, timezone from typing import Dict, Any, List, Optional, Callable, Awaitable, Union from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Path, Body from fastapi.responses import JSONResponse from pydantic import BaseModel from backend.database import get_db from sqlalchemy.orm import Session from backend.services.task_queue import TaskQueue from backend.services.task_service import get_task_data from backend.services.task_store_service import task_store # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/tasks", tags=["tasks"], ) # Simple in-memory task store has been moved to backend/services/task_store_service.py # Task queue for background processing class TaskQueue: def __init__(self): self.queue: Dict[str, asyncio.Task] = {} async def add_task(self, task_id: str, coro: Awaitable, on_complete: Optional[Callable[[str, Any], None]] = None, on_error: Optional[Callable[[str, Exception], None]] = None) -> None: """Add a task to the queue and execute it""" async def _wrapped_task(): try: result = await coro task_store.update_task(task_id, status="completed", result=result) if on_complete: on_complete(task_id, result) except Exception as e: logger.error(f"Task {task_id} failed: {str(e)}") error_message = str(e) task_store.update_task(task_id, status="failed", error=error_message) if on_error: on_error(task_id, e) finally: # Remove task from queue if task_id in self.queue: del self.queue[task_id] # Create and start the task self.queue[task_id] = asyncio.create_task(_wrapped_task()) def cancel_task(self, task_id: str) -> bool: """Cancel a running task""" if task_id in self.queue and not self.queue[task_id].done(): self.queue[task_id].cancel() task_store.update_task(task_id, status="cancelled") return True return False # Create a global task queue instance task_queue = TaskQueue() @router.get("/{task_id}/status") async def get_task_status_endpoint(task_id: str): """Get the status of a background task.""" try: task_data = get_task_data(task_id) if not task_data: return JSONResponse( status_code=404, content={"status": "not_found", "message": f"Task with ID {task_id} not found"} ) return task_data except Exception as e: logger.error(f"Error getting task status: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) @router.get("/{task_id}") async def get_task_endpoint(task_id: str): """Get detailed information about a task by its ID.""" task = task_store.get_task(task_id) if not task: raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") return task @router.post("/{task_id}/cancel") async def cancel_task_endpoint(task_id: str): """ Cancel a running task. Args: task_id: ID of the task Returns: Cancellation status """ task = task_store.get_task(task_id) if not task: raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") if task["status"] in ["completed", "failed", "cancelled"]: return {"message": f"Task is already in {task['status']} state and cannot be cancelled"} cancelled = task_queue.cancel_task(task_id) if not cancelled: task_store.update_task(task_id, status="cancelled") return {"message": "Task cancelled successfully"} @router.get("") async def list_tasks( task_type: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None ): """ List all tasks with optional filtering. Args: task_type: Filter by task type status: Filter by task status limit: Limit the number of tasks returned Returns: List of tasks with detailed information """ tasks = task_store.list_tasks(task_type=task_type, status=status) # Sort tasks by creation time (newest first) tasks.sort(key=lambda t: t["created_at"], reverse=True) # Limit results if requested if limit and isinstance(limit, int) and limit > 0: tasks = tasks[:limit] # Enhance task information for task in tasks: # Ensure progress is an integer if "progress" in task and task["progress"] is not None: task["progress"] = int(task["progress"]) else: task["progress"] = 0 # Calculate time elapsed if "created_at" in task and "updated_at" in task: try: created = datetime.fromisoformat(task["created_at"]) updated = datetime.fromisoformat(task["updated_at"]) elapsed_seconds = (updated - created).total_seconds() task["elapsed_seconds"] = elapsed_seconds # Format elapsed time as HH:MM:SS hours, remainder = divmod(int(elapsed_seconds), 3600) minutes, seconds = divmod(remainder, 60) task["elapsed_formatted"] = f"{hours:02}:{minutes:02}:{seconds:02}" except Exception as e: logger.error(f"Error calculating elapsed time: {str(e)}") return {"tasks": tasks, "total": len(tasks)}