Spaces:
Running
Running
File size: 4,580 Bytes
342e0fb 02fa899 342e0fb 02fa899 342e0fb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | import threading
import queue
import uuid
import time
from enum import Enum
from typing import Dict, Any, Optional, Callable
from core.logger import Logger
logger = Logger.get_logger(__name__)
class TaskStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class TaskManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(TaskManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.task_queue = queue.Queue()
self.tasks: Dict[str, Dict[str, Any]] = {}
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
self._initialized = True
logger.info("π Task Manager initialized with background worker")
def add_task(self, task_func: Callable, *args, **kwargs) -> str:
"""
Add a task to the processing queue.
Returns the task_id.
"""
# Extract task_id if provided, otherwise generate one
task_id = kwargs.get('task_id')
if not task_id:
task_id = uuid.uuid4().hex[:8]
kwargs['task_id'] = task_id
self.tasks[task_id] = {
"id": task_id,
"status": TaskStatus.PENDING,
"submitted_at": time.time(),
"result": None,
"error": None
}
# Add to queue
self.task_queue.put((task_id, task_func, args, kwargs))
logger.info(f"π₯ Task {task_id} added to queue (Position: {self.task_queue.qsize()})")
return task_id
def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Get the current status and result of a task."""
return self.tasks.get(task_id)
def update_task_progress(self, task_id: str, progress: int, message: str = "", result: Any = None):
"""Update the progress of a running task."""
if task_id in self.tasks:
self.tasks[task_id]["progress"] = progress
self.tasks[task_id]["message"] = message
if result is not None:
self.tasks[task_id]["result"] = result
logger.info(f"π Task {task_id} progress: {progress}% - {message}")
def _worker(self):
"""Background worker that processes tasks sequentially."""
logger.info("π· Task Worker loop started")
while True:
try:
# Block until a task is available
task_id, func, args, kwargs = self.task_queue.get()
logger.info(f"π Processing Task {task_id}...")
self.tasks[task_id]["status"] = TaskStatus.PROCESSING
self.tasks[task_id]["started_at"] = time.time()
try:
# Execute the task
result = func(*args, **kwargs)
self.tasks[task_id]["status"] = TaskStatus.COMPLETED
self.tasks[task_id]["completed_at"] = time.time()
self.tasks[task_id]["result"] = result
# If the result itself indicates an error (from our app logic)
if isinstance(result, dict) and result.get("status") == "error":
self.tasks[task_id]["status"] = TaskStatus.FAILED
self.tasks[task_id]["error"] = result.get("error")
logger.info(f"β
Task {task_id} completed successfully")
except Exception as e:
import traceback
error_trace = traceback.format_exc()
logger.error(f"β Task {task_id} failed with exception: {e}")
logger.error(error_trace)
self.tasks[task_id]["status"] = TaskStatus.FAILED
self.tasks[task_id]["error"] = str(e)
self.tasks[task_id]["traceback"] = error_trace
self.tasks[task_id]["completed_at"] = time.time()
finally:
self.task_queue.task_done()
except Exception as e:
logger.error(f"π Critical Worker Error: {e}")
time.sleep(1) # Prevent tight loop if queue is broken
|