| from __future__ import annotations |
|
|
| import sys |
|
|
| assert sys.platform != "win32" |
|
|
| import os |
| from contextlib import contextmanager |
| from typing import ContextManager, Iterator, TextIO, cast |
|
|
| from ..utils import DummyContext |
| from .base import PipeInput |
| from .vt100 import Vt100Input |
|
|
| __all__ = [ |
| "PosixPipeInput", |
| ] |
|
|
|
|
| class _Pipe: |
| "Wrapper around os.pipe, that ensures we don't double close any end." |
|
|
| def __init__(self) -> None: |
| self.read_fd, self.write_fd = os.pipe() |
| self._read_closed = False |
| self._write_closed = False |
|
|
| def close_read(self) -> None: |
| "Close read-end if not yet closed." |
| if self._read_closed: |
| return |
|
|
| os.close(self.read_fd) |
| self._read_closed = True |
|
|
| def close_write(self) -> None: |
| "Close write-end if not yet closed." |
| if self._write_closed: |
| return |
|
|
| os.close(self.write_fd) |
| self._write_closed = True |
|
|
| def close(self) -> None: |
| "Close both read and write ends." |
| self.close_read() |
| self.close_write() |
|
|
|
|
| class PosixPipeInput(Vt100Input, PipeInput): |
| """ |
| Input that is send through a pipe. |
| This is useful if we want to send the input programmatically into the |
| application. Mostly useful for unit testing. |
| |
| Usage:: |
| |
| with PosixPipeInput.create() as input: |
| input.send_text('inputdata') |
| """ |
|
|
| _id = 0 |
|
|
| def __init__(self, _pipe: _Pipe, _text: str = "") -> None: |
| |
| self.pipe = _pipe |
|
|
| class Stdin: |
| encoding = "utf-8" |
|
|
| def isatty(stdin) -> bool: |
| return True |
|
|
| def fileno(stdin) -> int: |
| return self.pipe.read_fd |
|
|
| super().__init__(cast(TextIO, Stdin())) |
| self.send_text(_text) |
|
|
| |
| self.__class__._id += 1 |
| self._id = self.__class__._id |
|
|
| @classmethod |
| @contextmanager |
| def create(cls, text: str = "") -> Iterator[PosixPipeInput]: |
| pipe = _Pipe() |
| try: |
| yield PosixPipeInput(_pipe=pipe, _text=text) |
| finally: |
| pipe.close() |
|
|
| def send_bytes(self, data: bytes) -> None: |
| os.write(self.pipe.write_fd, data) |
|
|
| def send_text(self, data: str) -> None: |
| "Send text to the input." |
| os.write(self.pipe.write_fd, data.encode("utf-8")) |
|
|
| def raw_mode(self) -> ContextManager[None]: |
| return DummyContext() |
|
|
| def cooked_mode(self) -> ContextManager[None]: |
| return DummyContext() |
|
|
| def close(self) -> None: |
| "Close pipe fds." |
| |
| |
| |
| |
| self.pipe.close_write() |
|
|
| def typeahead_hash(self) -> str: |
| """ |
| This needs to be unique for every `PipeInput`. |
| """ |
| return f"pipe-input-{self._id}" |
|
|