deepshield / services /job_queue.py
ar07xd's picture
Sync from GitHub via hub-sync
fba30db verified
"""Phase 19.3 — in-memory async job queue.
Backed by FastAPI BackgroundTasks for single-worker deployments; the API
surface matches what a future Celery/Redis migration would expose, so
callers don't need to change when we swap the transport.
"""
from __future__ import annotations
import threading
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable
from loguru import logger
@dataclass
class Job:
id: str
stage: str = "queued"
progress: int = 0 # 0..100
status: str = "queued" # queued | running | done | error
result: Any | None = None
error: str | None = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
class _JobRegistry:
def __init__(self) -> None:
self._jobs: dict[str, Job] = {}
self._lock = threading.Lock()
def create(self) -> Job:
job = Job(id=uuid.uuid4().hex)
with self._lock:
self._jobs[job.id] = job
return job
def get(self, job_id: str) -> Job | None:
with self._lock:
return self._jobs.get(job_id)
def update(self, job_id: str, **fields) -> None:
with self._lock:
j = self._jobs.get(job_id)
if not j:
return
for k, v in fields.items():
setattr(j, k, v)
j.updated_at = time.time()
def prune(self, ttl_seconds: int = 3600) -> None:
cutoff = time.time() - ttl_seconds
with self._lock:
dead = [jid for jid, j in self._jobs.items() if j.updated_at < cutoff]
for jid in dead:
self._jobs.pop(jid, None)
registry = _JobRegistry()
def run_job(job_id: str, stages: list[str], fn: Callable[[Callable[[str, int], None]], Any]) -> None:
"""Wrap a callable so it advances stage/progress through `registry`.
`fn` receives a `progress(stage, pct)` updater it can call.
"""
def progress(stage: str, pct: int) -> None:
registry.update(job_id, stage=stage, progress=max(0, min(100, int(pct))), status="running")
registry.update(job_id, status="running", stage=stages[0] if stages else "running", progress=1)
try:
result = fn(progress)
registry.update(job_id, status="done", stage="done", progress=100, result=result)
except Exception as e: # noqa: BLE001
logger.exception(f"Job {job_id} failed")
registry.update(job_id, status="error", error=str(e), progress=100)