File size: 1,995 Bytes
7cd10a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# app/utils/distributed_lock.py
"""
Redis-based distributed lock for APScheduler jobs.

Prevents duplicate job execution when multiple gunicorn/uvicorn workers
are running the same AsyncIOScheduler independently.

Usage (in main.py):
    from app.utils.distributed_lock import make_locked_job
    locked = make_locked_job(redis_client, my_job, "lock:my_job", ttl_seconds=840)
    scheduler.add_job(locked, "interval", minutes=15)
"""

import functools
import logging
from typing import Awaitable, Callable

logger = logging.getLogger(__name__)


def make_locked_job(
    redis_client,
    fn: Callable[[], Awaitable[None]],
    lock_key: str,
    ttl_seconds: int,
) -> Callable[[], Awaitable[None]]:
    """
    Return an async wrapper around `fn` that acquires a Redis SET NX lock
    before running. If the lock is already held (another worker is executing
    the same job), this invocation silently exits without running `fn`.

    The lock is always released in a `finally` block so a job crash does not
    permanently block subsequent runs. The TTL acts as a safety net for
    worker crashes — set it to the job interval minus a small buffer.
    """

    @functools.wraps(fn)
    async def _locked() -> None:
        if redis_client is None:
            # No Redis available — fall back to running without a lock.
            # Acceptable for local dev; logs a warning so it is visible.
            logger.warning(
                "distributed_lock: Redis not available, running %s without lock",
                fn.__name__,
            )
            await fn()
            return

        acquired = await redis_client.set(lock_key, "1", nx=True, ex=ttl_seconds)
        if not acquired:
            logger.debug(
                "distributed_lock: skipping %s — lock held by another worker",
                fn.__name__,
            )
            return

        try:
            await fn()
        finally:
            await redis_client.delete(lock_key)

    return _locked