| """ |
| Durable job storage abstraction. |
| |
| Why this exists: |
| - The API launches background work and needs a place to store job status/results. |
| - In-memory dicts break across process restarts and multi-worker deployments. |
| - We keep Redis optional to avoid forcing a dependency for local dev/tests. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import threading |
| from dataclasses import dataclass |
| from typing import Any, Dict, Optional, Protocol |
|
|
| JobRecord = Dict[str, Any] |
|
|
|
|
| class JobStore(Protocol): |
| def get(self, job_id: str) -> Optional[JobRecord]: |
| raise NotImplementedError |
|
|
| def set(self, job_id: str, record: JobRecord) -> None: |
| raise NotImplementedError |
|
|
| def update(self, job_id: str, patch: Dict[str, Any]) -> JobRecord: |
| raise NotImplementedError |
|
|
| def delete(self, job_id: str) -> None: |
| raise NotImplementedError |
|
|
| def list(self) -> Dict[str, JobRecord]: |
| raise NotImplementedError |
|
|
|
|
| class InMemoryJobStore: |
| """Thread-safe in-memory store (default).""" |
|
|
| def __init__(self) -> None: |
| self._lock = threading.RLock() |
| self._jobs: Dict[str, JobRecord] = {} |
|
|
| def get(self, job_id: str) -> Optional[JobRecord]: |
| with self._lock: |
| job = self._jobs.get(job_id) |
| return dict(job) if job is not None else None |
|
|
| def set(self, job_id: str, record: JobRecord) -> None: |
| with self._lock: |
| self._jobs[job_id] = dict(record) |
|
|
| def update(self, job_id: str, patch: Dict[str, Any]) -> JobRecord: |
| with self._lock: |
| if job_id not in self._jobs: |
| raise KeyError(f"Job not found: {job_id}") |
| self._jobs[job_id].update(patch) |
| return dict(self._jobs[job_id]) |
|
|
| def delete(self, job_id: str) -> None: |
| with self._lock: |
| self._jobs.pop(job_id, None) |
|
|
| def list(self) -> Dict[str, JobRecord]: |
| with self._lock: |
| return {job_id: dict(job) for job_id, job in self._jobs.items()} |
|
|
|
|
| @dataclass(frozen=True) |
| class RedisJobStoreConfig: |
| redis_url: str |
| key_prefix: str = "ylff:jobs" |
|
|
|
|
| class RedisJobStore: |
| """ |
| Redis-backed store using a single Redis hash. |
| |
| Storage: |
| - hash key: <key_prefix> |
| - field: <job_id> |
| - value: json(record) |
| """ |
|
|
| def __init__(self, cfg: RedisJobStoreConfig) -> None: |
| try: |
| import redis |
| except Exception as e: |
| raise ImportError( |
| "RedisJobStore requires the optional 'redis' package. " |
| "Install with: pip install redis" |
| ) from e |
|
|
| self._cfg = cfg |
| self._redis = redis.Redis.from_url(cfg.redis_url, decode_responses=True) |
| self._hash_key = cfg.key_prefix |
|
|
| def get(self, job_id: str) -> Optional[JobRecord]: |
| raw = self._redis.hget(self._hash_key, job_id) |
| if raw is None: |
| return None |
| return json.loads(raw) |
|
|
| def set(self, job_id: str, record: JobRecord) -> None: |
| self._redis.hset(self._hash_key, job_id, json.dumps(record, default=str)) |
|
|
| def update(self, job_id: str, patch: Dict[str, Any]) -> JobRecord: |
| current = self.get(job_id) |
| if current is None: |
| raise KeyError(f"Job not found: {job_id}") |
| current.update(patch) |
| self.set(job_id, current) |
| return current |
|
|
| def delete(self, job_id: str) -> None: |
| self._redis.hdel(self._hash_key, job_id) |
|
|
| def list(self) -> Dict[str, JobRecord]: |
| raw_map = self._redis.hgetall(self._hash_key) |
| return {job_id: json.loads(raw) for job_id, raw in raw_map.items()} |
|
|
|
|
| _default_store: JobStore = InMemoryJobStore() |
|
|
|
|
| def get_job_store(app: Any) -> JobStore: |
| """ |
| Retrieve the job store from a FastAPI app, or fall back to an in-memory store. |
| """ |
| store = getattr(getattr(app, "state", None), "job_store", None) |
| return store if store is not None else _default_store |
|
|
|
|
| def build_job_store( |
| *, |
| backend: str = "memory", |
| redis_url: Optional[str] = None, |
| redis_key_prefix: str = "ylff:jobs", |
| ) -> JobStore: |
| backend = (backend or "memory").lower().strip() |
| if backend in {"memory", "inmemory", "in-memory"}: |
| return InMemoryJobStore() |
| if backend in {"redis"}: |
| if not redis_url: |
| raise ValueError("redis_url is required when backend='redis'") |
| return RedisJobStore(RedisJobStoreConfig(redis_url=redis_url, key_prefix=redis_key_prefix)) |
| raise ValueError(f"Unknown job store backend: {backend}") |
|
|