Spaces:
Running
Running
| # app/utils/distributed_lock.py | |
| """ | |
| Redis-based distributed lock for APScheduler jobs. | |
| Prevents duplicate job execution when multiple gunicorn/uvicorn workers | |
| are running the same AsyncIOScheduler independently. | |
| Usage (in main.py): | |
| from app.utils.distributed_lock import make_locked_job | |
| locked = make_locked_job(redis_client, my_job, "lock:my_job", ttl_seconds=840) | |
| scheduler.add_job(locked, "interval", minutes=15) | |
| """ | |
| import functools | |
| import logging | |
| from typing import Awaitable, Callable | |
| logger = logging.getLogger(__name__) | |
| def make_locked_job( | |
| redis_client, | |
| fn: Callable[[], Awaitable[None]], | |
| lock_key: str, | |
| ttl_seconds: int, | |
| ) -> Callable[[], Awaitable[None]]: | |
| """ | |
| Return an async wrapper around `fn` that acquires a Redis SET NX lock | |
| before running. If the lock is already held (another worker is executing | |
| the same job), this invocation silently exits without running `fn`. | |
| The lock is always released in a `finally` block so a job crash does not | |
| permanently block subsequent runs. The TTL acts as a safety net for | |
| worker crashes — set it to the job interval minus a small buffer. | |
| """ | |
| async def _locked() -> None: | |
| if redis_client is None: | |
| # No Redis available — fall back to running without a lock. | |
| # Acceptable for local dev; logs a warning so it is visible. | |
| logger.warning( | |
| "distributed_lock: Redis not available, running %s without lock", | |
| fn.__name__, | |
| ) | |
| await fn() | |
| return | |
| acquired = await redis_client.set(lock_key, "1", nx=True, ex=ttl_seconds) | |
| if not acquired: | |
| logger.debug( | |
| "distributed_lock: skipping %s — lock held by another worker", | |
| fn.__name__, | |
| ) | |
| return | |
| try: | |
| await fn() | |
| finally: | |
| await redis_client.delete(lock_key) | |
| return _locked | |