| from __future__ import annotations |
|
|
| import math |
| from collections import deque |
| from collections.abc import Callable |
| from dataclasses import dataclass |
| from types import TracebackType |
| from typing import TypeVar |
|
|
| from ..lowlevel import checkpoint_if_cancelled |
| from ._eventloop import get_async_backend |
| from ._exceptions import BusyResourceError, NoEventLoopError |
| from ._tasks import CancelScope |
| from ._testing import TaskInfo, get_current_task |
|
|
| T = TypeVar("T") |
|
|
|
|
| @dataclass(frozen=True) |
| class EventStatistics: |
| """ |
| :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` |
| """ |
|
|
| tasks_waiting: int |
|
|
|
|
| @dataclass(frozen=True) |
| class CapacityLimiterStatistics: |
| """ |
| :ivar int borrowed_tokens: number of tokens currently borrowed by tasks |
| :ivar float total_tokens: total number of available tokens |
| :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from |
| this limiter |
| :ivar int tasks_waiting: number of tasks waiting on |
| :meth:`~.CapacityLimiter.acquire` or |
| :meth:`~.CapacityLimiter.acquire_on_behalf_of` |
| """ |
|
|
| borrowed_tokens: int |
| total_tokens: float |
| borrowers: tuple[object, ...] |
| tasks_waiting: int |
|
|
|
|
| @dataclass(frozen=True) |
| class LockStatistics: |
| """ |
| :ivar bool locked: flag indicating if this lock is locked or not |
| :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the |
| lock is not held by any task) |
| :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` |
| """ |
|
|
| locked: bool |
| owner: TaskInfo | None |
| tasks_waiting: int |
|
|
|
|
| @dataclass(frozen=True) |
| class ConditionStatistics: |
| """ |
| :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` |
| :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying |
| :class:`~.Lock` |
| """ |
|
|
| tasks_waiting: int |
| lock_statistics: LockStatistics |
|
|
|
|
| @dataclass(frozen=True) |
| class SemaphoreStatistics: |
| """ |
| :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` |
| |
| """ |
|
|
| tasks_waiting: int |
|
|
|
|
| class Event: |
| def __new__(cls) -> Event: |
| try: |
| return get_async_backend().create_event() |
| except NoEventLoopError: |
| return EventAdapter() |
|
|
| def set(self) -> None: |
| """Set the flag, notifying all listeners.""" |
| raise NotImplementedError |
|
|
| def is_set(self) -> bool: |
| """Return ``True`` if the flag is set, ``False`` if not.""" |
| raise NotImplementedError |
|
|
| async def wait(self) -> None: |
| """ |
| Wait until the flag has been set. |
| |
| If the flag has already been set when this method is called, it returns |
| immediately. |
| |
| """ |
| raise NotImplementedError |
|
|
| def statistics(self) -> EventStatistics: |
| """Return statistics about the current state of this event.""" |
| raise NotImplementedError |
|
|
|
|
| class EventAdapter(Event): |
| _internal_event: Event | None = None |
| _is_set: bool = False |
|
|
| def __new__(cls) -> EventAdapter: |
| return object.__new__(cls) |
|
|
| @property |
| def _event(self) -> Event: |
| if self._internal_event is None: |
| self._internal_event = get_async_backend().create_event() |
| if self._is_set: |
| self._internal_event.set() |
|
|
| return self._internal_event |
|
|
| def set(self) -> None: |
| if self._internal_event is None: |
| self._is_set = True |
| else: |
| self._event.set() |
|
|
| def is_set(self) -> bool: |
| if self._internal_event is None: |
| return self._is_set |
|
|
| return self._internal_event.is_set() |
|
|
| async def wait(self) -> None: |
| await self._event.wait() |
|
|
| def statistics(self) -> EventStatistics: |
| if self._internal_event is None: |
| return EventStatistics(tasks_waiting=0) |
|
|
| return self._internal_event.statistics() |
|
|
|
|
| class Lock: |
| def __new__(cls, *, fast_acquire: bool = False) -> Lock: |
| try: |
| return get_async_backend().create_lock(fast_acquire=fast_acquire) |
| except NoEventLoopError: |
| return LockAdapter(fast_acquire=fast_acquire) |
|
|
| async def __aenter__(self) -> None: |
| await self.acquire() |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| self.release() |
|
|
| async def acquire(self) -> None: |
| """Acquire the lock.""" |
| raise NotImplementedError |
|
|
| def acquire_nowait(self) -> None: |
| """ |
| Acquire the lock, without blocking. |
| |
| :raises ~anyio.WouldBlock: if the operation would block |
| |
| """ |
| raise NotImplementedError |
|
|
| def release(self) -> None: |
| """Release the lock.""" |
| raise NotImplementedError |
|
|
| def locked(self) -> bool: |
| """Return True if the lock is currently held.""" |
| raise NotImplementedError |
|
|
| def statistics(self) -> LockStatistics: |
| """ |
| Return statistics about the current state of this lock. |
| |
| .. versionadded:: 3.0 |
| """ |
| raise NotImplementedError |
|
|
|
|
| class LockAdapter(Lock): |
| _internal_lock: Lock | None = None |
|
|
| def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: |
| return object.__new__(cls) |
|
|
| def __init__(self, *, fast_acquire: bool = False): |
| self._fast_acquire = fast_acquire |
|
|
| @property |
| def _lock(self) -> Lock: |
| if self._internal_lock is None: |
| self._internal_lock = get_async_backend().create_lock( |
| fast_acquire=self._fast_acquire |
| ) |
|
|
| return self._internal_lock |
|
|
| async def __aenter__(self) -> None: |
| await self._lock.acquire() |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| if self._internal_lock is not None: |
| self._internal_lock.release() |
|
|
| async def acquire(self) -> None: |
| """Acquire the lock.""" |
| await self._lock.acquire() |
|
|
| def acquire_nowait(self) -> None: |
| """ |
| Acquire the lock, without blocking. |
| |
| :raises ~anyio.WouldBlock: if the operation would block |
| |
| """ |
| self._lock.acquire_nowait() |
|
|
| def release(self) -> None: |
| """Release the lock.""" |
| self._lock.release() |
|
|
| def locked(self) -> bool: |
| """Return True if the lock is currently held.""" |
| return self._lock.locked() |
|
|
| def statistics(self) -> LockStatistics: |
| """ |
| Return statistics about the current state of this lock. |
| |
| .. versionadded:: 3.0 |
| |
| """ |
| if self._internal_lock is None: |
| return LockStatistics(False, None, 0) |
|
|
| return self._internal_lock.statistics() |
|
|
|
|
| class Condition: |
| _owner_task: TaskInfo | None = None |
|
|
| def __init__(self, lock: Lock | None = None): |
| self._lock = lock or Lock() |
| self._waiters: deque[Event] = deque() |
|
|
| async def __aenter__(self) -> None: |
| await self.acquire() |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| self.release() |
|
|
| def _check_acquired(self) -> None: |
| if self._owner_task != get_current_task(): |
| raise RuntimeError("The current task is not holding the underlying lock") |
|
|
| async def acquire(self) -> None: |
| """Acquire the underlying lock.""" |
| await self._lock.acquire() |
| self._owner_task = get_current_task() |
|
|
| def acquire_nowait(self) -> None: |
| """ |
| Acquire the underlying lock, without blocking. |
| |
| :raises ~anyio.WouldBlock: if the operation would block |
| |
| """ |
| self._lock.acquire_nowait() |
| self._owner_task = get_current_task() |
|
|
| def release(self) -> None: |
| """Release the underlying lock.""" |
| self._lock.release() |
|
|
| def locked(self) -> bool: |
| """Return True if the lock is set.""" |
| return self._lock.locked() |
|
|
| def notify(self, n: int = 1) -> None: |
| """Notify exactly n listeners.""" |
| self._check_acquired() |
| for _ in range(n): |
| try: |
| event = self._waiters.popleft() |
| except IndexError: |
| break |
|
|
| event.set() |
|
|
| def notify_all(self) -> None: |
| """Notify all the listeners.""" |
| self._check_acquired() |
| for event in self._waiters: |
| event.set() |
|
|
| self._waiters.clear() |
|
|
| async def wait(self) -> None: |
| """Wait for a notification.""" |
| await checkpoint_if_cancelled() |
| self._check_acquired() |
| event = Event() |
| self._waiters.append(event) |
| self.release() |
| try: |
| await event.wait() |
| except BaseException: |
| if not event.is_set(): |
| self._waiters.remove(event) |
|
|
| raise |
| finally: |
| with CancelScope(shield=True): |
| await self.acquire() |
|
|
| async def wait_for(self, predicate: Callable[[], T]) -> T: |
| """ |
| Wait until a predicate becomes true. |
| |
| :param predicate: a callable that returns a truthy value when the condition is |
| met |
| :return: the result of the predicate |
| |
| .. versionadded:: 4.11.0 |
| |
| """ |
| while not (result := predicate()): |
| await self.wait() |
|
|
| return result |
|
|
| def statistics(self) -> ConditionStatistics: |
| """ |
| Return statistics about the current state of this condition. |
| |
| .. versionadded:: 3.0 |
| """ |
| return ConditionStatistics(len(self._waiters), self._lock.statistics()) |
|
|
|
|
| class Semaphore: |
| def __new__( |
| cls, |
| initial_value: int, |
| *, |
| max_value: int | None = None, |
| fast_acquire: bool = False, |
| ) -> Semaphore: |
| try: |
| return get_async_backend().create_semaphore( |
| initial_value, max_value=max_value, fast_acquire=fast_acquire |
| ) |
| except NoEventLoopError: |
| return SemaphoreAdapter(initial_value, max_value=max_value) |
|
|
| def __init__( |
| self, |
| initial_value: int, |
| *, |
| max_value: int | None = None, |
| fast_acquire: bool = False, |
| ): |
| if not isinstance(initial_value, int): |
| raise TypeError("initial_value must be an integer") |
| if initial_value < 0: |
| raise ValueError("initial_value must be >= 0") |
| if max_value is not None: |
| if not isinstance(max_value, int): |
| raise TypeError("max_value must be an integer or None") |
| if max_value < initial_value: |
| raise ValueError( |
| "max_value must be equal to or higher than initial_value" |
| ) |
|
|
| self._fast_acquire = fast_acquire |
|
|
| async def __aenter__(self) -> Semaphore: |
| await self.acquire() |
| return self |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| self.release() |
|
|
| async def acquire(self) -> None: |
| """Decrement the semaphore value, blocking if necessary.""" |
| raise NotImplementedError |
|
|
| def acquire_nowait(self) -> None: |
| """ |
| Acquire the underlying lock, without blocking. |
| |
| :raises ~anyio.WouldBlock: if the operation would block |
| |
| """ |
| raise NotImplementedError |
|
|
| def release(self) -> None: |
| """Increment the semaphore value.""" |
| raise NotImplementedError |
|
|
| @property |
| def value(self) -> int: |
| """The current value of the semaphore.""" |
| raise NotImplementedError |
|
|
| @property |
| def max_value(self) -> int | None: |
| """The maximum value of the semaphore.""" |
| raise NotImplementedError |
|
|
| def statistics(self) -> SemaphoreStatistics: |
| """ |
| Return statistics about the current state of this semaphore. |
| |
| .. versionadded:: 3.0 |
| """ |
| raise NotImplementedError |
|
|
|
|
| class SemaphoreAdapter(Semaphore): |
| _internal_semaphore: Semaphore | None = None |
|
|
| def __new__( |
| cls, |
| initial_value: int, |
| *, |
| max_value: int | None = None, |
| fast_acquire: bool = False, |
| ) -> SemaphoreAdapter: |
| return object.__new__(cls) |
|
|
| def __init__( |
| self, |
| initial_value: int, |
| *, |
| max_value: int | None = None, |
| fast_acquire: bool = False, |
| ) -> None: |
| super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) |
| self._initial_value = initial_value |
| self._max_value = max_value |
|
|
| @property |
| def _semaphore(self) -> Semaphore: |
| if self._internal_semaphore is None: |
| self._internal_semaphore = get_async_backend().create_semaphore( |
| self._initial_value, max_value=self._max_value |
| ) |
|
|
| return self._internal_semaphore |
|
|
| async def acquire(self) -> None: |
| await self._semaphore.acquire() |
|
|
| def acquire_nowait(self) -> None: |
| self._semaphore.acquire_nowait() |
|
|
| def release(self) -> None: |
| self._semaphore.release() |
|
|
| @property |
| def value(self) -> int: |
| if self._internal_semaphore is None: |
| return self._initial_value |
|
|
| return self._semaphore.value |
|
|
| @property |
| def max_value(self) -> int | None: |
| return self._max_value |
|
|
| def statistics(self) -> SemaphoreStatistics: |
| if self._internal_semaphore is None: |
| return SemaphoreStatistics(tasks_waiting=0) |
|
|
| return self._semaphore.statistics() |
|
|
|
|
| class CapacityLimiter: |
| def __new__(cls, total_tokens: float) -> CapacityLimiter: |
| try: |
| return get_async_backend().create_capacity_limiter(total_tokens) |
| except NoEventLoopError: |
| return CapacityLimiterAdapter(total_tokens) |
|
|
| async def __aenter__(self) -> None: |
| raise NotImplementedError |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| raise NotImplementedError |
|
|
| @property |
| def total_tokens(self) -> float: |
| """ |
| The total number of tokens available for borrowing. |
| |
| This is a read-write property. If the total number of tokens is increased, the |
| proportionate number of tasks waiting on this limiter will be granted their |
| tokens. |
| |
| .. versionchanged:: 3.0 |
| The property is now writable. |
| .. versionchanged:: 4.12 |
| The value can now be set to 0. |
| |
| """ |
| raise NotImplementedError |
|
|
| @total_tokens.setter |
| def total_tokens(self, value: float) -> None: |
| raise NotImplementedError |
|
|
| @property |
| def borrowed_tokens(self) -> int: |
| """The number of tokens that have currently been borrowed.""" |
| raise NotImplementedError |
|
|
| @property |
| def available_tokens(self) -> float: |
| """The number of tokens currently available to be borrowed""" |
| raise NotImplementedError |
|
|
| def acquire_nowait(self) -> None: |
| """ |
| Acquire a token for the current task without waiting for one to become |
| available. |
| |
| :raises ~anyio.WouldBlock: if there are no tokens available for borrowing |
| |
| """ |
| raise NotImplementedError |
|
|
| def acquire_on_behalf_of_nowait(self, borrower: object) -> None: |
| """ |
| Acquire a token without waiting for one to become available. |
| |
| :param borrower: the entity borrowing a token |
| :raises ~anyio.WouldBlock: if there are no tokens available for borrowing |
| |
| """ |
| raise NotImplementedError |
|
|
| async def acquire(self) -> None: |
| """ |
| Acquire a token for the current task, waiting if necessary for one to become |
| available. |
| |
| """ |
| raise NotImplementedError |
|
|
| async def acquire_on_behalf_of(self, borrower: object) -> None: |
| """ |
| Acquire a token, waiting if necessary for one to become available. |
| |
| :param borrower: the entity borrowing a token |
| |
| """ |
| raise NotImplementedError |
|
|
| def release(self) -> None: |
| """ |
| Release the token held by the current task. |
| |
| :raises RuntimeError: if the current task has not borrowed a token from this |
| limiter. |
| |
| """ |
| raise NotImplementedError |
|
|
| def release_on_behalf_of(self, borrower: object) -> None: |
| """ |
| Release the token held by the given borrower. |
| |
| :raises RuntimeError: if the borrower has not borrowed a token from this |
| limiter. |
| |
| """ |
| raise NotImplementedError |
|
|
| def statistics(self) -> CapacityLimiterStatistics: |
| """ |
| Return statistics about the current state of this limiter. |
| |
| .. versionadded:: 3.0 |
| |
| """ |
| raise NotImplementedError |
|
|
|
|
| class CapacityLimiterAdapter(CapacityLimiter): |
| _internal_limiter: CapacityLimiter | None = None |
|
|
| def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter: |
| return object.__new__(cls) |
|
|
| def __init__(self, total_tokens: float) -> None: |
| self.total_tokens = total_tokens |
|
|
| @property |
| def _limiter(self) -> CapacityLimiter: |
| if self._internal_limiter is None: |
| self._internal_limiter = get_async_backend().create_capacity_limiter( |
| self._total_tokens |
| ) |
|
|
| return self._internal_limiter |
|
|
| async def __aenter__(self) -> None: |
| await self._limiter.__aenter__() |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) |
|
|
| @property |
| def total_tokens(self) -> float: |
| if self._internal_limiter is None: |
| return self._total_tokens |
|
|
| return self._internal_limiter.total_tokens |
|
|
| @total_tokens.setter |
| def total_tokens(self, value: float) -> None: |
| if not isinstance(value, int) and value is not math.inf: |
| raise TypeError("total_tokens must be an int or math.inf") |
| elif value < 1: |
| raise ValueError("total_tokens must be >= 1") |
|
|
| if self._internal_limiter is None: |
| self._total_tokens = value |
| return |
|
|
| self._limiter.total_tokens = value |
|
|
| @property |
| def borrowed_tokens(self) -> int: |
| if self._internal_limiter is None: |
| return 0 |
|
|
| return self._internal_limiter.borrowed_tokens |
|
|
| @property |
| def available_tokens(self) -> float: |
| if self._internal_limiter is None: |
| return self._total_tokens |
|
|
| return self._internal_limiter.available_tokens |
|
|
| def acquire_nowait(self) -> None: |
| self._limiter.acquire_nowait() |
|
|
| def acquire_on_behalf_of_nowait(self, borrower: object) -> None: |
| self._limiter.acquire_on_behalf_of_nowait(borrower) |
|
|
| async def acquire(self) -> None: |
| await self._limiter.acquire() |
|
|
| async def acquire_on_behalf_of(self, borrower: object) -> None: |
| await self._limiter.acquire_on_behalf_of(borrower) |
|
|
| def release(self) -> None: |
| self._limiter.release() |
|
|
| def release_on_behalf_of(self, borrower: object) -> None: |
| self._limiter.release_on_behalf_of(borrower) |
|
|
| def statistics(self) -> CapacityLimiterStatistics: |
| if self._internal_limiter is None: |
| return CapacityLimiterStatistics( |
| borrowed_tokens=0, |
| total_tokens=self.total_tokens, |
| borrowers=(), |
| tasks_waiting=0, |
| ) |
|
|
| return self._internal_limiter.statistics() |
|
|
|
|
| class ResourceGuard: |
| """ |
| A context manager for ensuring that a resource is only used by a single task at a |
| time. |
| |
| Entering this context manager while the previous has not exited it yet will trigger |
| :exc:`BusyResourceError`. |
| |
| :param action: the action to guard against (visible in the :exc:`BusyResourceError` |
| when triggered, e.g. "Another task is already {action} this resource") |
| |
| .. versionadded:: 4.1 |
| """ |
|
|
| __slots__ = "action", "_guarded" |
|
|
| def __init__(self, action: str = "using"): |
| self.action: str = action |
| self._guarded = False |
|
|
| def __enter__(self) -> None: |
| if self._guarded: |
| raise BusyResourceError(self.action) |
|
|
| self._guarded = True |
|
|
| def __exit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_val: BaseException | None, |
| exc_tb: TracebackType | None, |
| ) -> None: |
| self._guarded = False |
|
|