| from __future__ import annotations |
|
|
| import errno |
| import socket |
| import sys |
| from abc import abstractmethod |
| from collections.abc import Callable, Collection, Mapping |
| from contextlib import AsyncExitStack |
| from io import IOBase |
| from ipaddress import IPv4Address, IPv6Address |
| from socket import AddressFamily |
| from typing import Any, TypeVar, Union |
|
|
| from .._core._eventloop import get_async_backend |
| from .._core._typedattr import ( |
| TypedAttributeProvider, |
| TypedAttributeSet, |
| typed_attribute, |
| ) |
| from ._streams import ByteStream, Listener, UnreliableObjectStream |
| from ._tasks import TaskGroup |
|
|
| if sys.version_info >= (3, 10): |
| from typing import TypeAlias |
| else: |
| from typing_extensions import TypeAlias |
|
|
| IPAddressType: TypeAlias = Union[str, IPv4Address, IPv6Address] |
| IPSockAddrType: TypeAlias = tuple[str, int] |
| SockAddrType: TypeAlias = Union[IPSockAddrType, str] |
| UDPPacketType: TypeAlias = tuple[bytes, IPSockAddrType] |
| UNIXDatagramPacketType: TypeAlias = tuple[bytes, str] |
| T_Retval = TypeVar("T_Retval") |
|
|
|
|
| def _validate_socket( |
| sock_or_fd: socket.socket | int, |
| sock_type: socket.SocketKind, |
| addr_family: socket.AddressFamily = socket.AF_UNSPEC, |
| *, |
| require_connected: bool = False, |
| require_bound: bool = False, |
| ) -> socket.socket: |
| if isinstance(sock_or_fd, int): |
| try: |
| sock = socket.socket(fileno=sock_or_fd) |
| except OSError as exc: |
| if exc.errno == errno.ENOTSOCK: |
| raise ValueError( |
| "the file descriptor does not refer to a socket" |
| ) from exc |
| elif require_connected: |
| raise ValueError("the socket must be connected") from exc |
| elif require_bound: |
| raise ValueError("the socket must be bound to a local address") from exc |
| else: |
| raise |
| elif isinstance(sock_or_fd, socket.socket): |
| sock = sock_or_fd |
| else: |
| raise TypeError( |
| f"expected an int or socket, got {type(sock_or_fd).__qualname__} instead" |
| ) |
|
|
| try: |
| if require_connected: |
| try: |
| sock.getpeername() |
| except OSError as exc: |
| raise ValueError("the socket must be connected") from exc |
|
|
| if require_bound: |
| try: |
| if sock.family in (socket.AF_INET, socket.AF_INET6): |
| bound_addr = sock.getsockname()[1] |
| else: |
| bound_addr = sock.getsockname() |
| except OSError: |
| bound_addr = None |
|
|
| if not bound_addr: |
| raise ValueError("the socket must be bound to a local address") |
|
|
| if addr_family != socket.AF_UNSPEC and sock.family != addr_family: |
| raise ValueError( |
| f"address family mismatch: expected {addr_family.name}, got " |
| f"{sock.family.name}" |
| ) |
|
|
| if sock.type != sock_type: |
| raise ValueError( |
| f"socket type mismatch: expected {sock_type.name}, got {sock.type.name}" |
| ) |
| except BaseException: |
| |
| if isinstance(sock_or_fd, int): |
| sock.detach() |
|
|
| raise |
|
|
| sock.setblocking(False) |
| return sock |
|
|
|
|
| class SocketAttribute(TypedAttributeSet): |
| """ |
| .. attribute:: family |
| :type: socket.AddressFamily |
| |
| the address family of the underlying socket |
| |
| .. attribute:: local_address |
| :type: tuple[str, int] | str |
| |
| the local address the underlying socket is connected to |
| |
| .. attribute:: local_port |
| :type: int |
| |
| for IP based sockets, the local port the underlying socket is bound to |
| |
| .. attribute:: raw_socket |
| :type: socket.socket |
| |
| the underlying stdlib socket object |
| |
| .. attribute:: remote_address |
| :type: tuple[str, int] | str |
| |
| the remote address the underlying socket is connected to |
| |
| .. attribute:: remote_port |
| :type: int |
| |
| for IP based sockets, the remote port the underlying socket is connected to |
| """ |
|
|
| family: AddressFamily = typed_attribute() |
| local_address: SockAddrType = typed_attribute() |
| local_port: int = typed_attribute() |
| raw_socket: socket.socket = typed_attribute() |
| remote_address: SockAddrType = typed_attribute() |
| remote_port: int = typed_attribute() |
|
|
|
|
| class _SocketProvider(TypedAttributeProvider): |
| @property |
| def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: |
| from .._core._sockets import convert_ipv6_sockaddr as convert |
|
|
| attributes: dict[Any, Callable[[], Any]] = { |
| SocketAttribute.family: lambda: self._raw_socket.family, |
| SocketAttribute.local_address: lambda: convert( |
| self._raw_socket.getsockname() |
| ), |
| SocketAttribute.raw_socket: lambda: self._raw_socket, |
| } |
| try: |
| peername: tuple[str, int] | None = convert(self._raw_socket.getpeername()) |
| except OSError: |
| peername = None |
|
|
| |
| if peername is not None: |
| attributes[SocketAttribute.remote_address] = lambda: peername |
|
|
| |
| if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6): |
| attributes[SocketAttribute.local_port] = ( |
| lambda: self._raw_socket.getsockname()[1] |
| ) |
| if peername is not None: |
| remote_port = peername[1] |
| attributes[SocketAttribute.remote_port] = lambda: remote_port |
|
|
| return attributes |
|
|
| @property |
| @abstractmethod |
| def _raw_socket(self) -> socket.socket: |
| pass |
|
|
|
|
| class SocketStream(ByteStream, _SocketProvider): |
| """ |
| Transports bytes over a socket. |
| |
| Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| """ |
|
|
| @classmethod |
| async def from_socket(cls, sock_or_fd: socket.socket | int) -> SocketStream: |
| """ |
| Wrap an existing socket object or file descriptor as a socket stream. |
| |
| The newly created socket wrapper takes ownership of the socket being passed in. |
| The existing socket must already be connected. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a socket stream |
| |
| """ |
| sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_connected=True) |
| return await get_async_backend().wrap_stream_socket(sock) |
|
|
|
|
| class UNIXSocketStream(SocketStream): |
| @classmethod |
| async def from_socket(cls, sock_or_fd: socket.socket | int) -> UNIXSocketStream: |
| """ |
| Wrap an existing socket object or file descriptor as a UNIX socket stream. |
| |
| The newly created socket wrapper takes ownership of the socket being passed in. |
| The existing socket must already be connected. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a UNIX socket stream |
| |
| """ |
| sock = _validate_socket( |
| sock_or_fd, socket.SOCK_STREAM, socket.AF_UNIX, require_connected=True |
| ) |
| return await get_async_backend().wrap_unix_stream_socket(sock) |
|
|
| @abstractmethod |
| async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None: |
| """ |
| Send file descriptors along with a message to the peer. |
| |
| :param message: a non-empty bytestring |
| :param fds: a collection of files (either numeric file descriptors or open file |
| or socket objects) |
| """ |
|
|
| @abstractmethod |
| async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]: |
| """ |
| Receive file descriptors along with a message from the peer. |
| |
| :param msglen: length of the message to expect from the peer |
| :param maxfds: maximum number of file descriptors to expect from the peer |
| :return: a tuple of (message, file descriptors) |
| """ |
|
|
|
|
| class SocketListener(Listener[SocketStream], _SocketProvider): |
| """ |
| Listens to incoming socket connections. |
| |
| Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| """ |
|
|
| @classmethod |
| async def from_socket( |
| cls, |
| sock_or_fd: socket.socket | int, |
| ) -> SocketListener: |
| """ |
| Wrap an existing socket object or file descriptor as a socket listener. |
| |
| The newly created listener takes ownership of the socket being passed in. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a socket listener |
| |
| """ |
| sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_bound=True) |
| return await get_async_backend().wrap_listener_socket(sock) |
|
|
| @abstractmethod |
| async def accept(self) -> SocketStream: |
| """Accept an incoming connection.""" |
|
|
| async def serve( |
| self, |
| handler: Callable[[SocketStream], Any], |
| task_group: TaskGroup | None = None, |
| ) -> None: |
| from .. import create_task_group |
|
|
| async with AsyncExitStack() as stack: |
| if task_group is None: |
| task_group = await stack.enter_async_context(create_task_group()) |
|
|
| while True: |
| stream = await self.accept() |
| task_group.start_soon(handler, stream) |
|
|
|
|
| class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider): |
| """ |
| Represents an unconnected UDP socket. |
| |
| Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| """ |
|
|
| @classmethod |
| async def from_socket(cls, sock_or_fd: socket.socket | int) -> UDPSocket: |
| """ |
| Wrap an existing socket object or file descriptor as a UDP socket. |
| |
| The newly created socket wrapper takes ownership of the socket being passed in. |
| The existing socket must be bound to a local address. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a UDP socket |
| |
| """ |
| sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, require_bound=True) |
| return await get_async_backend().wrap_udp_socket(sock) |
|
|
| async def sendto(self, data: bytes, host: str, port: int) -> None: |
| """ |
| Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))). |
| |
| """ |
| return await self.send((data, (host, port))) |
|
|
|
|
| class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider): |
| """ |
| Represents an connected UDP socket. |
| |
| Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| """ |
|
|
| @classmethod |
| async def from_socket(cls, sock_or_fd: socket.socket | int) -> ConnectedUDPSocket: |
| """ |
| Wrap an existing socket object or file descriptor as a connected UDP socket. |
| |
| The newly created socket wrapper takes ownership of the socket being passed in. |
| The existing socket must already be connected. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a connected UDP socket |
| |
| """ |
| sock = _validate_socket( |
| sock_or_fd, |
| socket.SOCK_DGRAM, |
| require_connected=True, |
| ) |
| return await get_async_backend().wrap_connected_udp_socket(sock) |
|
|
|
|
| class UNIXDatagramSocket( |
| UnreliableObjectStream[UNIXDatagramPacketType], _SocketProvider |
| ): |
| """ |
| Represents an unconnected Unix datagram socket. |
| |
| Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| """ |
|
|
| @classmethod |
| async def from_socket( |
| cls, |
| sock_or_fd: socket.socket | int, |
| ) -> UNIXDatagramSocket: |
| """ |
| Wrap an existing socket object or file descriptor as a UNIX datagram |
| socket. |
| |
| The newly created socket wrapper takes ownership of the socket being passed in. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a UNIX datagram socket |
| |
| """ |
| sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX) |
| return await get_async_backend().wrap_unix_datagram_socket(sock) |
|
|
| async def sendto(self, data: bytes, path: str) -> None: |
| """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, path)).""" |
| return await self.send((data, path)) |
|
|
|
|
| class ConnectedUNIXDatagramSocket(UnreliableObjectStream[bytes], _SocketProvider): |
| """ |
| Represents a connected Unix datagram socket. |
| |
| Supports all relevant extra attributes from :class:`~SocketAttribute`. |
| """ |
|
|
| @classmethod |
| async def from_socket( |
| cls, |
| sock_or_fd: socket.socket | int, |
| ) -> ConnectedUNIXDatagramSocket: |
| """ |
| Wrap an existing socket object or file descriptor as a connected UNIX datagram |
| socket. |
| |
| The newly created socket wrapper takes ownership of the socket being passed in. |
| The existing socket must already be connected. |
| |
| :param sock_or_fd: a socket object or file descriptor |
| :return: a connected UNIX datagram socket |
| |
| """ |
| sock = _validate_socket( |
| sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX, require_connected=True |
| ) |
| return await get_async_backend().wrap_connected_unix_datagram_socket(sock) |
|
|