Spaces:
Running
Running
File size: 1,995 Bytes
7cd10a9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | # app/utils/distributed_lock.py
"""
Redis-based distributed lock for APScheduler jobs.
Prevents duplicate job execution when multiple gunicorn/uvicorn workers
are running the same AsyncIOScheduler independently.
Usage (in main.py):
from app.utils.distributed_lock import make_locked_job
locked = make_locked_job(redis_client, my_job, "lock:my_job", ttl_seconds=840)
scheduler.add_job(locked, "interval", minutes=15)
"""
import functools
import logging
from typing import Awaitable, Callable
logger = logging.getLogger(__name__)
def make_locked_job(
redis_client,
fn: Callable[[], Awaitable[None]],
lock_key: str,
ttl_seconds: int,
) -> Callable[[], Awaitable[None]]:
"""
Return an async wrapper around `fn` that acquires a Redis SET NX lock
before running. If the lock is already held (another worker is executing
the same job), this invocation silently exits without running `fn`.
The lock is always released in a `finally` block so a job crash does not
permanently block subsequent runs. The TTL acts as a safety net for
worker crashes — set it to the job interval minus a small buffer.
"""
@functools.wraps(fn)
async def _locked() -> None:
if redis_client is None:
# No Redis available — fall back to running without a lock.
# Acceptable for local dev; logs a warning so it is visible.
logger.warning(
"distributed_lock: Redis not available, running %s without lock",
fn.__name__,
)
await fn()
return
acquired = await redis_client.set(lock_key, "1", nx=True, ex=ttl_seconds)
if not acquired:
logger.debug(
"distributed_lock: skipping %s — lock held by another worker",
fn.__name__,
)
return
try:
await fn()
finally:
await redis_client.delete(lock_key)
return _locked
|