| | from __future__ import annotations |
| |
|
| | import errno |
| | import socket |
| | import sys |
| | from abc import abstractmethod |
| | from collections.abc import Callable, Collection, Mapping |
| | from contextlib import AsyncExitStack |
| | from io import IOBase |
| | from ipaddress import IPv4Address, IPv6Address |
| | from socket import AddressFamily |
| | from typing import Any, TypeVar, Union |
| |
|
| | from .._core._eventloop import get_async_backend |
| | from .._core._typedattr import ( |
| | TypedAttributeProvider, |
| | TypedAttributeSet, |
| | typed_attribute, |
| | ) |
| | from ._streams import ByteStream, Listener, UnreliableObjectStream |
| | from ._tasks import TaskGroup |
| |
|
| | if sys.version_info >= (3, 10): |
| | from typing import TypeAlias |
| | else: |
| | from typing_extensions import TypeAlias |
| |
|
| | IPAddressType: TypeAlias = Union[str, IPv4Address, IPv6Address] |
| | IPSockAddrType: TypeAlias = tuple[str, int] |
| | SockAddrType: TypeAlias = Union[IPSockAddrType, str] |
| | UDPPacketType: TypeAlias = tuple[bytes, IPSockAddrType] |
| | UNIXDatagramPacketType: TypeAlias = tuple[bytes, str] |
| | T_Retval = TypeVar("T_Retval") |
| |
|
| |
|
| | def _validate_socket( |
| | sock_or_fd: socket.socket | int, |
| | sock_type: socket.SocketKind, |
| | addr_family: socket.AddressFamily = socket.AF_UNSPEC, |
| | *, |
| | require_connected: bool = False, |
| | require_bound: bool = False, |
| | ) -> socket.socket: |
| | if isinstance(sock_or_fd, int): |
| | try: |
| | sock = socket.socket(fileno=sock_or_fd) |
| | except OSError as exc: |
| | if exc.errno == errno.ENOTSOCK: |
| | raise ValueError( |
| | "the file descriptor does not refer to a socket" |
| | ) from exc |
| | elif require_connected: |
| | raise ValueError("the socket must be connected") from exc |
| | elif require_bound: |
| | raise ValueError("the socket must be bound to a local address") from exc |
| | else: |
| | raise |
| | elif isinstance(sock_or_fd, socket.socket): |
| | sock = sock_or_fd |
| | else: |
| | raise TypeError( |
| | f"expected an int or socket, got {type(sock_or_fd).__qualname__} instead" |
| | ) |
| |
|
| | try: |
| | if require_connected: |
| | try: |
| | sock.getpeername() |
| | except OSError as exc: |
| | raise ValueError("the socket must be connected") from exc |
| |
|
| | if require_bound: |
| | try: |
| | if sock.family in (socket.AF_INET, socket.AF_INET6): |
| | bound_addr = sock.getsockname()[1] |
| | else: |
| | bound_addr = sock.getsockname() |
| | except OSError: |
| | bound_addr = None |
| |
|
| | if not bound_addr: |
| | raise ValueError("the socket must be bound to a local address") |
| |
|
| | if addr_family != socket.AF_UNSPEC and sock.family != addr_family: |
| | raise ValueError( |
| | f"address family mismatch: expected {addr_family.name}, got " |
| | f"{sock.family.name}" |
| | ) |
| |
|
| | if sock.type != sock_type: |
| | raise ValueError( |
| | f"socket type mismatch: expected {sock_type.name}, got {sock.type.name}" |
| | ) |
| | except BaseException: |
| | |
| | if isinstance(sock_or_fd, int): |
| | sock.detach() |
| |
|
| | raise |
| |
|
| | sock.setblocking(False) |
| | return sock |
| |
|
| |
|
| | class SocketAttribute(TypedAttributeSet): |
| | """ |
| | .. attribute:: family |
| | :type: socket.AddressFamily |
| | |
| | the address family of the underlying socket |
| | |
| | .. attribute:: local_address |
| | :type: tuple[str, int] | str |
| | |
| | the local address the underlying socket is connected to |
| | |
| | .. attribute:: local_port |
| | :type: int |
| | |
| | for IP based sockets, the local port the underlying socket is bound to |
| | |
| | .. attribute:: raw_socket |
| | :type: socket.socket |
| | |
| | the underlying stdlib socket object |
| | |
| | .. attribute:: remote_address |
| | :type: tuple[str, int] | str |
| | |
| | the remote address the underlying socket is connected to |
| | |
| | .. attribute:: remote_port |
| | :type: int |
| | |
| | for IP based sockets, the remote port the underlying socket is connected to |
| | """ |
| |
|
| | family: AddressFamily = typed_attribute() |
| | local_address: SockAddrType = typed_attribute() |
| | local_port: int = typed_attribute() |
| | raw_socket: socket.socket = typed_attribute() |
| | remote_address: SockAddrType = typed_attribute() |
| | remote_port: int = typed_attribute() |
| |
|
| |
|
| | class _SocketProvider(TypedAttributeProvider): |
| | @property |
| | def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: |
| | from .._core._sockets import convert_ipv6_sockaddr as convert |
| |
|
| | attributes: dict[Any, Callable[[], Any]] = { |
| | SocketAttribute.family: lambda: self._raw_socket.family, |
| | SocketAttribute.local_address: lambda: convert( |
| | self._raw_socket.getsockname() |
| | ), |
| | SocketAttribute.raw_socket: lambda: self._raw_socket, |
| | } |
| | try: |
| | peername: tuple[str, int] | None = convert(self._raw_socket.getpeername()) |
| | except OSError: |
| | peername = None |
| |
|
| | |
| | if peername is not None: |
| | attributes[SocketAttribute.remote_address] = lambda: peername |
| |
|
| | |
| | if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6): |
| | attributes[SocketAttribute.local_port] = ( |
| | lambda: self._raw_socket.getsockname()[1] |
| | ) |
| | if peername is not None: |
| | remote_port = peername[1] |
| | attributes[SocketAttribute.remote_port] = lambda: remote_port |
| |
|
| | return attributes |
| |
|
| | @property |
| | @abstractmethod |
| | def _raw_socket(self) -> socket.socket: |
| | pass |
| |
|
| |
|
| | class SocketStream(ByteStream, _SocketProvider): |
| | """ |
| | Transports bytes over a socket. |
| | |
| | Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| | """ |
| |
|
| | @classmethod |
| | async def from_socket(cls, sock_or_fd: socket.socket | int) -> SocketStream: |
| | """ |
| | Wrap an existing socket object or file descriptor as a socket stream. |
| | |
| | The newly created socket wrapper takes ownership of the socket being passed in. |
| | The existing socket must already be connected. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a socket stream |
| | |
| | """ |
| | sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_connected=True) |
| | return await get_async_backend().wrap_stream_socket(sock) |
| |
|
| |
|
| | class UNIXSocketStream(SocketStream): |
| | @classmethod |
| | async def from_socket(cls, sock_or_fd: socket.socket | int) -> UNIXSocketStream: |
| | """ |
| | Wrap an existing socket object or file descriptor as a UNIX socket stream. |
| | |
| | The newly created socket wrapper takes ownership of the socket being passed in. |
| | The existing socket must already be connected. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a UNIX socket stream |
| | |
| | """ |
| | sock = _validate_socket( |
| | sock_or_fd, socket.SOCK_STREAM, socket.AF_UNIX, require_connected=True |
| | ) |
| | return await get_async_backend().wrap_unix_stream_socket(sock) |
| |
|
| | @abstractmethod |
| | async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None: |
| | """ |
| | Send file descriptors along with a message to the peer. |
| | |
| | :param message: a non-empty bytestring |
| | :param fds: a collection of files (either numeric file descriptors or open file |
| | or socket objects) |
| | """ |
| |
|
| | @abstractmethod |
| | async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]: |
| | """ |
| | Receive file descriptors along with a message from the peer. |
| | |
| | :param msglen: length of the message to expect from the peer |
| | :param maxfds: maximum number of file descriptors to expect from the peer |
| | :return: a tuple of (message, file descriptors) |
| | """ |
| |
|
| |
|
| | class SocketListener(Listener[SocketStream], _SocketProvider): |
| | """ |
| | Listens to incoming socket connections. |
| | |
| | Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| | """ |
| |
|
| | @classmethod |
| | async def from_socket( |
| | cls, |
| | sock_or_fd: socket.socket | int, |
| | ) -> SocketListener: |
| | """ |
| | Wrap an existing socket object or file descriptor as a socket listener. |
| | |
| | The newly created listener takes ownership of the socket being passed in. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a socket listener |
| | |
| | """ |
| | sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_bound=True) |
| | return await get_async_backend().wrap_listener_socket(sock) |
| |
|
| | @abstractmethod |
| | async def accept(self) -> SocketStream: |
| | """Accept an incoming connection.""" |
| |
|
| | async def serve( |
| | self, |
| | handler: Callable[[SocketStream], Any], |
| | task_group: TaskGroup | None = None, |
| | ) -> None: |
| | from .. import create_task_group |
| |
|
| | async with AsyncExitStack() as stack: |
| | if task_group is None: |
| | task_group = await stack.enter_async_context(create_task_group()) |
| |
|
| | while True: |
| | stream = await self.accept() |
| | task_group.start_soon(handler, stream) |
| |
|
| |
|
| | class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider): |
| | """ |
| | Represents an unconnected UDP socket. |
| | |
| | Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| | """ |
| |
|
| | @classmethod |
| | async def from_socket(cls, sock_or_fd: socket.socket | int) -> UDPSocket: |
| | """ |
| | Wrap an existing socket object or file descriptor as a UDP socket. |
| | |
| | The newly created socket wrapper takes ownership of the socket being passed in. |
| | The existing socket must be bound to a local address. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a UDP socket |
| | |
| | """ |
| | sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, require_bound=True) |
| | return await get_async_backend().wrap_udp_socket(sock) |
| |
|
| | async def sendto(self, data: bytes, host: str, port: int) -> None: |
| | """ |
| | Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))). |
| | |
| | """ |
| | return await self.send((data, (host, port))) |
| |
|
| |
|
| | class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider): |
| | """ |
| | Represents an connected UDP socket. |
| | |
| | Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| | """ |
| |
|
| | @classmethod |
| | async def from_socket(cls, sock_or_fd: socket.socket | int) -> ConnectedUDPSocket: |
| | """ |
| | Wrap an existing socket object or file descriptor as a connected UDP socket. |
| | |
| | The newly created socket wrapper takes ownership of the socket being passed in. |
| | The existing socket must already be connected. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a connected UDP socket |
| | |
| | """ |
| | sock = _validate_socket( |
| | sock_or_fd, |
| | socket.SOCK_DGRAM, |
| | require_connected=True, |
| | ) |
| | return await get_async_backend().wrap_connected_udp_socket(sock) |
| |
|
| |
|
| | class UNIXDatagramSocket( |
| | UnreliableObjectStream[UNIXDatagramPacketType], _SocketProvider |
| | ): |
| | """ |
| | Represents an unconnected Unix datagram socket. |
| | |
| | Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| | """ |
| |
|
| | @classmethod |
| | async def from_socket( |
| | cls, |
| | sock_or_fd: socket.socket | int, |
| | ) -> UNIXDatagramSocket: |
| | """ |
| | Wrap an existing socket object or file descriptor as a UNIX datagram |
| | socket. |
| | |
| | The newly created socket wrapper takes ownership of the socket being passed in. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a UNIX datagram socket |
| | |
| | """ |
| | sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX) |
| | return await get_async_backend().wrap_unix_datagram_socket(sock) |
| |
|
| | async def sendto(self, data: bytes, path: str) -> None: |
| | """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, path)).""" |
| | return await self.send((data, path)) |
| |
|
| |
|
| | class ConnectedUNIXDatagramSocket(UnreliableObjectStream[bytes], _SocketProvider): |
| | """ |
| | Represents a connected Unix datagram socket. |
| | |
| | Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| | """ |
| |
|
| | @classmethod |
| | async def from_socket( |
| | cls, |
| | sock_or_fd: socket.socket | int, |
| | ) -> ConnectedUNIXDatagramSocket: |
| | """ |
| | Wrap an existing socket object or file descriptor as a connected UNIX datagram |
| | socket. |
| | |
| | The newly created socket wrapper takes ownership of the socket being passed in. |
| | The existing socket must already be connected. |
| | |
| | :param sock_or_fd: a socket object or file descriptor |
| | :return: a connected UNIX datagram socket |
| | |
| | """ |
| | sock = _validate_socket( |
| | sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX, require_connected=True |
| | ) |
| | return await get_async_backend().wrap_connected_unix_datagram_socket(sock) |
| |
|