Spaces:
Sleeping
Sleeping
| """Contains the base class :class:`.BaseSimplePrompt`.""" | |
| import os | |
| import re | |
| from abc import ABC, abstractmethod | |
| from typing import ( | |
| TYPE_CHECKING, | |
| Any, | |
| Callable, | |
| Dict, | |
| List, | |
| Optional, | |
| Tuple, | |
| Union, | |
| cast, | |
| ) | |
| from prompt_toolkit.enums import EditingMode | |
| from prompt_toolkit.filters.base import Condition, FilterOrBool | |
| from prompt_toolkit.key_binding.key_bindings import KeyBindings, KeyHandlerCallable | |
| from prompt_toolkit.keys import Keys | |
| from prompt_toolkit.styles.style import Style | |
| from prompt_toolkit.validation import Validator | |
| from InquirerPy.enum import INQUIRERPY_KEYBOARD_INTERRUPT | |
| from InquirerPy.exceptions import RequiredKeyNotFound | |
| from InquirerPy.utils import ( | |
| InquirerPyMessage, | |
| InquirerPySessionResult, | |
| InquirerPyStyle, | |
| InquirerPyValidate, | |
| get_style, | |
| ) | |
| if TYPE_CHECKING: | |
| from prompt_toolkit.key_binding.key_processor import KeyPressEvent | |
| class BaseSimplePrompt(ABC): | |
| """The base class to create a simple terminal input prompt. | |
| Note: | |
| No actual :class:`~prompt_toolkit.application.Application` is created by this class. | |
| This class only creates some common interface and attributes that can be easily used | |
| by `prompt_toolkit`. | |
| To have a functional prompt, you'll at least have to implement the :meth:`.BaseSimplePrompt._run` | |
| and :meth:`.BaseSimplePrompt._get_prompt_message`. | |
| See Also: | |
| :class:`~InquirerPy.prompts.input.InputPrompt` | |
| """ | |
| def __init__( | |
| self, | |
| message: InquirerPyMessage, | |
| style: Optional[InquirerPyStyle] = None, | |
| vi_mode: bool = False, | |
| qmark: str = "?", | |
| amark: str = "?", | |
| instruction: str = "", | |
| validate: Optional[InquirerPyValidate] = None, | |
| invalid_message: str = "Invalid input", | |
| transformer: Optional[Callable[[Any], Any]] = None, | |
| filter: Optional[Callable[[Any], Any]] = None, | |
| default: Any = "", | |
| wrap_lines: bool = True, | |
| raise_keyboard_interrupt: bool = True, | |
| mandatory: bool = True, | |
| mandatory_message: str = "Mandatory prompt", | |
| session_result: Optional[InquirerPySessionResult] = None, | |
| ) -> None: | |
| self._mandatory = mandatory | |
| self._mandatory_message = mandatory_message | |
| self._result = session_result or {} | |
| self._message = ( | |
| message | |
| if not isinstance(message, Callable) | |
| else cast(Callable, message)(self._result) | |
| ) | |
| self._instruction = instruction | |
| self._default = ( | |
| default if not isinstance(default, Callable) else default(self._result) | |
| ) | |
| self._style = Style.from_dict(style.dict if style else get_style().dict) | |
| self._qmark = qmark | |
| self._amark = amark | |
| self._status = {"answered": False, "result": None, "skipped": False} | |
| self._kb = KeyBindings() | |
| self._lexer = "class:input" | |
| self._transformer = transformer | |
| self._filter = filter | |
| self._wrap_lines = wrap_lines | |
| self._editing_mode = ( | |
| EditingMode.VI | |
| if vi_mode or bool(os.getenv("INQUIRERPY_VI_MODE", False)) | |
| else EditingMode.EMACS | |
| ) | |
| if isinstance(validate, Validator): | |
| self._validator = validate | |
| else: | |
| self._validator = Validator.from_callable( | |
| validate if validate else lambda _: True, | |
| invalid_message, | |
| move_cursor_to_end=True, | |
| ) | |
| self._raise_kbi = not os.getenv( | |
| "INQUIRERPY_NO_RAISE_KBI", not raise_keyboard_interrupt | |
| ) | |
| self._is_rasing_kbi = Condition(lambda: self._raise_kbi) | |
| self._kb_maps = { | |
| "answer": [{"key": Keys.Enter}], | |
| "interrupt": [ | |
| {"key": "c-c", "filter": self._is_rasing_kbi}, | |
| {"key": "c-d", "filter": ~self._is_rasing_kbi}, | |
| ], | |
| "skip": [{"key": "c-z"}, {"key": "c-c", "filter": ~self._is_rasing_kbi}], | |
| } | |
| self._kb_func_lookup = { | |
| "answer": [{"func": self._handle_enter}], | |
| "interrupt": [{"func": self._handle_interrupt}], | |
| "skip": [{"func": self._handle_skip}], | |
| } | |
| def _keybinding_factory(self): | |
| """Register all keybindings in `self._kb_maps`. | |
| It's required to call this function at the end of prompt constructor if | |
| it inherits from :class:`~InquirerPy.base.simple.BaseSimplePrompt` or | |
| :class:`~InquirerPy.base.complex.BaseComplexPrompt`. | |
| """ | |
| def _factory(keys, filter, action): | |
| if action not in self.kb_func_lookup: | |
| raise RequiredKeyNotFound(f"keybinding action {action} not found") | |
| if not isinstance(keys, list): | |
| keys = [keys] | |
| def _(event): | |
| for method in self.kb_func_lookup[action]: | |
| method["func"](event, *method.get("args", [])) | |
| for key, item in self.kb_maps.items(): | |
| if not isinstance(item, list): | |
| item = [item] | |
| for kb in item: | |
| _factory(kb["key"], kb.get("filter", Condition(lambda: True)), key) | |
| def _set_error(self, message: str) -> None: | |
| """Set the error message for the prompt. | |
| Args: | |
| message: Error message to set. | |
| """ | |
| pass | |
| def _handle_skip(self, event: Optional["KeyPressEvent"]) -> None: | |
| """Handle the event when attempting to skip a prompt. | |
| Skip the prompt if the `_mandatory` field is False, otherwise | |
| show an error message that the prompt cannot be skipped. | |
| """ | |
| if not self._mandatory: | |
| self.status["answered"] = True | |
| self.status["skipped"] = True | |
| self.status["result"] = None | |
| if event: | |
| event.app.exit(result=None) | |
| else: | |
| self._set_error(message=self._mandatory_message) | |
| def _handle_interrupt(self, event: Optional["KeyPressEvent"]) -> None: | |
| """Handle the event when a KeyboardInterrupt signal is sent.""" | |
| self.status["answered"] = True | |
| self.status["result"] = INQUIRERPY_KEYBOARD_INTERRUPT | |
| self.status["skipped"] = True | |
| if event: | |
| event.app.exit(result=INQUIRERPY_KEYBOARD_INTERRUPT) | |
| def _handle_enter(self, event: Optional["KeyPressEvent"]) -> None: | |
| """Handle the event when user attempt to answer the question.""" | |
| pass | |
| def status(self) -> Dict[str, Any]: | |
| """Dict[str, Any]: Get current prompt status. | |
| The status contains 3 keys: "answered" and "result". | |
| answered: If the current prompt is answered. | |
| result: The result of the user answer. | |
| skipped: If the prompt is skipped. | |
| """ | |
| return self._status | |
| def status(self, value) -> None: | |
| self._status = value | |
| def register_kb( | |
| self, *keys: Union[Keys, str], filter: FilterOrBool = True, **kwargs | |
| ) -> Callable[[KeyHandlerCallable], KeyHandlerCallable]: | |
| """Keybinding registration decorator. | |
| This decorator wraps around the :meth:`prompt_toolkit.key_binding.KeyBindings.add` with | |
| added feature to process `alt` realted keybindings. | |
| By default, `prompt_toolkit` doesn't process `alt` related keybindings, | |
| it requires `alt-ANY` to `escape` + `ANY`. | |
| Args: | |
| keys: The keys to bind that can trigger the function. | |
| filter: :class:`~prompt_toolkit.filter.Condition` to indicate if this keybinding should be active. | |
| Returns: | |
| A decorator that should be applied to the function thats intended to be active when the keys | |
| are pressed. | |
| Examples: | |
| >>> @self.register_kb("alt-j") | |
| ... def test(event): | |
| ... pass | |
| """ | |
| alt_pattern = re.compile(r"^alt-(.*)") | |
| def decorator(func: KeyHandlerCallable) -> KeyHandlerCallable: | |
| formatted_keys = [] | |
| for key in keys: | |
| match = alt_pattern.match(key) | |
| if match: | |
| formatted_keys.append("escape") | |
| formatted_keys.append(match.group(1)) | |
| else: | |
| formatted_keys.append(key) | |
| def executable(event) -> None: | |
| func(event) | |
| return executable | |
| return decorator | |
| def _get_prompt_message( | |
| self, pre_answer: Tuple[str, str], post_answer: Tuple[str, str] | |
| ) -> List[Tuple[str, str]]: | |
| """Get the question message in formatted text form to display in the prompt. | |
| This function is mainly used to render the question message dynamically based | |
| on the current status (answered or not answered) of the prompt. | |
| Note: | |
| The function requires implementation when inheriting :class:`.BaseSimplePrompt`. | |
| You should call `super()._get_prompt_message(pre_answer, post_answer)` in | |
| the implemented `_get_prompt_message`. | |
| Args: | |
| pre_answer: The message to display before the question is answered. | |
| post_answer: The information to display after the question is answered. | |
| Returns: | |
| Formatted text in list of tuple format. | |
| """ | |
| display_message = [] | |
| if self.status["skipped"]: | |
| display_message.append(("class:skipped", self._qmark)) | |
| display_message.append( | |
| ("class:skipped", "%s%s " % (" " if self._qmark else "", self._message)) | |
| ) | |
| elif self.status["answered"]: | |
| display_message.append(("class:answermark", self._amark)) | |
| display_message.append( | |
| ( | |
| "class:answered_question", | |
| "%s%s" % (" " if self._amark else "", self._message), | |
| ) | |
| ) | |
| display_message.append( | |
| post_answer | |
| if not self._transformer | |
| else ( | |
| "class:answer", | |
| " %s" % self._transformer(self.status["result"]), | |
| ) | |
| ) | |
| else: | |
| display_message.append(("class:questionmark", self._qmark)) | |
| display_message.append( | |
| ( | |
| "class:question", | |
| "%s%s" % (" " if self._qmark else "", self._message), | |
| ) | |
| ) | |
| display_message.append(pre_answer) | |
| return display_message | |
| def _run(self) -> Any: | |
| """Abstractmethod to enforce a run function is implemented. | |
| All prompt instance requires a `_run` call to initialise and run an instance of | |
| `PromptSession` or `Application`. | |
| """ | |
| pass | |
| async def _run_async(self) -> Any: | |
| """Abstractmethod to enforce a run function is implemented. | |
| All prompt instance requires a `_run_async` call to initialise and run an instance of | |
| `PromptSession` or `Application`. | |
| """ | |
| pass | |
| def execute(self, raise_keyboard_interrupt: Optional[bool] = None) -> Any: | |
| """Run the prompt and get the result. | |
| Args: | |
| raise_keyboard_interrupt: **Deprecated**. Set this parameter on the prompt initialisation instead. | |
| Returns: | |
| Value of the user answer. Types varies depending on the prompt. | |
| Raises: | |
| KeyboardInterrupt: When `ctrl-c` is pressed and `raise_keyboard_interrupt` is True. | |
| """ | |
| result = self._run() | |
| if raise_keyboard_interrupt is not None: | |
| self._raise_kbi = not os.getenv( | |
| "INQUIRERPY_NO_RAISE_KBI", not raise_keyboard_interrupt | |
| ) | |
| if result == INQUIRERPY_KEYBOARD_INTERRUPT: | |
| raise KeyboardInterrupt | |
| if not self._filter: | |
| return result | |
| return self._filter(result) | |
| async def execute_async(self) -> None: | |
| """Run the prompt asynchronously and get the result. | |
| Returns: | |
| Value of the user answer. Types varies depending on the prompt. | |
| Raises: | |
| KeyboardInterrupt: When `ctrl-c` is pressed and `raise_keyboard_interrupt` is True. | |
| """ | |
| result = await self._run_async() | |
| if result == INQUIRERPY_KEYBOARD_INTERRUPT: | |
| raise KeyboardInterrupt | |
| if not self._filter: | |
| return result | |
| return self._filter(result) | |
| def instruction(self) -> str: | |
| """str: Instruction to display next to question.""" | |
| return self._instruction | |
| def kb_maps(self) -> Dict[str, Any]: | |
| """Dict[str, Any]: Keybinding mappings.""" | |
| return self._kb_maps | |
| def kb_maps(self, value: Dict[str, Any]) -> None: | |
| self._kb_maps = {**self._kb_maps, **value} | |
| def kb_func_lookup(self) -> Dict[str, Any]: | |
| """Dict[str, Any]: Keybinding function lookup mappings..""" | |
| return self._kb_func_lookup | |
| def kb_func_lookup(self, value: Dict[str, Any]) -> None: | |
| self._kb_func_lookup = {**self._kb_func_lookup, **value} | |