File size: 15,313 Bytes
ca4b037
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
from __future__ import annotations

import atexit
import logging
import os
import pathlib
import sqlite3
import threading
import time
from contextlib import contextmanager, suppress
from typing import TYPE_CHECKING, Literal
from weakref import WeakValueDictionary

from ._api import AcquireReturnProxy
from ._error import Timeout

if TYPE_CHECKING:
    from collections.abc import Generator

_LOGGER = logging.getLogger("filelock")

_all_connections: set[sqlite3.Connection] = set()
_all_connections_lock = threading.Lock()


def _cleanup_connections() -> None:
    with _all_connections_lock:
        for con in list(_all_connections):
            with suppress(Exception):
                con.close()
        _all_connections.clear()


atexit.register(_cleanup_connections)

# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days).
_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1


def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int:
    if blocking is False:
        return 0

    if timeout == -1:
        return _MAX_SQLITE_TIMEOUT_MS

    if timeout < 0:
        msg = "timeout must be a non-negative number or -1"
        raise ValueError(msg)

    remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout
    timeout_ms = int(remaining * 1000)
    if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
        _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
        return _MAX_SQLITE_TIMEOUT_MS
    return timeout_ms


class _ReadWriteLockMeta(type):
    """
    Metaclass that handles singleton resolution when is_singleton=True.

    Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently
    returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify.

    """

    _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock]
    _instances_lock: threading.Lock

    def __call__(
        cls,
        lock_file: str | os.PathLike[str],
        timeout: float = -1,
        *,
        blocking: bool = True,
        is_singleton: bool = True,
    ) -> ReadWriteLock:
        if not is_singleton:
            return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)

        normalized = pathlib.Path(lock_file).resolve()
        with cls._instances_lock:
            if normalized not in cls._instances:
                instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
                cls._instances[normalized] = instance
            else:
                instance = cls._instances[normalized]

            if instance.timeout != timeout or instance.blocking != blocking:
                msg = (
                    f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
                    f" cannot be changed to timeout={timeout}, blocking={blocking}"
                )
                raise ValueError(msg)
            return instance


