| | import re |
| | import tokenize |
| | from io import StringIO |
| | from typing import Callable, List, Optional, Union, Generator, Tuple |
| | import warnings |
| |
|
| | from prompt_toolkit.buffer import Buffer |
| | from prompt_toolkit.key_binding import KeyPressEvent |
| | from prompt_toolkit.key_binding.bindings import named_commands as nc |
| | from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion |
| | from prompt_toolkit.document import Document |
| | from prompt_toolkit.history import History |
| | from prompt_toolkit.shortcuts import PromptSession |
| | from prompt_toolkit.layout.processors import ( |
| | Processor, |
| | Transformation, |
| | TransformationInput, |
| | ) |
| |
|
| | from IPython.core.getipython import get_ipython |
| | from IPython.utils.tokenutil import generate_tokens |
| |
|
| | from .filters import pass_through |
| |
|
| |
|
| | def _get_query(document: Document): |
| | return document.lines[document.cursor_position_row] |
| |
|
| |
|
| | class AppendAutoSuggestionInAnyLine(Processor): |
| | """ |
| | Append the auto suggestion to lines other than the last (appending to the |
| | last line is natively supported by the prompt toolkit). |
| | """ |
| |
|
| | def __init__(self, style: str = "class:auto-suggestion") -> None: |
| | self.style = style |
| |
|
| | def apply_transformation(self, ti: TransformationInput) -> Transformation: |
| | is_last_line = ti.lineno == ti.document.line_count - 1 |
| | is_active_line = ti.lineno == ti.document.cursor_position_row |
| |
|
| | if not is_last_line and is_active_line: |
| | buffer = ti.buffer_control.buffer |
| |
|
| | if buffer.suggestion and ti.document.is_cursor_at_the_end_of_line: |
| | suggestion = buffer.suggestion.text |
| | else: |
| | suggestion = "" |
| |
|
| | return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) |
| | else: |
| | return Transformation(fragments=ti.fragments) |
| |
|
| |
|
| | class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): |
| | """ |
| | A subclass of AutoSuggestFromHistory that allow navigation to next/previous |
| | suggestion from history. To do so it remembers the current position, but it |
| | state need to carefully be cleared on the right events. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | ): |
| | self.skip_lines = 0 |
| | self._connected_apps = [] |
| |
|
| | def reset_history_position(self, _: Buffer): |
| | self.skip_lines = 0 |
| |
|
| | def disconnect(self): |
| | for pt_app in self._connected_apps: |
| | text_insert_event = pt_app.default_buffer.on_text_insert |
| | text_insert_event.remove_handler(self.reset_history_position) |
| |
|
| | def connect(self, pt_app: PromptSession): |
| | self._connected_apps.append(pt_app) |
| | |
| | |
| | pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position) |
| | pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss) |
| |
|
| | def get_suggestion( |
| | self, buffer: Buffer, document: Document |
| | ) -> Optional[Suggestion]: |
| | text = _get_query(document) |
| |
|
| | if text.strip(): |
| | for suggestion, _ in self._find_next_match( |
| | text, self.skip_lines, buffer.history |
| | ): |
| | return Suggestion(suggestion) |
| |
|
| | return None |
| |
|
| | def _dismiss(self, buffer, *args, **kwargs): |
| | buffer.suggestion = None |
| |
|
| | def _find_match( |
| | self, text: str, skip_lines: float, history: History, previous: bool |
| | ) -> Generator[Tuple[str, float], None, None]: |
| | """ |
| | text : str |
| | Text content to find a match for, the user cursor is most of the |
| | time at the end of this text. |
| | skip_lines : float |
| | number of items to skip in the search, this is used to indicate how |
| | far in the list the user has navigated by pressing up or down. |
| | The float type is used as the base value is +inf |
| | history : History |
| | prompt_toolkit History instance to fetch previous entries from. |
| | previous : bool |
| | Direction of the search, whether we are looking previous match |
| | (True), or next match (False). |
| | |
| | Yields |
| | ------ |
| | Tuple with: |
| | str: |
| | current suggestion. |
| | float: |
| | will actually yield only ints, which is passed back via skip_lines, |
| | which may be a +inf (float) |
| | |
| | |
| | """ |
| | line_number = -1 |
| | for string in reversed(list(history.get_strings())): |
| | for line in reversed(string.splitlines()): |
| | line_number += 1 |
| | if not previous and line_number < skip_lines: |
| | continue |
| | |
| | |
| | if line.startswith(text) and len(line) > len(text): |
| | yield line[len(text) :], line_number |
| | if previous and line_number >= skip_lines: |
| | return |
| |
|
| | def _find_next_match( |
| | self, text: str, skip_lines: float, history: History |
| | ) -> Generator[Tuple[str, float], None, None]: |
| | return self._find_match(text, skip_lines, history, previous=False) |
| |
|
| | def _find_previous_match(self, text: str, skip_lines: float, history: History): |
| | return reversed( |
| | list(self._find_match(text, skip_lines, history, previous=True)) |
| | ) |
| |
|
| | def up(self, query: str, other_than: str, history: History) -> None: |
| | for suggestion, line_number in self._find_next_match( |
| | query, self.skip_lines, history |
| | ): |
| | |
| | |
| | |
| | |
| | |
| | if query + suggestion != other_than: |
| | self.skip_lines = line_number |
| | break |
| | else: |
| | |
| | self.skip_lines = 0 |
| |
|
| | def down(self, query: str, other_than: str, history: History) -> None: |
| | for suggestion, line_number in self._find_previous_match( |
| | query, self.skip_lines, history |
| | ): |
| | if query + suggestion != other_than: |
| | self.skip_lines = line_number |
| | break |
| | else: |
| | |
| | for suggestion, line_number in self._find_previous_match( |
| | query, float("Inf"), history |
| | ): |
| | if query + suggestion != other_than: |
| | self.skip_lines = line_number |
| | break |
| |
|
| |
|
| | def accept_or_jump_to_end(event: KeyPressEvent): |
| | """Apply autosuggestion or jump to end of line.""" |
| | buffer = event.current_buffer |
| | d = buffer.document |
| | after_cursor = d.text[d.cursor_position :] |
| | lines = after_cursor.split("\n") |
| | end_of_current_line = lines[0].strip() |
| | suggestion = buffer.suggestion |
| | if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""): |
| | buffer.insert_text(suggestion.text) |
| | else: |
| | nc.end_of_line(event) |
| |
|
| |
|
| | def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent): |
| | """Accept autosuggestion or jump to end of line. |
| | |
| | .. deprecated:: 8.12 |
| | Use `accept_or_jump_to_end` instead. |
| | """ |
| | return accept_or_jump_to_end(event) |
| |
|
| |
|
| | def accept(event: KeyPressEvent): |
| | """Accept autosuggestion""" |
| | buffer = event.current_buffer |
| | suggestion = buffer.suggestion |
| | if suggestion: |
| | buffer.insert_text(suggestion.text) |
| | else: |
| | nc.forward_char(event) |
| |
|
| |
|
| | def discard(event: KeyPressEvent): |
| | """Discard autosuggestion""" |
| | buffer = event.current_buffer |
| | buffer.suggestion = None |
| |
|
| |
|
| | def accept_word(event: KeyPressEvent): |
| | """Fill partial autosuggestion by word""" |
| | buffer = event.current_buffer |
| | suggestion = buffer.suggestion |
| | if suggestion: |
| | t = re.split(r"(\S+\s+)", suggestion.text) |
| | buffer.insert_text(next((x for x in t if x), "")) |
| | else: |
| | nc.forward_word(event) |
| |
|
| |
|
| | def accept_character(event: KeyPressEvent): |
| | """Fill partial autosuggestion by character""" |
| | b = event.current_buffer |
| | suggestion = b.suggestion |
| | if suggestion and suggestion.text: |
| | b.insert_text(suggestion.text[0]) |
| |
|
| |
|
| | def accept_and_keep_cursor(event: KeyPressEvent): |
| | """Accept autosuggestion and keep cursor in place""" |
| | buffer = event.current_buffer |
| | old_position = buffer.cursor_position |
| | suggestion = buffer.suggestion |
| | if suggestion: |
| | buffer.insert_text(suggestion.text) |
| | buffer.cursor_position = old_position |
| |
|
| |
|
| | def accept_and_move_cursor_left(event: KeyPressEvent): |
| | """Accept autosuggestion and move cursor left in place""" |
| | accept_and_keep_cursor(event) |
| | nc.backward_char(event) |
| |
|
| |
|
| | def _update_hint(buffer: Buffer): |
| | if buffer.auto_suggest: |
| | suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document) |
| | buffer.suggestion = suggestion |
| |
|
| |
|
| | def backspace_and_resume_hint(event: KeyPressEvent): |
| | """Resume autosuggestions after deleting last character""" |
| | nc.backward_delete_char(event) |
| | _update_hint(event.current_buffer) |
| |
|
| |
|
| | def resume_hinting(event: KeyPressEvent): |
| | """Resume autosuggestions""" |
| | pass_through.reply(event) |
| | |
| | |
| | _update_hint(event.current_buffer) |
| |
|
| |
|
| | def up_and_update_hint(event: KeyPressEvent): |
| | """Go up and update hint""" |
| | current_buffer = event.current_buffer |
| |
|
| | current_buffer.auto_up(count=event.arg) |
| | _update_hint(current_buffer) |
| |
|
| |
|
| | def down_and_update_hint(event: KeyPressEvent): |
| | """Go down and update hint""" |
| | current_buffer = event.current_buffer |
| |
|
| | current_buffer.auto_down(count=event.arg) |
| | _update_hint(current_buffer) |
| |
|
| |
|
| | def accept_token(event: KeyPressEvent): |
| | """Fill partial autosuggestion by token""" |
| | b = event.current_buffer |
| | suggestion = b.suggestion |
| |
|
| | if suggestion: |
| | prefix = _get_query(b.document) |
| | text = prefix + suggestion.text |
| |
|
| | tokens: List[Optional[str]] = [None, None, None] |
| | substrings = [""] |
| | i = 0 |
| |
|
| | for token in generate_tokens(StringIO(text).readline): |
| | if token.type == tokenize.NEWLINE: |
| | index = len(text) |
| | else: |
| | index = text.index(token[1], len(substrings[-1])) |
| | substrings.append(text[:index]) |
| | tokenized_so_far = substrings[-1] |
| | if tokenized_so_far.startswith(prefix): |
| | if i == 0 and len(tokenized_so_far) > len(prefix): |
| | tokens[0] = tokenized_so_far[len(prefix) :] |
| | substrings.append(tokenized_so_far) |
| | i += 1 |
| | tokens[i] = token[1] |
| | if i == 2: |
| | break |
| | i += 1 |
| |
|
| | if tokens[0]: |
| | to_insert: str |
| | insert_text = substrings[-2] |
| | if tokens[1] and len(tokens[1]) == 1: |
| | insert_text = substrings[-1] |
| | to_insert = insert_text[len(prefix) :] |
| | b.insert_text(to_insert) |
| | return |
| |
|
| | nc.forward_word(event) |
| |
|
| |
|
| | Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None] |
| |
|
| |
|
| | def _swap_autosuggestion( |
| | buffer: Buffer, |
| | provider: NavigableAutoSuggestFromHistory, |
| | direction_method: Callable, |
| | ): |
| | """ |
| | We skip most recent history entry (in either direction) if it equals the |
| | current autosuggestion because if user cycles when auto-suggestion is shown |
| | they most likely want something else than what was suggested (otherwise |
| | they would have accepted the suggestion). |
| | """ |
| | suggestion = buffer.suggestion |
| | if not suggestion: |
| | return |
| |
|
| | query = _get_query(buffer.document) |
| | current = query + suggestion.text |
| |
|
| | direction_method(query=query, other_than=current, history=buffer.history) |
| |
|
| | new_suggestion = provider.get_suggestion(buffer, buffer.document) |
| | buffer.suggestion = new_suggestion |
| |
|
| |
|
| | def swap_autosuggestion_up(event: KeyPressEvent): |
| | """Get next autosuggestion from history.""" |
| | shell = get_ipython() |
| | provider = shell.auto_suggest |
| |
|
| | if not isinstance(provider, NavigableAutoSuggestFromHistory): |
| | return |
| |
|
| | return _swap_autosuggestion( |
| | buffer=event.current_buffer, provider=provider, direction_method=provider.up |
| | ) |
| |
|
| |
|
| | def swap_autosuggestion_down(event: KeyPressEvent): |
| | """Get previous autosuggestion from history.""" |
| | shell = get_ipython() |
| | provider = shell.auto_suggest |
| |
|
| | if not isinstance(provider, NavigableAutoSuggestFromHistory): |
| | return |
| |
|
| | return _swap_autosuggestion( |
| | buffer=event.current_buffer, |
| | provider=provider, |
| | direction_method=provider.down, |
| | ) |
| |
|
| |
|
| | def __getattr__(key): |
| | if key == "accept_in_vi_insert_mode": |
| | warnings.warn( |
| | "`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and " |
| | "renamed to `accept_or_jump_to_end`. Please update your configuration " |
| | "accordingly", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| | return _deprected_accept_in_vi_insert_mode |
| | raise AttributeError |
| |
|