wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
#!/usr/bin/env python
"""
Task Router
This module provides API endpoints for task management and status tracking.
It implements a simple in-memory task queue with status tracking.
"""
import logging
import asyncio
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Callable, Awaitable, Union
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Path, Body
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from backend.database import get_db
from sqlalchemy.orm import Session
from backend.services.task_queue import TaskQueue
from backend.services.task_service import get_task_data
from backend.services.task_store_service import task_store
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/tasks",
tags=["tasks"],
)
# Simple in-memory task store has been moved to backend/services/task_store_service.py
# Task queue for background processing
class TaskQueue:
def __init__(self):
self.queue: Dict[str, asyncio.Task] = {}
async def add_task(self, task_id: str,
coro: Awaitable,
on_complete: Optional[Callable[[str, Any], None]] = None,
on_error: Optional[Callable[[str, Exception], None]] = None) -> None:
"""Add a task to the queue and execute it"""
async def _wrapped_task():
try:
result = await coro
task_store.update_task(task_id, status="completed", result=result)
if on_complete:
on_complete(task_id, result)
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
error_message = str(e)
task_store.update_task(task_id, status="failed", error=error_message)
if on_error:
on_error(task_id, e)
finally:
# Remove task from queue
if task_id in self.queue:
del self.queue[task_id]
# Create and start the task
self.queue[task_id] = asyncio.create_task(_wrapped_task())
def cancel_task(self, task_id: str) -> bool:
"""Cancel a running task"""
if task_id in self.queue and not self.queue[task_id].done():
self.queue[task_id].cancel()
task_store.update_task(task_id, status="cancelled")
return True
return False
# Create a global task queue instance
task_queue = TaskQueue()
@router.get("/{task_id}/status")
async def get_task_status_endpoint(task_id: str):
"""Get the status of a background task."""
try:
task_data = get_task_data(task_id)
if not task_data:
return JSONResponse(
status_code=404,
content={"status": "not_found", "message": f"Task with ID {task_id} not found"}
)
return task_data
except Exception as e:
logger.error(f"Error getting task status: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.get("/{task_id}")
async def get_task_endpoint(task_id: str):
"""Get detailed information about a task by its ID."""
task = task_store.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
return task
@router.post("/{task_id}/cancel")
async def cancel_task_endpoint(task_id: str):
"""
Cancel a running task.
Args:
task_id: ID of the task
Returns:
Cancellation status
"""
task = task_store.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
if task["status"] in ["completed", "failed", "cancelled"]:
return {"message": f"Task is already in {task['status']} state and cannot be cancelled"}
cancelled = task_queue.cancel_task(task_id)
if not cancelled:
task_store.update_task(task_id, status="cancelled")
return {"message": "Task cancelled successfully"}
@router.get("")
async def list_tasks(
task_type: Optional[str] = None,
status: Optional[str] = None,
limit: Optional[int] = None
):
"""
List all tasks with optional filtering.
Args:
task_type: Filter by task type
status: Filter by task status
limit: Limit the number of tasks returned
Returns:
List of tasks with detailed information
"""
tasks = task_store.list_tasks(task_type=task_type, status=status)
# Sort tasks by creation time (newest first)
tasks.sort(key=lambda t: t["created_at"], reverse=True)
# Limit results if requested
if limit and isinstance(limit, int) and limit > 0:
tasks = tasks[:limit]
# Enhance task information
for task in tasks:
# Ensure progress is an integer
if "progress" in task and task["progress"] is not None:
task["progress"] = int(task["progress"])
else:
task["progress"] = 0
# Calculate time elapsed
if "created_at" in task and "updated_at" in task:
try:
created = datetime.fromisoformat(task["created_at"])
updated = datetime.fromisoformat(task["updated_at"])
elapsed_seconds = (updated - created).total_seconds()
task["elapsed_seconds"] = elapsed_seconds
# Format elapsed time as HH:MM:SS
hours, remainder = divmod(int(elapsed_seconds), 3600)
minutes, seconds = divmod(remainder, 60)
task["elapsed_formatted"] = f"{hours:02}:{minutes:02}:{seconds:02}"
except Exception as e:
logger.error(f"Error calculating elapsed time: {str(e)}")
return {"tasks": tasks, "total": len(tasks)}