Spaces:
Running
Running
| from __future__ import annotations | |
| import threading, uuid, traceback | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Callable, Dict, Any, Optional | |
| ProgressCB = Callable[[str, int, str], None] | |
| class TaskManager: | |
| def __init__(self, max_workers: int = 8): | |
| self._exec = ThreadPoolExecutor(max_workers=max_workers) | |
| self._lock = threading.Lock() | |
| self._jobs: Dict[str, dict] = {} | |
| self._registry: Dict[str, Callable[..., Any]] = {} | |
| # ---- job bookkeeping ---- | |
| def create_job(self, name: str) -> str: | |
| job_id = uuid.uuid4().hex | |
| with self._lock: | |
| self._jobs[job_id] = {"name": name, "status": "queued", "progress": 0, "message": "", "result": None, "error": None} | |
| return job_id | |
| def register(self, name: str, worker: Callable[..., Any]) -> None: | |
| self._registry[name] = worker | |
| def progress_update(self, job_id: str, percent: int, message: str = "") -> None: | |
| with self._lock: | |
| job = self._jobs.get(job_id) | |
| if job: | |
| job["progress"] = max(0, min(100, int(percent))) | |
| if message: | |
| job["message"] = message | |
| def _set_status(self, job_id: str, status: str) -> None: | |
| with self._lock: | |
| job = self._jobs.get(job_id) | |
| if job: | |
| job["status"] = status | |
| def submit(self, job_id: str, worker: Callable[..., Any], **kwargs) -> None: | |
| self._set_status(job_id, "running") | |
| def _runner(): | |
| try: | |
| result = worker(job_id, self.progress_update, **kwargs) | |
| with self._lock: | |
| self._jobs[job_id]["result"] = result | |
| self._jobs[job_id]["status"] = "completed" | |
| self._jobs[job_id]["progress"] = 100 | |
| except Exception: | |
| err = traceback.format_exc() | |
| with self._lock: | |
| self._jobs[job_id]["status"] = "failed" | |
| self._jobs[job_id]["error"] = err | |
| self._exec.submit(_runner) | |
| # ---- accessors for UI ---- | |
| def get(self, job_id: str) -> Optional[dict]: | |
| with self._lock: | |
| return dict(self._jobs.get(job_id) or {}) | |
| def list_jobs(self) -> Dict[str, dict]: | |
| with self._lock: | |
| return {k: dict(v) for k, v in self._jobs.items()} | |