| | from __future__ import annotations |
| |
|
| | import collections.abc as cabc |
| | import os |
| | import re |
| | import sys |
| | import typing as t |
| | from functools import update_wrapper |
| | from types import ModuleType |
| | from types import TracebackType |
| |
|
| | from ._compat import _default_text_stderr |
| | from ._compat import _default_text_stdout |
| | from ._compat import _find_binary_writer |
| | from ._compat import auto_wrap_for_ansi |
| | from ._compat import binary_streams |
| | from ._compat import open_stream |
| | from ._compat import should_strip_ansi |
| | from ._compat import strip_ansi |
| | from ._compat import text_streams |
| | from ._compat import WIN |
| | from .globals import resolve_color_default |
| |
|
| | if t.TYPE_CHECKING: |
| | import typing_extensions as te |
| |
|
| | P = te.ParamSpec("P") |
| |
|
| | R = t.TypeVar("R") |
| |
|
| |
|
| | def _posixify(name: str) -> str: |
| | return "-".join(name.split()).lower() |
| |
|
| |
|
| | def safecall(func: t.Callable[P, R]) -> t.Callable[P, R | None]: |
| | """Wraps a function so that it swallows exceptions.""" |
| |
|
| | def wrapper(*args: P.args, **kwargs: P.kwargs) -> R | None: |
| | try: |
| | return func(*args, **kwargs) |
| | except Exception: |
| | pass |
| | return None |
| |
|
| | return update_wrapper(wrapper, func) |
| |
|
| |
|
| | def make_str(value: t.Any) -> str: |
| | """Converts a value into a valid string.""" |
| | if isinstance(value, bytes): |
| | try: |
| | return value.decode(sys.getfilesystemencoding()) |
| | except UnicodeError: |
| | return value.decode("utf-8", "replace") |
| | return str(value) |
| |
|
| |
|
| | def make_default_short_help(help: str, max_length: int = 45) -> str: |
| | """Returns a condensed version of help string.""" |
| | |
| | paragraph_end = help.find("\n\n") |
| |
|
| | if paragraph_end != -1: |
| | help = help[:paragraph_end] |
| |
|
| | |
| | words = help.split() |
| |
|
| | if not words: |
| | return "" |
| |
|
| | |
| | if words[0] == "\b": |
| | words = words[1:] |
| |
|
| | total_length = 0 |
| | last_index = len(words) - 1 |
| |
|
| | for i, word in enumerate(words): |
| | total_length += len(word) + (i > 0) |
| |
|
| | if total_length > max_length: |
| | break |
| |
|
| | if word[-1] == ".": |
| | return " ".join(words[: i + 1]) |
| |
|
| | if total_length == max_length and i != last_index: |
| | break |
| | else: |
| | return " ".join(words) |
| |
|
| | |
| | total_length += len("...") |
| |
|
| | |
| | while i > 0: |
| | total_length -= len(words[i]) + (i > 0) |
| |
|
| | if total_length <= max_length: |
| | break |
| |
|
| | i -= 1 |
| |
|
| | return " ".join(words[:i]) + "..." |
| |
|
| |
|
| | class LazyFile: |
| | """A lazy file works like a regular file but it does not fully open |
| | the file but it does perform some basic checks early to see if the |
| | filename parameter does make sense. This is useful for safely opening |
| | files for writing. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | filename: str | os.PathLike[str], |
| | mode: str = "r", |
| | encoding: str | None = None, |
| | errors: str | None = "strict", |
| | atomic: bool = False, |
| | ): |
| | self.name: str = os.fspath(filename) |
| | self.mode = mode |
| | self.encoding = encoding |
| | self.errors = errors |
| | self.atomic = atomic |
| | self._f: t.IO[t.Any] | None |
| | self.should_close: bool |
| |
|
| | if self.name == "-": |
| | self._f, self.should_close = open_stream(filename, mode, encoding, errors) |
| | else: |
| | if "r" in mode: |
| | |
| | |
| | |
| | open(filename, mode).close() |
| | self._f = None |
| | self.should_close = True |
| |
|
| | def __getattr__(self, name: str) -> t.Any: |
| | return getattr(self.open(), name) |
| |
|
| | def __repr__(self) -> str: |
| | if self._f is not None: |
| | return repr(self._f) |
| | return f"<unopened file '{format_filename(self.name)}' {self.mode}>" |
| |
|
| | def open(self) -> t.IO[t.Any]: |
| | """Opens the file if it's not yet open. This call might fail with |
| | a :exc:`FileError`. Not handling this error will produce an error |
| | that Click shows. |
| | """ |
| | if self._f is not None: |
| | return self._f |
| | try: |
| | rv, self.should_close = open_stream( |
| | self.name, self.mode, self.encoding, self.errors, atomic=self.atomic |
| | ) |
| | except OSError as e: |
| | from .exceptions import FileError |
| |
|
| | raise FileError(self.name, hint=e.strerror) from e |
| | self._f = rv |
| | return rv |
| |
|
| | def close(self) -> None: |
| | """Closes the underlying file, no matter what.""" |
| | if self._f is not None: |
| | self._f.close() |
| |
|
| | def close_intelligently(self) -> None: |
| | """This function only closes the file if it was opened by the lazy |
| | file wrapper. For instance this will never close stdin. |
| | """ |
| | if self.should_close: |
| | self.close() |
| |
|
| | def __enter__(self) -> LazyFile: |
| | return self |
| |
|
| | def __exit__( |
| | self, |
| | exc_type: type[BaseException] | None, |
| | exc_value: BaseException | None, |
| | tb: TracebackType | None, |
| | ) -> None: |
| | self.close_intelligently() |
| |
|
| | def __iter__(self) -> cabc.Iterator[t.AnyStr]: |
| | self.open() |
| | return iter(self._f) |
| |
|
| |
|
| | class KeepOpenFile: |
| | def __init__(self, file: t.IO[t.Any]) -> None: |
| | self._file: t.IO[t.Any] = file |
| |
|
| | def __getattr__(self, name: str) -> t.Any: |
| | return getattr(self._file, name) |
| |
|
| | def __enter__(self) -> KeepOpenFile: |
| | return self |
| |
|
| | def __exit__( |
| | self, |
| | exc_type: type[BaseException] | None, |
| | exc_value: BaseException | None, |
| | tb: TracebackType | None, |
| | ) -> None: |
| | pass |
| |
|
| | def __repr__(self) -> str: |
| | return repr(self._file) |
| |
|
| | def __iter__(self) -> cabc.Iterator[t.AnyStr]: |
| | return iter(self._file) |
| |
|
| |
|
| | def echo( |
| | message: t.Any | None = None, |
| | file: t.IO[t.Any] | None = None, |
| | nl: bool = True, |
| | err: bool = False, |
| | color: bool | None = None, |
| | ) -> None: |
| | """Print a message and newline to stdout or a file. This should be |
| | used instead of :func:`print` because it provides better support |
| | for different data, files, and environments. |
| | |
| | Compared to :func:`print`, this does the following: |
| | |
| | - Ensures that the output encoding is not misconfigured on Linux. |
| | - Supports Unicode in the Windows console. |
| | - Supports writing to binary outputs, and supports writing bytes |
| | to text outputs. |
| | - Supports colors and styles on Windows. |
| | - Removes ANSI color and style codes if the output does not look |
| | like an interactive terminal. |
| | - Always flushes the output. |
| | |
| | :param message: The string or bytes to output. Other objects are |
| | converted to strings. |
| | :param file: The file to write to. Defaults to ``stdout``. |
| | :param err: Write to ``stderr`` instead of ``stdout``. |
| | :param nl: Print a newline after the message. Enabled by default. |
| | :param color: Force showing or hiding colors and other styles. By |
| | default Click will remove color if the output does not look like |
| | an interactive terminal. |
| | |
| | .. versionchanged:: 6.0 |
| | Support Unicode output on the Windows console. Click does not |
| | modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()`` |
| | will still not support Unicode. |
| | |
| | .. versionchanged:: 4.0 |
| | Added the ``color`` parameter. |
| | |
| | .. versionadded:: 3.0 |
| | Added the ``err`` parameter. |
| | |
| | .. versionchanged:: 2.0 |
| | Support colors on Windows if colorama is installed. |
| | """ |
| | if file is None: |
| | if err: |
| | file = _default_text_stderr() |
| | else: |
| | file = _default_text_stdout() |
| |
|
| | |
| | |
| | if file is None: |
| | return |
| |
|
| | |
| | if message is not None and not isinstance(message, (str, bytes, bytearray)): |
| | out: str | bytes | bytearray | None = str(message) |
| | else: |
| | out = message |
| |
|
| | if nl: |
| | out = out or "" |
| | if isinstance(out, str): |
| | out += "\n" |
| | else: |
| | out += b"\n" |
| |
|
| | if not out: |
| | file.flush() |
| | return |
| |
|
| | |
| | |
| | |
| | |
| | if isinstance(out, (bytes, bytearray)): |
| | binary_file = _find_binary_writer(file) |
| |
|
| | if binary_file is not None: |
| | file.flush() |
| | binary_file.write(out) |
| | binary_file.flush() |
| | return |
| |
|
| | |
| | |
| | else: |
| | color = resolve_color_default(color) |
| |
|
| | if should_strip_ansi(file, color): |
| | out = strip_ansi(out) |
| | elif WIN: |
| | if auto_wrap_for_ansi is not None: |
| | file = auto_wrap_for_ansi(file, color) |
| | elif not color: |
| | out = strip_ansi(out) |
| |
|
| | file.write(out) |
| | file.flush() |
| |
|
| |
|
| | def get_binary_stream(name: t.Literal["stdin", "stdout", "stderr"]) -> t.BinaryIO: |
| | """Returns a system stream for byte processing. |
| | |
| | :param name: the name of the stream to open. Valid names are ``'stdin'``, |
| | ``'stdout'`` and ``'stderr'`` |
| | """ |
| | opener = binary_streams.get(name) |
| | if opener is None: |
| | raise TypeError(f"Unknown standard stream '{name}'") |
| | return opener() |
| |
|
| |
|
| | def get_text_stream( |
| | name: t.Literal["stdin", "stdout", "stderr"], |
| | encoding: str | None = None, |
| | errors: str | None = "strict", |
| | ) -> t.TextIO: |
| | """Returns a system stream for text processing. This usually returns |
| | a wrapped stream around a binary stream returned from |
| | :func:`get_binary_stream` but it also can take shortcuts for already |
| | correctly configured streams. |
| | |
| | :param name: the name of the stream to open. Valid names are ``'stdin'``, |
| | ``'stdout'`` and ``'stderr'`` |
| | :param encoding: overrides the detected default encoding. |
| | :param errors: overrides the default error mode. |
| | """ |
| | opener = text_streams.get(name) |
| | if opener is None: |
| | raise TypeError(f"Unknown standard stream '{name}'") |
| | return opener(encoding, errors) |
| |
|
| |
|
| | def open_file( |
| | filename: str | os.PathLike[str], |
| | mode: str = "r", |
| | encoding: str | None = None, |
| | errors: str | None = "strict", |
| | lazy: bool = False, |
| | atomic: bool = False, |
| | ) -> t.IO[t.Any]: |
| | """Open a file, with extra behavior to handle ``'-'`` to indicate |
| | a standard stream, lazy open on write, and atomic write. Similar to |
| | the behavior of the :class:`~click.File` param type. |
| | |
| | If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is |
| | wrapped so that using it in a context manager will not close it. |
| | This makes it possible to use the function without accidentally |
| | closing a standard stream: |
| | |
| | .. code-block:: python |
| | |
| | with open_file(filename) as f: |
| | ... |
| | |
| | :param filename: The name or Path of the file to open, or ``'-'`` for |
| | ``stdin``/``stdout``. |
| | :param mode: The mode in which to open the file. |
| | :param encoding: The encoding to decode or encode a file opened in |
| | text mode. |
| | :param errors: The error handling mode. |
| | :param lazy: Wait to open the file until it is accessed. For read |
| | mode, the file is temporarily opened to raise access errors |
| | early, then closed until it is read again. |
| | :param atomic: Write to a temporary file and replace the given file |
| | on close. |
| | |
| | .. versionadded:: 3.0 |
| | """ |
| | if lazy: |
| | return t.cast( |
| | "t.IO[t.Any]", LazyFile(filename, mode, encoding, errors, atomic=atomic) |
| | ) |
| |
|
| | f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) |
| |
|
| | if not should_close: |
| | f = t.cast("t.IO[t.Any]", KeepOpenFile(f)) |
| |
|
| | return f |
| |
|
| |
|
| | def format_filename( |
| | filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], |
| | shorten: bool = False, |
| | ) -> str: |
| | """Format a filename as a string for display. Ensures the filename can be |
| | displayed by replacing any invalid bytes or surrogate escapes in the name |
| | with the replacement character ``�``. |
| | |
| | Invalid bytes or surrogate escapes will raise an error when written to a |
| | stream with ``errors="strict"``. This will typically happen with ``stdout`` |
| | when the locale is something like ``en_GB.UTF-8``. |
| | |
| | Many scenarios *are* safe to write surrogates though, due to PEP 538 and |
| | PEP 540, including: |
| | |
| | - Writing to ``stderr``, which uses ``errors="backslashreplace"``. |
| | - The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens |
| | stdout and stderr with ``errors="surrogateescape"``. |
| | - None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``. |
| | - Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``. |
| | Python opens stdout and stderr with ``errors="surrogateescape"``. |
| | |
| | :param filename: formats a filename for UI display. This will also convert |
| | the filename into unicode without failing. |
| | :param shorten: this optionally shortens the filename to strip of the |
| | path that leads up to it. |
| | """ |
| | if shorten: |
| | filename = os.path.basename(filename) |
| | else: |
| | filename = os.fspath(filename) |
| |
|
| | if isinstance(filename, bytes): |
| | filename = filename.decode(sys.getfilesystemencoding(), "replace") |
| | else: |
| | filename = filename.encode("utf-8", "surrogateescape").decode( |
| | "utf-8", "replace" |
| | ) |
| |
|
| | return filename |
| |
|
| |
|
| | def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: |
| | r"""Returns the config folder for the application. The default behavior |
| | is to return whatever is most appropriate for the operating system. |
| | |
| | To give you an idea, for an app called ``"Foo Bar"``, something like |
| | the following folders could be returned: |
| | |
| | Mac OS X: |
| | ``~/Library/Application Support/Foo Bar`` |
| | Mac OS X (POSIX): |
| | ``~/.foo-bar`` |
| | Unix: |
| | ``~/.config/foo-bar`` |
| | Unix (POSIX): |
| | ``~/.foo-bar`` |
| | Windows (roaming): |
| | ``C:\Users\<user>\AppData\Roaming\Foo Bar`` |
| | Windows (not roaming): |
| | ``C:\Users\<user>\AppData\Local\Foo Bar`` |
| | |
| | .. versionadded:: 2.0 |
| | |
| | :param app_name: the application name. This should be properly capitalized |
| | and can contain whitespace. |
| | :param roaming: controls if the folder should be roaming or not on Windows. |
| | Has no effect otherwise. |
| | :param force_posix: if this is set to `True` then on any POSIX system the |
| | folder will be stored in the home folder with a leading |
| | dot instead of the XDG config home or darwin's |
| | application support folder. |
| | """ |
| | if WIN: |
| | key = "APPDATA" if roaming else "LOCALAPPDATA" |
| | folder = os.environ.get(key) |
| | if folder is None: |
| | folder = os.path.expanduser("~") |
| | return os.path.join(folder, app_name) |
| | if force_posix: |
| | return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}")) |
| | if sys.platform == "darwin": |
| | return os.path.join( |
| | os.path.expanduser("~/Library/Application Support"), app_name |
| | ) |
| | return os.path.join( |
| | os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), |
| | _posixify(app_name), |
| | ) |
| |
|
| |
|
| | class PacifyFlushWrapper: |
| | """This wrapper is used to catch and suppress BrokenPipeErrors resulting |
| | from ``.flush()`` being called on broken pipe during the shutdown/final-GC |
| | of the Python interpreter. Notably ``.flush()`` is always called on |
| | ``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any |
| | other cleanup code, and the case where the underlying file is not a broken |
| | pipe, all calls and attributes are proxied. |
| | """ |
| |
|
| | def __init__(self, wrapped: t.IO[t.Any]) -> None: |
| | self.wrapped = wrapped |
| |
|
| | def flush(self) -> None: |
| | try: |
| | self.wrapped.flush() |
| | except OSError as e: |
| | import errno |
| |
|
| | if e.errno != errno.EPIPE: |
| | raise |
| |
|
| | def __getattr__(self, attr: str) -> t.Any: |
| | return getattr(self.wrapped, attr) |
| |
|
| |
|
| | def _detect_program_name( |
| | path: str | None = None, _main: ModuleType | None = None |
| | ) -> str: |
| | """Determine the command used to run the program, for use in help |
| | text. If a file or entry point was executed, the file name is |
| | returned. If ``python -m`` was used to execute a module or package, |
| | ``python -m name`` is returned. |
| | |
| | This doesn't try to be too precise, the goal is to give a concise |
| | name for help text. Files are only shown as their name without the |
| | path. ``python`` is only shown for modules, and the full path to |
| | ``sys.executable`` is not shown. |
| | |
| | :param path: The Python file being executed. Python puts this in |
| | ``sys.argv[0]``, which is used by default. |
| | :param _main: The ``__main__`` module. This should only be passed |
| | during internal testing. |
| | |
| | .. versionadded:: 8.0 |
| | Based on command args detection in the Werkzeug reloader. |
| | |
| | :meta private: |
| | """ |
| | if _main is None: |
| | _main = sys.modules["__main__"] |
| |
|
| | if not path: |
| | path = sys.argv[0] |
| |
|
| | |
| | |
| | |
| | |
| | if getattr(_main, "__package__", None) in {None, ""} or ( |
| | os.name == "nt" |
| | and _main.__package__ == "" |
| | and not os.path.exists(path) |
| | and os.path.exists(f"{path}.exe") |
| | ): |
| | |
| | return os.path.basename(path) |
| |
|
| | |
| | |
| | |
| | py_module = t.cast(str, _main.__package__) |
| | name = os.path.splitext(os.path.basename(path))[0] |
| |
|
| | |
| | if name != "__main__": |
| | py_module = f"{py_module}.{name}" |
| |
|
| | return f"python -m {py_module.lstrip('.')}" |
| |
|
| |
|
| | def _expand_args( |
| | args: cabc.Iterable[str], |
| | *, |
| | user: bool = True, |
| | env: bool = True, |
| | glob_recursive: bool = True, |
| | ) -> list[str]: |
| | """Simulate Unix shell expansion with Python functions. |
| | |
| | See :func:`glob.glob`, :func:`os.path.expanduser`, and |
| | :func:`os.path.expandvars`. |
| | |
| | This is intended for use on Windows, where the shell does not do any |
| | expansion. It may not exactly match what a Unix shell would do. |
| | |
| | :param args: List of command line arguments to expand. |
| | :param user: Expand user home directory. |
| | :param env: Expand environment variables. |
| | :param glob_recursive: ``**`` matches directories recursively. |
| | |
| | .. versionchanged:: 8.1 |
| | Invalid glob patterns are treated as empty expansions rather |
| | than raising an error. |
| | |
| | .. versionadded:: 8.0 |
| | |
| | :meta private: |
| | """ |
| | from glob import glob |
| |
|
| | out = [] |
| |
|
| | for arg in args: |
| | if user: |
| | arg = os.path.expanduser(arg) |
| |
|
| | if env: |
| | arg = os.path.expandvars(arg) |
| |
|
| | try: |
| | matches = glob(arg, recursive=glob_recursive) |
| | except re.error: |
| | matches = [] |
| |
|
| | if not matches: |
| | out.append(arg) |
| | else: |
| | out.extend(matches) |
| |
|
| | return out |
| |
|