| | from __future__ import annotations |
| |
|
| | import os |
| | import signal |
| | import sys |
| | import threading |
| | from collections import deque |
| | from typing import ( |
| | Callable, |
| | ContextManager, |
| | Dict, |
| | Generator, |
| | Generic, |
| | TypeVar, |
| | Union, |
| | ) |
| |
|
| | from wcwidth import wcwidth |
| |
|
| | __all__ = [ |
| | "Event", |
| | "DummyContext", |
| | "get_cwidth", |
| | "suspend_to_background_supported", |
| | "is_conemu_ansi", |
| | "is_windows", |
| | "in_main_thread", |
| | "get_bell_environment_variable", |
| | "get_term_environment_variable", |
| | "take_using_weights", |
| | "to_str", |
| | "to_int", |
| | "AnyFloat", |
| | "to_float", |
| | "is_dumb_terminal", |
| | ] |
| |
|
| | |
| | |
| | SPHINX_AUTODOC_RUNNING = "sphinx.ext.autodoc" in sys.modules |
| |
|
| | _Sender = TypeVar("_Sender", covariant=True) |
| |
|
| |
|
| | class Event(Generic[_Sender]): |
| | """ |
| | Simple event to which event handlers can be attached. For instance:: |
| | |
| | class Cls: |
| | def __init__(self): |
| | # Define event. The first parameter is the sender. |
| | self.event = Event(self) |
| | |
| | obj = Cls() |
| | |
| | def handler(sender): |
| | pass |
| | |
| | # Add event handler by using the += operator. |
| | obj.event += handler |
| | |
| | # Fire event. |
| | obj.event() |
| | """ |
| |
|
| | def __init__( |
| | self, sender: _Sender, handler: Callable[[_Sender], None] | None = None |
| | ) -> None: |
| | self.sender = sender |
| | self._handlers: list[Callable[[_Sender], None]] = [] |
| |
|
| | if handler is not None: |
| | self += handler |
| |
|
| | def __call__(self) -> None: |
| | "Fire event." |
| | for handler in self._handlers: |
| | handler(self.sender) |
| |
|
| | def fire(self) -> None: |
| | "Alias for just calling the event." |
| | self() |
| |
|
| | def add_handler(self, handler: Callable[[_Sender], None]) -> None: |
| | """ |
| | Add another handler to this callback. |
| | (Handler should be a callable that takes exactly one parameter: the |
| | sender object.) |
| | """ |
| | |
| | self._handlers.append(handler) |
| |
|
| | def remove_handler(self, handler: Callable[[_Sender], None]) -> None: |
| | """ |
| | Remove a handler from this callback. |
| | """ |
| | if handler in self._handlers: |
| | self._handlers.remove(handler) |
| |
|
| | def __iadd__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: |
| | """ |
| | `event += handler` notation for adding a handler. |
| | """ |
| | self.add_handler(handler) |
| | return self |
| |
|
| | def __isub__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: |
| | """ |
| | `event -= handler` notation for removing a handler. |
| | """ |
| | self.remove_handler(handler) |
| | return self |
| |
|
| |
|
| | class DummyContext(ContextManager[None]): |
| | """ |
| | (contextlib.nested is not available on Py3) |
| | """ |
| |
|
| | def __enter__(self) -> None: |
| | pass |
| |
|
| | def __exit__(self, *a: object) -> None: |
| | pass |
| |
|
| |
|
| | class _CharSizesCache(Dict[str, int]): |
| | """ |
| | Cache for wcwidth sizes. |
| | """ |
| |
|
| | LONG_STRING_MIN_LEN = 64 |
| | MAX_LONG_STRINGS = 16 |
| |
|
| | def __init__(self) -> None: |
| | super().__init__() |
| | |
| | self._long_strings: deque[str] = deque() |
| |
|
| | def __missing__(self, string: str) -> int: |
| | |
| | |
| | |
| | |
| | result: int |
| | if len(string) == 1: |
| | result = max(0, wcwidth(string)) |
| | else: |
| | result = sum(self[c] for c in string) |
| |
|
| | |
| | self[string] = result |
| |
|
| | |
| | |
| | if len(string) > self.LONG_STRING_MIN_LEN: |
| | long_strings = self._long_strings |
| | long_strings.append(string) |
| |
|
| | if len(long_strings) > self.MAX_LONG_STRINGS: |
| | key_to_remove = long_strings.popleft() |
| | if key_to_remove in self: |
| | del self[key_to_remove] |
| |
|
| | return result |
| |
|
| |
|
| | _CHAR_SIZES_CACHE = _CharSizesCache() |
| |
|
| |
|
| | def get_cwidth(string: str) -> int: |
| | """ |
| | Return width of a string. Wrapper around ``wcwidth``. |
| | """ |
| | return _CHAR_SIZES_CACHE[string] |
| |
|
| |
|
| | def suspend_to_background_supported() -> bool: |
| | """ |
| | Returns `True` when the Python implementation supports |
| | suspend-to-background. This is typically `False' on Windows systems. |
| | """ |
| | return hasattr(signal, "SIGTSTP") |
| |
|
| |
|
| | def is_windows() -> bool: |
| | """ |
| | True when we are using Windows. |
| | """ |
| | return sys.platform == "win32" |
| |
|
| |
|
| | def is_windows_vt100_supported() -> bool: |
| | """ |
| | True when we are using Windows, but VT100 escape sequences are supported. |
| | """ |
| | if sys.platform == "win32": |
| | |
| | from prompt_toolkit.output.windows10 import is_win_vt100_enabled |
| |
|
| | return is_win_vt100_enabled() |
| |
|
| | return False |
| |
|
| |
|
| | def is_conemu_ansi() -> bool: |
| | """ |
| | True when the ConEmu Windows console is used. |
| | """ |
| | return sys.platform == "win32" and os.environ.get("ConEmuANSI", "OFF") == "ON" |
| |
|
| |
|
| | def in_main_thread() -> bool: |
| | """ |
| | True when the current thread is the main thread. |
| | """ |
| | return threading.current_thread().__class__.__name__ == "_MainThread" |
| |
|
| |
|
| | def get_bell_environment_variable() -> bool: |
| | """ |
| | True if env variable is set to true (true, TRUE, True, 1). |
| | """ |
| | value = os.environ.get("PROMPT_TOOLKIT_BELL", "true") |
| | return value.lower() in ("1", "true") |
| |
|
| |
|
| | def get_term_environment_variable() -> str: |
| | "Return the $TERM environment variable." |
| | return os.environ.get("TERM", "") |
| |
|
| |
|
| | _T = TypeVar("_T") |
| |
|
| |
|
| | def take_using_weights( |
| | items: list[_T], weights: list[int] |
| | ) -> Generator[_T, None, None]: |
| | """ |
| | Generator that keeps yielding items from the items list, in proportion to |
| | their weight. For instance:: |
| | |
| | # Getting the first 70 items from this generator should have yielded 10 |
| | # times A, 20 times B and 40 times C, all distributed equally.. |
| | take_using_weights(['A', 'B', 'C'], [5, 10, 20]) |
| | |
| | :param items: List of items to take from. |
| | :param weights: Integers representing the weight. (Numbers have to be |
| | integers, not floats.) |
| | """ |
| | assert len(items) == len(weights) |
| | assert len(items) > 0 |
| |
|
| | |
| | items2 = [] |
| | weights2 = [] |
| | for item, w in zip(items, weights): |
| | if w > 0: |
| | items2.append(item) |
| | weights2.append(w) |
| |
|
| | items = items2 |
| | weights = weights2 |
| |
|
| | |
| | if not items: |
| | raise ValueError("Did't got any items with a positive weight.") |
| |
|
| | |
| | already_taken = [0 for i in items] |
| | item_count = len(items) |
| | max_weight = max(weights) |
| |
|
| | i = 0 |
| | while True: |
| | |
| | adding = True |
| | while adding: |
| | adding = False |
| |
|
| | for item_i, item, weight in zip(range(item_count), items, weights): |
| | if already_taken[item_i] < i * weight / float(max_weight): |
| | yield item |
| | already_taken[item_i] += 1 |
| | adding = True |
| |
|
| | i += 1 |
| |
|
| |
|
| | def to_str(value: Callable[[], str] | str) -> str: |
| | "Turn callable or string into string." |
| | if callable(value): |
| | return to_str(value()) |
| | else: |
| | return str(value) |
| |
|
| |
|
| | def to_int(value: Callable[[], int] | int) -> int: |
| | "Turn callable or int into int." |
| | if callable(value): |
| | return to_int(value()) |
| | else: |
| | return int(value) |
| |
|
| |
|
| | AnyFloat = Union[Callable[[], float], float] |
| |
|
| |
|
| | def to_float(value: AnyFloat) -> float: |
| | "Turn callable or float into float." |
| | if callable(value): |
| | return to_float(value()) |
| | else: |
| | return float(value) |
| |
|
| |
|
| | def is_dumb_terminal(term: str | None = None) -> bool: |
| | """ |
| | True if this terminal type is considered "dumb". |
| | |
| | If so, we should fall back to the simplest possible form of line editing, |
| | without cursor positioning and color support. |
| | """ |
| | if term is None: |
| | return is_dumb_terminal(os.environ.get("TERM", "")) |
| |
|
| | return term.lower() in ["dumb", "unknown"] |
| |
|