GodSpeed / src /redis /locks.py
Samyuktha24's picture
feat: Implement polling adapters for logs, metrics, error traces, and business data
0493349
"""Distributed locks to prevent concurrent syncing of same source."""
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Optional
import redis
class DistributedLock:
"""
Redis-backed distributed lock for preventing concurrent operations on same resource.
Prevents multiple Celery workers from syncing the same source simultaneously.
"""
def __init__(self, redis_client: redis.Redis, key_prefix: str = "locks"):
self.redis = redis_client
self.key_prefix = key_prefix
self.lock_id = str(uuid.uuid4())
def _make_lock_key(self, resource: str) -> str:
"""Build lock key."""
return f"{self.key_prefix}:{resource}"
async def acquire(
self,
resource: str,
timeout_seconds: int = 3600,
) -> bool:
"""
Try to acquire a lock on a resource.
Args:
resource: Unique identifier (e.g., "notion:workspace-123")
timeout_seconds: How long lock lasts before auto-releasing
Returns:
True if lock acquired, False if resource already locked
"""
lock_key = self._make_lock_key(resource)
# SET NX: only set if not exists
# SET with EX: auto-expire after timeout
result = self.redis.set(
lock_key,
self.lock_id,
nx=True,
ex=timeout_seconds,
)
return result is not None
async def release(self, resource: str) -> bool:
"""
Release a lock (only if we own it).
Uses Lua script for atomic check-and-delete.
"""
lock_key = self._make_lock_key(resource)
# Lua script: delete only if value matches our lock_id (prevents freeing other locks)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
script = self.redis.register_script(lua_script)
result = script(keys=[lock_key], args=[self.lock_id])
return result > 0
async def is_locked(self, resource: str) -> bool:
"""Check if resource is currently locked."""
lock_key = self._make_lock_key(resource)
return self.redis.exists(lock_key) > 0
async def wait_and_acquire(
self,
resource: str,
timeout_seconds: int = 3600,
max_wait_seconds: int = 300,
) -> bool:
"""
Poll and wait until lock is available, then acquire it.
Useful for ensuring sequential processing of the same resource.
"""
wait_start = datetime.utcnow()
wait_deadline = wait_start + timedelta(seconds=max_wait_seconds)
while datetime.utcnow() < wait_deadline:
if await self.acquire(resource, timeout_seconds):
return True
# Back off: sleep 100ms before retrying
await asyncio.sleep(0.1)
return False
class LockPool:
"""Manage multiple locks efficiently."""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.locks = {} # resource -> DistributedLock instance
def get_lock(self, resource: str) -> DistributedLock:
"""Get or create a lock for a resource."""
if resource not in self.locks:
self.locks[resource] = DistributedLock(self.redis)
return self.locks[resource]
async def acquire_many(
self,
resources: list[str],
timeout_seconds: int = 3600,
) -> dict[str, bool]:
"""Try to acquire locks on multiple resources."""
results = {}
for resource in resources:
lock = self.get_lock(resource)
results[resource] = await lock.acquire(resource, timeout_seconds)
return results
async def release_all(self):
"""Release all locks held by this pool."""
for resource, lock in self.locks.items():
await lock.release(resource)
self.locks.clear()