copper-mind / app /lock.py
ifieryarrows's picture
Sync from GitHub
f64b002 verified
"""
Pipeline lock mechanism to prevent concurrent heavy operations.
Uses file-based locking for simplicity and cross-process compatibility.
"""
import logging
import os
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from filelock import FileLock, Timeout
from app.settings import get_settings
logger = logging.getLogger(__name__)
class PipelineLock:
"""
File-based lock for pipeline operations.
Prevents concurrent data ingestion or model training.
"""
def __init__(self, lock_file: Optional[str] = None, timeout: int = 0):
"""
Initialize the lock.
Args:
lock_file: Path to lock file. If None, uses settings.
timeout: Seconds to wait for lock. 0 = non-blocking, -1 = wait forever.
"""
settings = get_settings()
self.lock_file = Path(lock_file or settings.pipeline_lock_file)
self.timeout = timeout
self._lock: Optional[FileLock] = None
self._acquired = False
# Ensure lock directory exists
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
def acquire(self) -> bool:
"""
Try to acquire the lock.
Returns:
True if lock acquired, False if already locked.
"""
if self._acquired:
return True
self._lock = FileLock(self.lock_file, timeout=self.timeout)
try:
self._lock.acquire(timeout=self.timeout)
self._acquired = True
# Write lock info
self._write_lock_info()
logger.info(f"Pipeline lock acquired: {self.lock_file}")
return True
except Timeout:
logger.warning(f"Could not acquire pipeline lock (already locked): {self.lock_file}")
return False
def release(self):
"""Release the lock."""
if self._lock and self._acquired:
self._lock.release()
self._acquired = False
# Remove lock info file
info_file = Path(str(self.lock_file) + ".info")
if info_file.exists():
try:
info_file.unlink()
except Exception:
pass
logger.info(f"Pipeline lock released: {self.lock_file}")
def _write_lock_info(self):
"""Write info about who holds the lock."""
info_file = Path(str(self.lock_file) + ".info")
try:
info = {
"pid": os.getpid(),
"acquired_at": datetime.now(timezone.utc).isoformat(),
}
info_file.write_text(str(info))
except Exception as e:
logger.debug(f"Could not write lock info: {e}")
def __enter__(self):
"""Context manager entry."""
if not self.acquire():
raise RuntimeError("Could not acquire pipeline lock")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()
return False
def is_pipeline_locked() -> bool:
"""
Check if the pipeline is currently locked.
Non-blocking check.
"""
settings = get_settings()
lock_file = Path(settings.pipeline_lock_file)
if not lock_file.exists():
return False
# Try to acquire briefly
lock = FileLock(lock_file, timeout=0)
try:
lock.acquire(timeout=0)
lock.release()
return False
except Timeout:
return True
def get_lock_info() -> Optional[dict]:
"""
Get information about the current lock holder.
"""
settings = get_settings()
info_file = Path(str(settings.pipeline_lock_file) + ".info")
if not info_file.exists():
return None
try:
content = info_file.read_text()
# Parse the dict string (simple approach)
import ast
return ast.literal_eval(content)
except Exception:
return None
@contextmanager
def pipeline_lock(timeout: int = 0):
"""
Context manager for pipeline locking.
Usage:
with pipeline_lock():
# Do heavy work
pass
Args:
timeout: Seconds to wait for lock. 0 = non-blocking, -1 = wait forever.
Raises:
RuntimeError: If lock cannot be acquired.
"""
lock = PipelineLock(timeout=timeout)
try:
if not lock.acquire():
raise RuntimeError("Pipeline is locked by another process")
yield lock
finally:
lock.release()