| | from __future__ import annotations |
| |
|
| | import math |
| | import sys |
| | from abc import ABCMeta, abstractmethod |
| | from collections.abc import AsyncIterator, Awaitable, Callable, Sequence |
| | from contextlib import AbstractContextManager |
| | from os import PathLike |
| | from signal import Signals |
| | from socket import AddressFamily, SocketKind, socket |
| | from typing import ( |
| | IO, |
| | TYPE_CHECKING, |
| | Any, |
| | TypeVar, |
| | Union, |
| | overload, |
| | ) |
| |
|
| | if sys.version_info >= (3, 11): |
| | from typing import TypeVarTuple, Unpack |
| | else: |
| | from typing_extensions import TypeVarTuple, Unpack |
| |
|
| | if sys.version_info >= (3, 10): |
| | from typing import TypeAlias |
| | else: |
| | from typing_extensions import TypeAlias |
| |
|
| | if TYPE_CHECKING: |
| | from _typeshed import FileDescriptorLike |
| |
|
| | from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore |
| | from .._core._tasks import CancelScope |
| | from .._core._testing import TaskInfo |
| | from ._sockets import ( |
| | ConnectedUDPSocket, |
| | ConnectedUNIXDatagramSocket, |
| | IPSockAddrType, |
| | SocketListener, |
| | SocketStream, |
| | UDPSocket, |
| | UNIXDatagramSocket, |
| | UNIXSocketStream, |
| | ) |
| | from ._subprocesses import Process |
| | from ._tasks import TaskGroup |
| | from ._testing import TestRunner |
| |
|
| | T_Retval = TypeVar("T_Retval") |
| | PosArgsT = TypeVarTuple("PosArgsT") |
| | StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] |
| |
|
| |
|
| | class AsyncBackend(metaclass=ABCMeta): |
| | @classmethod |
| | @abstractmethod |
| | def run( |
| | cls, |
| | func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], |
| | args: tuple[Unpack[PosArgsT]], |
| | kwargs: dict[str, Any], |
| | options: dict[str, Any], |
| | ) -> T_Retval: |
| | """ |
| | Run the given coroutine function in an asynchronous event loop. |
| | |
| | The current thread must not be already running an event loop. |
| | |
| | :param func: a coroutine function |
| | :param args: positional arguments to ``func`` |
| | :param kwargs: positional arguments to ``func`` |
| | :param options: keyword arguments to call the backend ``run()`` implementation |
| | with |
| | :return: the return value of the coroutine function |
| | """ |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def current_token(cls) -> object: |
| | """ |
| | Return an object that allows other threads to run code inside the event loop. |
| | |
| | :return: a token object, specific to the event loop running in the current |
| | thread |
| | """ |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def current_time(cls) -> float: |
| | """ |
| | Return the current value of the event loop's internal clock. |
| | |
| | :return: the clock value (seconds) |
| | """ |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def cancelled_exception_class(cls) -> type[BaseException]: |
| | """Return the exception class that is raised in a task if it's cancelled.""" |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def checkpoint(cls) -> None: |
| | """ |
| | Check if the task has been cancelled, and allow rescheduling of other tasks. |
| | |
| | This is effectively the same as running :meth:`checkpoint_if_cancelled` and then |
| | :meth:`cancel_shielded_checkpoint`. |
| | """ |
| |
|
| | @classmethod |
| | async def checkpoint_if_cancelled(cls) -> None: |
| | """ |
| | Check if the current task group has been cancelled. |
| | |
| | This will check if the task has been cancelled, but will not allow other tasks |
| | to be scheduled if not. |
| | |
| | """ |
| | if cls.current_effective_deadline() == -math.inf: |
| | await cls.checkpoint() |
| |
|
| | @classmethod |
| | async def cancel_shielded_checkpoint(cls) -> None: |
| | """ |
| | Allow the rescheduling of other tasks. |
| | |
| | This will give other tasks the opportunity to run, but without checking if the |
| | current task group has been cancelled, unlike with :meth:`checkpoint`. |
| | |
| | """ |
| | with cls.create_cancel_scope(shield=True): |
| | await cls.sleep(0) |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def sleep(cls, delay: float) -> None: |
| | """ |
| | Pause the current task for the specified duration. |
| | |
| | :param delay: the duration, in seconds |
| | """ |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_cancel_scope( |
| | cls, *, deadline: float = math.inf, shield: bool = False |
| | ) -> CancelScope: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def current_effective_deadline(cls) -> float: |
| | """ |
| | Return the nearest deadline among all the cancel scopes effective for the |
| | current task. |
| | |
| | :return: |
| | - a clock value from the event loop's internal clock |
| | - ``inf`` if there is no deadline in effect |
| | - ``-inf`` if the current scope has been cancelled |
| | :rtype: float |
| | """ |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_task_group(cls) -> TaskGroup: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_event(cls) -> Event: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_lock(cls, *, fast_acquire: bool) -> Lock: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_semaphore( |
| | cls, |
| | initial_value: int, |
| | *, |
| | max_value: int | None = None, |
| | fast_acquire: bool = False, |
| | ) -> Semaphore: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def run_sync_in_worker_thread( |
| | cls, |
| | func: Callable[[Unpack[PosArgsT]], T_Retval], |
| | args: tuple[Unpack[PosArgsT]], |
| | abandon_on_cancel: bool = False, |
| | limiter: CapacityLimiter | None = None, |
| | ) -> T_Retval: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def check_cancelled(cls) -> None: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def run_async_from_thread( |
| | cls, |
| | func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], |
| | args: tuple[Unpack[PosArgsT]], |
| | token: object, |
| | ) -> T_Retval: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def run_sync_from_thread( |
| | cls, |
| | func: Callable[[Unpack[PosArgsT]], T_Retval], |
| | args: tuple[Unpack[PosArgsT]], |
| | token: object, |
| | ) -> T_Retval: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def open_process( |
| | cls, |
| | command: StrOrBytesPath | Sequence[StrOrBytesPath], |
| | *, |
| | stdin: int | IO[Any] | None, |
| | stdout: int | IO[Any] | None, |
| | stderr: int | IO[Any] | None, |
| | **kwargs: Any, |
| | ) -> Process: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def connect_tcp( |
| | cls, host: str, port: int, local_address: IPSockAddrType | None = None |
| | ) -> SocketStream: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_tcp_listener(cls, sock: socket) -> SocketListener: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_unix_listener(cls, sock: socket) -> SocketListener: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def create_udp_socket( |
| | cls, |
| | family: AddressFamily, |
| | local_address: IPSockAddrType | None, |
| | remote_address: IPSockAddrType | None, |
| | reuse_port: bool, |
| | ) -> UDPSocket | ConnectedUDPSocket: |
| | pass |
| |
|
| | @classmethod |
| | @overload |
| | async def create_unix_datagram_socket( |
| | cls, raw_socket: socket, remote_path: None |
| | ) -> UNIXDatagramSocket: ... |
| |
|
| | @classmethod |
| | @overload |
| | async def create_unix_datagram_socket( |
| | cls, raw_socket: socket, remote_path: str | bytes |
| | ) -> ConnectedUNIXDatagramSocket: ... |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def create_unix_datagram_socket( |
| | cls, raw_socket: socket, remote_path: str | bytes | None |
| | ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def getaddrinfo( |
| | cls, |
| | host: bytes | str | None, |
| | port: str | int | None, |
| | *, |
| | family: int | AddressFamily = 0, |
| | type: int | SocketKind = 0, |
| | proto: int = 0, |
| | flags: int = 0, |
| | ) -> Sequence[ |
| | tuple[ |
| | AddressFamily, |
| | SocketKind, |
| | int, |
| | str, |
| | tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes], |
| | ] |
| | ]: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def getnameinfo( |
| | cls, sockaddr: IPSockAddrType, flags: int = 0 |
| | ) -> tuple[str, str]: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wait_readable(cls, obj: FileDescriptorLike) -> None: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wait_writable(cls, obj: FileDescriptorLike) -> None: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def notify_closing(cls, obj: FileDescriptorLike) -> None: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_listener_socket(cls, sock: socket) -> SocketListener: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_stream_socket(cls, sock: socket) -> SocketStream: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_udp_socket(cls, sock: socket) -> UDPSocket: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wrap_connected_unix_datagram_socket( |
| | cls, sock: socket |
| | ) -> ConnectedUNIXDatagramSocket: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def current_default_thread_limiter(cls) -> CapacityLimiter: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def open_signal_receiver( |
| | cls, *signals: Signals |
| | ) -> AbstractContextManager[AsyncIterator[Signals]]: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def get_current_task(cls) -> TaskInfo: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def get_running_tasks(cls) -> Sequence[TaskInfo]: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | async def wait_all_tasks_blocked(cls) -> None: |
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: |
| | pass |
| |
|