File size: 4,580 Bytes
342e0fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
02fa899
342e0fb
 
 
 
02fa899
 
342e0fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import threading
import queue
import uuid
import time
from enum import Enum
from typing import Dict, Any, Optional, Callable
from core.logger import Logger

logger = Logger.get_logger(__name__)

class TaskStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class TaskManager:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(TaskManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return
            
        self.task_queue = queue.Queue()
        self.tasks: Dict[str, Dict[str, Any]] = {}
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
        self._initialized = True
        logger.info("πŸš€ Task Manager initialized with background worker")

    def add_task(self, task_func: Callable, *args, **kwargs) -> str:
        """
        Add a task to the processing queue.
        Returns the task_id.
        """
        # Extract task_id if provided, otherwise generate one
        task_id = kwargs.get('task_id')
        if not task_id:
            task_id = uuid.uuid4().hex[:8]
            kwargs['task_id'] = task_id
            
        self.tasks[task_id] = {
            "id": task_id,
            "status": TaskStatus.PENDING,
            "submitted_at": time.time(),
            "result": None,
            "error": None
        }
        
        # Add to queue
        self.task_queue.put((task_id, task_func, args, kwargs))
        logger.info(f"πŸ“₯ Task {task_id} added to queue (Position: {self.task_queue.qsize()})")
        return task_id

    def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """Get the current status and result of a task."""
        return self.tasks.get(task_id)

    def update_task_progress(self, task_id: str, progress: int, message: str = "", result: Any = None):
        """Update the progress of a running task."""
        if task_id in self.tasks:
            self.tasks[task_id]["progress"] = progress
            self.tasks[task_id]["message"] = message
            if result is not None:
                self.tasks[task_id]["result"] = result
            logger.info(f"πŸ“ˆ Task {task_id} progress: {progress}% - {message}")

    def _worker(self):
        """Background worker that processes tasks sequentially."""
        logger.info("πŸ‘· Task Worker loop started")
        while True:
            try:
                # Block until a task is available
                task_id, func, args, kwargs = self.task_queue.get()
                
                logger.info(f"πŸ”„ Processing Task {task_id}...")
                self.tasks[task_id]["status"] = TaskStatus.PROCESSING
                self.tasks[task_id]["started_at"] = time.time()
                
                try:
                    # Execute the task
                    result = func(*args, **kwargs)
                    
                    self.tasks[task_id]["status"] = TaskStatus.COMPLETED
                    self.tasks[task_id]["completed_at"] = time.time()
                    self.tasks[task_id]["result"] = result
                    
                    # If the result itself indicates an error (from our app logic)
                    if isinstance(result, dict) and result.get("status") == "error":
                         self.tasks[task_id]["status"] = TaskStatus.FAILED
                         self.tasks[task_id]["error"] = result.get("error")
                    
                    logger.info(f"βœ… Task {task_id} completed successfully")
                    
                except Exception as e:
                    import traceback
                    error_trace = traceback.format_exc()
                    logger.error(f"❌ Task {task_id} failed with exception: {e}")
                    logger.error(error_trace)
                    
                    self.tasks[task_id]["status"] = TaskStatus.FAILED
                    self.tasks[task_id]["error"] = str(e)
                    self.tasks[task_id]["traceback"] = error_trace
                    self.tasks[task_id]["completed_at"] = time.time()
                
                finally:
                    self.task_queue.task_done()
                    
            except Exception as e:
                logger.error(f"πŸ’€ Critical Worker Error: {e}")
                time.sleep(1) # Prevent tight loop if queue is broken