3d_model / ylff /utils /job_store.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Durable job storage abstraction.
Why this exists:
- The API launches background work and needs a place to store job status/results.
- In-memory dicts break across process restarts and multi-worker deployments.
- We keep Redis optional to avoid forcing a dependency for local dev/tests.
"""
from __future__ import annotations
import json
import threading
from dataclasses import dataclass
from typing import Any, Dict, Optional, Protocol
JobRecord = Dict[str, Any]
class JobStore(Protocol):
def get(self, job_id: str) -> Optional[JobRecord]:
raise NotImplementedError
def set(self, job_id: str, record: JobRecord) -> None:
raise NotImplementedError
def update(self, job_id: str, patch: Dict[str, Any]) -> JobRecord:
raise NotImplementedError
def delete(self, job_id: str) -> None:
raise NotImplementedError
def list(self) -> Dict[str, JobRecord]:
raise NotImplementedError
class InMemoryJobStore:
"""Thread-safe in-memory store (default)."""
def __init__(self) -> None:
self._lock = threading.RLock()
self._jobs: Dict[str, JobRecord] = {}
def get(self, job_id: str) -> Optional[JobRecord]:
with self._lock:
job = self._jobs.get(job_id)
return dict(job) if job is not None else None
def set(self, job_id: str, record: JobRecord) -> None:
with self._lock:
self._jobs[job_id] = dict(record)
def update(self, job_id: str, patch: Dict[str, Any]) -> JobRecord:
with self._lock:
if job_id not in self._jobs:
raise KeyError(f"Job not found: {job_id}")
self._jobs[job_id].update(patch)
return dict(self._jobs[job_id])
def delete(self, job_id: str) -> None:
with self._lock:
self._jobs.pop(job_id, None)
def list(self) -> Dict[str, JobRecord]:
with self._lock:
return {job_id: dict(job) for job_id, job in self._jobs.items()}
@dataclass(frozen=True)
class RedisJobStoreConfig:
redis_url: str
key_prefix: str = "ylff:jobs"
class RedisJobStore:
"""
Redis-backed store using a single Redis hash.
Storage:
- hash key: <key_prefix>
- field: <job_id>
- value: json(record)
"""
def __init__(self, cfg: RedisJobStoreConfig) -> None:
try:
import redis # type: ignore
except Exception as e: # pragma: no cover
raise ImportError(
"RedisJobStore requires the optional 'redis' package. "
"Install with: pip install redis"
) from e
self._cfg = cfg
self._redis = redis.Redis.from_url(cfg.redis_url, decode_responses=True)
self._hash_key = cfg.key_prefix
def get(self, job_id: str) -> Optional[JobRecord]:
raw = self._redis.hget(self._hash_key, job_id)
if raw is None:
return None
return json.loads(raw)
def set(self, job_id: str, record: JobRecord) -> None:
self._redis.hset(self._hash_key, job_id, json.dumps(record, default=str))
def update(self, job_id: str, patch: Dict[str, Any]) -> JobRecord:
current = self.get(job_id)
if current is None:
raise KeyError(f"Job not found: {job_id}")
current.update(patch)
self.set(job_id, current)
return current
def delete(self, job_id: str) -> None:
self._redis.hdel(self._hash_key, job_id)
def list(self) -> Dict[str, JobRecord]:
raw_map = self._redis.hgetall(self._hash_key)
return {job_id: json.loads(raw) for job_id, raw in raw_map.items()}
_default_store: JobStore = InMemoryJobStore()
def get_job_store(app: Any) -> JobStore:
"""
Retrieve the job store from a FastAPI app, or fall back to an in-memory store.
"""
store = getattr(getattr(app, "state", None), "job_store", None)
return store if store is not None else _default_store
def build_job_store(
*,
backend: str = "memory",
redis_url: Optional[str] = None,
redis_key_prefix: str = "ylff:jobs",
) -> JobStore:
backend = (backend or "memory").lower().strip()
if backend in {"memory", "inmemory", "in-memory"}:
return InMemoryJobStore()
if backend in {"redis"}:
if not redis_url:
raise ValueError("redis_url is required when backend='redis'")
return RedisJobStore(RedisJobStoreConfig(redis_url=redis_url, key_prefix=redis_key_prefix))
raise ValueError(f"Unknown job store backend: {backend}")