from __future__ import annotations import threading, uuid, traceback from concurrent.futures import ThreadPoolExecutor from typing import Callable, Dict, Any, Optional ProgressCB = Callable[[str, int, str], None] class TaskManager: def __init__(self, max_workers: int = 8): self._exec = ThreadPoolExecutor(max_workers=max_workers) self._lock = threading.Lock() self._jobs: Dict[str, dict] = {} self._registry: Dict[str, Callable[..., Any]] = {} # ---- job bookkeeping ---- def create_job(self, name: str) -> str: job_id = uuid.uuid4().hex with self._lock: self._jobs[job_id] = {"name": name, "status": "queued", "progress": 0, "message": "", "result": None, "error": None} return job_id def register(self, name: str, worker: Callable[..., Any]) -> None: self._registry[name] = worker def progress_update(self, job_id: str, percent: int, message: str = "") -> None: with self._lock: job = self._jobs.get(job_id) if job: job["progress"] = max(0, min(100, int(percent))) if message: job["message"] = message def _set_status(self, job_id: str, status: str) -> None: with self._lock: job = self._jobs.get(job_id) if job: job["status"] = status def submit(self, job_id: str, worker: Callable[..., Any], **kwargs) -> None: self._set_status(job_id, "running") def _runner(): try: result = worker(job_id, self.progress_update, **kwargs) with self._lock: self._jobs[job_id]["result"] = result self._jobs[job_id]["status"] = "completed" self._jobs[job_id]["progress"] = 100 except Exception: err = traceback.format_exc() with self._lock: self._jobs[job_id]["status"] = "failed" self._jobs[job_id]["error"] = err self._exec.submit(_runner) # ---- accessors for UI ---- def get(self, job_id: str) -> Optional[dict]: with self._lock: return dict(self._jobs.get(job_id) or {}) def list_jobs(self) -> Dict[str, dict]: with self._lock: return {k: dict(v) for k, v in self._jobs.items()}