File size: 6,039 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python
"""
Task Router

This module provides API endpoints for task management and status tracking.
It implements a simple in-memory task queue with status tracking.
"""

import logging
import asyncio
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Callable, Awaitable, Union
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Path, Body
from fastapi.responses import JSONResponse
from pydantic import BaseModel

from backend.database import get_db
from sqlalchemy.orm import Session
from backend.services.task_queue import TaskQueue
from backend.services.task_service import get_task_data
from backend.services.task_store_service import task_store

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

router = APIRouter(
    prefix="/api/tasks",
    tags=["tasks"],
)

# Simple in-memory task store has been moved to backend/services/task_store_service.py

# Task queue for background processing
class TaskQueue:
    def __init__(self):
        self.queue: Dict[str, asyncio.Task] = {}
    
    async def add_task(self, task_id: str, 
                       coro: Awaitable,
                       on_complete: Optional[Callable[[str, Any], None]] = None,
                       on_error: Optional[Callable[[str, Exception], None]] = None) -> None:
        """Add a task to the queue and execute it"""
        
        async def _wrapped_task():
            try:
                result = await coro
                task_store.update_task(task_id, status="completed", result=result)
                if on_complete:
                    on_complete(task_id, result)
            except Exception as e:
                logger.error(f"Task {task_id} failed: {str(e)}")
                error_message = str(e)
                task_store.update_task(task_id, status="failed", error=error_message)
                if on_error:
                    on_error(task_id, e)
            finally:
                # Remove task from queue
                if task_id in self.queue:
                    del self.queue[task_id]
        
        # Create and start the task
        self.queue[task_id] = asyncio.create_task(_wrapped_task())
    
    def cancel_task(self, task_id: str) -> bool:
        """Cancel a running task"""
        if task_id in self.queue and not self.queue[task_id].done():
            self.queue[task_id].cancel()
            task_store.update_task(task_id, status="cancelled")
            return True
        return False

# Create a global task queue instance
task_queue = TaskQueue()

@router.get("/{task_id}/status")
async def get_task_status_endpoint(task_id: str):
    """Get the status of a background task."""
    try:
        task_data = get_task_data(task_id)
        if not task_data:
            return JSONResponse(
                status_code=404,
                content={"status": "not_found", "message": f"Task with ID {task_id} not found"}
            )
        return task_data
    except Exception as e:
        logger.error(f"Error getting task status: {e}")
        return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})

@router.get("/{task_id}")
async def get_task_endpoint(task_id: str):
    """Get detailed information about a task by its ID."""
    task = task_store.get_task(task_id)
    if not task:
        raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
    return task

@router.post("/{task_id}/cancel")
async def cancel_task_endpoint(task_id: str):
    """
    Cancel a running task.
    
    Args:
        task_id: ID of the task
    
    Returns:
        Cancellation status
    """
    task = task_store.get_task(task_id)
    if not task:
        raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
    
    if task["status"] in ["completed", "failed", "cancelled"]:
        return {"message": f"Task is already in {task['status']} state and cannot be cancelled"}
    
    cancelled = task_queue.cancel_task(task_id)
    
    if not cancelled:
        task_store.update_task(task_id, status="cancelled")
    
    return {"message": "Task cancelled successfully"}

@router.get("")
async def list_tasks(
    task_type: Optional[str] = None,
    status: Optional[str] = None,
    limit: Optional[int] = None
):
    """
    List all tasks with optional filtering.
    
    Args:
        task_type: Filter by task type
        status: Filter by task status
        limit: Limit the number of tasks returned
    
    Returns:
        List of tasks with detailed information
    """
    tasks = task_store.list_tasks(task_type=task_type, status=status)
    
    # Sort tasks by creation time (newest first)
    tasks.sort(key=lambda t: t["created_at"], reverse=True)
    
    # Limit results if requested
    if limit and isinstance(limit, int) and limit > 0:
        tasks = tasks[:limit]
    
    # Enhance task information
    for task in tasks:
        # Ensure progress is an integer
        if "progress" in task and task["progress"] is not None:
            task["progress"] = int(task["progress"])
        else:
            task["progress"] = 0
            
        # Calculate time elapsed
        if "created_at" in task and "updated_at" in task:
            try:
                created = datetime.fromisoformat(task["created_at"])
                updated = datetime.fromisoformat(task["updated_at"])
                elapsed_seconds = (updated - created).total_seconds()
                task["elapsed_seconds"] = elapsed_seconds
                
                # Format elapsed time as HH:MM:SS
                hours, remainder = divmod(int(elapsed_seconds), 3600)
                minutes, seconds = divmod(remainder, 60)
                task["elapsed_formatted"] = f"{hours:02}:{minutes:02}:{seconds:02}"
            except Exception as e:
                logger.error(f"Error calculating elapsed time: {str(e)}")
    
    return {"tasks": tasks, "total": len(tasks)}