File size: 4,723 Bytes
f64b002
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
Pipeline lock mechanism to prevent concurrent heavy operations.
Uses file-based locking for simplicity and cross-process compatibility.
"""

import logging
import os
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

from filelock import FileLock, Timeout

from app.settings import get_settings

logger = logging.getLogger(__name__)


class PipelineLock:
    """
    File-based lock for pipeline operations.
    Prevents concurrent data ingestion or model training.
    """
    
    def __init__(self, lock_file: Optional[str] = None, timeout: int = 0):
        """
        Initialize the lock.
        
        Args:
            lock_file: Path to lock file. If None, uses settings.
            timeout: Seconds to wait for lock. 0 = non-blocking, -1 = wait forever.
        """
        settings = get_settings()
        self.lock_file = Path(lock_file or settings.pipeline_lock_file)
        self.timeout = timeout
        self._lock: Optional[FileLock] = None
        self._acquired = False
        
        # Ensure lock directory exists
        self.lock_file.parent.mkdir(parents=True, exist_ok=True)
    
    def acquire(self) -> bool:
        """
        Try to acquire the lock.
        
        Returns:
            True if lock acquired, False if already locked.
        """
        if self._acquired:
            return True
        
        self._lock = FileLock(self.lock_file, timeout=self.timeout)
        
        try:
            self._lock.acquire(timeout=self.timeout)
            self._acquired = True
            
            # Write lock info
            self._write_lock_info()
            
            logger.info(f"Pipeline lock acquired: {self.lock_file}")
            return True
            
        except Timeout:
            logger.warning(f"Could not acquire pipeline lock (already locked): {self.lock_file}")
            return False
    
    def release(self):
        """Release the lock."""
        if self._lock and self._acquired:
            self._lock.release()
            self._acquired = False
            
            # Remove lock info file
            info_file = Path(str(self.lock_file) + ".info")
            if info_file.exists():
                try:
                    info_file.unlink()
                except Exception:
                    pass
            
            logger.info(f"Pipeline lock released: {self.lock_file}")
    
    def _write_lock_info(self):
        """Write info about who holds the lock."""
        info_file = Path(str(self.lock_file) + ".info")
        try:
            info = {
                "pid": os.getpid(),
                "acquired_at": datetime.now(timezone.utc).isoformat(),
            }
            info_file.write_text(str(info))
        except Exception as e:
            logger.debug(f"Could not write lock info: {e}")
    
    def __enter__(self):
        """Context manager entry."""
        if not self.acquire():
            raise RuntimeError("Could not acquire pipeline lock")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.release()
        return False


def is_pipeline_locked() -> bool:
    """
    Check if the pipeline is currently locked.
    Non-blocking check.
    """
    settings = get_settings()
    lock_file = Path(settings.pipeline_lock_file)
    
    if not lock_file.exists():
        return False
    
    # Try to acquire briefly
    lock = FileLock(lock_file, timeout=0)
    try:
        lock.acquire(timeout=0)
        lock.release()
        return False
    except Timeout:
        return True


def get_lock_info() -> Optional[dict]:
    """
    Get information about the current lock holder.
    """
    settings = get_settings()
    info_file = Path(str(settings.pipeline_lock_file) + ".info")
    
    if not info_file.exists():
        return None
    
    try:
        content = info_file.read_text()
        # Parse the dict string (simple approach)
        import ast
        return ast.literal_eval(content)
    except Exception:
        return None


@contextmanager
def pipeline_lock(timeout: int = 0):
    """
    Context manager for pipeline locking.
    
    Usage:
        with pipeline_lock():
            # Do heavy work
            pass
    
    Args:
        timeout: Seconds to wait for lock. 0 = non-blocking, -1 = wait forever.
    
    Raises:
        RuntimeError: If lock cannot be acquired.
    """
    lock = PipelineLock(timeout=timeout)
    try:
        if not lock.acquire():
            raise RuntimeError("Pipeline is locked by another process")
        yield lock
    finally:
        lock.release()