File size: 15,313 Bytes
ca4b037 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 | from __future__ import annotations
import atexit
import logging
import os
import pathlib
import sqlite3
import threading
import time
from contextlib import contextmanager, suppress
from typing import TYPE_CHECKING, Literal
from weakref import WeakValueDictionary
from ._api import AcquireReturnProxy
from ._error import Timeout
if TYPE_CHECKING:
from collections.abc import Generator
_LOGGER = logging.getLogger("filelock")
_all_connections: set[sqlite3.Connection] = set()
_all_connections_lock = threading.Lock()
def _cleanup_connections() -> None:
with _all_connections_lock:
for con in list(_all_connections):
with suppress(Exception):
con.close()
_all_connections.clear()
atexit.register(_cleanup_connections)
# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days).
_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int:
if blocking is False:
return 0
if timeout == -1:
return _MAX_SQLITE_TIMEOUT_MS
if timeout < 0:
msg = "timeout must be a non-negative number or -1"
raise ValueError(msg)
remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout
timeout_ms = int(remaining * 1000)
if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
_LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
return _MAX_SQLITE_TIMEOUT_MS
return timeout_ms
class _ReadWriteLockMeta(type):
"""
Metaclass that handles singleton resolution when is_singleton=True.
Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently
returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify.
"""
_instances: WeakValueDictionary[pathlib.Path, ReadWriteLock]
_instances_lock: threading.Lock
def __call__(
cls,
lock_file: str | os.PathLike[str],
timeout: float = -1,
*,
blocking: bool = True,
is_singleton: bool = True,
) -> ReadWriteLock:
if not is_singleton:
return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
normalized = pathlib.Path(lock_file).resolve()
with cls._instances_lock:
if normalized not in cls._instances:
instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
cls._instances[normalized] = instance
else:
instance = cls._instances[normalized]
if instance.timeout != timeout or instance.blocking != blocking:
msg = (
f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
f" cannot be changed to timeout={timeout}, blocking={blocking}"
)
raise ValueError(msg)
return instance
class ReadWriteLock(metaclass=_ReadWriteLockMeta):
"""
Cross-process read-write lock backed by SQLite.
Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple
``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read
to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that
acquired them.
By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same
instance. The lock file must use a ``.db`` extension (SQLite database).
:param lock_file: path to the SQLite database file used as the lock
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:param is_singleton: if ``True``, reuse existing instances for the same resolved path
.. versionadded:: 3.21.0
"""
_instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary()
_instances_lock = threading.Lock()
@classmethod
def get_lock(
cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True
) -> ReadWriteLock:
"""
Return the singleton :class:`ReadWriteLock` for *lock_file*.
:param lock_file: path to the SQLite database file used as the lock
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:returns: the singleton lock instance
:raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
"""
return cls(lock_file, timeout, blocking=blocking)
def __init__(
self,
lock_file: str | os.PathLike[str],
timeout: float = -1,
*,
blocking: bool = True,
is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__
) -> None:
self.lock_file = os.fspath(lock_file)
self.timeout = timeout
self.blocking = blocking
self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work
self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback
self._lock_level = 0
self._current_mode: Literal["read", "write"] | None = None
self._write_thread_id: int | None = None
self._con = sqlite3.connect(self.lock_file, check_same_thread=False)
with _all_connections_lock:
_all_connections.add(self._con)
def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None:
if timeout == -1:
# blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics
acquired = self._transaction_lock.acquire(blocking)
else:
acquired = self._transaction_lock.acquire(blocking, timeout)
if not acquired:
raise Timeout(self.lock_file) from None
def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy:
if self._current_mode != mode:
msg = (
f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
f"already holding a {opposite} lock ({direction} not allowed)"
)
raise RuntimeError(msg)
if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id:
msg = (
f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
f"from thread {cur} while it is held by thread {self._write_thread_id}"
)
raise RuntimeError(msg)
self._lock_level += 1
return AcquireReturnProxy(lock=self)
def _configure_and_begin(
self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float
) -> None:
waited = time.perf_counter() - start_time
timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)
self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close()
# Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE
# write transaction is active, making read-write locking impossible without modifying table data.
# MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB.
# See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
#
# Set here (not in __init__) because this pragma itself may block on a locked database,
# so it must run after busy_timeout is configured above.
self._con.execute("PRAGMA journal_mode=MEMORY;").close()
# Recompute remaining timeout after the potentially blocking journal_mode pragma.
waited = time.perf_counter() - start_time
if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms:
self._con.execute(f"PRAGMA busy_timeout={recomputed};").close()
stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;"
self._con.execute(stmt).close()
if mode == "read":
# A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database.
# https://www.sqlite.org/lockingv3.html#transaction_control
self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close()
def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy:
opposite = "write" if mode == "read" else "read"
direction = "downgrade" if mode == "read" else "upgrade"
with self._internal_lock:
if self._lock_level > 0:
return self._validate_reentrant(mode, opposite, direction)
start_time = time.perf_counter()
self._acquire_transaction_lock(blocking=blocking, timeout=timeout)
try:
# Double-check: another thread may have acquired the lock while we waited on _transaction_lock.
with self._internal_lock:
if self._lock_level > 0:
return self._validate_reentrant(mode, opposite, direction)
self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time)
with self._internal_lock:
self._current_mode = mode
self._lock_level = 1
if mode == "write":
self._write_thread_id = threading.get_ident()
return AcquireReturnProxy(lock=self)
except sqlite3.OperationalError as exc:
if "database is locked" not in str(exc):
raise
raise Timeout(self.lock_file) from None
finally:
self._transaction_lock.release()
def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
"""
Acquire a shared read lock.
If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed).
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:returns: a proxy that can be used as a context manager to release the lock
:raises RuntimeError: if a write lock is already held on this instance
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
return self._acquire("read", timeout, blocking=blocking)
def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
"""
Acquire an exclusive write lock.
If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed).
Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
:class:`RuntimeError`.
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:returns: a proxy that can be used as a context manager to release the lock
:raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
return self._acquire("write", timeout, blocking=blocking)
def release(self, *, force: bool = False) -> None:
"""
Release one level of the current lock.
When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock.
:param force: if ``True``, release the lock completely regardless of the current lock level
:raises RuntimeError: if no lock is currently held and *force* is ``False``
"""
should_rollback = False
with self._internal_lock:
if self._lock_level == 0:
if force:
return
msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
raise RuntimeError(msg)
if force:
self._lock_level = 0
else:
self._lock_level -= 1
if self._lock_level == 0:
self._current_mode = None
self._write_thread_id = None
should_rollback = True
if should_rollback:
self._con.rollback()
@contextmanager
def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
"""
Context manager that acquires and releases a shared read lock.
Falls back to instance defaults for *timeout* and *blocking* when ``None``.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
"""
if timeout is None:
timeout = self.timeout
if blocking is None:
blocking = self.blocking
self.acquire_read(timeout, blocking=blocking)
try:
yield
finally:
self.release()
@contextmanager
def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
"""
Context manager that acquires and releases an exclusive write lock.
Falls back to instance defaults for *timeout* and *blocking* when ``None``.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
"""
if timeout is None:
timeout = self.timeout
if blocking is None:
blocking = self.blocking
self.acquire_write(timeout, blocking=blocking)
try:
yield
finally:
self.release()
def close(self) -> None:
"""
Release the lock (if held) and close the underlying SQLite connection.
After calling this method, the lock instance is no longer usable.
"""
self.release(force=True)
self._con.close()
with _all_connections_lock:
_all_connections.discard(self._con)
|