class ReadWriteLock(metaclass=_ReadWriteLockMeta):
    """
    Cross-process read-write lock backed by SQLite.

    Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple
    ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read
    to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that
    acquired them.

    By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same
    instance. The lock file must use a ``.db`` extension (SQLite database).

    :param lock_file: path to the SQLite database file used as the lock
    :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
    :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
    :param is_singleton: if ``True``, reuse existing instances for the same resolved path

    .. versionadded:: 3.21.0

    """

    _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary()
    _instances_lock = threading.Lock()

    @classmethod
    def get_lock(
        cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True
    ) -> ReadWriteLock:
        """
        Return the singleton :class:`ReadWriteLock` for *lock_file*.

        :param lock_file: path to the SQLite database file used as the lock
        :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable

        :returns: the singleton lock instance

        :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values

        """
        return cls(lock_file, timeout, blocking=blocking)

    def __init__(
        self,
        lock_file: str | os.PathLike[str],
        timeout: float = -1,
        *,
        blocking: bool = True,
        is_singleton: bool = True,  # noqa: ARG002  # consumed by _ReadWriteLockMeta.__call__
    ) -> None:
        self.lock_file = os.fspath(lock_file)
        self.timeout = timeout
        self.blocking = blocking
        self._transaction_lock = threading.Lock()  # serializes the (possibly blocking) SQLite transaction work
        self._internal_lock = threading.Lock()  # protects _lock_level / _current_mode updates and rollback
        self._lock_level = 0
        self._current_mode: Literal["read", "write"] | None = None
        self._write_thread_id: int | None = None
        self._con = sqlite3.connect(self.lock_file, check_same_thread=False)
        with _all_connections_lock:
            _all_connections.add(self._con)

    def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None:
        if timeout == -1:
            # blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics
            acquired = self._transaction_lock.acquire(blocking)
        else:
            acquired = self._transaction_lock.acquire(blocking, timeout)
        if not acquired:
            raise Timeout(self.lock_file) from None

    def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy:
        if self._current_mode != mode:
            msg = (
                f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
                f"already holding a {opposite} lock ({direction} not allowed)"
            )
            raise RuntimeError(msg)
        if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id:
            msg = (
                f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
                f"from thread {cur} while it is held by thread {self._write_thread_id}"
            )
            raise RuntimeError(msg)
        self._lock_level += 1
        return AcquireReturnProxy(lock=self)

    def _configure_and_begin(
        self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float
    ) -> None:
        waited = time.perf_counter() - start_time
        timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)
        self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close()
        # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE
        # write transaction is active, making read-write locking impossible without modifying table data.
        # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB.
        # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
        #
        # Set here (not in __init__) because this pragma itself may block on a locked database,
        # so it must run after busy_timeout is configured above.
        self._con.execute("PRAGMA journal_mode=MEMORY;").close()
        # Recompute remaining timeout after the potentially blocking journal_mode pragma.
        waited = time.perf_counter() - start_time
        if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms:
            self._con.execute(f"PRAGMA busy_timeout={recomputed};").close()
        stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;"
        self._con.execute(stmt).close()
        if mode == "read":
            # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database.
            # https://www.sqlite.org/lockingv3.html#transaction_control
            self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close()

    def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy:
        opposite = "write" if mode == "read" else "read"
        direction = "downgrade" if mode == "read" else "upgrade"

        with self._internal_lock:
            if self._lock_level > 0:
                return self._validate_reentrant(mode, opposite, direction)

        start_time = time.perf_counter()
        self._acquire_transaction_lock(blocking=blocking, timeout=timeout)
        try:
            # Double-check: another thread may have acquired the lock while we waited on _transaction_lock.
            with self._internal_lock:
                if self._lock_level > 0:
                    return self._validate_reentrant(mode, opposite, direction)

            self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time)

            with self._internal_lock:
                self._current_mode = mode
                self._lock_level = 1
                if mode == "write":
                    self._write_thread_id = threading.get_ident()

            return AcquireReturnProxy(lock=self)

        except sqlite3.OperationalError as exc:
            if "database is locked" not in str(exc):
                raise
            raise Timeout(self.lock_file) from None
        finally:
            self._transaction_lock.release()

    def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
        """
        Acquire a shared read lock.

        If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
        read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed).

        :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable

        :returns: a proxy that can be used as a context manager to release the lock

        :raises RuntimeError: if a write lock is already held on this instance
        :raises Timeout: if the lock cannot be acquired within *timeout* seconds

        """
        return self._acquire("read", timeout, blocking=blocking)

    def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
        """
        Acquire an exclusive write lock.

        If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
        Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed).
        Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
        :class:`RuntimeError`.

        :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable

        :returns: a proxy that can be used as a context manager to release the lock

        :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
        :raises Timeout: if the lock cannot be acquired within *timeout* seconds

        """
        return self._acquire("write", timeout, blocking=blocking)

    def release(self, *, force: bool = False) -> None:
        """
        Release one level of the current lock.

        When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock.

        :param force: if ``True``, release the lock completely regardless of the current lock level

        :raises RuntimeError: if no lock is currently held and *force* is ``False``

        """
        should_rollback = False
        with self._internal_lock:
            if self._lock_level == 0:
                if force:
                    return
                msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
                raise RuntimeError(msg)
            if force:
                self._lock_level = 0
            else:
                self._lock_level -= 1
            if self._lock_level == 0:
                self._current_mode = None
                self._write_thread_id = None
                should_rollback = True
        if should_rollback:
            self._con.rollback()

    @contextmanager
    def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
        """
        Context manager that acquires and releases a shared read lock.

        Falls back to instance defaults for *timeout* and *blocking* when ``None``.

        :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default

        """
        if timeout is None:
            timeout = self.timeout
        if blocking is None:
            blocking = self.blocking
        self.acquire_read(timeout, blocking=blocking)
        try:
            yield
        finally:
            self.release()

    @contextmanager
    def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
        """
        Context manager that acquires and releases an exclusive write lock.

        Falls back to instance defaults for *timeout* and *blocking* when ``None``.

        :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default

        """
        if timeout is None:
            timeout = self.timeout
        if blocking is None:
            blocking = self.blocking
        self.acquire_write(timeout, blocking=blocking)
        try:
            yield
        finally:
            self.release()

    def close(self) -> None:
        """
        Release the lock (if held) and close the underlying SQLite connection.

        After calling this method, the lock instance is no longer usable.

        """
        self.release(force=True)
        self._con.close()
        with _all_connections_lock:
            _all_connections.discard(self._con)