| | """ |
| | Utility for running a prompt_toolkit application in an asyncssh server. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import asyncio |
| | import traceback |
| | from asyncio import get_running_loop |
| | from typing import Any, Callable, Coroutine, TextIO, cast |
| |
|
| | import asyncssh |
| |
|
| | from prompt_toolkit.application.current import AppSession, create_app_session |
| | from prompt_toolkit.data_structures import Size |
| | from prompt_toolkit.input import PipeInput, create_pipe_input |
| | from prompt_toolkit.output.vt100 import Vt100_Output |
| |
|
| | __all__ = ["PromptToolkitSSHSession", "PromptToolkitSSHServer"] |
| |
|
| |
|
| | class PromptToolkitSSHSession(asyncssh.SSHServerSession): |
| | def __init__( |
| | self, |
| | interact: Callable[[PromptToolkitSSHSession], Coroutine[Any, Any, None]], |
| | *, |
| | enable_cpr: bool, |
| | ) -> None: |
| | self.interact = interact |
| | self.enable_cpr = enable_cpr |
| | self.interact_task: asyncio.Task[None] | None = None |
| | self._chan: Any | None = None |
| | self.app_session: AppSession | None = None |
| |
|
| | |
| | |
| | |
| | self._input: PipeInput | None = None |
| | self._output: Vt100_Output | None = None |
| |
|
| | |
| | |
| | class Stdout: |
| | def write(s, data: str) -> None: |
| | try: |
| | if self._chan is not None: |
| | self._chan.write(data.replace("\n", "\r\n")) |
| | except BrokenPipeError: |
| | pass |
| |
|
| | def isatty(s) -> bool: |
| | return True |
| |
|
| | def flush(s) -> None: |
| | pass |
| |
|
| | @property |
| | def encoding(s) -> str: |
| | assert self._chan is not None |
| | return str(self._chan._orig_chan.get_encoding()[0]) |
| |
|
| | self.stdout = cast(TextIO, Stdout()) |
| |
|
| | def _get_size(self) -> Size: |
| | """ |
| | Callable that returns the current `Size`, required by Vt100_Output. |
| | """ |
| | if self._chan is None: |
| | return Size(rows=20, columns=79) |
| | else: |
| | width, height, pixwidth, pixheight = self._chan.get_terminal_size() |
| | return Size(rows=height, columns=width) |
| |
|
| | def connection_made(self, chan: Any) -> None: |
| | self._chan = chan |
| |
|
| | def shell_requested(self) -> bool: |
| | return True |
| |
|
| | def session_started(self) -> None: |
| | self.interact_task = get_running_loop().create_task(self._interact()) |
| |
|
| | async def _interact(self) -> None: |
| | if self._chan is None: |
| | |
| | raise Exception("`_interact` called before `connection_made`.") |
| |
|
| | if hasattr(self._chan, "set_line_mode") and self._chan._editor is not None: |
| | |
| | |
| | self._chan.set_line_mode(False) |
| |
|
| | term = self._chan.get_terminal_type() |
| |
|
| | self._output = Vt100_Output( |
| | self.stdout, self._get_size, term=term, enable_cpr=self.enable_cpr |
| | ) |
| |
|
| | with create_pipe_input() as self._input: |
| | with create_app_session(input=self._input, output=self._output) as session: |
| | self.app_session = session |
| | try: |
| | await self.interact(self) |
| | except BaseException: |
| | traceback.print_exc() |
| | finally: |
| | |
| | self._chan.close() |
| | self._input.close() |
| |
|
| | def terminal_size_changed( |
| | self, width: int, height: int, pixwidth: object, pixheight: object |
| | ) -> None: |
| | |
| | if self.app_session and self.app_session.app: |
| | self.app_session.app._on_resize() |
| |
|
| | def data_received(self, data: str, datatype: object) -> None: |
| | if self._input is None: |
| | |
| | return |
| |
|
| | self._input.send_text(data) |
| |
|
| |
|
| | class PromptToolkitSSHServer(asyncssh.SSHServer): |
| | """ |
| | Run a prompt_toolkit application over an asyncssh server. |
| | |
| | This takes one argument, an `interact` function, which is called for each |
| | connection. This should be an asynchronous function that runs the |
| | prompt_toolkit applications. This function runs in an `AppSession`, which |
| | means that we can have multiple UI interactions concurrently. |
| | |
| | Example usage: |
| | |
| | .. code:: python |
| | |
| | async def interact(ssh_session: PromptToolkitSSHSession) -> None: |
| | await yes_no_dialog("my title", "my text").run_async() |
| | |
| | prompt_session = PromptSession() |
| | text = await prompt_session.prompt_async("Type something: ") |
| | print_formatted_text('You said: ', text) |
| | |
| | server = PromptToolkitSSHServer(interact=interact) |
| | loop = get_running_loop() |
| | loop.run_until_complete( |
| | asyncssh.create_server( |
| | lambda: MySSHServer(interact), |
| | "", |
| | port, |
| | server_host_keys=["/etc/ssh/..."], |
| | ) |
| | ) |
| | loop.run_forever() |
| | |
| | :param enable_cpr: When `True`, the default, try to detect whether the SSH |
| | client runs in a terminal that responds to "cursor position requests". |
| | That way, we can properly determine how much space there is available |
| | for the UI (especially for drop down menus) to render. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | interact: Callable[[PromptToolkitSSHSession], Coroutine[Any, Any, None]], |
| | *, |
| | enable_cpr: bool = True, |
| | ) -> None: |
| | self.interact = interact |
| | self.enable_cpr = enable_cpr |
| |
|
| | def begin_auth(self, username: str) -> bool: |
| | |
| | return False |
| |
|
| | def session_requested(self) -> PromptToolkitSSHSession: |
| | return PromptToolkitSSHSession(self.interact, enable_cpr=self.enable_cpr) |
| |
|