| | from __future__ import annotations |
| |
|
| | import collections.abc as cabc |
| | import contextlib |
| | import io |
| | import os |
| | import shlex |
| | import sys |
| | import tempfile |
| | import typing as t |
| | from types import TracebackType |
| |
|
| | from . import _compat |
| | from . import formatting |
| | from . import termui |
| | from . import utils |
| | from ._compat import _find_binary_reader |
| |
|
| | if t.TYPE_CHECKING: |
| | from _typeshed import ReadableBuffer |
| |
|
| | from .core import Command |
| |
|
| |
|
| | class EchoingStdin: |
| | def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: |
| | self._input = input |
| | self._output = output |
| | self._paused = False |
| |
|
| | def __getattr__(self, x: str) -> t.Any: |
| | return getattr(self._input, x) |
| |
|
| | def _echo(self, rv: bytes) -> bytes: |
| | if not self._paused: |
| | self._output.write(rv) |
| |
|
| | return rv |
| |
|
| | def read(self, n: int = -1) -> bytes: |
| | return self._echo(self._input.read(n)) |
| |
|
| | def read1(self, n: int = -1) -> bytes: |
| | return self._echo(self._input.read1(n)) |
| |
|
| | def readline(self, n: int = -1) -> bytes: |
| | return self._echo(self._input.readline(n)) |
| |
|
| | def readlines(self) -> list[bytes]: |
| | return [self._echo(x) for x in self._input.readlines()] |
| |
|
| | def __iter__(self) -> cabc.Iterator[bytes]: |
| | return iter(self._echo(x) for x in self._input) |
| |
|
| | def __repr__(self) -> str: |
| | return repr(self._input) |
| |
|
| |
|
| | @contextlib.contextmanager |
| | def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: |
| | if stream is None: |
| | yield |
| | else: |
| | stream._paused = True |
| | yield |
| | stream._paused = False |
| |
|
| |
|
| | class BytesIOCopy(io.BytesIO): |
| | """Patch ``io.BytesIO`` to let the written stream be copied to another. |
| | |
| | .. versionadded:: 8.2 |
| | """ |
| |
|
| | def __init__(self, copy_to: io.BytesIO) -> None: |
| | super().__init__() |
| | self.copy_to = copy_to |
| |
|
| | def flush(self) -> None: |
| | super().flush() |
| | self.copy_to.flush() |
| |
|
| | def write(self, b: ReadableBuffer) -> int: |
| | self.copy_to.write(b) |
| | return super().write(b) |
| |
|
| |
|
| | class StreamMixer: |
| | """Mixes `<stdout>` and `<stderr>` streams. |
| | |
| | The result is available in the ``output`` attribute. |
| | |
| | .. versionadded:: 8.2 |
| | """ |
| |
|
| | def __init__(self) -> None: |
| | self.output: io.BytesIO = io.BytesIO() |
| | self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) |
| | self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) |
| |
|
| | def __del__(self) -> None: |
| | """ |
| | Guarantee that embedded file-like objects are closed in a |
| | predictable order, protecting against races between |
| | self.output being closed and other streams being flushed on close |
| | |
| | .. versionadded:: 8.2.2 |
| | """ |
| | self.stderr.close() |
| | self.stdout.close() |
| | self.output.close() |
| |
|
| |
|
| | class _NamedTextIOWrapper(io.TextIOWrapper): |
| | def __init__( |
| | self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any |
| | ) -> None: |
| | super().__init__(buffer, **kwargs) |
| | self._name = name |
| | self._mode = mode |
| |
|
| | @property |
| | def name(self) -> str: |
| | return self._name |
| |
|
| | @property |
| | def mode(self) -> str: |
| | return self._mode |
| |
|
| |
|
| | def make_input_stream( |
| | input: str | bytes | t.IO[t.Any] | None, charset: str |
| | ) -> t.BinaryIO: |
| | |
| | if hasattr(input, "read"): |
| | rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) |
| |
|
| | if rv is not None: |
| | return rv |
| |
|
| | raise TypeError("Could not find binary reader for input stream.") |
| |
|
| | if input is None: |
| | input = b"" |
| | elif isinstance(input, str): |
| | input = input.encode(charset) |
| |
|
| | return io.BytesIO(input) |
| |
|
| |
|
| | class Result: |
| | """Holds the captured result of an invoked CLI script. |
| | |
| | :param runner: The runner that created the result |
| | :param stdout_bytes: The standard output as bytes. |
| | :param stderr_bytes: The standard error as bytes. |
| | :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the |
| | user would see it in its terminal. |
| | :param return_value: The value returned from the invoked command. |
| | :param exit_code: The exit code as integer. |
| | :param exception: The exception that happened if one did. |
| | :param exc_info: Exception information (exception type, exception instance, |
| | traceback type). |
| | |
| | .. versionchanged:: 8.2 |
| | ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and |
| | ``mix_stderr`` has been removed. |
| | |
| | .. versionadded:: 8.0 |
| | Added ``return_value``. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | runner: CliRunner, |
| | stdout_bytes: bytes, |
| | stderr_bytes: bytes, |
| | output_bytes: bytes, |
| | return_value: t.Any, |
| | exit_code: int, |
| | exception: BaseException | None, |
| | exc_info: tuple[type[BaseException], BaseException, TracebackType] |
| | | None = None, |
| | ): |
| | self.runner = runner |
| | self.stdout_bytes = stdout_bytes |
| | self.stderr_bytes = stderr_bytes |
| | self.output_bytes = output_bytes |
| | self.return_value = return_value |
| | self.exit_code = exit_code |
| | self.exception = exception |
| | self.exc_info = exc_info |
| |
|
| | @property |
| | def output(self) -> str: |
| | """The terminal output as unicode string, as the user would see it. |
| | |
| | .. versionchanged:: 8.2 |
| | No longer a proxy for ``self.stdout``. Now has its own independent stream |
| | that is mixing `<stdout>` and `<stderr>`, in the order they were written. |
| | """ |
| | return self.output_bytes.decode(self.runner.charset, "replace").replace( |
| | "\r\n", "\n" |
| | ) |
| |
|
| | @property |
| | def stdout(self) -> str: |
| | """The standard output as unicode string.""" |
| | return self.stdout_bytes.decode(self.runner.charset, "replace").replace( |
| | "\r\n", "\n" |
| | ) |
| |
|
| | @property |
| | def stderr(self) -> str: |
| | """The standard error as unicode string. |
| | |
| | .. versionchanged:: 8.2 |
| | No longer raise an exception, always returns the `<stderr>` string. |
| | """ |
| | return self.stderr_bytes.decode(self.runner.charset, "replace").replace( |
| | "\r\n", "\n" |
| | ) |
| |
|
| | def __repr__(self) -> str: |
| | exc_str = repr(self.exception) if self.exception else "okay" |
| | return f"<{type(self).__name__} {exc_str}>" |
| |
|
| |
|
| | class CliRunner: |
| | """The CLI runner provides functionality to invoke a Click command line |
| | script for unittesting purposes in a isolated environment. This only |
| | works in single-threaded systems without any concurrency as it changes the |
| | global interpreter state. |
| | |
| | :param charset: the character set for the input and output data. |
| | :param env: a dictionary with environment variables for overriding. |
| | :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes |
| | to `<stdout>`. This is useful for showing examples in |
| | some circumstances. Note that regular prompts |
| | will automatically echo the input. |
| | :param catch_exceptions: Whether to catch any exceptions other than |
| | ``SystemExit`` when running :meth:`~CliRunner.invoke`. |
| | |
| | .. versionchanged:: 8.2 |
| | Added the ``catch_exceptions`` parameter. |
| | |
| | .. versionchanged:: 8.2 |
| | ``mix_stderr`` parameter has been removed. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | charset: str = "utf-8", |
| | env: cabc.Mapping[str, str | None] | None = None, |
| | echo_stdin: bool = False, |
| | catch_exceptions: bool = True, |
| | ) -> None: |
| | self.charset = charset |
| | self.env: cabc.Mapping[str, str | None] = env or {} |
| | self.echo_stdin = echo_stdin |
| | self.catch_exceptions = catch_exceptions |
| |
|
| | def get_default_prog_name(self, cli: Command) -> str: |
| | """Given a command object it will return the default program name |
| | for it. The default is the `name` attribute or ``"root"`` if not |
| | set. |
| | """ |
| | return cli.name or "root" |
| |
|
| | def make_env( |
| | self, overrides: cabc.Mapping[str, str | None] | None = None |
| | ) -> cabc.Mapping[str, str | None]: |
| | """Returns the environment overrides for invoking a script.""" |
| | rv = dict(self.env) |
| | if overrides: |
| | rv.update(overrides) |
| | return rv |
| |
|
| | @contextlib.contextmanager |
| | def isolation( |
| | self, |
| | input: str | bytes | t.IO[t.Any] | None = None, |
| | env: cabc.Mapping[str, str | None] | None = None, |
| | color: bool = False, |
| | ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: |
| | """A context manager that sets up the isolation for invoking of a |
| | command line tool. This sets up `<stdin>` with the given input data |
| | and `os.environ` with the overrides from the given dictionary. |
| | This also rebinds some internals in Click to be mocked (like the |
| | prompt functionality). |
| | |
| | This is automatically done in the :meth:`invoke` method. |
| | |
| | :param input: the input stream to put into `sys.stdin`. |
| | :param env: the environment overrides as dictionary. |
| | :param color: whether the output should contain color codes. The |
| | application can still override this explicitly. |
| | |
| | .. versionadded:: 8.2 |
| | An additional output stream is returned, which is a mix of |
| | `<stdout>` and `<stderr>` streams. |
| | |
| | .. versionchanged:: 8.2 |
| | Always returns the `<stderr>` stream. |
| | |
| | .. versionchanged:: 8.0 |
| | `<stderr>` is opened with ``errors="backslashreplace"`` |
| | instead of the default ``"strict"``. |
| | |
| | .. versionchanged:: 4.0 |
| | Added the ``color`` parameter. |
| | """ |
| | bytes_input = make_input_stream(input, self.charset) |
| | echo_input = None |
| |
|
| | old_stdin = sys.stdin |
| | old_stdout = sys.stdout |
| | old_stderr = sys.stderr |
| | old_forced_width = formatting.FORCED_WIDTH |
| | formatting.FORCED_WIDTH = 80 |
| |
|
| | env = self.make_env(env) |
| |
|
| | stream_mixer = StreamMixer() |
| |
|
| | if self.echo_stdin: |
| | bytes_input = echo_input = t.cast( |
| | t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) |
| | ) |
| |
|
| | sys.stdin = text_input = _NamedTextIOWrapper( |
| | bytes_input, encoding=self.charset, name="<stdin>", mode="r" |
| | ) |
| |
|
| | if self.echo_stdin: |
| | |
| | |
| | text_input._CHUNK_SIZE = 1 |
| |
|
| | sys.stdout = _NamedTextIOWrapper( |
| | stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w" |
| | ) |
| |
|
| | sys.stderr = _NamedTextIOWrapper( |
| | stream_mixer.stderr, |
| | encoding=self.charset, |
| | name="<stderr>", |
| | mode="w", |
| | errors="backslashreplace", |
| | ) |
| |
|
| | @_pause_echo(echo_input) |
| | def visible_input(prompt: str | None = None) -> str: |
| | sys.stdout.write(prompt or "") |
| | try: |
| | val = next(text_input).rstrip("\r\n") |
| | except StopIteration as e: |
| | raise EOFError() from e |
| | sys.stdout.write(f"{val}\n") |
| | sys.stdout.flush() |
| | return val |
| |
|
| | @_pause_echo(echo_input) |
| | def hidden_input(prompt: str | None = None) -> str: |
| | sys.stdout.write(f"{prompt or ''}\n") |
| | sys.stdout.flush() |
| | try: |
| | return next(text_input).rstrip("\r\n") |
| | except StopIteration as e: |
| | raise EOFError() from e |
| |
|
| | @_pause_echo(echo_input) |
| | def _getchar(echo: bool) -> str: |
| | char = sys.stdin.read(1) |
| |
|
| | if echo: |
| | sys.stdout.write(char) |
| |
|
| | sys.stdout.flush() |
| | return char |
| |
|
| | default_color = color |
| |
|
| | def should_strip_ansi( |
| | stream: t.IO[t.Any] | None = None, color: bool | None = None |
| | ) -> bool: |
| | if color is None: |
| | return not default_color |
| | return not color |
| |
|
| | old_visible_prompt_func = termui.visible_prompt_func |
| | old_hidden_prompt_func = termui.hidden_prompt_func |
| | old__getchar_func = termui._getchar |
| | old_should_strip_ansi = utils.should_strip_ansi |
| | old__compat_should_strip_ansi = _compat.should_strip_ansi |
| | termui.visible_prompt_func = visible_input |
| | termui.hidden_prompt_func = hidden_input |
| | termui._getchar = _getchar |
| | utils.should_strip_ansi = should_strip_ansi |
| | _compat.should_strip_ansi = should_strip_ansi |
| |
|
| | old_env = {} |
| | try: |
| | for key, value in env.items(): |
| | old_env[key] = os.environ.get(key) |
| | if value is None: |
| | try: |
| | del os.environ[key] |
| | except Exception: |
| | pass |
| | else: |
| | os.environ[key] = value |
| | yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) |
| | finally: |
| | for key, value in old_env.items(): |
| | if value is None: |
| | try: |
| | del os.environ[key] |
| | except Exception: |
| | pass |
| | else: |
| | os.environ[key] = value |
| | sys.stdout = old_stdout |
| | sys.stderr = old_stderr |
| | sys.stdin = old_stdin |
| | termui.visible_prompt_func = old_visible_prompt_func |
| | termui.hidden_prompt_func = old_hidden_prompt_func |
| | termui._getchar = old__getchar_func |
| | utils.should_strip_ansi = old_should_strip_ansi |
| | _compat.should_strip_ansi = old__compat_should_strip_ansi |
| | formatting.FORCED_WIDTH = old_forced_width |
| |
|
| | def invoke( |
| | self, |
| | cli: Command, |
| | args: str | cabc.Sequence[str] | None = None, |
| | input: str | bytes | t.IO[t.Any] | None = None, |
| | env: cabc.Mapping[str, str | None] | None = None, |
| | catch_exceptions: bool | None = None, |
| | color: bool = False, |
| | **extra: t.Any, |
| | ) -> Result: |
| | """Invokes a command in an isolated environment. The arguments are |
| | forwarded directly to the command line script, the `extra` keyword |
| | arguments are passed to the :meth:`~clickpkg.Command.main` function of |
| | the command. |
| | |
| | This returns a :class:`Result` object. |
| | |
| | :param cli: the command to invoke |
| | :param args: the arguments to invoke. It may be given as an iterable |
| | or a string. When given as string it will be interpreted |
| | as a Unix shell command. More details at |
| | :func:`shlex.split`. |
| | :param input: the input data for `sys.stdin`. |
| | :param env: the environment overrides. |
| | :param catch_exceptions: Whether to catch any other exceptions than |
| | ``SystemExit``. If :data:`None`, the value |
| | from :class:`CliRunner` is used. |
| | :param extra: the keyword arguments to pass to :meth:`main`. |
| | :param color: whether the output should contain color codes. The |
| | application can still override this explicitly. |
| | |
| | .. versionadded:: 8.2 |
| | The result object has the ``output_bytes`` attribute with |
| | the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would |
| | see it in its terminal. |
| | |
| | .. versionchanged:: 8.2 |
| | The result object always returns the ``stderr_bytes`` stream. |
| | |
| | .. versionchanged:: 8.0 |
| | The result object has the ``return_value`` attribute with |
| | the value returned from the invoked command. |
| | |
| | .. versionchanged:: 4.0 |
| | Added the ``color`` parameter. |
| | |
| | .. versionchanged:: 3.0 |
| | Added the ``catch_exceptions`` parameter. |
| | |
| | .. versionchanged:: 3.0 |
| | The result object has the ``exc_info`` attribute with the |
| | traceback if available. |
| | """ |
| | exc_info = None |
| | if catch_exceptions is None: |
| | catch_exceptions = self.catch_exceptions |
| |
|
| | with self.isolation(input=input, env=env, color=color) as outstreams: |
| | return_value = None |
| | exception: BaseException | None = None |
| | exit_code = 0 |
| |
|
| | if isinstance(args, str): |
| | args = shlex.split(args) |
| |
|
| | try: |
| | prog_name = extra.pop("prog_name") |
| | except KeyError: |
| | prog_name = self.get_default_prog_name(cli) |
| |
|
| | try: |
| | return_value = cli.main(args=args or (), prog_name=prog_name, **extra) |
| | except SystemExit as e: |
| | exc_info = sys.exc_info() |
| | e_code = t.cast("int | t.Any | None", e.code) |
| |
|
| | if e_code is None: |
| | e_code = 0 |
| |
|
| | if e_code != 0: |
| | exception = e |
| |
|
| | if not isinstance(e_code, int): |
| | sys.stdout.write(str(e_code)) |
| | sys.stdout.write("\n") |
| | e_code = 1 |
| |
|
| | exit_code = e_code |
| |
|
| | except Exception as e: |
| | if not catch_exceptions: |
| | raise |
| | exception = e |
| | exit_code = 1 |
| | exc_info = sys.exc_info() |
| | finally: |
| | sys.stdout.flush() |
| | sys.stderr.flush() |
| | stdout = outstreams[0].getvalue() |
| | stderr = outstreams[1].getvalue() |
| | output = outstreams[2].getvalue() |
| |
|
| | return Result( |
| | runner=self, |
| | stdout_bytes=stdout, |
| | stderr_bytes=stderr, |
| | output_bytes=output, |
| | return_value=return_value, |
| | exit_code=exit_code, |
| | exception=exception, |
| | exc_info=exc_info, |
| | ) |
| |
|
| | @contextlib.contextmanager |
| | def isolated_filesystem( |
| | self, temp_dir: str | os.PathLike[str] | None = None |
| | ) -> cabc.Iterator[str]: |
| | """A context manager that creates a temporary directory and |
| | changes the current working directory to it. This isolates tests |
| | that affect the contents of the CWD to prevent them from |
| | interfering with each other. |
| | |
| | :param temp_dir: Create the temporary directory under this |
| | directory. If given, the created directory is not removed |
| | when exiting. |
| | |
| | .. versionchanged:: 8.0 |
| | Added the ``temp_dir`` parameter. |
| | """ |
| | cwd = os.getcwd() |
| | dt = tempfile.mkdtemp(dir=temp_dir) |
| | os.chdir(dt) |
| |
|
| | try: |
| | yield dt |
| | finally: |
| | os.chdir(cwd) |
| |
|
| | if temp_dir is None: |
| | import shutil |
| |
|
| | try: |
| | shutil.rmtree(dt) |
| | except OSError: |
| | pass |
| |
|