AdGenesis-App / background_task /task_manager.py
userIdc2024's picture
Upload 41 files
9bc1376 verified
from __future__ import annotations
import threading, uuid, traceback
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict, Any, Optional
ProgressCB = Callable[[str, int, str], None]
class TaskManager:
def __init__(self, max_workers: int = 8):
self._exec = ThreadPoolExecutor(max_workers=max_workers)
self._lock = threading.Lock()
self._jobs: Dict[str, dict] = {}
self._registry: Dict[str, Callable[..., Any]] = {}
# ---- job bookkeeping ----
def create_job(self, name: str) -> str:
job_id = uuid.uuid4().hex
with self._lock:
self._jobs[job_id] = {"name": name, "status": "queued", "progress": 0, "message": "", "result": None, "error": None}
return job_id
def register(self, name: str, worker: Callable[..., Any]) -> None:
self._registry[name] = worker
def progress_update(self, job_id: str, percent: int, message: str = "") -> None:
with self._lock:
job = self._jobs.get(job_id)
if job:
job["progress"] = max(0, min(100, int(percent)))
if message:
job["message"] = message
def _set_status(self, job_id: str, status: str) -> None:
with self._lock:
job = self._jobs.get(job_id)
if job:
job["status"] = status
def submit(self, job_id: str, worker: Callable[..., Any], **kwargs) -> None:
self._set_status(job_id, "running")
def _runner():
try:
result = worker(job_id, self.progress_update, **kwargs)
with self._lock:
self._jobs[job_id]["result"] = result
self._jobs[job_id]["status"] = "completed"
self._jobs[job_id]["progress"] = 100
except Exception:
err = traceback.format_exc()
with self._lock:
self._jobs[job_id]["status"] = "failed"
self._jobs[job_id]["error"] = err
self._exec.submit(_runner)
# ---- accessors for UI ----
def get(self, job_id: str) -> Optional[dict]:
with self._lock:
return dict(self._jobs.get(job_id) or {})
def list_jobs(self) -> Dict[str, dict]:
with self._lock:
return {k: dict(v) for k, v in self._jobs.items()}