| | from __future__ import annotations |
| |
|
| | import sys |
| |
|
| | assert sys.platform != "win32" |
| |
|
| | import contextlib |
| | import io |
| | import termios |
| | import tty |
| | from asyncio import AbstractEventLoop, get_running_loop |
| | from typing import Callable, ContextManager, Generator, TextIO |
| |
|
| | from ..key_binding import KeyPress |
| | from .base import Input |
| | from .posix_utils import PosixStdinReader |
| | from .vt100_parser import Vt100Parser |
| |
|
| | __all__ = [ |
| | "Vt100Input", |
| | "raw_mode", |
| | "cooked_mode", |
| | ] |
| |
|
| |
|
| | class Vt100Input(Input): |
| | """ |
| | Vt100 input for Posix systems. |
| | (This uses a posix file descriptor that can be registered in the event loop.) |
| | """ |
| |
|
| | |
| | |
| | _fds_not_a_terminal: set[int] = set() |
| |
|
| | def __init__(self, stdin: TextIO) -> None: |
| | |
| | |
| | try: |
| | |
| | stdin.fileno() |
| | except io.UnsupportedOperation as e: |
| | if "idlelib.run" in sys.modules: |
| | raise io.UnsupportedOperation( |
| | "Stdin is not a terminal. Running from Idle is not supported." |
| | ) from e |
| | else: |
| | raise io.UnsupportedOperation("Stdin is not a terminal.") from e |
| |
|
| | |
| | |
| | |
| | |
| | |
| | isatty = stdin.isatty() |
| | fd = stdin.fileno() |
| |
|
| | if not isatty and fd not in Vt100Input._fds_not_a_terminal: |
| | msg = "Warning: Input is not a terminal (fd=%r).\n" |
| | sys.stderr.write(msg % fd) |
| | sys.stderr.flush() |
| | Vt100Input._fds_not_a_terminal.add(fd) |
| |
|
| | |
| | self.stdin = stdin |
| |
|
| | |
| | |
| | self._fileno = stdin.fileno() |
| |
|
| | self._buffer: list[KeyPress] = [] |
| | self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding) |
| | self.vt100_parser = Vt100Parser( |
| | lambda key_press: self._buffer.append(key_press) |
| | ) |
| |
|
| | def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: |
| | """ |
| | Return a context manager that makes this input active in the current |
| | event loop. |
| | """ |
| | return _attached_input(self, input_ready_callback) |
| |
|
| | def detach(self) -> ContextManager[None]: |
| | """ |
| | Return a context manager that makes sure that this input is not active |
| | in the current event loop. |
| | """ |
| | return _detached_input(self) |
| |
|
| | def read_keys(self) -> list[KeyPress]: |
| | "Read list of KeyPress." |
| | |
| | data = self.stdin_reader.read() |
| |
|
| | |
| | self.vt100_parser.feed(data) |
| |
|
| | |
| | result = self._buffer |
| | self._buffer = [] |
| | return result |
| |
|
| | def flush_keys(self) -> list[KeyPress]: |
| | """ |
| | Flush pending keys and return them. |
| | (Used for flushing the 'escape' key.) |
| | """ |
| | |
| | |
| | self.vt100_parser.flush() |
| |
|
| | |
| | result = self._buffer |
| | self._buffer = [] |
| | return result |
| |
|
| | @property |
| | def closed(self) -> bool: |
| | return self.stdin_reader.closed |
| |
|
| | def raw_mode(self) -> ContextManager[None]: |
| | return raw_mode(self.stdin.fileno()) |
| |
|
| | def cooked_mode(self) -> ContextManager[None]: |
| | return cooked_mode(self.stdin.fileno()) |
| |
|
| | def fileno(self) -> int: |
| | return self.stdin.fileno() |
| |
|
| | def typeahead_hash(self) -> str: |
| | return f"fd-{self._fileno}" |
| |
|
| |
|
| | _current_callbacks: dict[ |
| | tuple[AbstractEventLoop, int], Callable[[], None] | None |
| | ] = {} |
| |
|
| |
|
| | @contextlib.contextmanager |
| | def _attached_input( |
| | input: Vt100Input, callback: Callable[[], None] |
| | ) -> Generator[None, None, None]: |
| | """ |
| | Context manager that makes this input active in the current event loop. |
| | |
| | :param input: :class:`~prompt_toolkit.input.Input` object. |
| | :param callback: Called when the input is ready to read. |
| | """ |
| | loop = get_running_loop() |
| | fd = input.fileno() |
| | previous = _current_callbacks.get((loop, fd)) |
| |
|
| | def callback_wrapper() -> None: |
| | """Wrapper around the callback that already removes the reader when |
| | the input is closed. Otherwise, we keep continuously calling this |
| | callback, until we leave the context manager (which can happen a bit |
| | later). This fixes issues when piping /dev/null into a prompt_toolkit |
| | application.""" |
| | if input.closed: |
| | loop.remove_reader(fd) |
| | callback() |
| |
|
| | try: |
| | loop.add_reader(fd, callback_wrapper) |
| | except PermissionError: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | raise EOFError |
| |
|
| | _current_callbacks[loop, fd] = callback |
| |
|
| | try: |
| | yield |
| | finally: |
| | loop.remove_reader(fd) |
| |
|
| | if previous: |
| | loop.add_reader(fd, previous) |
| | _current_callbacks[loop, fd] = previous |
| | else: |
| | del _current_callbacks[loop, fd] |
| |
|
| |
|
| | @contextlib.contextmanager |
| | def _detached_input(input: Vt100Input) -> Generator[None, None, None]: |
| | loop = get_running_loop() |
| | fd = input.fileno() |
| | previous = _current_callbacks.get((loop, fd)) |
| |
|
| | if previous: |
| | loop.remove_reader(fd) |
| | _current_callbacks[loop, fd] = None |
| |
|
| | try: |
| | yield |
| | finally: |
| | if previous: |
| | loop.add_reader(fd, previous) |
| | _current_callbacks[loop, fd] = previous |
| |
|
| |
|
| | class raw_mode: |
| | """ |
| | :: |
| | |
| | with raw_mode(stdin): |
| | ''' the pseudo-terminal stdin is now used in raw mode ''' |
| | |
| | We ignore errors when executing `tcgetattr` fails. |
| | """ |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | def __init__(self, fileno: int) -> None: |
| | self.fileno = fileno |
| | self.attrs_before: list[int | list[bytes | int]] | None |
| | try: |
| | self.attrs_before = termios.tcgetattr(fileno) |
| | except termios.error: |
| | |
| | self.attrs_before = None |
| |
|
| | def __enter__(self) -> None: |
| | |
| | try: |
| | newattr = termios.tcgetattr(self.fileno) |
| | except termios.error: |
| | pass |
| | else: |
| | newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG]) |
| | newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG]) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | newattr[tty.CC][termios.VMIN] = 1 |
| |
|
| | termios.tcsetattr(self.fileno, termios.TCSANOW, newattr) |
| |
|
| | @classmethod |
| | def _patch_lflag(cls, attrs: int) -> int: |
| | return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) |
| |
|
| | @classmethod |
| | def _patch_iflag(cls, attrs: int) -> int: |
| | return attrs & ~( |
| | |
| | |
| | |
| | termios.IXON |
| | | termios.IXOFF |
| | | |
| | |
| | termios.ICRNL |
| | | termios.INLCR |
| | | termios.IGNCR |
| | ) |
| |
|
| | def __exit__(self, *a: object) -> None: |
| | if self.attrs_before is not None: |
| | try: |
| | termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) |
| | except termios.error: |
| | pass |
| |
|
| | |
| | |
| |
|
| |
|
| | class cooked_mode(raw_mode): |
| | """ |
| | The opposite of ``raw_mode``, used when we need cooked mode inside a |
| | `raw_mode` block. Used in `Application.run_in_terminal`.:: |
| | |
| | with cooked_mode(stdin): |
| | ''' the pseudo-terminal stdin is now used in cooked mode. ''' |
| | """ |
| |
|
| | @classmethod |
| | def _patch_lflag(cls, attrs: int) -> int: |
| | return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) |
| |
|
| | @classmethod |
| | def _patch_iflag(cls, attrs: int) -> int: |
| | |
| | |
| | |
| | |
| | return attrs | termios.ICRNL |
| |
|