| | from __future__ import annotations |
| |
|
| | import errno |
| | import os |
| | import socket |
| | import ssl |
| | import stat |
| | import sys |
| | from collections.abc import Awaitable |
| | from dataclasses import dataclass |
| | from ipaddress import IPv4Address, IPv6Address, ip_address |
| | from os import PathLike, chmod |
| | from socket import AddressFamily, SocketKind |
| | from typing import TYPE_CHECKING, Any, Literal, cast, overload |
| |
|
| | from .. import ConnectionFailed, to_thread |
| | from ..abc import ( |
| | ByteStreamConnectable, |
| | ConnectedUDPSocket, |
| | ConnectedUNIXDatagramSocket, |
| | IPAddressType, |
| | IPSockAddrType, |
| | SocketListener, |
| | SocketStream, |
| | UDPSocket, |
| | UNIXDatagramSocket, |
| | UNIXSocketStream, |
| | ) |
| | from ..streams.stapled import MultiListener |
| | from ..streams.tls import TLSConnectable, TLSStream |
| | from ._eventloop import get_async_backend |
| | from ._resources import aclose_forcefully |
| | from ._synchronization import Event |
| | from ._tasks import create_task_group, move_on_after |
| |
|
| | if TYPE_CHECKING: |
| | from _typeshed import FileDescriptorLike |
| | else: |
| | FileDescriptorLike = object |
| |
|
| | if sys.version_info < (3, 11): |
| | from exceptiongroup import ExceptionGroup |
| |
|
| | if sys.version_info >= (3, 12): |
| | from typing import override |
| | else: |
| | from typing_extensions import override |
| |
|
| | if sys.version_info < (3, 13): |
| | from typing_extensions import deprecated |
| | else: |
| | from warnings import deprecated |
| |
|
| | IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) |
| |
|
| | AnyIPAddressFamily = Literal[ |
| | AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6 |
| | ] |
| | IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6] |
| |
|
| |
|
| | |
| | @overload |
| | async def connect_tcp( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | local_host: IPAddressType | None = ..., |
| | ssl_context: ssl.SSLContext | None = ..., |
| | tls_standard_compatible: bool = ..., |
| | tls_hostname: str, |
| | happy_eyeballs_delay: float = ..., |
| | ) -> TLSStream: ... |
| |
|
| |
|
| | |
| | @overload |
| | async def connect_tcp( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | local_host: IPAddressType | None = ..., |
| | ssl_context: ssl.SSLContext, |
| | tls_standard_compatible: bool = ..., |
| | tls_hostname: str | None = ..., |
| | happy_eyeballs_delay: float = ..., |
| | ) -> TLSStream: ... |
| |
|
| |
|
| | |
| | @overload |
| | async def connect_tcp( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | local_host: IPAddressType | None = ..., |
| | tls: Literal[True], |
| | ssl_context: ssl.SSLContext | None = ..., |
| | tls_standard_compatible: bool = ..., |
| | tls_hostname: str | None = ..., |
| | happy_eyeballs_delay: float = ..., |
| | ) -> TLSStream: ... |
| |
|
| |
|
| | |
| | @overload |
| | async def connect_tcp( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | local_host: IPAddressType | None = ..., |
| | tls: Literal[False], |
| | ssl_context: ssl.SSLContext | None = ..., |
| | tls_standard_compatible: bool = ..., |
| | tls_hostname: str | None = ..., |
| | happy_eyeballs_delay: float = ..., |
| | ) -> SocketStream: ... |
| |
|
| |
|
| | |
| | @overload |
| | async def connect_tcp( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | local_host: IPAddressType | None = ..., |
| | happy_eyeballs_delay: float = ..., |
| | ) -> SocketStream: ... |
| |
|
| |
|
| | async def connect_tcp( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | local_host: IPAddressType | None = None, |
| | tls: bool = False, |
| | ssl_context: ssl.SSLContext | None = None, |
| | tls_standard_compatible: bool = True, |
| | tls_hostname: str | None = None, |
| | happy_eyeballs_delay: float = 0.25, |
| | ) -> SocketStream | TLSStream: |
| | """ |
| | Connect to a host using the TCP protocol. |
| | |
| | This function implements the stateless version of the Happy Eyeballs algorithm (RFC |
| | 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses, |
| | each one is tried until one connection attempt succeeds. If the first attempt does |
| | not connected within 250 milliseconds, a second attempt is started using the next |
| | address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if |
| | available) is tried first. |
| | |
| | When the connection has been established, a TLS handshake will be done if either |
| | ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``. |
| | |
| | :param remote_host: the IP address or host name to connect to |
| | :param remote_port: port on the target host to connect to |
| | :param local_host: the interface address or name to bind the socket to before |
| | connecting |
| | :param tls: ``True`` to do a TLS handshake with the connected stream and return a |
| | :class:`~anyio.streams.tls.TLSStream` instead |
| | :param ssl_context: the SSL context object to use (if omitted, a default context is |
| | created) |
| | :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake |
| | before closing the stream and requires that the server does this as well. |
| | Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream. |
| | Some protocols, such as HTTP, require this option to be ``False``. |
| | See :meth:`~ssl.SSLContext.wrap_socket` for details. |
| | :param tls_hostname: host name to check the server certificate against (defaults to |
| | the value of ``remote_host``) |
| | :param happy_eyeballs_delay: delay (in seconds) before starting the next connection |
| | attempt |
| | :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream |
| | :raises ConnectionFailed: if the connection fails |
| | |
| | """ |
| | |
| | connected_stream: SocketStream | None = None |
| |
|
| | async def try_connect(remote_host: str, event: Event) -> None: |
| | nonlocal connected_stream |
| | try: |
| | stream = await asynclib.connect_tcp(remote_host, remote_port, local_address) |
| | except OSError as exc: |
| | oserrors.append(exc) |
| | return |
| | else: |
| | if connected_stream is None: |
| | connected_stream = stream |
| | tg.cancel_scope.cancel() |
| | else: |
| | await stream.aclose() |
| | finally: |
| | event.set() |
| |
|
| | asynclib = get_async_backend() |
| | local_address: IPSockAddrType | None = None |
| | family = socket.AF_UNSPEC |
| | if local_host: |
| | gai_res = await getaddrinfo(str(local_host), None) |
| | family, *_, local_address = gai_res[0] |
| |
|
| | target_host = str(remote_host) |
| | try: |
| | addr_obj = ip_address(remote_host) |
| | except ValueError: |
| | addr_obj = None |
| |
|
| | if addr_obj is not None: |
| | if isinstance(addr_obj, IPv6Address): |
| | target_addrs = [(socket.AF_INET6, addr_obj.compressed)] |
| | else: |
| | target_addrs = [(socket.AF_INET, addr_obj.compressed)] |
| | else: |
| | |
| | gai_res = await getaddrinfo( |
| | target_host, remote_port, family=family, type=socket.SOCK_STREAM |
| | ) |
| |
|
| | |
| | |
| | v6_found = v4_found = False |
| | target_addrs = [] |
| | for af, *_, sa in gai_res: |
| | if af == socket.AF_INET6 and not v6_found: |
| | v6_found = True |
| | target_addrs.insert(0, (af, sa[0])) |
| | elif af == socket.AF_INET and not v4_found and v6_found: |
| | v4_found = True |
| | target_addrs.insert(1, (af, sa[0])) |
| | else: |
| | target_addrs.append((af, sa[0])) |
| |
|
| | oserrors: list[OSError] = [] |
| | try: |
| | async with create_task_group() as tg: |
| | for _af, addr in target_addrs: |
| | event = Event() |
| | tg.start_soon(try_connect, addr, event) |
| | with move_on_after(happy_eyeballs_delay): |
| | await event.wait() |
| |
|
| | if connected_stream is None: |
| | cause = ( |
| | oserrors[0] |
| | if len(oserrors) == 1 |
| | else ExceptionGroup("multiple connection attempts failed", oserrors) |
| | ) |
| | raise OSError("All connection attempts failed") from cause |
| | finally: |
| | oserrors.clear() |
| |
|
| | if tls or tls_hostname or ssl_context: |
| | try: |
| | return await TLSStream.wrap( |
| | connected_stream, |
| | server_side=False, |
| | hostname=tls_hostname or str(remote_host), |
| | ssl_context=ssl_context, |
| | standard_compatible=tls_standard_compatible, |
| | ) |
| | except BaseException: |
| | await aclose_forcefully(connected_stream) |
| | raise |
| |
|
| | return connected_stream |
| |
|
| |
|
| | async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream: |
| | """ |
| | Connect to the given UNIX socket. |
| | |
| | Not available on Windows. |
| | |
| | :param path: path to the socket |
| | :return: a socket stream object |
| | :raises ConnectionFailed: if the connection fails |
| | |
| | """ |
| | path = os.fspath(path) |
| | return await get_async_backend().connect_unix(path) |
| |
|
| |
|
| | async def create_tcp_listener( |
| | *, |
| | local_host: IPAddressType | None = None, |
| | local_port: int = 0, |
| | family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC, |
| | backlog: int = 65536, |
| | reuse_port: bool = False, |
| | ) -> MultiListener[SocketStream]: |
| | """ |
| | Create a TCP socket listener. |
| | |
| | :param local_port: port number to listen on |
| | :param local_host: IP address of the interface to listen on. If omitted, listen on |
| | all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address |
| | family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6. |
| | :param family: address family (used if ``local_host`` was omitted) |
| | :param backlog: maximum number of queued incoming connections (up to a maximum of |
| | 2**16, or 65536) |
| | :param reuse_port: ``True`` to allow multiple sockets to bind to the same |
| | address/port (not supported on Windows) |
| | :return: a multi-listener object containing one or more socket listeners |
| | :raises OSError: if there's an error creating a socket, or binding to one or more |
| | interfaces failed |
| | |
| | """ |
| | asynclib = get_async_backend() |
| | backlog = min(backlog, 65536) |
| | local_host = str(local_host) if local_host is not None else None |
| |
|
| | def setup_raw_socket( |
| | fam: AddressFamily, |
| | bind_addr: tuple[str, int] | tuple[str, int, int, int], |
| | *, |
| | v6only: bool = True, |
| | ) -> socket.socket: |
| | sock = socket.socket(fam) |
| | try: |
| | sock.setblocking(False) |
| |
|
| | if fam == AddressFamily.AF_INET6: |
| | sock.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, v6only) |
| |
|
| | |
| | |
| | if sys.platform == "win32": |
| | sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) |
| | else: |
| | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| |
|
| | if reuse_port: |
| | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) |
| |
|
| | |
| | if fam == socket.AF_INET6 and "%" in bind_addr[0]: |
| | addr, scope_id = bind_addr[0].split("%", 1) |
| | bind_addr = (addr, bind_addr[1], 0, int(scope_id)) |
| |
|
| | sock.bind(bind_addr) |
| | sock.listen(backlog) |
| | except BaseException: |
| | sock.close() |
| | raise |
| |
|
| | return sock |
| |
|
| | |
| | |
| | |
| | |
| | gai_res = await getaddrinfo( |
| | local_host, |
| | local_port, |
| | family=family, |
| | type=socket.SOCK_STREAM if sys.platform == "win32" else 0, |
| | flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, |
| | ) |
| |
|
| | |
| | |
| | sockaddrs = sorted({res for res in gai_res if res[1] == SocketKind.SOCK_STREAM}) |
| |
|
| | |
| | if ( |
| | local_host is None |
| | and family == AddressFamily.AF_UNSPEC |
| | and socket.has_dualstack_ipv6() |
| | and any(fam == AddressFamily.AF_INET6 for fam, *_ in gai_res) |
| | ): |
| | raw_socket = setup_raw_socket( |
| | AddressFamily.AF_INET6, ("::", local_port), v6only=False |
| | ) |
| | listener = asynclib.create_tcp_listener(raw_socket) |
| | return MultiListener([listener]) |
| |
|
| | errors: list[OSError] = [] |
| | try: |
| | for _ in range(len(sockaddrs)): |
| | listeners: list[SocketListener] = [] |
| | bound_ephemeral_port = local_port |
| | try: |
| | for fam, *_, sockaddr in sockaddrs: |
| | sockaddr = sockaddr[0], bound_ephemeral_port, *sockaddr[2:] |
| | raw_socket = setup_raw_socket(fam, sockaddr) |
| |
|
| | |
| | |
| | if local_port == 0 and len(gai_res) > 1: |
| | bound_ephemeral_port = raw_socket.getsockname()[1] |
| |
|
| | listeners.append(asynclib.create_tcp_listener(raw_socket)) |
| | except BaseException as exc: |
| | for listener in listeners: |
| | await listener.aclose() |
| |
|
| | |
| | |
| | if ( |
| | isinstance(exc, OSError) |
| | and exc.errno == errno.EADDRINUSE |
| | and local_port == 0 |
| | and bound_ephemeral_port |
| | ): |
| | errors.append(exc) |
| | sockaddrs.append(sockaddrs.pop(0)) |
| | continue |
| |
|
| | raise |
| |
|
| | return MultiListener(listeners) |
| |
|
| | raise OSError( |
| | f"Could not create {len(sockaddrs)} listeners with a consistent port" |
| | ) from ExceptionGroup("Several bind attempts failed", errors) |
| | finally: |
| | del errors |
| |
|
| |
|
| | async def create_unix_listener( |
| | path: str | bytes | PathLike[Any], |
| | *, |
| | mode: int | None = None, |
| | backlog: int = 65536, |
| | ) -> SocketListener: |
| | """ |
| | Create a UNIX socket listener. |
| | |
| | Not available on Windows. |
| | |
| | :param path: path of the socket |
| | :param mode: permissions to set on the socket |
| | :param backlog: maximum number of queued incoming connections (up to a maximum of |
| | 2**16, or 65536) |
| | :return: a listener object |
| | |
| | .. versionchanged:: 3.0 |
| | If a socket already exists on the file system in the given path, it will be |
| | removed first. |
| | |
| | """ |
| | backlog = min(backlog, 65536) |
| | raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM) |
| | try: |
| | raw_socket.listen(backlog) |
| | return get_async_backend().create_unix_listener(raw_socket) |
| | except BaseException: |
| | raw_socket.close() |
| | raise |
| |
|
| |
|
| | async def create_udp_socket( |
| | family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC, |
| | *, |
| | local_host: IPAddressType | None = None, |
| | local_port: int = 0, |
| | reuse_port: bool = False, |
| | ) -> UDPSocket: |
| | """ |
| | Create a UDP socket. |
| | |
| | If ``port`` has been given, the socket will be bound to this port on the local |
| | machine, making this socket suitable for providing UDP based services. |
| | |
| | :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically |
| | determined from ``local_host`` if omitted |
| | :param local_host: IP address or host name of the local interface to bind to |
| | :param local_port: local port to bind to |
| | :param reuse_port: ``True`` to allow multiple sockets to bind to the same |
| | address/port (not supported on Windows) |
| | :return: a UDP socket |
| | |
| | """ |
| | if family is AddressFamily.AF_UNSPEC and not local_host: |
| | raise ValueError('Either "family" or "local_host" must be given') |
| |
|
| | if local_host: |
| | gai_res = await getaddrinfo( |
| | str(local_host), |
| | local_port, |
| | family=family, |
| | type=socket.SOCK_DGRAM, |
| | flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, |
| | ) |
| | family = cast(AnyIPAddressFamily, gai_res[0][0]) |
| | local_address = gai_res[0][-1] |
| | elif family is AddressFamily.AF_INET6: |
| | local_address = ("::", 0) |
| | else: |
| | local_address = ("0.0.0.0", 0) |
| |
|
| | sock = await get_async_backend().create_udp_socket( |
| | family, local_address, None, reuse_port |
| | ) |
| | return cast(UDPSocket, sock) |
| |
|
| |
|
| | async def create_connected_udp_socket( |
| | remote_host: IPAddressType, |
| | remote_port: int, |
| | *, |
| | family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC, |
| | local_host: IPAddressType | None = None, |
| | local_port: int = 0, |
| | reuse_port: bool = False, |
| | ) -> ConnectedUDPSocket: |
| | """ |
| | Create a connected UDP socket. |
| | |
| | Connected UDP sockets can only communicate with the specified remote host/port, an |
| | any packets sent from other sources are dropped. |
| | |
| | :param remote_host: remote host to set as the default target |
| | :param remote_port: port on the remote host to set as the default target |
| | :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically |
| | determined from ``local_host`` or ``remote_host`` if omitted |
| | :param local_host: IP address or host name of the local interface to bind to |
| | :param local_port: local port to bind to |
| | :param reuse_port: ``True`` to allow multiple sockets to bind to the same |
| | address/port (not supported on Windows) |
| | :return: a connected UDP socket |
| | |
| | """ |
| | local_address = None |
| | if local_host: |
| | gai_res = await getaddrinfo( |
| | str(local_host), |
| | local_port, |
| | family=family, |
| | type=socket.SOCK_DGRAM, |
| | flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, |
| | ) |
| | family = cast(AnyIPAddressFamily, gai_res[0][0]) |
| | local_address = gai_res[0][-1] |
| |
|
| | gai_res = await getaddrinfo( |
| | str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM |
| | ) |
| | family = cast(AnyIPAddressFamily, gai_res[0][0]) |
| | remote_address = gai_res[0][-1] |
| |
|
| | sock = await get_async_backend().create_udp_socket( |
| | family, local_address, remote_address, reuse_port |
| | ) |
| | return cast(ConnectedUDPSocket, sock) |
| |
|
| |
|
| | async def create_unix_datagram_socket( |
| | *, |
| | local_path: None | str | bytes | PathLike[Any] = None, |
| | local_mode: int | None = None, |
| | ) -> UNIXDatagramSocket: |
| | """ |
| | Create a UNIX datagram socket. |
| | |
| | Not available on Windows. |
| | |
| | If ``local_path`` has been given, the socket will be bound to this path, making this |
| | socket suitable for receiving datagrams from other processes. Other processes can |
| | send datagrams to this socket only if ``local_path`` is set. |
| | |
| | If a socket already exists on the file system in the ``local_path``, it will be |
| | removed first. |
| | |
| | :param local_path: the path on which to bind to |
| | :param local_mode: permissions to set on the local socket |
| | :return: a UNIX datagram socket |
| | |
| | """ |
| | raw_socket = await setup_unix_local_socket( |
| | local_path, local_mode, socket.SOCK_DGRAM |
| | ) |
| | return await get_async_backend().create_unix_datagram_socket(raw_socket, None) |
| |
|
| |
|
| | async def create_connected_unix_datagram_socket( |
| | remote_path: str | bytes | PathLike[Any], |
| | *, |
| | local_path: None | str | bytes | PathLike[Any] = None, |
| | local_mode: int | None = None, |
| | ) -> ConnectedUNIXDatagramSocket: |
| | """ |
| | Create a connected UNIX datagram socket. |
| | |
| | Connected datagram sockets can only communicate with the specified remote path. |
| | |
| | If ``local_path`` has been given, the socket will be bound to this path, making |
| | this socket suitable for receiving datagrams from other processes. Other processes |
| | can send datagrams to this socket only if ``local_path`` is set. |
| | |
| | If a socket already exists on the file system in the ``local_path``, it will be |
| | removed first. |
| | |
| | :param remote_path: the path to set as the default target |
| | :param local_path: the path on which to bind to |
| | :param local_mode: permissions to set on the local socket |
| | :return: a connected UNIX datagram socket |
| | |
| | """ |
| | remote_path = os.fspath(remote_path) |
| | raw_socket = await setup_unix_local_socket( |
| | local_path, local_mode, socket.SOCK_DGRAM |
| | ) |
| | return await get_async_backend().create_unix_datagram_socket( |
| | raw_socket, remote_path |
| | ) |
| |
|
| |
|
| | async def getaddrinfo( |
| | host: bytes | str | None, |
| | port: str | int | None, |
| | *, |
| | family: int | AddressFamily = 0, |
| | type: int | SocketKind = 0, |
| | proto: int = 0, |
| | flags: int = 0, |
| | ) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]: |
| | """ |
| | Look up a numeric IP address given a host name. |
| | |
| | Internationalized domain names are translated according to the (non-transitional) |
| | IDNA 2008 standard. |
| | |
| | .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of |
| | (host, port), unlike what :func:`socket.getaddrinfo` does. |
| | |
| | :param host: host name |
| | :param port: port number |
| | :param family: socket family (`'AF_INET``, ...) |
| | :param type: socket type (``SOCK_STREAM``, ...) |
| | :param proto: protocol number |
| | :param flags: flags to pass to upstream ``getaddrinfo()`` |
| | :return: list of tuples containing (family, type, proto, canonname, sockaddr) |
| | |
| | .. seealso:: :func:`socket.getaddrinfo` |
| | |
| | """ |
| | |
| | if isinstance(host, str): |
| | try: |
| | encoded_host: bytes | None = host.encode("ascii") |
| | except UnicodeEncodeError: |
| | import idna |
| |
|
| | encoded_host = idna.encode(host, uts46=True) |
| | else: |
| | encoded_host = host |
| |
|
| | gai_res = await get_async_backend().getaddrinfo( |
| | encoded_host, port, family=family, type=type, proto=proto, flags=flags |
| | ) |
| | return [ |
| | (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr)) |
| | for family, type, proto, canonname, sockaddr in gai_res |
| | |
| | if not isinstance(sockaddr[0], int) |
| | ] |
| |
|
| |
|
| | def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]: |
| | """ |
| | Look up the host name of an IP address. |
| | |
| | :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4) |
| | :param flags: flags to pass to upstream ``getnameinfo()`` |
| | :return: a tuple of (host name, service name) |
| | :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| | current thread |
| | |
| | .. seealso:: :func:`socket.getnameinfo` |
| | |
| | """ |
| | return get_async_backend().getnameinfo(sockaddr, flags) |
| |
|
| |
|
| | @deprecated("This function is deprecated; use `wait_readable` instead") |
| | def wait_socket_readable(sock: socket.socket) -> Awaitable[None]: |
| | """ |
| | .. deprecated:: 4.7.0 |
| | Use :func:`wait_readable` instead. |
| | |
| | Wait until the given socket has data to be read. |
| | |
| | .. warning:: Only use this on raw sockets that have not been wrapped by any higher |
| | level constructs like socket streams! |
| | |
| | :param sock: a socket object |
| | :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the |
| | socket to become readable |
| | :raises ~anyio.BusyResourceError: if another task is already waiting for the socket |
| | to become readable |
| | :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| | current thread |
| | |
| | """ |
| | return get_async_backend().wait_readable(sock.fileno()) |
| |
|
| |
|
| | @deprecated("This function is deprecated; use `wait_writable` instead") |
| | def wait_socket_writable(sock: socket.socket) -> Awaitable[None]: |
| | """ |
| | .. deprecated:: 4.7.0 |
| | Use :func:`wait_writable` instead. |
| | |
| | Wait until the given socket can be written to. |
| | |
| | This does **NOT** work on Windows when using the asyncio backend with a proactor |
| | event loop (default on py3.8+). |
| | |
| | .. warning:: Only use this on raw sockets that have not been wrapped by any higher |
| | level constructs like socket streams! |
| | |
| | :param sock: a socket object |
| | :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the |
| | socket to become writable |
| | :raises ~anyio.BusyResourceError: if another task is already waiting for the socket |
| | to become writable |
| | :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| | current thread |
| | |
| | """ |
| | return get_async_backend().wait_writable(sock.fileno()) |
| |
|
| |
|
| | def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: |
| | """ |
| | Wait until the given object has data to be read. |
| | |
| | On Unix systems, ``obj`` must either be an integer file descriptor, or else an |
| | object with a ``.fileno()`` method which returns an integer file descriptor. Any |
| | kind of file descriptor can be passed, though the exact semantics will depend on |
| | your kernel. For example, this probably won't do anything useful for on-disk files. |
| | |
| | On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an |
| | object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File |
| | descriptors aren't supported, and neither are handles that refer to anything besides |
| | a ``SOCKET``. |
| | |
| | On backends where this functionality is not natively provided (asyncio |
| | ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread |
| | which is set to shut down when the interpreter shuts down. |
| | |
| | .. warning:: Don't use this on raw sockets that have been wrapped by any higher |
| | level constructs like socket streams! |
| | |
| | :param obj: an object with a ``.fileno()`` method or an integer handle |
| | :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the |
| | object to become readable |
| | :raises ~anyio.BusyResourceError: if another task is already waiting for the object |
| | to become readable |
| | :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| | current thread |
| | |
| | """ |
| | return get_async_backend().wait_readable(obj) |
| |
|
| |
|
| | def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]: |
| | """ |
| | Wait until the given object can be written to. |
| | |
| | :param obj: an object with a ``.fileno()`` method or an integer handle |
| | :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the |
| | object to become writable |
| | :raises ~anyio.BusyResourceError: if another task is already waiting for the object |
| | to become writable |
| | :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| | current thread |
| | |
| | .. seealso:: See the documentation of :func:`wait_readable` for the definition of |
| | ``obj`` and notes on backend compatibility. |
| | |
| | .. warning:: Don't use this on raw sockets that have been wrapped by any higher |
| | level constructs like socket streams! |
| | |
| | """ |
| | return get_async_backend().wait_writable(obj) |
| |
|
| |
|
| | def notify_closing(obj: FileDescriptorLike) -> None: |
| | """ |
| | Call this before closing a file descriptor (on Unix) or socket (on |
| | Windows). This will cause any `wait_readable` or `wait_writable` |
| | calls on the given object to immediately wake up and raise |
| | `~anyio.ClosedResourceError`. |
| | |
| | This doesn't actually close the object – you still have to do that |
| | yourself afterwards. Also, you want to be careful to make sure no |
| | new tasks start waiting on the object in between when you call this |
| | and when it's actually closed. So to close something properly, you |
| | usually want to do these steps in order: |
| | |
| | 1. Explicitly mark the object as closed, so that any new attempts |
| | to use it will abort before they start. |
| | 2. Call `notify_closing` to wake up any already-existing users. |
| | 3. Actually close the object. |
| | |
| | It's also possible to do them in a different order if that's more |
| | convenient, *but only if* you make sure not to have any checkpoints in |
| | between the steps. This way they all happen in a single atomic |
| | step, so other tasks won't be able to tell what order they happened |
| | in anyway. |
| | |
| | :param obj: an object with a ``.fileno()`` method or an integer handle |
| | :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| | current thread |
| | |
| | """ |
| | get_async_backend().notify_closing(obj) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def convert_ipv6_sockaddr( |
| | sockaddr: tuple[str, int, int, int] | tuple[str, int], |
| | ) -> tuple[str, int]: |
| | """ |
| | Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format. |
| | |
| | If the scope ID is nonzero, it is added to the address, separated with ``%``. |
| | Otherwise the flow id and scope id are simply cut off from the tuple. |
| | Any other kinds of socket addresses are returned as-is. |
| | |
| | :param sockaddr: the result of :meth:`~socket.socket.getsockname` |
| | :return: the converted socket address |
| | |
| | """ |
| | |
| | if isinstance(sockaddr, tuple) and len(sockaddr) == 4: |
| | host, port, flowinfo, scope_id = sockaddr |
| | if scope_id: |
| | |
| | |
| | |
| | host = host.split("%")[0] |
| |
|
| | |
| | return f"{host}%{scope_id}", port |
| | else: |
| | return host, port |
| | else: |
| | return sockaddr |
| |
|
| |
|
| | async def setup_unix_local_socket( |
| | path: None | str | bytes | PathLike[Any], |
| | mode: int | None, |
| | socktype: int, |
| | ) -> socket.socket: |
| | """ |
| | Create a UNIX local socket object, deleting the socket at the given path if it |
| | exists. |
| | |
| | Not available on Windows. |
| | |
| | :param path: path of the socket |
| | :param mode: permissions to set on the socket |
| | :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM |
| | |
| | """ |
| | path_str: str | None |
| | if path is not None: |
| | path_str = os.fsdecode(path) |
| |
|
| | |
| | if not path_str.startswith("\0"): |
| | |
| | try: |
| | stat_result = os.stat(path) |
| | except OSError as e: |
| | if e.errno not in ( |
| | errno.ENOENT, |
| | errno.ENOTDIR, |
| | errno.EBADF, |
| | errno.ELOOP, |
| | ): |
| | raise |
| | else: |
| | if stat.S_ISSOCK(stat_result.st_mode): |
| | os.unlink(path) |
| | else: |
| | path_str = None |
| |
|
| | raw_socket = socket.socket(socket.AF_UNIX, socktype) |
| | raw_socket.setblocking(False) |
| |
|
| | if path_str is not None: |
| | try: |
| | await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True) |
| | if mode is not None: |
| | await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True) |
| | except BaseException: |
| | raw_socket.close() |
| | raise |
| |
|
| | return raw_socket |
| |
|
| |
|
| | @dataclass |
| | class TCPConnectable(ByteStreamConnectable): |
| | """ |
| | Connects to a TCP server at the given host and port. |
| | |
| | :param host: host name or IP address of the server |
| | :param port: TCP port number of the server |
| | """ |
| |
|
| | host: str | IPv4Address | IPv6Address |
| | port: int |
| |
|
| | def __post_init__(self) -> None: |
| | if self.port < 1 or self.port > 65535: |
| | raise ValueError("TCP port number out of range") |
| |
|
| | @override |
| | async def connect(self) -> SocketStream: |
| | try: |
| | return await connect_tcp(self.host, self.port) |
| | except OSError as exc: |
| | raise ConnectionFailed( |
| | f"error connecting to {self.host}:{self.port}: {exc}" |
| | ) from exc |
| |
|
| |
|
| | @dataclass |
| | class UNIXConnectable(ByteStreamConnectable): |
| | """ |
| | Connects to a UNIX domain socket at the given path. |
| | |
| | :param path: the file system path of the socket |
| | """ |
| |
|
| | path: str | bytes | PathLike[str] | PathLike[bytes] |
| |
|
| | @override |
| | async def connect(self) -> UNIXSocketStream: |
| | try: |
| | return await connect_unix(self.path) |
| | except OSError as exc: |
| | raise ConnectionFailed(f"error connecting to {self.path!r}: {exc}") from exc |
| |
|
| |
|
| | def as_connectable( |
| | remote: ByteStreamConnectable |
| | | tuple[str | IPv4Address | IPv6Address, int] |
| | | str |
| | | bytes |
| | | PathLike[str], |
| | /, |
| | *, |
| | tls: bool = False, |
| | ssl_context: ssl.SSLContext | None = None, |
| | tls_hostname: str | None = None, |
| | tls_standard_compatible: bool = True, |
| | ) -> ByteStreamConnectable: |
| | """ |
| | Return a byte stream connectable from the given object. |
| | |
| | If a bytestream connectable is given, it is returned unchanged. |
| | If a tuple of (host, port) is given, a TCP connectable is returned. |
| | If a string or bytes path is given, a UNIX connectable is returned. |
| | |
| | If ``tls=True``, the connectable will be wrapped in a |
| | :class:`~.streams.tls.TLSConnectable`. |
| | |
| | :param remote: a connectable, a tuple of (host, port) or a path to a UNIX socket |
| | :param tls: if ``True``, wrap the plaintext connectable in a |
| | :class:`~.streams.tls.TLSConnectable`, using the provided TLS settings) |
| | :param ssl_context: if ``tls=True``, the SSLContext object to use (if not provided, |
| | a secure default will be created) |
| | :param tls_hostname: if ``tls=True``, host name of the server to use for checking |
| | the server certificate (defaults to the host portion of the address for TCP |
| | connectables) |
| | :param tls_standard_compatible: if ``False`` and ``tls=True``, makes the TLS stream |
| | skip the closing handshake when closing the connection, so it won't raise an |
| | exception if the server does the same |
| | |
| | """ |
| | connectable: TCPConnectable | UNIXConnectable | TLSConnectable |
| | if isinstance(remote, ByteStreamConnectable): |
| | return remote |
| | elif isinstance(remote, tuple) and len(remote) == 2: |
| | connectable = TCPConnectable(*remote) |
| | elif isinstance(remote, (str, bytes, PathLike)): |
| | connectable = UNIXConnectable(remote) |
| | else: |
| | raise TypeError(f"cannot convert {remote!r} to a connectable") |
| |
|
| | if tls: |
| | if not tls_hostname and isinstance(connectable, TCPConnectable): |
| | tls_hostname = str(connectable.host) |
| |
|
| | connectable = TLSConnectable( |
| | connectable, |
| | ssl_context=ssl_context, |
| | hostname=tls_hostname, |
| | standard_compatible=tls_standard_compatible, |
| | ) |
| |
|
| | return connectable |
| |
|