| """
|
| This module contains implementations for the termui module. To keep the
|
| import time of Click down, some infrequently used functionality is
|
| placed in this module and only imported as needed.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import collections.abc as cabc
|
| import contextlib
|
| import math
|
| import os
|
| import shlex
|
| import sys
|
| import time
|
| import typing as t
|
| from gettext import gettext as _
|
| from io import StringIO
|
| from pathlib import Path
|
| from types import TracebackType
|
|
|
| from ._compat import _default_text_stdout
|
| from ._compat import CYGWIN
|
| from ._compat import get_best_encoding
|
| from ._compat import isatty
|
| from ._compat import open_stream
|
| from ._compat import strip_ansi
|
| from ._compat import term_len
|
| from ._compat import WIN
|
| from .exceptions import ClickException
|
| from .utils import echo
|
|
|
| V = t.TypeVar("V")
|
|
|
| if os.name == "nt":
|
| BEFORE_BAR = "\r"
|
| AFTER_BAR = "\n"
|
| else:
|
| BEFORE_BAR = "\r\033[?25l"
|
| AFTER_BAR = "\033[?25h\n"
|
|
|
|
|
| class ProgressBar(t.Generic[V]):
|
| def __init__(
|
| self,
|
| iterable: cabc.Iterable[V] | None,
|
| length: int | None = None,
|
| fill_char: str = "#",
|
| empty_char: str = " ",
|
| bar_template: str = "%(bar)s",
|
| info_sep: str = " ",
|
| hidden: bool = False,
|
| show_eta: bool = True,
|
| show_percent: bool | None = None,
|
| show_pos: bool = False,
|
| item_show_func: t.Callable[[V | None], str | None] | None = None,
|
| label: str | None = None,
|
| file: t.TextIO | None = None,
|
| color: bool | None = None,
|
| update_min_steps: int = 1,
|
| width: int = 30,
|
| ) -> None:
|
| self.fill_char = fill_char
|
| self.empty_char = empty_char
|
| self.bar_template = bar_template
|
| self.info_sep = info_sep
|
| self.hidden = hidden
|
| self.show_eta = show_eta
|
| self.show_percent = show_percent
|
| self.show_pos = show_pos
|
| self.item_show_func = item_show_func
|
| self.label: str = label or ""
|
|
|
| if file is None:
|
| file = _default_text_stdout()
|
|
|
|
|
|
|
| if file is None:
|
| file = StringIO()
|
|
|
| self.file = file
|
| self.color = color
|
| self.update_min_steps = update_min_steps
|
| self._completed_intervals = 0
|
| self.width: int = width
|
| self.autowidth: bool = width == 0
|
|
|
| if length is None:
|
| from operator import length_hint
|
|
|
| length = length_hint(iterable, -1)
|
|
|
| if length == -1:
|
| length = None
|
| if iterable is None:
|
| if length is None:
|
| raise TypeError("iterable or length is required")
|
| iterable = t.cast("cabc.Iterable[V]", range(length))
|
| self.iter: cabc.Iterable[V] = iter(iterable)
|
| self.length = length
|
| self.pos: int = 0
|
| self.avg: list[float] = []
|
| self.last_eta: float
|
| self.start: float
|
| self.start = self.last_eta = time.time()
|
| self.eta_known: bool = False
|
| self.finished: bool = False
|
| self.max_width: int | None = None
|
| self.entered: bool = False
|
| self.current_item: V | None = None
|
| self._is_atty = isatty(self.file)
|
| self._last_line: str | None = None
|
|
|
| def __enter__(self) -> ProgressBar[V]:
|
| self.entered = True
|
| self.render_progress()
|
| return self
|
|
|
| def __exit__(
|
| self,
|
| exc_type: type[BaseException] | None,
|
| exc_value: BaseException | None,
|
| tb: TracebackType | None,
|
| ) -> None:
|
| self.render_finish()
|
|
|
| def __iter__(self) -> cabc.Iterator[V]:
|
| if not self.entered:
|
| raise RuntimeError("You need to use progress bars in a with block.")
|
| self.render_progress()
|
| return self.generator()
|
|
|
| def __next__(self) -> V:
|
|
|
|
|
|
|
|
|
|
|
| return next(iter(self))
|
|
|
| def render_finish(self) -> None:
|
| if self.hidden or not self._is_atty:
|
| return
|
| self.file.write(AFTER_BAR)
|
| self.file.flush()
|
|
|
| @property
|
| def pct(self) -> float:
|
| if self.finished:
|
| return 1.0
|
| return min(self.pos / (float(self.length or 1) or 1), 1.0)
|
|
|
| @property
|
| def time_per_iteration(self) -> float:
|
| if not self.avg:
|
| return 0.0
|
| return sum(self.avg) / float(len(self.avg))
|
|
|
| @property
|
| def eta(self) -> float:
|
| if self.length is not None and not self.finished:
|
| return self.time_per_iteration * (self.length - self.pos)
|
| return 0.0
|
|
|
| def format_eta(self) -> str:
|
| if self.eta_known:
|
| t = int(self.eta)
|
| seconds = t % 60
|
| t //= 60
|
| minutes = t % 60
|
| t //= 60
|
| hours = t % 24
|
| t //= 24
|
| if t > 0:
|
| return f"{t}d {hours:02}:{minutes:02}:{seconds:02}"
|
| else:
|
| return f"{hours:02}:{minutes:02}:{seconds:02}"
|
| return ""
|
|
|
| def format_pos(self) -> str:
|
| pos = str(self.pos)
|
| if self.length is not None:
|
| pos += f"/{self.length}"
|
| return pos
|
|
|
| def format_pct(self) -> str:
|
| return f"{int(self.pct * 100): 4}%"[1:]
|
|
|
| def format_bar(self) -> str:
|
| if self.length is not None:
|
| bar_length = int(self.pct * self.width)
|
| bar = self.fill_char * bar_length
|
| bar += self.empty_char * (self.width - bar_length)
|
| elif self.finished:
|
| bar = self.fill_char * self.width
|
| else:
|
| chars = list(self.empty_char * (self.width or 1))
|
| if self.time_per_iteration != 0:
|
| chars[
|
| int(
|
| (math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5)
|
| * self.width
|
| )
|
| ] = self.fill_char
|
| bar = "".join(chars)
|
| return bar
|
|
|
| def format_progress_line(self) -> str:
|
| show_percent = self.show_percent
|
|
|
| info_bits = []
|
| if self.length is not None and show_percent is None:
|
| show_percent = not self.show_pos
|
|
|
| if self.show_pos:
|
| info_bits.append(self.format_pos())
|
| if show_percent:
|
| info_bits.append(self.format_pct())
|
| if self.show_eta and self.eta_known and not self.finished:
|
| info_bits.append(self.format_eta())
|
| if self.item_show_func is not None:
|
| item_info = self.item_show_func(self.current_item)
|
| if item_info is not None:
|
| info_bits.append(item_info)
|
|
|
| return (
|
| self.bar_template
|
| % {
|
| "label": self.label,
|
| "bar": self.format_bar(),
|
| "info": self.info_sep.join(info_bits),
|
| }
|
| ).rstrip()
|
|
|
| def render_progress(self) -> None:
|
| if self.hidden:
|
| return
|
|
|
| if not self._is_atty:
|
|
|
| if self._last_line != self.label:
|
| self._last_line = self.label
|
| echo(self.label, file=self.file, color=self.color)
|
| return
|
|
|
| buf = []
|
|
|
| if self.autowidth:
|
| import shutil
|
|
|
| old_width = self.width
|
| self.width = 0
|
| clutter_length = term_len(self.format_progress_line())
|
| new_width = max(0, shutil.get_terminal_size().columns - clutter_length)
|
| if new_width < old_width and self.max_width is not None:
|
| buf.append(BEFORE_BAR)
|
| buf.append(" " * self.max_width)
|
| self.max_width = new_width
|
| self.width = new_width
|
|
|
| clear_width = self.width
|
| if self.max_width is not None:
|
| clear_width = self.max_width
|
|
|
| buf.append(BEFORE_BAR)
|
| line = self.format_progress_line()
|
| line_len = term_len(line)
|
| if self.max_width is None or self.max_width < line_len:
|
| self.max_width = line_len
|
|
|
| buf.append(line)
|
| buf.append(" " * (clear_width - line_len))
|
| line = "".join(buf)
|
|
|
|
|
| if line != self._last_line:
|
| self._last_line = line
|
| echo(line, file=self.file, color=self.color, nl=False)
|
| self.file.flush()
|
|
|
| def make_step(self, n_steps: int) -> None:
|
| self.pos += n_steps
|
| if self.length is not None and self.pos >= self.length:
|
| self.finished = True
|
|
|
| if (time.time() - self.last_eta) < 1.0:
|
| return
|
|
|
| self.last_eta = time.time()
|
|
|
|
|
|
|
|
|
| if self.pos:
|
| step = (time.time() - self.start) / self.pos
|
| else:
|
| step = time.time() - self.start
|
|
|
| self.avg = self.avg[-6:] + [step]
|
|
|
| self.eta_known = self.length is not None
|
|
|
| def update(self, n_steps: int, current_item: V | None = None) -> None:
|
| """Update the progress bar by advancing a specified number of
|
| steps, and optionally set the ``current_item`` for this new
|
| position.
|
|
|
| :param n_steps: Number of steps to advance.
|
| :param current_item: Optional item to set as ``current_item``
|
| for the updated position.
|
|
|
| .. versionchanged:: 8.0
|
| Added the ``current_item`` optional parameter.
|
|
|
| .. versionchanged:: 8.0
|
| Only render when the number of steps meets the
|
| ``update_min_steps`` threshold.
|
| """
|
| if current_item is not None:
|
| self.current_item = current_item
|
|
|
| self._completed_intervals += n_steps
|
|
|
| if self._completed_intervals >= self.update_min_steps:
|
| self.make_step(self._completed_intervals)
|
| self.render_progress()
|
| self._completed_intervals = 0
|
|
|
| def finish(self) -> None:
|
| self.eta_known = False
|
| self.current_item = None
|
| self.finished = True
|
|
|
| def generator(self) -> cabc.Iterator[V]:
|
| """Return a generator which yields the items added to the bar
|
| during construction, and updates the progress bar *after* the
|
| yielded block returns.
|
| """
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if not self.entered:
|
| raise RuntimeError("You need to use progress bars in a with block.")
|
|
|
| if not self._is_atty:
|
| yield from self.iter
|
| else:
|
| for rv in self.iter:
|
| self.current_item = rv
|
|
|
|
|
|
|
|
|
| if self._completed_intervals == 0:
|
| self.render_progress()
|
|
|
| yield rv
|
| self.update(1)
|
|
|
| self.finish()
|
| self.render_progress()
|
|
|
|
|
| def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
|
| """Decide what method to use for paging through text."""
|
| stdout = _default_text_stdout()
|
|
|
|
|
|
|
| if stdout is None:
|
| stdout = StringIO()
|
|
|
| if not isatty(sys.stdin) or not isatty(stdout):
|
| return _nullpager(stdout, generator, color)
|
|
|
|
|
| pager_cmd_parts = shlex.split(os.environ.get("PAGER", ""), posix=False)
|
| if pager_cmd_parts:
|
| if WIN:
|
| if _tempfilepager(generator, pager_cmd_parts, color):
|
| return
|
| elif _pipepager(generator, pager_cmd_parts, color):
|
| return
|
|
|
| if os.environ.get("TERM") in ("dumb", "emacs"):
|
| return _nullpager(stdout, generator, color)
|
| if (WIN or sys.platform.startswith("os2")) and _tempfilepager(
|
| generator, ["more"], color
|
| ):
|
| return
|
| if _pipepager(generator, ["less"], color):
|
| return
|
|
|
| import tempfile
|
|
|
| fd, filename = tempfile.mkstemp()
|
| os.close(fd)
|
| try:
|
| if _pipepager(generator, ["more"], color):
|
| return
|
| return _nullpager(stdout, generator, color)
|
| finally:
|
| os.unlink(filename)
|
|
|
|
|
| def _pipepager(
|
| generator: cabc.Iterable[str], cmd_parts: list[str], color: bool | None
|
| ) -> bool:
|
| """Page through text by feeding it to another program. Invoking a
|
| pager through this might support colors.
|
|
|
| Returns `True` if the command was found, `False` otherwise and thus another
|
| pager should be attempted.
|
| """
|
|
|
| if not cmd_parts:
|
| return False
|
|
|
| import shutil
|
|
|
| cmd = cmd_parts[0]
|
| cmd_params = cmd_parts[1:]
|
|
|
| cmd_filepath = shutil.which(cmd)
|
| if not cmd_filepath:
|
| return False
|
|
|
|
|
|
|
|
|
| cmd_path = Path(cmd_filepath).absolute()
|
| cmd_name = cmd_path.name
|
|
|
| import subprocess
|
|
|
|
|
| env = dict(os.environ)
|
|
|
|
|
|
|
| if color is None and cmd_name == "less":
|
| less_flags = f"{os.environ.get('LESS', '')}{' '.join(cmd_params)}"
|
| if not less_flags:
|
| env["LESS"] = "-R"
|
| color = True
|
| elif "r" in less_flags or "R" in less_flags:
|
| color = True
|
|
|
| c = subprocess.Popen(
|
| [str(cmd_path)] + cmd_params,
|
| shell=False,
|
| stdin=subprocess.PIPE,
|
| env=env,
|
| errors="replace",
|
| text=True,
|
| )
|
| assert c.stdin is not None
|
| try:
|
| for text in generator:
|
| if not color:
|
| text = strip_ansi(text)
|
|
|
| c.stdin.write(text)
|
| except BrokenPipeError:
|
|
|
| pass
|
| except Exception as e:
|
|
|
|
|
|
|
|
|
| c.terminate()
|
| raise e
|
| finally:
|
|
|
| try:
|
| c.stdin.close()
|
|
|
|
|
| except BrokenPipeError:
|
| pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| while True:
|
| try:
|
| c.wait()
|
| except KeyboardInterrupt:
|
| pass
|
| else:
|
| break
|
|
|
| return True
|
|
|
|
|
| def _tempfilepager(
|
| generator: cabc.Iterable[str], cmd_parts: list[str], color: bool | None
|
| ) -> bool:
|
| """Page through text by invoking a program on a temporary file.
|
|
|
| Returns `True` if the command was found, `False` otherwise and thus another
|
| pager should be attempted.
|
| """
|
|
|
| if not cmd_parts:
|
| return False
|
|
|
| import shutil
|
|
|
| cmd = cmd_parts[0]
|
|
|
| cmd_filepath = shutil.which(cmd)
|
| if not cmd_filepath:
|
| return False
|
|
|
|
|
|
|
| cmd_path = Path(cmd_filepath).absolute()
|
|
|
| import subprocess
|
| import tempfile
|
|
|
| fd, filename = tempfile.mkstemp()
|
|
|
| text = "".join(generator)
|
| if not color:
|
| text = strip_ansi(text)
|
| encoding = get_best_encoding(sys.stdout)
|
| with open_stream(filename, "wb")[0] as f:
|
| f.write(text.encode(encoding))
|
| try:
|
| subprocess.call([str(cmd_path), filename])
|
| except OSError:
|
|
|
| pass
|
| finally:
|
| os.close(fd)
|
| os.unlink(filename)
|
|
|
| return True
|
|
|
|
|
| def _nullpager(
|
| stream: t.TextIO, generator: cabc.Iterable[str], color: bool | None
|
| ) -> None:
|
| """Simply print unformatted text. This is the ultimate fallback."""
|
| for text in generator:
|
| if not color:
|
| text = strip_ansi(text)
|
| stream.write(text)
|
|
|
|
|
| class Editor:
|
| def __init__(
|
| self,
|
| editor: str | None = None,
|
| env: cabc.Mapping[str, str] | None = None,
|
| require_save: bool = True,
|
| extension: str = ".txt",
|
| ) -> None:
|
| self.editor = editor
|
| self.env = env
|
| self.require_save = require_save
|
| self.extension = extension
|
|
|
| def get_editor(self) -> str:
|
| if self.editor is not None:
|
| return self.editor
|
| for key in "VISUAL", "EDITOR":
|
| rv = os.environ.get(key)
|
| if rv:
|
| return rv
|
| if WIN:
|
| return "notepad"
|
|
|
| from shutil import which
|
|
|
| for editor in "sensible-editor", "vim", "nano":
|
| if which(editor) is not None:
|
| return editor
|
| return "vi"
|
|
|
| def edit_files(self, filenames: cabc.Iterable[str]) -> None:
|
| import subprocess
|
|
|
| editor = self.get_editor()
|
| environ: dict[str, str] | None = None
|
|
|
| if self.env:
|
| environ = os.environ.copy()
|
| environ.update(self.env)
|
|
|
| exc_filename = " ".join(f'"{filename}"' for filename in filenames)
|
|
|
| try:
|
| c = subprocess.Popen(
|
| args=f"{editor} {exc_filename}", env=environ, shell=True
|
| )
|
| exit_code = c.wait()
|
| if exit_code != 0:
|
| raise ClickException(
|
| _("{editor}: Editing failed").format(editor=editor)
|
| )
|
| except OSError as e:
|
| raise ClickException(
|
| _("{editor}: Editing failed: {e}").format(editor=editor, e=e)
|
| ) from e
|
|
|
| @t.overload
|
| def edit(self, text: bytes | bytearray) -> bytes | None: ...
|
|
|
|
|
|
|
| @t.overload
|
| def edit(self, text: str | None) -> str | None: ...
|
|
|
| def edit(self, text: str | bytes | bytearray | None) -> str | bytes | None:
|
| import tempfile
|
|
|
| if text is None:
|
| data: bytes | bytearray = b""
|
| elif isinstance(text, (bytes, bytearray)):
|
| data = text
|
| else:
|
| if text and not text.endswith("\n"):
|
| text += "\n"
|
|
|
| if WIN:
|
| data = text.replace("\n", "\r\n").encode("utf-8-sig")
|
| else:
|
| data = text.encode("utf-8")
|
|
|
| fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension)
|
| f: t.BinaryIO
|
|
|
| try:
|
| with os.fdopen(fd, "wb") as f:
|
| f.write(data)
|
|
|
|
|
|
|
|
|
|
|
| os.utime(name, (os.path.getatime(name), os.path.getmtime(name) - 2))
|
|
|
|
|
| timestamp = os.path.getmtime(name)
|
|
|
| self.edit_files((name,))
|
|
|
| if self.require_save and os.path.getmtime(name) == timestamp:
|
| return None
|
|
|
| with open(name, "rb") as f:
|
| rv = f.read()
|
|
|
| if isinstance(text, (bytes, bytearray)):
|
| return rv
|
|
|
| return rv.decode("utf-8-sig").replace("\r\n", "\n")
|
| finally:
|
| os.unlink(name)
|
|
|
|
|
| def open_url(url: str, wait: bool = False, locate: bool = False) -> int:
|
| import subprocess
|
|
|
| def _unquote_file(url: str) -> str:
|
| from urllib.parse import unquote
|
|
|
| if url.startswith("file://"):
|
| url = unquote(url[7:])
|
|
|
| return url
|
|
|
| if sys.platform == "darwin":
|
| args = ["open"]
|
| if wait:
|
| args.append("-W")
|
| if locate:
|
| args.append("-R")
|
| args.append(_unquote_file(url))
|
| null = open("/dev/null", "w")
|
| try:
|
| return subprocess.Popen(args, stderr=null).wait()
|
| finally:
|
| null.close()
|
| elif WIN:
|
| if locate:
|
| url = _unquote_file(url)
|
| args = ["explorer", f"/select,{url}"]
|
| else:
|
| args = ["start"]
|
| if wait:
|
| args.append("/WAIT")
|
| args.append("")
|
| args.append(url)
|
| try:
|
| return subprocess.call(args)
|
| except OSError:
|
|
|
| return 127
|
| elif CYGWIN:
|
| if locate:
|
| url = _unquote_file(url)
|
| args = ["cygstart", os.path.dirname(url)]
|
| else:
|
| args = ["cygstart"]
|
| if wait:
|
| args.append("-w")
|
| args.append(url)
|
| try:
|
| return subprocess.call(args)
|
| except OSError:
|
|
|
| return 127
|
|
|
| try:
|
| if locate:
|
| url = os.path.dirname(_unquote_file(url)) or "."
|
| else:
|
| url = _unquote_file(url)
|
| c = subprocess.Popen(["xdg-open", url])
|
| if wait:
|
| return c.wait()
|
| return 0
|
| except OSError:
|
| if url.startswith(("http://", "https://")) and not locate and not wait:
|
| import webbrowser
|
|
|
| webbrowser.open(url)
|
| return 0
|
| return 1
|
|
|
|
|
| def _translate_ch_to_exc(ch: str) -> None:
|
| if ch == "\x03":
|
| raise KeyboardInterrupt()
|
|
|
| if ch == "\x04" and not WIN:
|
| raise EOFError()
|
|
|
| if ch == "\x1a" and WIN:
|
| raise EOFError()
|
|
|
| return None
|
|
|
|
|
| if sys.platform == "win32":
|
| import msvcrt
|
|
|
| @contextlib.contextmanager
|
| def raw_terminal() -> cabc.Iterator[int]:
|
| yield -1
|
|
|
| def getchar(echo: bool) -> str:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if echo:
|
| func = t.cast(t.Callable[[], str], msvcrt.getwche)
|
| else:
|
| func = t.cast(t.Callable[[], str], msvcrt.getwch)
|
|
|
| rv = func()
|
|
|
| if rv in ("\x00", "\xe0"):
|
|
|
|
|
| rv += func()
|
|
|
| _translate_ch_to_exc(rv)
|
| return rv
|
|
|
| else:
|
| import termios
|
| import tty
|
|
|
| @contextlib.contextmanager
|
| def raw_terminal() -> cabc.Iterator[int]:
|
| f: t.TextIO | None
|
| fd: int
|
|
|
| if not isatty(sys.stdin):
|
| f = open("/dev/tty")
|
| fd = f.fileno()
|
| else:
|
| fd = sys.stdin.fileno()
|
| f = None
|
|
|
| try:
|
| old_settings = termios.tcgetattr(fd)
|
|
|
| try:
|
| tty.setraw(fd)
|
| yield fd
|
| finally:
|
| termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
| sys.stdout.flush()
|
|
|
| if f is not None:
|
| f.close()
|
| except termios.error:
|
| pass
|
|
|
| def getchar(echo: bool) -> str:
|
| with raw_terminal() as fd:
|
| ch = os.read(fd, 32).decode(get_best_encoding(sys.stdin), "replace")
|
|
|
| if echo and isatty(sys.stdout):
|
| sys.stdout.write(ch)
|
|
|
| _translate_ch_to_exc(ch)
|
| return ch
|
|
|