| | """ |
| | Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in |
| | the asyncio event loop. |
| | |
| | The way this works is by using a custom 'selector' that runs the other event |
| | loop until the real selector is ready. |
| | |
| | It's the responsibility of this event hook to return when there is input ready. |
| | There are two ways to detect when input is ready: |
| | |
| | The inputhook itself is a callable that receives an `InputHookContext`. This |
| | callable should run the other event loop, and return when the main loop has |
| | stuff to do. There are two ways to detect when to return: |
| | |
| | - Call the `input_is_ready` method periodically. Quit when this returns `True`. |
| | |
| | - Add the `fileno` as a watch to the external eventloop. Quit when file descriptor |
| | becomes readable. (But don't read from it.) |
| | |
| | Note that this is not the same as checking for `sys.stdin.fileno()`. The |
| | eventloop of prompt-toolkit allows thread-based executors, for example for |
| | asynchronous autocompletion. When the completion for instance is ready, we |
| | also want prompt-toolkit to gain control again in order to display that. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import asyncio |
| | import os |
| | import select |
| | import selectors |
| | import sys |
| | import threading |
| | from asyncio import AbstractEventLoop, get_running_loop |
| | from selectors import BaseSelector, SelectorKey |
| | from typing import TYPE_CHECKING, Any, Callable, Mapping |
| |
|
| | __all__ = [ |
| | "new_eventloop_with_inputhook", |
| | "set_eventloop_with_inputhook", |
| | "InputHookSelector", |
| | "InputHookContext", |
| | "InputHook", |
| | ] |
| |
|
| | if TYPE_CHECKING: |
| | from _typeshed import FileDescriptorLike |
| | from typing_extensions import TypeAlias |
| |
|
| | _EventMask = int |
| |
|
| |
|
| | class InputHookContext: |
| | """ |
| | Given as a parameter to the inputhook. |
| | """ |
| |
|
| | def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: |
| | self._fileno = fileno |
| | self.input_is_ready = input_is_ready |
| |
|
| | def fileno(self) -> int: |
| | return self._fileno |
| |
|
| |
|
| | InputHook: TypeAlias = Callable[[InputHookContext], None] |
| |
|
| |
|
| | def new_eventloop_with_inputhook( |
| | inputhook: Callable[[InputHookContext], None], |
| | ) -> AbstractEventLoop: |
| | """ |
| | Create a new event loop with the given inputhook. |
| | """ |
| | selector = InputHookSelector(selectors.DefaultSelector(), inputhook) |
| | loop = asyncio.SelectorEventLoop(selector) |
| | return loop |
| |
|
| |
|
| | def set_eventloop_with_inputhook( |
| | inputhook: Callable[[InputHookContext], None], |
| | ) -> AbstractEventLoop: |
| | """ |
| | Create a new event loop with the given inputhook, and activate it. |
| | """ |
| | |
| |
|
| | loop = new_eventloop_with_inputhook(inputhook) |
| | asyncio.set_event_loop(loop) |
| | return loop |
| |
|
| |
|
| | class InputHookSelector(BaseSelector): |
| | """ |
| | Usage: |
| | |
| | selector = selectors.SelectSelector() |
| | loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) |
| | asyncio.set_event_loop(loop) |
| | """ |
| |
|
| | def __init__( |
| | self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] |
| | ) -> None: |
| | self.selector = selector |
| | self.inputhook = inputhook |
| | self._r, self._w = os.pipe() |
| |
|
| | def register( |
| | self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None |
| | ) -> SelectorKey: |
| | return self.selector.register(fileobj, events, data=data) |
| |
|
| | def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: |
| | return self.selector.unregister(fileobj) |
| |
|
| | def modify( |
| | self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None |
| | ) -> SelectorKey: |
| | return self.selector.modify(fileobj, events, data=None) |
| |
|
| | def select( |
| | self, timeout: float | None = None |
| | ) -> list[tuple[SelectorKey, _EventMask]]: |
| | |
| | |
| | if len(getattr(get_running_loop(), "_ready", [])) > 0: |
| | return self.selector.select(timeout=timeout) |
| |
|
| | ready = False |
| | result = None |
| |
|
| | |
| | def run_selector() -> None: |
| | nonlocal ready, result |
| | result = self.selector.select(timeout=timeout) |
| | os.write(self._w, b"x") |
| | ready = True |
| |
|
| | th = threading.Thread(target=run_selector) |
| | th.start() |
| |
|
| | def input_is_ready() -> bool: |
| | return ready |
| |
|
| | |
| | |
| | |
| | |
| | self.inputhook(InputHookContext(self._r, input_is_ready)) |
| |
|
| | |
| | try: |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | if sys.platform != "win32": |
| | select.select([self._r], [], [], None) |
| |
|
| | os.read(self._r, 1024) |
| | except OSError: |
| | |
| | |
| | |
| | pass |
| |
|
| | |
| | th.join() |
| | assert result is not None |
| | return result |
| |
|
| | def close(self) -> None: |
| | """ |
| | Clean up resources. |
| | """ |
| | if self._r: |
| | os.close(self._r) |
| | os.close(self._w) |
| |
|
| | self._r = self._w = -1 |
| | self.selector.close() |
| |
|
| | def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: |
| | return self.selector.get_map() |
| |
|