File size: 4,057 Bytes
58ace4b
 
0493349
58ace4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Distributed locks to prevent concurrent syncing of same source."""

import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Optional

import redis


class DistributedLock:
    """
    Redis-backed distributed lock for preventing concurrent operations on same resource.

    Prevents multiple Celery workers from syncing the same source simultaneously.
    """

    def __init__(self, redis_client: redis.Redis, key_prefix: str = "locks"):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.lock_id = str(uuid.uuid4())

    def _make_lock_key(self, resource: str) -> str:
        """Build lock key."""
        return f"{self.key_prefix}:{resource}"

    async def acquire(
        self,
        resource: str,
        timeout_seconds: int = 3600,
    ) -> bool:
        """
        Try to acquire a lock on a resource.

        Args:
            resource: Unique identifier (e.g., "notion:workspace-123")
            timeout_seconds: How long lock lasts before auto-releasing

        Returns:
            True if lock acquired, False if resource already locked
        """
        lock_key = self._make_lock_key(resource)

        # SET NX: only set if not exists
        # SET with EX: auto-expire after timeout
        result = self.redis.set(
            lock_key,
            self.lock_id,
            nx=True,
            ex=timeout_seconds,
        )

        return result is not None

    async def release(self, resource: str) -> bool:
        """
        Release a lock (only if we own it).

        Uses Lua script for atomic check-and-delete.
        """
        lock_key = self._make_lock_key(resource)

        # Lua script: delete only if value matches our lock_id (prevents freeing other locks)
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        script = self.redis.register_script(lua_script)
        result = script(keys=[lock_key], args=[self.lock_id])

        return result > 0

    async def is_locked(self, resource: str) -> bool:
        """Check if resource is currently locked."""
        lock_key = self._make_lock_key(resource)
        return self.redis.exists(lock_key) > 0

    async def wait_and_acquire(
        self,
        resource: str,
        timeout_seconds: int = 3600,
        max_wait_seconds: int = 300,
    ) -> bool:
        """
        Poll and wait until lock is available, then acquire it.

        Useful for ensuring sequential processing of the same resource.
        """
        wait_start = datetime.utcnow()
        wait_deadline = wait_start + timedelta(seconds=max_wait_seconds)

        while datetime.utcnow() < wait_deadline:
            if await self.acquire(resource, timeout_seconds):
                return True

            # Back off: sleep 100ms before retrying
            await asyncio.sleep(0.1)

        return False


class LockPool:
    """Manage multiple locks efficiently."""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.locks = {}  # resource -> DistributedLock instance

    def get_lock(self, resource: str) -> DistributedLock:
        """Get or create a lock for a resource."""
        if resource not in self.locks:
            self.locks[resource] = DistributedLock(self.redis)

        return self.locks[resource]

    async def acquire_many(
        self,
        resources: list[str],
        timeout_seconds: int = 3600,
    ) -> dict[str, bool]:
        """Try to acquire locks on multiple resources."""
        results = {}

        for resource in resources:
            lock = self.get_lock(resource)
            results[resource] = await lock.acquire(resource, timeout_seconds)

        return results

    async def release_all(self):
        """Release all locks held by this pool."""
        for resource, lock in self.locks.items():
            await lock.release(resource)

        self.locks.clear()