Spaces:
Paused
Paused
| from __future__ import annotations | |
| import collections.abc as c | |
| import typing as t | |
| import warnings | |
| import weakref | |
| from collections import defaultdict | |
| from contextlib import AbstractContextManager | |
| from contextlib import contextmanager | |
| from functools import cached_property | |
| from inspect import iscoroutinefunction | |
| from weakref import WeakValueDictionary | |
| from ._utilities import make_id | |
| from ._utilities import make_ref | |
| from ._utilities import Symbol | |
| if t.TYPE_CHECKING: | |
| F = t.TypeVar("F", bound=c.Callable[..., t.Any]) | |
| ANY = Symbol("ANY") | |
| """Symbol for "any sender".""" | |
| ANY_ID = 0 | |
| class Signal: | |
| """A notification emitter. | |
| :param doc: The docstring for the signal. | |
| """ | |
| ANY = ANY | |
| """An alias for the :data:`~blinker.ANY` sender symbol.""" | |
| set_class: type[set[t.Any]] = set | |
| """The set class to use for tracking connected receivers and senders. | |
| Python's ``set`` is unordered. If receivers must be dispatched in the order | |
| they were connected, an ordered set implementation can be used. | |
| .. versionadded:: 1.7 | |
| """ | |
| def receiver_connected(self) -> Signal: | |
| """Emitted at the end of each :meth:`connect` call. | |
| The signal sender is the signal instance, and the :meth:`connect` | |
| arguments are passed through: ``receiver``, ``sender``, and ``weak``. | |
| .. versionadded:: 1.2 | |
| """ | |
| return Signal(doc="Emitted after a receiver connects.") | |
| def receiver_disconnected(self) -> Signal: | |
| """Emitted at the end of each :meth:`disconnect` call. | |
| The sender is the signal instance, and the :meth:`disconnect` arguments | |
| are passed through: ``receiver`` and ``sender``. | |
| This signal is emitted **only** when :meth:`disconnect` is called | |
| explicitly. This signal cannot be emitted by an automatic disconnect | |
| when a weakly referenced receiver or sender goes out of scope, as the | |
| instance is no longer be available to be used as the sender for this | |
| signal. | |
| An alternative approach is available by subscribing to | |
| :attr:`receiver_connected` and setting up a custom weakref cleanup | |
| callback on weak receivers and senders. | |
| .. versionadded:: 1.2 | |
| """ | |
| return Signal(doc="Emitted after a receiver disconnects.") | |
| def __init__(self, doc: str | None = None) -> None: | |
| if doc: | |
| self.__doc__ = doc | |
| self.receivers: dict[ | |
| t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any] | |
| ] = {} | |
| """The map of connected receivers. Useful to quickly check if any | |
| receivers are connected to the signal: ``if s.receivers:``. The | |
| structure and data is not part of the public API, but checking its | |
| boolean value is. | |
| """ | |
| self.is_muted: bool = False | |
| self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class) | |
| self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class) | |
| self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {} | |
| def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F: | |
| """Connect ``receiver`` to be called when the signal is sent by | |
| ``sender``. | |
| :param receiver: The callable to call when :meth:`send` is called with | |
| the given ``sender``, passing ``sender`` as a positional argument | |
| along with any extra keyword arguments. | |
| :param sender: Any object or :data:`ANY`. ``receiver`` will only be | |
| called when :meth:`send` is called with this sender. If ``ANY``, the | |
| receiver will be called for any sender. A receiver may be connected | |
| to multiple senders by calling :meth:`connect` multiple times. | |
| :param weak: Track the receiver with a :mod:`weakref`. The receiver will | |
| be automatically disconnected when it is garbage collected. When | |
| connecting a receiver defined within a function, set to ``False``, | |
| otherwise it will be disconnected when the function scope ends. | |
| """ | |
| receiver_id = make_id(receiver) | |
| sender_id = ANY_ID if sender is ANY else make_id(sender) | |
| if weak: | |
| self.receivers[receiver_id] = make_ref( | |
| receiver, self._make_cleanup_receiver(receiver_id) | |
| ) | |
| else: | |
| self.receivers[receiver_id] = receiver | |
| self._by_sender[sender_id].add(receiver_id) | |
| self._by_receiver[receiver_id].add(sender_id) | |
| if sender is not ANY and sender_id not in self._weak_senders: | |
| # store a cleanup for weakref-able senders | |
| try: | |
| self._weak_senders[sender_id] = make_ref( | |
| sender, self._make_cleanup_sender(sender_id) | |
| ) | |
| except TypeError: | |
| pass | |
| if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers: | |
| try: | |
| self.receiver_connected.send( | |
| self, receiver=receiver, sender=sender, weak=weak | |
| ) | |
| except TypeError: | |
| # TODO no explanation or test for this | |
| self.disconnect(receiver, sender) | |
| raise | |
| if _receiver_connected.receivers and self is not _receiver_connected: | |
| try: | |
| _receiver_connected.send( | |
| self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak | |
| ) | |
| except TypeError: | |
| self.disconnect(receiver, sender) | |
| raise | |
| return receiver | |
| def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]: | |
| """Connect the decorated function to be called when the signal is sent | |
| by ``sender``. | |
| The decorated function will be called when :meth:`send` is called with | |
| the given ``sender``, passing ``sender`` as a positional argument along | |
| with any extra keyword arguments. | |
| :param sender: Any object or :data:`ANY`. ``receiver`` will only be | |
| called when :meth:`send` is called with this sender. If ``ANY``, the | |
| receiver will be called for any sender. A receiver may be connected | |
| to multiple senders by calling :meth:`connect` multiple times. | |
| :param weak: Track the receiver with a :mod:`weakref`. The receiver will | |
| be automatically disconnected when it is garbage collected. When | |
| connecting a receiver defined within a function, set to ``False``, | |
| otherwise it will be disconnected when the function scope ends.= | |
| .. versionadded:: 1.1 | |
| """ | |
| def decorator(fn: F) -> F: | |
| self.connect(fn, sender, weak) | |
| return fn | |
| return decorator | |
| def connected_to( | |
| self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY | |
| ) -> c.Generator[None, None, None]: | |
| """A context manager that temporarily connects ``receiver`` to the | |
| signal while a ``with`` block executes. When the block exits, the | |
| receiver is disconnected. Useful for tests. | |
| :param receiver: The callable to call when :meth:`send` is called with | |
| the given ``sender``, passing ``sender`` as a positional argument | |
| along with any extra keyword arguments. | |
| :param sender: Any object or :data:`ANY`. ``receiver`` will only be | |
| called when :meth:`send` is called with this sender. If ``ANY``, the | |
| receiver will be called for any sender. | |
| .. versionadded:: 1.1 | |
| """ | |
| self.connect(receiver, sender=sender, weak=False) | |
| try: | |
| yield None | |
| finally: | |
| self.disconnect(receiver) | |
| def muted(self) -> c.Generator[None, None, None]: | |
| """A context manager that temporarily disables the signal. No receivers | |
| will be called if the signal is sent, until the ``with`` block exits. | |
| Useful for tests. | |
| """ | |
| self.is_muted = True | |
| try: | |
| yield None | |
| finally: | |
| self.is_muted = False | |
| def temporarily_connected_to( | |
| self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY | |
| ) -> AbstractContextManager[None]: | |
| """Deprecated alias for :meth:`connected_to`. | |
| .. deprecated:: 1.1 | |
| Renamed to ``connected_to``. Will be removed in Blinker 1.9. | |
| .. versionadded:: 0.9 | |
| """ | |
| warnings.warn( | |
| "'temporarily_connected_to' is renamed to 'connected_to'. The old name is" | |
| " deprecated and will be removed in Blinker 1.9.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| return self.connected_to(receiver, sender) | |
| def send( | |
| self, | |
| sender: t.Any | None = None, | |
| /, | |
| *, | |
| _async_wrapper: c.Callable[ | |
| [c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any] | |
| ] | |
| | None = None, | |
| **kwargs: t.Any, | |
| ) -> list[tuple[c.Callable[..., t.Any], t.Any]]: | |
| """Call all receivers that are connected to the given ``sender`` | |
| or :data:`ANY`. Each receiver is called with ``sender`` as a positional | |
| argument along with any extra keyword arguments. Return a list of | |
| ``(receiver, return value)`` tuples. | |
| The order receivers are called is undefined, but can be influenced by | |
| setting :attr:`set_class`. | |
| If a receiver raises an exception, that exception will propagate up. | |
| This makes debugging straightforward, with an assumption that correctly | |
| implemented receivers will not raise. | |
| :param sender: Call receivers connected to this sender, in addition to | |
| those connected to :data:`ANY`. | |
| :param _async_wrapper: Will be called on any receivers that are async | |
| coroutines to turn them into sync callables. For example, could run | |
| the receiver with an event loop. | |
| :param kwargs: Extra keyword arguments to pass to each receiver. | |
| .. versionchanged:: 1.7 | |
| Added the ``_async_wrapper`` argument. | |
| """ | |
| if self.is_muted: | |
| return [] | |
| results = [] | |
| for receiver in self.receivers_for(sender): | |
| if iscoroutinefunction(receiver): | |
| if _async_wrapper is None: | |
| raise RuntimeError("Cannot send to a coroutine function.") | |
| result = _async_wrapper(receiver)(sender, **kwargs) | |
| else: | |
| result = receiver(sender, **kwargs) | |
| results.append((receiver, result)) | |
| return results | |
| async def send_async( | |
| self, | |
| sender: t.Any | None = None, | |
| /, | |
| *, | |
| _sync_wrapper: c.Callable[ | |
| [c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]] | |
| ] | |
| | None = None, | |
| **kwargs: t.Any, | |
| ) -> list[tuple[c.Callable[..., t.Any], t.Any]]: | |
| """Await all receivers that are connected to the given ``sender`` | |
| or :data:`ANY`. Each receiver is called with ``sender`` as a positional | |
| argument along with any extra keyword arguments. Return a list of | |
| ``(receiver, return value)`` tuples. | |
| The order receivers are called is undefined, but can be influenced by | |
| setting :attr:`set_class`. | |
| If a receiver raises an exception, that exception will propagate up. | |
| This makes debugging straightforward, with an assumption that correctly | |
| implemented receivers will not raise. | |
| :param sender: Call receivers connected to this sender, in addition to | |
| those connected to :data:`ANY`. | |
| :param _sync_wrapper: Will be called on any receivers that are sync | |
| callables to turn them into async coroutines. For example, | |
| could call the receiver in a thread. | |
| :param kwargs: Extra keyword arguments to pass to each receiver. | |
| .. versionadded:: 1.7 | |
| """ | |
| if self.is_muted: | |
| return [] | |
| results = [] | |
| for receiver in self.receivers_for(sender): | |
| if not iscoroutinefunction(receiver): | |
| if _sync_wrapper is None: | |
| raise RuntimeError("Cannot send to a non-coroutine function.") | |
| result = await _sync_wrapper(receiver)(sender, **kwargs) | |
| else: | |
| result = await receiver(sender, **kwargs) | |
| results.append((receiver, result)) | |
| return results | |
| def has_receivers_for(self, sender: t.Any) -> bool: | |
| """Check if there is at least one receiver that will be called with the | |
| given ``sender``. A receiver connected to :data:`ANY` will always be | |
| called, regardless of sender. Does not check if weakly referenced | |
| receivers are still live. See :meth:`receivers_for` for a stronger | |
| search. | |
| :param sender: Check for receivers connected to this sender, in addition | |
| to those connected to :data:`ANY`. | |
| """ | |
| if not self.receivers: | |
| return False | |
| if self._by_sender[ANY_ID]: | |
| return True | |
| if sender is ANY: | |
| return False | |
| return make_id(sender) in self._by_sender | |
| def receivers_for( | |
| self, sender: t.Any | |
| ) -> c.Generator[c.Callable[..., t.Any], None, None]: | |
| """Yield each receiver to be called for ``sender``, in addition to those | |
| to be called for :data:`ANY`. Weakly referenced receivers that are not | |
| live will be disconnected and skipped. | |
| :param sender: Yield receivers connected to this sender, in addition | |
| to those connected to :data:`ANY`. | |
| """ | |
| # TODO: test receivers_for(ANY) | |
| if not self.receivers: | |
| return | |
| sender_id = make_id(sender) | |
| if sender_id in self._by_sender: | |
| ids = self._by_sender[ANY_ID] | self._by_sender[sender_id] | |
| else: | |
| ids = self._by_sender[ANY_ID].copy() | |
| for receiver_id in ids: | |
| receiver = self.receivers.get(receiver_id) | |
| if receiver is None: | |
| continue | |
| if isinstance(receiver, weakref.ref): | |
| strong = receiver() | |
| if strong is None: | |
| self._disconnect(receiver_id, ANY_ID) | |
| continue | |
| yield strong | |
| else: | |
| yield receiver | |
| def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None: | |
| """Disconnect ``receiver`` from being called when the signal is sent by | |
| ``sender``. | |
| :param receiver: A connected receiver callable. | |
| :param sender: Disconnect from only this sender. By default, disconnect | |
| from all senders. | |
| """ | |
| sender_id: c.Hashable | |
| if sender is ANY: | |
| sender_id = ANY_ID | |
| else: | |
| sender_id = make_id(sender) | |
| receiver_id = make_id(receiver) | |
| self._disconnect(receiver_id, sender_id) | |
| if ( | |
| "receiver_disconnected" in self.__dict__ | |
| and self.receiver_disconnected.receivers | |
| ): | |
| self.receiver_disconnected.send(self, receiver=receiver, sender=sender) | |
| def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None: | |
| if sender_id == ANY_ID: | |
| if self._by_receiver.pop(receiver_id, None) is not None: | |
| for bucket in self._by_sender.values(): | |
| bucket.discard(receiver_id) | |
| self.receivers.pop(receiver_id, None) | |
| else: | |
| self._by_sender[sender_id].discard(receiver_id) | |
| self._by_receiver[receiver_id].discard(sender_id) | |
| def _make_cleanup_receiver( | |
| self, receiver_id: c.Hashable | |
| ) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]: | |
| """Create a callback function to disconnect a weakly referenced | |
| receiver when it is garbage collected. | |
| """ | |
| def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None: | |
| self._disconnect(receiver_id, ANY_ID) | |
| return cleanup | |
| def _make_cleanup_sender( | |
| self, sender_id: c.Hashable | |
| ) -> c.Callable[[weakref.ref[t.Any]], None]: | |
| """Create a callback function to disconnect all receivers for a weakly | |
| referenced sender when it is garbage collected. | |
| """ | |
| assert sender_id != ANY_ID | |
| def cleanup(ref: weakref.ref[t.Any]) -> None: | |
| self._weak_senders.pop(sender_id, None) | |
| for receiver_id in self._by_sender.pop(sender_id, ()): | |
| self._by_receiver[receiver_id].discard(sender_id) | |
| return cleanup | |
| def _cleanup_bookkeeping(self) -> None: | |
| """Prune unused sender/receiver bookkeeping. Not threadsafe. | |
| Connecting & disconnecting leaves behind a small amount of bookkeeping | |
| data. Typical workloads using Blinker, for example in most web apps, | |
| Flask, CLI scripts, etc., are not adversely affected by this | |
| bookkeeping. | |
| With a long-running process performing dynamic signal routing with high | |
| volume, e.g. connecting to function closures, senders are all unique | |
| object instances. Doing all of this over and over may cause memory usage | |
| to grow due to extraneous bookkeeping. (An empty ``set`` for each stale | |
| sender/receiver pair.) | |
| This method will prune that bookkeeping away, with the caveat that such | |
| pruning is not threadsafe. The risk is that cleanup of a fully | |
| disconnected receiver/sender pair occurs while another thread is | |
| connecting that same pair. If you are in the highly dynamic, unique | |
| receiver/sender situation that has lead you to this method, that failure | |
| mode is perhaps not a big deal for you. | |
| """ | |
| for mapping in (self._by_sender, self._by_receiver): | |
| for ident, bucket in list(mapping.items()): | |
| if not bucket: | |
| mapping.pop(ident, None) | |
| def _clear_state(self) -> None: | |
| """Disconnect all receivers and senders. Useful for tests.""" | |
| self._weak_senders.clear() | |
| self.receivers.clear() | |
| self._by_sender.clear() | |
| self._by_receiver.clear() | |
| _receiver_connected = Signal( | |
| """\ | |
| Sent by a :class:`Signal` after a receiver connects. | |
| :argument: the Signal that was connected to | |
| :keyword receiver_arg: the connected receiver | |
| :keyword sender_arg: the sender to connect to | |
| :keyword weak_arg: true if the connection to receiver_arg is a weak reference | |
| .. deprecated:: 1.2 | |
| Individual signals have their own :attr:`~Signal.receiver_connected` and | |
| :attr:`~Signal.receiver_disconnected` signals with a slightly simplified | |
| call signature. This global signal will be removed in Blinker 1.9. | |
| """ | |
| ) | |
| class NamedSignal(Signal): | |
| """A named generic notification emitter. The name is not used by the signal | |
| itself, but matches the key in the :class:`Namespace` that it belongs to. | |
| :param name: The name of the signal within the namespace. | |
| :param doc: The docstring for the signal. | |
| """ | |
| def __init__(self, name: str, doc: str | None = None) -> None: | |
| super().__init__(doc) | |
| #: The name of this signal. | |
| self.name: str = name | |
| def __repr__(self) -> str: | |
| base = super().__repr__() | |
| return f"{base[:-1]}; {self.name!r}>" # noqa: E702 | |
| if t.TYPE_CHECKING: | |
| class PNamespaceSignal(t.Protocol): | |
| def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ... | |
| # Python < 3.9 | |
| _NamespaceBase = dict[str, NamedSignal] # type: ignore[misc] | |
| else: | |
| _NamespaceBase = dict | |
| class Namespace(_NamespaceBase): | |
| """A dict mapping names to signals.""" | |
| def signal(self, name: str, doc: str | None = None) -> NamedSignal: | |
| """Return the :class:`NamedSignal` for the given ``name``, creating it | |
| if required. Repeated calls with the same name return the same signal. | |
| :param name: The name of the signal. | |
| :param doc: The docstring of the signal. | |
| """ | |
| if name not in self: | |
| self[name] = NamedSignal(name, doc) | |
| return self[name] | |
| class _WeakNamespace(WeakValueDictionary): # type: ignore[type-arg] | |
| """A weak mapping of names to signals. | |
| Automatically cleans up unused signals when the last reference goes out | |
| of scope. This namespace implementation provides similar behavior to Blinker | |
| <= 1.2. | |
| .. deprecated:: 1.3 | |
| Will be removed in Blinker 1.9. | |
| .. versionadded:: 1.3 | |
| """ | |
| def __init__(self) -> None: | |
| warnings.warn( | |
| "'WeakNamespace' is deprecated and will be removed in Blinker 1.9." | |
| " Use 'Namespace' instead.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| super().__init__() | |
| def signal(self, name: str, doc: str | None = None) -> NamedSignal: | |
| """Return the :class:`NamedSignal` for the given ``name``, creating it | |
| if required. Repeated calls with the same name return the same signal. | |
| :param name: The name of the signal. | |
| :param doc: The docstring of the signal. | |
| """ | |
| if name not in self: | |
| self[name] = NamedSignal(name, doc) | |
| return self[name] # type: ignore[no-any-return] | |
| default_namespace: Namespace = Namespace() | |
| """A default :class:`Namespace` for creating named signals. :func:`signal` | |
| creates a :class:`NamedSignal` in this namespace. | |
| """ | |
| signal: PNamespaceSignal = default_namespace.signal | |
| """Return a :class:`NamedSignal` in :data:`default_namespace` with the given | |
| ``name``, creating it if required. Repeated calls with the same name return the | |
| same signal. | |
| """ | |
| def __getattr__(name: str) -> t.Any: | |
| if name == "receiver_connected": | |
| warnings.warn( | |
| "The global 'receiver_connected' signal is deprecated and will be" | |
| " removed in Blinker 1.9. Use 'Signal.receiver_connected' and" | |
| " 'Signal.receiver_disconnected' instead.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| return _receiver_connected | |
| if name == "WeakNamespace": | |
| warnings.warn( | |
| "'WeakNamespace' is deprecated and will be removed in Blinker 1.9." | |
| " Use 'Namespace' instead.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| return _WeakNamespace | |
| raise AttributeError(name) | |