Spaces:
Sleeping
Sleeping
File size: 6,039 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
#!/usr/bin/env python
"""
Task Router
This module provides API endpoints for task management and status tracking.
It implements a simple in-memory task queue with status tracking.
"""
import logging
import asyncio
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Callable, Awaitable, Union
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Path, Body
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from backend.database import get_db
from sqlalchemy.orm import Session
from backend.services.task_queue import TaskQueue
from backend.services.task_service import get_task_data
from backend.services.task_store_service import task_store
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/tasks",
tags=["tasks"],
)
# Simple in-memory task store has been moved to backend/services/task_store_service.py
# Task queue for background processing
class TaskQueue:
def __init__(self):
self.queue: Dict[str, asyncio.Task] = {}
async def add_task(self, task_id: str,
coro: Awaitable,
on_complete: Optional[Callable[[str, Any], None]] = None,
on_error: Optional[Callable[[str, Exception], None]] = None) -> None:
"""Add a task to the queue and execute it"""
async def _wrapped_task():
try:
result = await coro
task_store.update_task(task_id, status="completed", result=result)
if on_complete:
on_complete(task_id, result)
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
error_message = str(e)
task_store.update_task(task_id, status="failed", error=error_message)
if on_error:
on_error(task_id, e)
finally:
# Remove task from queue
if task_id in self.queue:
del self.queue[task_id]
# Create and start the task
self.queue[task_id] = asyncio.create_task(_wrapped_task())
def cancel_task(self, task_id: str) -> bool:
"""Cancel a running task"""
if task_id in self.queue and not self.queue[task_id].done():
self.queue[task_id].cancel()
task_store.update_task(task_id, status="cancelled")
return True
return False
# Create a global task queue instance
task_queue = TaskQueue()
@router.get("/{task_id}/status")
async def get_task_status_endpoint(task_id: str):
"""Get the status of a background task."""
try:
task_data = get_task_data(task_id)
if not task_data:
return JSONResponse(
status_code=404,
content={"status": "not_found", "message": f"Task with ID {task_id} not found"}
)
return task_data
except Exception as e:
logger.error(f"Error getting task status: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.get("/{task_id}")
async def get_task_endpoint(task_id: str):
"""Get detailed information about a task by its ID."""
task = task_store.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
return task
@router.post("/{task_id}/cancel")
async def cancel_task_endpoint(task_id: str):
"""
Cancel a running task.
Args:
task_id: ID of the task
Returns:
Cancellation status
"""
task = task_store.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
if task["status"] in ["completed", "failed", "cancelled"]:
return {"message": f"Task is already in {task['status']} state and cannot be cancelled"}
cancelled = task_queue.cancel_task(task_id)
if not cancelled:
task_store.update_task(task_id, status="cancelled")
return {"message": "Task cancelled successfully"}
@router.get("")
async def list_tasks(
task_type: Optional[str] = None,
status: Optional[str] = None,
limit: Optional[int] = None
):
"""
List all tasks with optional filtering.
Args:
task_type: Filter by task type
status: Filter by task status
limit: Limit the number of tasks returned
Returns:
List of tasks with detailed information
"""
tasks = task_store.list_tasks(task_type=task_type, status=status)
# Sort tasks by creation time (newest first)
tasks.sort(key=lambda t: t["created_at"], reverse=True)
# Limit results if requested
if limit and isinstance(limit, int) and limit > 0:
tasks = tasks[:limit]
# Enhance task information
for task in tasks:
# Ensure progress is an integer
if "progress" in task and task["progress"] is not None:
task["progress"] = int(task["progress"])
else:
task["progress"] = 0
# Calculate time elapsed
if "created_at" in task and "updated_at" in task:
try:
created = datetime.fromisoformat(task["created_at"])
updated = datetime.fromisoformat(task["updated_at"])
elapsed_seconds = (updated - created).total_seconds()
task["elapsed_seconds"] = elapsed_seconds
# Format elapsed time as HH:MM:SS
hours, remainder = divmod(int(elapsed_seconds), 3600)
minutes, seconds = divmod(remainder, 60)
task["elapsed_formatted"] = f"{hours:02}:{minutes:02}:{seconds:02}"
except Exception as e:
logger.error(f"Error calculating elapsed time: {str(e)}")
return {"tasks": tasks, "total": len(tasks)} |