AIDA / app /utils /distributed_lock.py
destinyebuka's picture
new setup
7cd10a9
# app/utils/distributed_lock.py
"""
Redis-based distributed lock for APScheduler jobs.
Prevents duplicate job execution when multiple gunicorn/uvicorn workers
are running the same AsyncIOScheduler independently.
Usage (in main.py):
from app.utils.distributed_lock import make_locked_job
locked = make_locked_job(redis_client, my_job, "lock:my_job", ttl_seconds=840)
scheduler.add_job(locked, "interval", minutes=15)
"""
import functools
import logging
from typing import Awaitable, Callable
logger = logging.getLogger(__name__)
def make_locked_job(
redis_client,
fn: Callable[[], Awaitable[None]],
lock_key: str,
ttl_seconds: int,
) -> Callable[[], Awaitable[None]]:
"""
Return an async wrapper around `fn` that acquires a Redis SET NX lock
before running. If the lock is already held (another worker is executing
the same job), this invocation silently exits without running `fn`.
The lock is always released in a `finally` block so a job crash does not
permanently block subsequent runs. The TTL acts as a safety net for
worker crashes — set it to the job interval minus a small buffer.
"""
@functools.wraps(fn)
async def _locked() -> None:
if redis_client is None:
# No Redis available — fall back to running without a lock.
# Acceptable for local dev; logs a warning so it is visible.
logger.warning(
"distributed_lock: Redis not available, running %s without lock",
fn.__name__,
)
await fn()
return
acquired = await redis_client.set(lock_key, "1", nx=True, ex=ttl_seconds)
if not acquired:
logger.debug(
"distributed_lock: skipping %s — lock held by another worker",
fn.__name__,
)
return
try:
await fn()
finally:
await redis_client.delete(lock_key)
return _locked