|
|
import re |
|
|
import threading |
|
|
from contextlib import contextmanager |
|
|
from pathlib import Path |
|
|
|
|
|
from filelock import FileLock |
|
|
from platformdirs import user_cache_dir |
|
|
|
|
|
|
|
|
class KeyedMemoryLockManager: |
|
|
"""A manager for acquiring and releasing memory locks based on a key.""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
self.locks: dict[str, threading.Lock] = {} |
|
|
self.global_lock = threading.Lock() |
|
|
|
|
|
def _get_lock(self, key: str): |
|
|
with self.global_lock: |
|
|
if key not in self.locks: |
|
|
self.locks[key] = threading.Lock() |
|
|
return self.locks[key] |
|
|
|
|
|
@contextmanager |
|
|
def lock(self, key: str): |
|
|
lock = self._get_lock(key) |
|
|
lock.acquire() |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
lock.release() |
|
|
|
|
|
|
|
|
class KeyedWorkerLockManager: |
|
|
"""A manager for acquiring locks between workers based on a key.""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks" |
|
|
|
|
|
def _validate_key(self, key: str) -> bool: |
|
|
"""Validate that the string only contains alphanumeric characters and underscores. |
|
|
|
|
|
Parameters: |
|
|
s (str): The string to validate. |
|
|
|
|
|
Returns: |
|
|
bool: True if the string is valid, False otherwise. |
|
|
""" |
|
|
pattern = re.compile(r"^\w+$") |
|
|
return bool(pattern.match(key)) |
|
|
|
|
|
@contextmanager |
|
|
def lock(self, key: str): |
|
|
if not self._validate_key(key): |
|
|
msg = f"Invalid key: {key}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
lock = FileLock(self.locks_dir / key) |
|
|
with lock: |
|
|
yield |
|
|
|