File size: 3,909 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python
"""
Task Queue Service

A lightweight task queue implementation that manages background tasks
and tracks their status in the database.
"""

import asyncio
import logging
import uuid
from typing import Dict, Any, Callable, Awaitable, Optional
from datetime import datetime
from sqlalchemy.orm import Session

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TaskQueue:
    """
    Simple in-memory task queue manager.
    
    Tracks task status and manages asynchronous execution.
    """
    
    def __init__(self):
        self.tasks: Dict[str, Dict[str, Any]] = {}
        self._running = True
    
    def generate_task_id(self) -> str:
        """Generate a unique task ID."""
        return str(uuid.uuid4())
    
    async def add_task(self, 
                     func: Callable[..., Awaitable[Any]], 
                     session: Session,
                     **kwargs) -> str:
        """
        Add a task to the queue.
        
        Args:
            func: Async function to execute
            session: Database session
            **kwargs: Arguments to pass to the function
            
        Returns:
            task_id: Unique identifier for the task
        """
        task_id = self.generate_task_id()
        
        # Store task data
        self.tasks[task_id] = {
            "status": "queued",
            "created_at": datetime.utcnow(),
            "updated_at": datetime.utcnow(),
            "progress": 0,
            "result": None,
            "error": None
        }
        
        # Create task in database
        try:
            # If you want to store task status in your database, do it here
            pass
        except Exception as e:
            logger.error(f"Error creating task record: {str(e)}")
        
        # Schedule the task to run
        asyncio.create_task(self._run_task(task_id, func, session, **kwargs))
        
        return task_id
    
    async def _run_task(self, 
                      task_id: str, 
                      func: Callable[..., Awaitable[Any]], 
                      session: Session,
                      **kwargs):
        """Execute a task and update its status."""
        try:
            # Update task status
            self.tasks[task_id]["status"] = "running"
            self.tasks[task_id]["updated_at"] = datetime.utcnow()
            
            # Execute the task
            result = await func(task_id=task_id, session=session, **kwargs)
            
            # Update task status
            self.tasks[task_id]["status"] = "completed"
            self.tasks[task_id]["progress"] = 100
            self.tasks[task_id]["result"] = result
            self.tasks[task_id]["updated_at"] = datetime.utcnow()
            logger.info(f"Task {task_id} completed successfully")
            
        except Exception as e:
            # Update task status on error
            logger.error(f"Task {task_id} failed: {str(e)}")
            self.tasks[task_id]["status"] = "failed"
            self.tasks[task_id]["error"] = str(e)
            self.tasks[task_id]["updated_at"] = datetime.utcnow()
    
    def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """Get the current status of a task."""
        if task_id not in self.tasks:
            return None
        
        return self.tasks[task_id]
    
    def update_task_progress(self, task_id: str, progress: int, message: Optional[str] = None):
        """Update the progress of a task."""
        if task_id in self.tasks:
            self.tasks[task_id]["progress"] = progress
            self.tasks[task_id]["updated_at"] = datetime.utcnow()
            if message:
                self.tasks[task_id]["message"] = message
            logger.debug(f"Task {task_id} progress: {progress}%")

# Singleton instance
task_queue = TaskQueue()