Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| import sys | |
| from abc import abstractmethod | |
| from asyncio import get_running_loop | |
| from contextlib import contextmanager | |
| from ..utils import SPHINX_AUTODOC_RUNNING | |
| assert sys.platform == "win32" | |
| # Do not import win32-specific stuff when generating documentation. | |
| # Otherwise RTD would be unable to generate docs for this module. | |
| if not SPHINX_AUTODOC_RUNNING: | |
| import msvcrt | |
| from ctypes import windll | |
| from ctypes import Array, pointer | |
| from ctypes.wintypes import DWORD, HANDLE | |
| from typing import Callable, ContextManager, Iterable, Iterator, TextIO | |
| from prompt_toolkit.eventloop import run_in_executor_with_context | |
| from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles | |
| from prompt_toolkit.key_binding.key_processor import KeyPress | |
| from prompt_toolkit.keys import Keys | |
| from prompt_toolkit.mouse_events import MouseButton, MouseEventType | |
| from prompt_toolkit.win32_types import ( | |
| INPUT_RECORD, | |
| KEY_EVENT_RECORD, | |
| MOUSE_EVENT_RECORD, | |
| STD_INPUT_HANDLE, | |
| EventTypes, | |
| ) | |
| from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES | |
| from .base import Input | |
| __all__ = [ | |
| "Win32Input", | |
| "ConsoleInputReader", | |
| "raw_mode", | |
| "cooked_mode", | |
| "attach_win32_input", | |
| "detach_win32_input", | |
| ] | |
| # Win32 Constants for MOUSE_EVENT_RECORD. | |
| # See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str | |
| FROM_LEFT_1ST_BUTTON_PRESSED = 0x1 | |
| RIGHTMOST_BUTTON_PRESSED = 0x2 | |
| MOUSE_MOVED = 0x0001 | |
| MOUSE_WHEELED = 0x0004 | |
| class _Win32InputBase(Input): | |
| """ | |
| Base class for `Win32Input` and `Win32PipeInput`. | |
| """ | |
| def __init__(self) -> None: | |
| self.win32_handles = _Win32Handles() | |
| def handle(self) -> HANDLE: | |
| pass | |
| class Win32Input(_Win32InputBase): | |
| """ | |
| `Input` class that reads from the Windows console. | |
| """ | |
| def __init__(self, stdin: TextIO | None = None) -> None: | |
| super().__init__() | |
| self.console_input_reader = ConsoleInputReader() | |
| def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: | |
| """ | |
| Return a context manager that makes this input active in the current | |
| event loop. | |
| """ | |
| return attach_win32_input(self, input_ready_callback) | |
| def detach(self) -> ContextManager[None]: | |
| """ | |
| Return a context manager that makes sure that this input is not active | |
| in the current event loop. | |
| """ | |
| return detach_win32_input(self) | |
| def read_keys(self) -> list[KeyPress]: | |
| return list(self.console_input_reader.read()) | |
| def flush(self) -> None: | |
| pass | |
| def closed(self) -> bool: | |
| return False | |
| def raw_mode(self) -> ContextManager[None]: | |
| return raw_mode() | |
| def cooked_mode(self) -> ContextManager[None]: | |
| return cooked_mode() | |
| def fileno(self) -> int: | |
| # The windows console doesn't depend on the file handle, so | |
| # this is not used for the event loop (which uses the | |
| # handle instead). But it's used in `Application.run_system_command` | |
| # which opens a subprocess with a given stdin/stdout. | |
| return sys.stdin.fileno() | |
| def typeahead_hash(self) -> str: | |
| return "win32-input" | |
| def close(self) -> None: | |
| self.console_input_reader.close() | |
| def handle(self) -> HANDLE: | |
| return self.console_input_reader.handle | |
| class ConsoleInputReader: | |
| """ | |
| :param recognize_paste: When True, try to discover paste actions and turn | |
| the event into a BracketedPaste. | |
| """ | |
| # Keys with character data. | |
| mappings = { | |
| b"\x1b": Keys.Escape, | |
| b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@) | |
| b"\x01": Keys.ControlA, # Control-A (home) | |
| b"\x02": Keys.ControlB, # Control-B (emacs cursor left) | |
| b"\x03": Keys.ControlC, # Control-C (interrupt) | |
| b"\x04": Keys.ControlD, # Control-D (exit) | |
| b"\x05": Keys.ControlE, # Control-E (end) | |
| b"\x06": Keys.ControlF, # Control-F (cursor forward) | |
| b"\x07": Keys.ControlG, # Control-G | |
| b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') | |
| b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') | |
| b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') | |
| b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) | |
| b"\x0c": Keys.ControlL, # Control-L (clear; form feed) | |
| b"\x0d": Keys.ControlM, # Control-M (enter) | |
| b"\x0e": Keys.ControlN, # Control-N (14) (history forward) | |
| b"\x0f": Keys.ControlO, # Control-O (15) | |
| b"\x10": Keys.ControlP, # Control-P (16) (history back) | |
| b"\x11": Keys.ControlQ, # Control-Q | |
| b"\x12": Keys.ControlR, # Control-R (18) (reverse search) | |
| b"\x13": Keys.ControlS, # Control-S (19) (forward search) | |
| b"\x14": Keys.ControlT, # Control-T | |
| b"\x15": Keys.ControlU, # Control-U | |
| b"\x16": Keys.ControlV, # Control-V | |
| b"\x17": Keys.ControlW, # Control-W | |
| b"\x18": Keys.ControlX, # Control-X | |
| b"\x19": Keys.ControlY, # Control-Y (25) | |
| b"\x1a": Keys.ControlZ, # Control-Z | |
| b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-| | |
| b"\x1d": Keys.ControlSquareClose, # Control-] | |
| b"\x1e": Keys.ControlCircumflex, # Control-^ | |
| b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) | |
| b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.) | |
| } | |
| # Keys that don't carry character data. | |
| keycodes = { | |
| # Home/End | |
| 33: Keys.PageUp, | |
| 34: Keys.PageDown, | |
| 35: Keys.End, | |
| 36: Keys.Home, | |
| # Arrows | |
| 37: Keys.Left, | |
| 38: Keys.Up, | |
| 39: Keys.Right, | |
| 40: Keys.Down, | |
| 45: Keys.Insert, | |
| 46: Keys.Delete, | |
| # F-keys. | |
| 112: Keys.F1, | |
| 113: Keys.F2, | |
| 114: Keys.F3, | |
| 115: Keys.F4, | |
| 116: Keys.F5, | |
| 117: Keys.F6, | |
| 118: Keys.F7, | |
| 119: Keys.F8, | |
| 120: Keys.F9, | |
| 121: Keys.F10, | |
| 122: Keys.F11, | |
| 123: Keys.F12, | |
| } | |
| LEFT_ALT_PRESSED = 0x0002 | |
| RIGHT_ALT_PRESSED = 0x0001 | |
| SHIFT_PRESSED = 0x0010 | |
| LEFT_CTRL_PRESSED = 0x0008 | |
| RIGHT_CTRL_PRESSED = 0x0004 | |
| def __init__(self, recognize_paste: bool = True) -> None: | |
| self._fdcon = None | |
| self.recognize_paste = recognize_paste | |
| # When stdin is a tty, use that handle, otherwise, create a handle from | |
| # CONIN$. | |
| self.handle: HANDLE | |
| if sys.stdin.isatty(): | |
| self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) | |
| else: | |
| self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) | |
| self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon)) | |
| def close(self) -> None: | |
| "Close fdcon." | |
| if self._fdcon is not None: | |
| os.close(self._fdcon) | |
| def read(self) -> Iterable[KeyPress]: | |
| """ | |
| Return a list of `KeyPress` instances. It won't return anything when | |
| there was nothing to read. (This function doesn't block.) | |
| http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx | |
| """ | |
| max_count = 2048 # Max events to read at the same time. | |
| read = DWORD(0) | |
| arrtype = INPUT_RECORD * max_count | |
| input_records = arrtype() | |
| # Check whether there is some input to read. `ReadConsoleInputW` would | |
| # block otherwise. | |
| # (Actually, the event loop is responsible to make sure that this | |
| # function is only called when there is something to read, but for some | |
| # reason this happened in the asyncio_win32 loop, and it's better to be | |
| # safe anyway.) | |
| if not wait_for_handles([self.handle], timeout=0): | |
| return | |
| # Get next batch of input event. | |
| windll.kernel32.ReadConsoleInputW( | |
| self.handle, pointer(input_records), max_count, pointer(read) | |
| ) | |
| # First, get all the keys from the input buffer, in order to determine | |
| # whether we should consider this a paste event or not. | |
| all_keys = list(self._get_keys(read, input_records)) | |
| # Fill in 'data' for key presses. | |
| all_keys = [self._insert_key_data(key) for key in all_keys] | |
| # Correct non-bmp characters that are passed as separate surrogate codes | |
| all_keys = list(self._merge_paired_surrogates(all_keys)) | |
| if self.recognize_paste and self._is_paste(all_keys): | |
| gen = iter(all_keys) | |
| k: KeyPress | None | |
| for k in gen: | |
| # Pasting: if the current key consists of text or \n, turn it | |
| # into a BracketedPaste. | |
| data = [] | |
| while k and ( | |
| not isinstance(k.key, Keys) | |
| or k.key in {Keys.ControlJ, Keys.ControlM} | |
| ): | |
| data.append(k.data) | |
| try: | |
| k = next(gen) | |
| except StopIteration: | |
| k = None | |
| if data: | |
| yield KeyPress(Keys.BracketedPaste, "".join(data)) | |
| if k is not None: | |
| yield k | |
| else: | |
| yield from all_keys | |
| def _insert_key_data(self, key_press: KeyPress) -> KeyPress: | |
| """ | |
| Insert KeyPress data, for vt100 compatibility. | |
| """ | |
| if key_press.data: | |
| return key_press | |
| if isinstance(key_press.key, Keys): | |
| data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "") | |
| else: | |
| data = "" | |
| return KeyPress(key_press.key, data) | |
| def _get_keys( | |
| self, read: DWORD, input_records: Array[INPUT_RECORD] | |
| ) -> Iterator[KeyPress]: | |
| """ | |
| Generator that yields `KeyPress` objects from the input records. | |
| """ | |
| for i in range(read.value): | |
| ir = input_records[i] | |
| # Get the right EventType from the EVENT_RECORD. | |
| # (For some reason the Windows console application 'cmder' | |
| # [http://gooseberrycreative.com/cmder/] can return '0' for | |
| # ir.EventType. -- Just ignore that.) | |
| if ir.EventType in EventTypes: | |
| ev = getattr(ir.Event, EventTypes[ir.EventType]) | |
| # Process if this is a key event. (We also have mouse, menu and | |
| # focus events.) | |
| if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown: | |
| yield from self._event_to_key_presses(ev) | |
| elif isinstance(ev, MOUSE_EVENT_RECORD): | |
| yield from self._handle_mouse(ev) | |
| def _merge_paired_surrogates(key_presses: list[KeyPress]) -> Iterator[KeyPress]: | |
| """ | |
| Combines consecutive KeyPresses with high and low surrogates into | |
| single characters | |
| """ | |
| buffered_high_surrogate = None | |
| for key in key_presses: | |
| is_text = not isinstance(key.key, Keys) | |
| is_high_surrogate = is_text and "\ud800" <= key.key <= "\udbff" | |
| is_low_surrogate = is_text and "\udc00" <= key.key <= "\udfff" | |
| if buffered_high_surrogate: | |
| if is_low_surrogate: | |
| # convert high surrogate + low surrogate to single character | |
| fullchar = ( | |
| (buffered_high_surrogate.key + key.key) | |
| .encode("utf-16-le", "surrogatepass") | |
| .decode("utf-16-le") | |
| ) | |
| key = KeyPress(fullchar, fullchar) | |
| else: | |
| yield buffered_high_surrogate | |
| buffered_high_surrogate = None | |
| if is_high_surrogate: | |
| buffered_high_surrogate = key | |
| else: | |
| yield key | |
| if buffered_high_surrogate: | |
| yield buffered_high_surrogate | |
| def _is_paste(keys: list[KeyPress]) -> bool: | |
| """ | |
| Return `True` when we should consider this list of keys as a paste | |
| event. Pasted text on windows will be turned into a | |
| `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably | |
| the best possible way to detect pasting of text and handle that | |
| correctly.) | |
| """ | |
| # Consider paste when it contains at least one newline and at least one | |
| # other character. | |
| text_count = 0 | |
| newline_count = 0 | |
| for k in keys: | |
| if not isinstance(k.key, Keys): | |
| text_count += 1 | |
| if k.key == Keys.ControlM: | |
| newline_count += 1 | |
| return newline_count >= 1 and text_count >= 1 | |
| def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> list[KeyPress]: | |
| """ | |
| For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances. | |
| """ | |
| assert isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown | |
| result: KeyPress | None = None | |
| control_key_state = ev.ControlKeyState | |
| u_char = ev.uChar.UnicodeChar | |
| # Use surrogatepass because u_char may be an unmatched surrogate | |
| ascii_char = u_char.encode("utf-8", "surrogatepass") | |
| # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the | |
| # unicode code point truncated to 1 byte. See also: | |
| # https://github.com/ipython/ipython/issues/10004 | |
| # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389 | |
| if u_char == "\x00": | |
| if ev.VirtualKeyCode in self.keycodes: | |
| result = KeyPress(self.keycodes[ev.VirtualKeyCode], "") | |
| else: | |
| if ascii_char in self.mappings: | |
| if self.mappings[ascii_char] == Keys.ControlJ: | |
| u_char = ( | |
| "\n" # Windows sends \n, turn into \r for unix compatibility. | |
| ) | |
| result = KeyPress(self.mappings[ascii_char], u_char) | |
| else: | |
| result = KeyPress(u_char, u_char) | |
| # First we handle Shift-Control-Arrow/Home/End (need to do this first) | |
| if ( | |
| ( | |
| control_key_state & self.LEFT_CTRL_PRESSED | |
| or control_key_state & self.RIGHT_CTRL_PRESSED | |
| ) | |
| and control_key_state & self.SHIFT_PRESSED | |
| and result | |
| ): | |
| mapping: dict[str, str] = { | |
| Keys.Left: Keys.ControlShiftLeft, | |
| Keys.Right: Keys.ControlShiftRight, | |
| Keys.Up: Keys.ControlShiftUp, | |
| Keys.Down: Keys.ControlShiftDown, | |
| Keys.Home: Keys.ControlShiftHome, | |
| Keys.End: Keys.ControlShiftEnd, | |
| Keys.Insert: Keys.ControlShiftInsert, | |
| Keys.PageUp: Keys.ControlShiftPageUp, | |
| Keys.PageDown: Keys.ControlShiftPageDown, | |
| } | |
| result.key = mapping.get(result.key, result.key) | |
| # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys. | |
| if ( | |
| control_key_state & self.LEFT_CTRL_PRESSED | |
| or control_key_state & self.RIGHT_CTRL_PRESSED | |
| ) and result: | |
| mapping = { | |
| Keys.Left: Keys.ControlLeft, | |
| Keys.Right: Keys.ControlRight, | |
| Keys.Up: Keys.ControlUp, | |
| Keys.Down: Keys.ControlDown, | |
| Keys.Home: Keys.ControlHome, | |
| Keys.End: Keys.ControlEnd, | |
| Keys.Insert: Keys.ControlInsert, | |
| Keys.Delete: Keys.ControlDelete, | |
| Keys.PageUp: Keys.ControlPageUp, | |
| Keys.PageDown: Keys.ControlPageDown, | |
| } | |
| result.key = mapping.get(result.key, result.key) | |
| # Turn 'Tab' into 'BackTab' when shift was pressed. | |
| # Also handle other shift-key combination | |
| if control_key_state & self.SHIFT_PRESSED and result: | |
| mapping = { | |
| Keys.Tab: Keys.BackTab, | |
| Keys.Left: Keys.ShiftLeft, | |
| Keys.Right: Keys.ShiftRight, | |
| Keys.Up: Keys.ShiftUp, | |
| Keys.Down: Keys.ShiftDown, | |
| Keys.Home: Keys.ShiftHome, | |
| Keys.End: Keys.ShiftEnd, | |
| Keys.Insert: Keys.ShiftInsert, | |
| Keys.Delete: Keys.ShiftDelete, | |
| Keys.PageUp: Keys.ShiftPageUp, | |
| Keys.PageDown: Keys.ShiftPageDown, | |
| } | |
| result.key = mapping.get(result.key, result.key) | |
| # Turn 'Space' into 'ControlSpace' when control was pressed. | |
| if ( | |
| ( | |
| control_key_state & self.LEFT_CTRL_PRESSED | |
| or control_key_state & self.RIGHT_CTRL_PRESSED | |
| ) | |
| and result | |
| and result.data == " " | |
| ): | |
| result = KeyPress(Keys.ControlSpace, " ") | |
| # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot | |
| # detect this combination. But it's really practical on Windows.) | |
| if ( | |
| ( | |
| control_key_state & self.LEFT_CTRL_PRESSED | |
| or control_key_state & self.RIGHT_CTRL_PRESSED | |
| ) | |
| and result | |
| and result.key == Keys.ControlJ | |
| ): | |
| return [KeyPress(Keys.Escape, ""), result] | |
| # Return result. If alt was pressed, prefix the result with an | |
| # 'Escape' key, just like unix VT100 terminals do. | |
| # NOTE: Only replace the left alt with escape. The right alt key often | |
| # acts as altgr and is used in many non US keyboard layouts for | |
| # typing some special characters, like a backslash. We don't want | |
| # all backslashes to be prefixed with escape. (Esc-\ has a | |
| # meaning in E-macs, for instance.) | |
| if result: | |
| meta_pressed = control_key_state & self.LEFT_ALT_PRESSED | |
| if meta_pressed: | |
| return [KeyPress(Keys.Escape, ""), result] | |
| else: | |
| return [result] | |
| else: | |
| return [] | |
| def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> list[KeyPress]: | |
| """ | |
| Handle mouse events. Return a list of KeyPress instances. | |
| """ | |
| event_flags = ev.EventFlags | |
| button_state = ev.ButtonState | |
| event_type: MouseEventType | None = None | |
| button: MouseButton = MouseButton.NONE | |
| # Scroll events. | |
| if event_flags & MOUSE_WHEELED: | |
| if button_state > 0: | |
| event_type = MouseEventType.SCROLL_UP | |
| else: | |
| event_type = MouseEventType.SCROLL_DOWN | |
| else: | |
| # Handle button state for non-scroll events. | |
| if button_state == FROM_LEFT_1ST_BUTTON_PRESSED: | |
| button = MouseButton.LEFT | |
| elif button_state == RIGHTMOST_BUTTON_PRESSED: | |
| button = MouseButton.RIGHT | |
| # Move events. | |
| if event_flags & MOUSE_MOVED: | |
| event_type = MouseEventType.MOUSE_MOVE | |
| # No key pressed anymore: mouse up. | |
| if event_type is None: | |
| if button_state > 0: | |
| # Some button pressed. | |
| event_type = MouseEventType.MOUSE_DOWN | |
| else: | |
| # No button pressed. | |
| event_type = MouseEventType.MOUSE_UP | |
| data = ";".join( | |
| [ | |
| button.value, | |
| event_type.value, | |
| str(ev.MousePosition.X), | |
| str(ev.MousePosition.Y), | |
| ] | |
| ) | |
| return [KeyPress(Keys.WindowsMouseEvent, data)] | |
| class _Win32Handles: | |
| """ | |
| Utility to keep track of which handles are connectod to which callbacks. | |
| `add_win32_handle` starts a tiny event loop in another thread which waits | |
| for the Win32 handle to become ready. When this happens, the callback will | |
| be called in the current asyncio event loop using `call_soon_threadsafe`. | |
| `remove_win32_handle` will stop this tiny event loop. | |
| NOTE: We use this technique, so that we don't have to use the | |
| `ProactorEventLoop` on Windows and we can wait for things like stdin | |
| in a `SelectorEventLoop`. This is important, because our inputhook | |
| mechanism (used by IPython), only works with the `SelectorEventLoop`. | |
| """ | |
| def __init__(self) -> None: | |
| self._handle_callbacks: dict[int, Callable[[], None]] = {} | |
| # Windows Events that are triggered when we have to stop watching this | |
| # handle. | |
| self._remove_events: dict[int, HANDLE] = {} | |
| def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None: | |
| """ | |
| Add a Win32 handle to the event loop. | |
| """ | |
| handle_value = handle.value | |
| if handle_value is None: | |
| raise ValueError("Invalid handle.") | |
| # Make sure to remove a previous registered handler first. | |
| self.remove_win32_handle(handle) | |
| loop = get_running_loop() | |
| self._handle_callbacks[handle_value] = callback | |
| # Create remove event. | |
| remove_event = create_win32_event() | |
| self._remove_events[handle_value] = remove_event | |
| # Add reader. | |
| def ready() -> None: | |
| # Tell the callback that input's ready. | |
| try: | |
| callback() | |
| finally: | |
| run_in_executor_with_context(wait, loop=loop) | |
| # Wait for the input to become ready. | |
| # (Use an executor for this, the Windows asyncio event loop doesn't | |
| # allow us to wait for handles like stdin.) | |
| def wait() -> None: | |
| # Wait until either the handle becomes ready, or the remove event | |
| # has been set. | |
| result = wait_for_handles([remove_event, handle]) | |
| if result is remove_event: | |
| windll.kernel32.CloseHandle(remove_event) | |
| return | |
| else: | |
| loop.call_soon_threadsafe(ready) | |
| run_in_executor_with_context(wait, loop=loop) | |
| def remove_win32_handle(self, handle: HANDLE) -> Callable[[], None] | None: | |
| """ | |
| Remove a Win32 handle from the event loop. | |
| Return either the registered handler or `None`. | |
| """ | |
| if handle.value is None: | |
| return None # Ignore. | |
| # Trigger remove events, so that the reader knows to stop. | |
| try: | |
| event = self._remove_events.pop(handle.value) | |
| except KeyError: | |
| pass | |
| else: | |
| windll.kernel32.SetEvent(event) | |
| try: | |
| return self._handle_callbacks.pop(handle.value) | |
| except KeyError: | |
| return None | |
| def attach_win32_input( | |
| input: _Win32InputBase, callback: Callable[[], None] | |
| ) -> Iterator[None]: | |
| """ | |
| Context manager that makes this input active in the current event loop. | |
| :param input: :class:`~prompt_toolkit.input.Input` object. | |
| :param input_ready_callback: Called when the input is ready to read. | |
| """ | |
| win32_handles = input.win32_handles | |
| handle = input.handle | |
| if handle.value is None: | |
| raise ValueError("Invalid handle.") | |
| # Add reader. | |
| previous_callback = win32_handles.remove_win32_handle(handle) | |
| win32_handles.add_win32_handle(handle, callback) | |
| try: | |
| yield | |
| finally: | |
| win32_handles.remove_win32_handle(handle) | |
| if previous_callback: | |
| win32_handles.add_win32_handle(handle, previous_callback) | |
| def detach_win32_input(input: _Win32InputBase) -> Iterator[None]: | |
| win32_handles = input.win32_handles | |
| handle = input.handle | |
| if handle.value is None: | |
| raise ValueError("Invalid handle.") | |
| previous_callback = win32_handles.remove_win32_handle(handle) | |
| try: | |
| yield | |
| finally: | |
| if previous_callback: | |
| win32_handles.add_win32_handle(handle, previous_callback) | |
| class raw_mode: | |
| """ | |
| :: | |
| with raw_mode(stdin): | |
| ''' the windows terminal is now in 'raw' mode. ''' | |
| The ``fileno`` attribute is ignored. This is to be compatible with the | |
| `raw_input` method of `.vt100_input`. | |
| """ | |
| def __init__(self, fileno: int | None = None) -> None: | |
| self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) | |
| def __enter__(self) -> None: | |
| # Remember original mode. | |
| original_mode = DWORD() | |
| windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode)) | |
| self.original_mode = original_mode | |
| self._patch() | |
| def _patch(self) -> None: | |
| # Set raw | |
| ENABLE_ECHO_INPUT = 0x0004 | |
| ENABLE_LINE_INPUT = 0x0002 | |
| ENABLE_PROCESSED_INPUT = 0x0001 | |
| windll.kernel32.SetConsoleMode( | |
| self.handle, | |
| self.original_mode.value | |
| & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), | |
| ) | |
| def __exit__(self, *a: object) -> None: | |
| # Restore original mode | |
| windll.kernel32.SetConsoleMode(self.handle, self.original_mode) | |
| class cooked_mode(raw_mode): | |
| """ | |
| :: | |
| with cooked_mode(stdin): | |
| ''' The pseudo-terminal stdin is now used in cooked mode. ''' | |
| """ | |
| def _patch(self) -> None: | |
| # Set cooked. | |
| ENABLE_ECHO_INPUT = 0x0004 | |
| ENABLE_LINE_INPUT = 0x0002 | |
| ENABLE_PROCESSED_INPUT = 0x0001 | |
| windll.kernel32.SetConsoleMode( | |
| self.handle, | |
| self.original_mode.value | |
| | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), | |
| ) | |