Spaces:
Sleeping
Sleeping
| """Distributed locks to prevent concurrent syncing of same source.""" | |
| import asyncio | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| import redis | |
| class DistributedLock: | |
| """ | |
| Redis-backed distributed lock for preventing concurrent operations on same resource. | |
| Prevents multiple Celery workers from syncing the same source simultaneously. | |
| """ | |
| def __init__(self, redis_client: redis.Redis, key_prefix: str = "locks"): | |
| self.redis = redis_client | |
| self.key_prefix = key_prefix | |
| self.lock_id = str(uuid.uuid4()) | |
| def _make_lock_key(self, resource: str) -> str: | |
| """Build lock key.""" | |
| return f"{self.key_prefix}:{resource}" | |
| async def acquire( | |
| self, | |
| resource: str, | |
| timeout_seconds: int = 3600, | |
| ) -> bool: | |
| """ | |
| Try to acquire a lock on a resource. | |
| Args: | |
| resource: Unique identifier (e.g., "notion:workspace-123") | |
| timeout_seconds: How long lock lasts before auto-releasing | |
| Returns: | |
| True if lock acquired, False if resource already locked | |
| """ | |
| lock_key = self._make_lock_key(resource) | |
| # SET NX: only set if not exists | |
| # SET with EX: auto-expire after timeout | |
| result = self.redis.set( | |
| lock_key, | |
| self.lock_id, | |
| nx=True, | |
| ex=timeout_seconds, | |
| ) | |
| return result is not None | |
| async def release(self, resource: str) -> bool: | |
| """ | |
| Release a lock (only if we own it). | |
| Uses Lua script for atomic check-and-delete. | |
| """ | |
| lock_key = self._make_lock_key(resource) | |
| # Lua script: delete only if value matches our lock_id (prevents freeing other locks) | |
| lua_script = """ | |
| if redis.call("get", KEYS[1]) == ARGV[1] then | |
| return redis.call("del", KEYS[1]) | |
| else | |
| return 0 | |
| end | |
| """ | |
| script = self.redis.register_script(lua_script) | |
| result = script(keys=[lock_key], args=[self.lock_id]) | |
| return result > 0 | |
| async def is_locked(self, resource: str) -> bool: | |
| """Check if resource is currently locked.""" | |
| lock_key = self._make_lock_key(resource) | |
| return self.redis.exists(lock_key) > 0 | |
| async def wait_and_acquire( | |
| self, | |
| resource: str, | |
| timeout_seconds: int = 3600, | |
| max_wait_seconds: int = 300, | |
| ) -> bool: | |
| """ | |
| Poll and wait until lock is available, then acquire it. | |
| Useful for ensuring sequential processing of the same resource. | |
| """ | |
| wait_start = datetime.utcnow() | |
| wait_deadline = wait_start + timedelta(seconds=max_wait_seconds) | |
| while datetime.utcnow() < wait_deadline: | |
| if await self.acquire(resource, timeout_seconds): | |
| return True | |
| # Back off: sleep 100ms before retrying | |
| await asyncio.sleep(0.1) | |
| return False | |
| class LockPool: | |
| """Manage multiple locks efficiently.""" | |
| def __init__(self, redis_client: redis.Redis): | |
| self.redis = redis_client | |
| self.locks = {} # resource -> DistributedLock instance | |
| def get_lock(self, resource: str) -> DistributedLock: | |
| """Get or create a lock for a resource.""" | |
| if resource not in self.locks: | |
| self.locks[resource] = DistributedLock(self.redis) | |
| return self.locks[resource] | |
| async def acquire_many( | |
| self, | |
| resources: list[str], | |
| timeout_seconds: int = 3600, | |
| ) -> dict[str, bool]: | |
| """Try to acquire locks on multiple resources.""" | |
| results = {} | |
| for resource in resources: | |
| lock = self.get_lock(resource) | |
| results[resource] = await lock.acquire(resource, timeout_seconds) | |
| return results | |
| async def release_all(self): | |
| """Release all locks held by this pool.""" | |
| for resource, lock in self.locks.items(): | |
| await lock.release(resource) | |
| self.locks.clear() | |