# app/utils/distributed_lock.py """ Redis-based distributed lock for APScheduler jobs. Prevents duplicate job execution when multiple gunicorn/uvicorn workers are running the same AsyncIOScheduler independently. Usage (in main.py): from app.utils.distributed_lock import make_locked_job locked = make_locked_job(redis_client, my_job, "lock:my_job", ttl_seconds=840) scheduler.add_job(locked, "interval", minutes=15) """ import functools import logging from typing import Awaitable, Callable logger = logging.getLogger(__name__) def make_locked_job( redis_client, fn: Callable[[], Awaitable[None]], lock_key: str, ttl_seconds: int, ) -> Callable[[], Awaitable[None]]: """ Return an async wrapper around `fn` that acquires a Redis SET NX lock before running. If the lock is already held (another worker is executing the same job), this invocation silently exits without running `fn`. The lock is always released in a `finally` block so a job crash does not permanently block subsequent runs. The TTL acts as a safety net for worker crashes — set it to the job interval minus a small buffer. """ @functools.wraps(fn) async def _locked() -> None: if redis_client is None: # No Redis available — fall back to running without a lock. # Acceptable for local dev; logs a warning so it is visible. logger.warning( "distributed_lock: Redis not available, running %s without lock", fn.__name__, ) await fn() return acquired = await redis_client.set(lock_key, "1", nx=True, ex=ttl_seconds) if not acquired: logger.debug( "distributed_lock: skipping %s — lock held by another worker", fn.__name__, ) return try: await fn() finally: await redis_client.delete(lock_key) return _locked