Spaces:
Running
Running
| """Phase 19.3 — in-memory async job queue. | |
| Backed by FastAPI BackgroundTasks for single-worker deployments; the API | |
| surface matches what a future Celery/Redis migration would expose, so | |
| callers don't need to change when we swap the transport. | |
| """ | |
| from __future__ import annotations | |
| import threading | |
| import time | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from typing import Any, Callable | |
| from loguru import logger | |
| class Job: | |
| id: str | |
| stage: str = "queued" | |
| progress: int = 0 # 0..100 | |
| status: str = "queued" # queued | running | done | error | |
| result: Any | None = None | |
| error: str | None = None | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| class _JobRegistry: | |
| def __init__(self) -> None: | |
| self._jobs: dict[str, Job] = {} | |
| self._lock = threading.Lock() | |
| def create(self) -> Job: | |
| job = Job(id=uuid.uuid4().hex) | |
| with self._lock: | |
| self._jobs[job.id] = job | |
| return job | |
| def get(self, job_id: str) -> Job | None: | |
| with self._lock: | |
| return self._jobs.get(job_id) | |
| def update(self, job_id: str, **fields) -> None: | |
| with self._lock: | |
| j = self._jobs.get(job_id) | |
| if not j: | |
| return | |
| for k, v in fields.items(): | |
| setattr(j, k, v) | |
| j.updated_at = time.time() | |
| def prune(self, ttl_seconds: int = 3600) -> None: | |
| cutoff = time.time() - ttl_seconds | |
| with self._lock: | |
| dead = [jid for jid, j in self._jobs.items() if j.updated_at < cutoff] | |
| for jid in dead: | |
| self._jobs.pop(jid, None) | |
| registry = _JobRegistry() | |
| def run_job(job_id: str, stages: list[str], fn: Callable[[Callable[[str, int], None]], Any]) -> None: | |
| """Wrap a callable so it advances stage/progress through `registry`. | |
| `fn` receives a `progress(stage, pct)` updater it can call. | |
| """ | |
| def progress(stage: str, pct: int) -> None: | |
| registry.update(job_id, stage=stage, progress=max(0, min(100, int(pct))), status="running") | |
| registry.update(job_id, status="running", stage=stages[0] if stages else "running", progress=1) | |
| try: | |
| result = fn(progress) | |
| registry.update(job_id, status="done", stage="done", progress=100, result=result) | |
| except Exception as e: # noqa: BLE001 | |
| logger.exception(f"Job {job_id} failed") | |
| registry.update(job_id, status="error", error=str(e), progress=100) | |