| | """ |
| | Parser for VT100 input stream. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import re |
| | from typing import Callable, Dict, Generator |
| |
|
| | from ..key_binding.key_processor import KeyPress |
| | from ..keys import Keys |
| | from .ansi_escape_sequences import ANSI_SEQUENCES |
| |
|
| | __all__ = [ |
| | "Vt100Parser", |
| | ] |
| |
|
| |
|
| | |
| | |
| | |
| | _cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") |
| |
|
| | |
| | |
| | _mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") |
| |
|
| | |
| | |
| | |
| | _cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") |
| |
|
| | _mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") |
| |
|
| |
|
| | class _Flush: |
| | """Helper object to indicate flush operation to the parser.""" |
| |
|
| | pass |
| |
|
| |
|
| | class _IsPrefixOfLongerMatchCache(Dict[str, bool]): |
| | """ |
| | Dictionary that maps input sequences to a boolean indicating whether there is |
| | any key that start with this characters. |
| | """ |
| |
|
| | def __missing__(self, prefix: str) -> bool: |
| | |
| | |
| | if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( |
| | prefix |
| | ): |
| | result = True |
| | else: |
| | |
| | result = any( |
| | v |
| | for k, v in ANSI_SEQUENCES.items() |
| | if k.startswith(prefix) and k != prefix |
| | ) |
| |
|
| | self[prefix] = result |
| | return result |
| |
|
| |
|
| | _IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() |
| |
|
| |
|
| | class Vt100Parser: |
| | """ |
| | Parser for VT100 input stream. |
| | Data can be fed through the `feed` method and the given callback will be |
| | called with KeyPress objects. |
| | |
| | :: |
| | |
| | def callback(key): |
| | pass |
| | i = Vt100Parser(callback) |
| | i.feed('data\x01...') |
| | |
| | :attr feed_key_callback: Function that will be called when a key is parsed. |
| | """ |
| |
|
| | |
| | |
| | |
| | def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: |
| | self.feed_key_callback = feed_key_callback |
| | self.reset() |
| |
|
| | def reset(self, request: bool = False) -> None: |
| | self._in_bracketed_paste = False |
| | self._start_parser() |
| |
|
| | def _start_parser(self) -> None: |
| | """ |
| | Start the parser coroutine. |
| | """ |
| | self._input_parser = self._input_parser_generator() |
| | self._input_parser.send(None) |
| |
|
| | def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]: |
| | """ |
| | Return the key (or keys) that maps to this prefix. |
| | """ |
| | |
| | |
| | |
| | if _cpr_response_re.match(prefix): |
| | return Keys.CPRResponse |
| |
|
| | elif _mouse_event_re.match(prefix): |
| | return Keys.Vt100MouseEvent |
| |
|
| | |
| | try: |
| | return ANSI_SEQUENCES[prefix] |
| | except KeyError: |
| | return None |
| |
|
| | def _input_parser_generator(self) -> Generator[None, str | _Flush, None]: |
| | """ |
| | Coroutine (state machine) for the input parser. |
| | """ |
| | prefix = "" |
| | retry = False |
| | flush = False |
| |
|
| | while True: |
| | flush = False |
| |
|
| | if retry: |
| | retry = False |
| | else: |
| | |
| | c = yield |
| |
|
| | if isinstance(c, _Flush): |
| | flush = True |
| | else: |
| | prefix += c |
| |
|
| | |
| | if prefix: |
| | is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] |
| | match = self._get_match(prefix) |
| |
|
| | |
| | if (flush or not is_prefix_of_longer_match) and match: |
| | self._call_handler(match, prefix) |
| | prefix = "" |
| |
|
| | |
| | elif (flush or not is_prefix_of_longer_match) and not match: |
| | found = False |
| | retry = True |
| |
|
| | |
| | |
| | for i in range(len(prefix), 0, -1): |
| | match = self._get_match(prefix[:i]) |
| | if match: |
| | self._call_handler(match, prefix[:i]) |
| | prefix = prefix[i:] |
| | found = True |
| |
|
| | if not found: |
| | self._call_handler(prefix[0], prefix[0]) |
| | prefix = prefix[1:] |
| |
|
| | def _call_handler( |
| | self, key: str | Keys | tuple[Keys, ...], insert_text: str |
| | ) -> None: |
| | """ |
| | Callback to handler. |
| | """ |
| | if isinstance(key, tuple): |
| | |
| | |
| | |
| | |
| | for i, k in enumerate(key): |
| | self._call_handler(k, insert_text if i == 0 else "") |
| | else: |
| | if key == Keys.BracketedPaste: |
| | self._in_bracketed_paste = True |
| | self._paste_buffer = "" |
| | else: |
| | self.feed_key_callback(KeyPress(key, insert_text)) |
| |
|
| | def feed(self, data: str) -> None: |
| | """ |
| | Feed the input stream. |
| | |
| | :param data: Input string (unicode). |
| | """ |
| | |
| | |
| | |
| | if self._in_bracketed_paste: |
| | self._paste_buffer += data |
| | end_mark = "\x1b[201~" |
| |
|
| | if end_mark in self._paste_buffer: |
| | end_index = self._paste_buffer.index(end_mark) |
| |
|
| | |
| | paste_content = self._paste_buffer[:end_index] |
| | self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) |
| |
|
| | |
| | self._in_bracketed_paste = False |
| | remaining = self._paste_buffer[end_index + len(end_mark) :] |
| | self._paste_buffer = "" |
| |
|
| | self.feed(remaining) |
| |
|
| | |
| | else: |
| | for i, c in enumerate(data): |
| | if self._in_bracketed_paste: |
| | |
| | |
| | self.feed(data[i:]) |
| | break |
| | else: |
| | self._input_parser.send(c) |
| |
|
| | def flush(self) -> None: |
| | """ |
| | Flush the buffer of the input stream. |
| | |
| | This will allow us to handle the escape key (or maybe meta) sooner. |
| | The input received by the escape key is actually the same as the first |
| | characters of e.g. Arrow-Up, so without knowing what follows the escape |
| | sequence, we don't know whether escape has been pressed, or whether |
| | it's something else. This flush function should be called after a |
| | timeout, and processes everything that's still in the buffer as-is, so |
| | without assuming any characters will follow. |
| | """ |
| | self._input_parser.send(_Flush()) |
| |
|
| | def feed_and_flush(self, data: str) -> None: |
| | """ |
| | Wrapper around ``feed`` and ``flush``. |
| | """ |
| | self.feed(data) |
| | self.flush() |
| |
|