| | from __future__ import annotations |
| |
|
| | __all__ = ( |
| | "TextConnectable", |
| | "TextReceiveStream", |
| | "TextSendStream", |
| | "TextStream", |
| | ) |
| |
|
| | import codecs |
| | import sys |
| | from collections.abc import Callable, Mapping |
| | from dataclasses import InitVar, dataclass, field |
| | from typing import Any |
| |
|
| | from ..abc import ( |
| | AnyByteReceiveStream, |
| | AnyByteSendStream, |
| | AnyByteStream, |
| | AnyByteStreamConnectable, |
| | ObjectReceiveStream, |
| | ObjectSendStream, |
| | ObjectStream, |
| | ObjectStreamConnectable, |
| | ) |
| |
|
| | if sys.version_info >= (3, 12): |
| | from typing import override |
| | else: |
| | from typing_extensions import override |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class TextReceiveStream(ObjectReceiveStream[str]): |
| | """ |
| | Stream wrapper that decodes bytes to strings using the given encoding. |
| | |
| | Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any |
| | completely received unicode characters as soon as they come in. |
| | |
| | :param transport_stream: any bytes-based receive stream |
| | :param encoding: character encoding to use for decoding bytes to strings (defaults |
| | to ``utf-8``) |
| | :param errors: handling scheme for decoding errors (defaults to ``strict``; see the |
| | `codecs module documentation`_ for a comprehensive list of options) |
| | |
| | .. _codecs module documentation: |
| | https://docs.python.org/3/library/codecs.html#codec-objects |
| | """ |
| |
|
| | transport_stream: AnyByteReceiveStream |
| | encoding: InitVar[str] = "utf-8" |
| | errors: InitVar[str] = "strict" |
| | _decoder: codecs.IncrementalDecoder = field(init=False) |
| |
|
| | def __post_init__(self, encoding: str, errors: str) -> None: |
| | decoder_class = codecs.getincrementaldecoder(encoding) |
| | self._decoder = decoder_class(errors=errors) |
| |
|
| | async def receive(self) -> str: |
| | while True: |
| | chunk = await self.transport_stream.receive() |
| | decoded = self._decoder.decode(chunk) |
| | if decoded: |
| | return decoded |
| |
|
| | async def aclose(self) -> None: |
| | await self.transport_stream.aclose() |
| | self._decoder.reset() |
| |
|
| | @property |
| | def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: |
| | return self.transport_stream.extra_attributes |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class TextSendStream(ObjectSendStream[str]): |
| | """ |
| | Sends strings to the wrapped stream as bytes using the given encoding. |
| | |
| | :param AnyByteSendStream transport_stream: any bytes-based send stream |
| | :param str encoding: character encoding to use for encoding strings to bytes |
| | (defaults to ``utf-8``) |
| | :param str errors: handling scheme for encoding errors (defaults to ``strict``; see |
| | the `codecs module documentation`_ for a comprehensive list of options) |
| | |
| | .. _codecs module documentation: |
| | https://docs.python.org/3/library/codecs.html#codec-objects |
| | """ |
| |
|
| | transport_stream: AnyByteSendStream |
| | encoding: InitVar[str] = "utf-8" |
| | errors: str = "strict" |
| | _encoder: Callable[..., tuple[bytes, int]] = field(init=False) |
| |
|
| | def __post_init__(self, encoding: str) -> None: |
| | self._encoder = codecs.getencoder(encoding) |
| |
|
| | async def send(self, item: str) -> None: |
| | encoded = self._encoder(item, self.errors)[0] |
| | await self.transport_stream.send(encoded) |
| |
|
| | async def aclose(self) -> None: |
| | await self.transport_stream.aclose() |
| |
|
| | @property |
| | def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: |
| | return self.transport_stream.extra_attributes |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class TextStream(ObjectStream[str]): |
| | """ |
| | A bidirectional stream that decodes bytes to strings on receive and encodes strings |
| | to bytes on send. |
| | |
| | Extra attributes will be provided from both streams, with the receive stream |
| | providing the values in case of a conflict. |
| | |
| | :param AnyByteStream transport_stream: any bytes-based stream |
| | :param str encoding: character encoding to use for encoding/decoding strings to/from |
| | bytes (defaults to ``utf-8``) |
| | :param str errors: handling scheme for encoding errors (defaults to ``strict``; see |
| | the `codecs module documentation`_ for a comprehensive list of options) |
| | |
| | .. _codecs module documentation: |
| | https://docs.python.org/3/library/codecs.html#codec-objects |
| | """ |
| |
|
| | transport_stream: AnyByteStream |
| | encoding: InitVar[str] = "utf-8" |
| | errors: InitVar[str] = "strict" |
| | _receive_stream: TextReceiveStream = field(init=False) |
| | _send_stream: TextSendStream = field(init=False) |
| |
|
| | def __post_init__(self, encoding: str, errors: str) -> None: |
| | self._receive_stream = TextReceiveStream( |
| | self.transport_stream, encoding=encoding, errors=errors |
| | ) |
| | self._send_stream = TextSendStream( |
| | self.transport_stream, encoding=encoding, errors=errors |
| | ) |
| |
|
| | async def receive(self) -> str: |
| | return await self._receive_stream.receive() |
| |
|
| | async def send(self, item: str) -> None: |
| | await self._send_stream.send(item) |
| |
|
| | async def send_eof(self) -> None: |
| | await self.transport_stream.send_eof() |
| |
|
| | async def aclose(self) -> None: |
| | await self._send_stream.aclose() |
| | await self._receive_stream.aclose() |
| |
|
| | @property |
| | def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: |
| | return { |
| | **self._send_stream.extra_attributes, |
| | **self._receive_stream.extra_attributes, |
| | } |
| |
|
| |
|
| | class TextConnectable(ObjectStreamConnectable[str]): |
| | def __init__(self, connectable: AnyByteStreamConnectable): |
| | """ |
| | :param connectable: the bytestream endpoint to wrap |
| | |
| | """ |
| | self.connectable = connectable |
| |
|
| | @override |
| | async def connect(self) -> TextStream: |
| | stream = await self.connectable.connect() |
| | return TextStream(stream) |
| |
|