| from __future__ import annotations
|
|
|
| import collections.abc as cabc
|
| import contextlib
|
| import io
|
| import os
|
| import shlex
|
| import sys
|
| import tempfile
|
| import typing as t
|
| from types import TracebackType
|
|
|
| from . import _compat
|
| from . import formatting
|
| from . import termui
|
| from . import utils
|
| from ._compat import _find_binary_reader
|
|
|
| if t.TYPE_CHECKING:
|
| from _typeshed import ReadableBuffer
|
|
|
| from .core import Command
|
|
|
|
|
| class EchoingStdin:
|
| def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
|
| self._input = input
|
| self._output = output
|
| self._paused = False
|
|
|
| def __getattr__(self, x: str) -> t.Any:
|
| return getattr(self._input, x)
|
|
|
| def _echo(self, rv: bytes) -> bytes:
|
| if not self._paused:
|
| self._output.write(rv)
|
|
|
| return rv
|
|
|
| def read(self, n: int = -1) -> bytes:
|
| return self._echo(self._input.read(n))
|
|
|
| def read1(self, n: int = -1) -> bytes:
|
| return self._echo(self._input.read1(n))
|
|
|
| def readline(self, n: int = -1) -> bytes:
|
| return self._echo(self._input.readline(n))
|
|
|
| def readlines(self) -> list[bytes]:
|
| return [self._echo(x) for x in self._input.readlines()]
|
|
|
| def __iter__(self) -> cabc.Iterator[bytes]:
|
| return iter(self._echo(x) for x in self._input)
|
|
|
| def __repr__(self) -> str:
|
| return repr(self._input)
|
|
|
|
|
| @contextlib.contextmanager
|
| def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]:
|
| if stream is None:
|
| yield
|
| else:
|
| stream._paused = True
|
| yield
|
| stream._paused = False
|
|
|
|
|
| class BytesIOCopy(io.BytesIO):
|
| """Patch ``io.BytesIO`` to let the written stream be copied to another.
|
|
|
| .. versionadded:: 8.2
|
| """
|
|
|
| def __init__(self, copy_to: io.BytesIO) -> None:
|
| super().__init__()
|
| self.copy_to = copy_to
|
|
|
| def flush(self) -> None:
|
| super().flush()
|
| self.copy_to.flush()
|
|
|
| def write(self, b: ReadableBuffer) -> int:
|
| self.copy_to.write(b)
|
| return super().write(b)
|
|
|
|
|
| class StreamMixer:
|
| """Mixes `<stdout>` and `<stderr>` streams.
|
|
|
| The result is available in the ``output`` attribute.
|
|
|
| .. versionadded:: 8.2
|
| """
|
|
|
| def __init__(self) -> None:
|
| self.output: io.BytesIO = io.BytesIO()
|
| self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output)
|
| self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output)
|
|
|
| def __del__(self) -> None:
|
| """
|
| Guarantee that embedded file-like objects are closed in a
|
| predictable order, protecting against races between
|
| self.output being closed and other streams being flushed on close
|
|
|
| .. versionadded:: 8.2.2
|
| """
|
| self.stderr.close()
|
| self.stdout.close()
|
| self.output.close()
|
|
|
|
|
| class _NamedTextIOWrapper(io.TextIOWrapper):
|
| def __init__(
|
| self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
|
| ) -> None:
|
| super().__init__(buffer, **kwargs)
|
| self._name = name
|
| self._mode = mode
|
|
|
| @property
|
| def name(self) -> str:
|
| return self._name
|
|
|
| @property
|
| def mode(self) -> str:
|
| return self._mode
|
|
|
|
|
| def make_input_stream(
|
| input: str | bytes | t.IO[t.Any] | None, charset: str
|
| ) -> t.BinaryIO:
|
|
|
| if hasattr(input, "read"):
|
| rv = _find_binary_reader(t.cast("t.IO[t.Any]", input))
|
|
|
| if rv is not None:
|
| return rv
|
|
|
| raise TypeError("Could not find binary reader for input stream.")
|
|
|
| if input is None:
|
| input = b""
|
| elif isinstance(input, str):
|
| input = input.encode(charset)
|
|
|
| return io.BytesIO(input)
|
|
|
|
|
| class Result:
|
| """Holds the captured result of an invoked CLI script.
|
|
|
| :param runner: The runner that created the result
|
| :param stdout_bytes: The standard output as bytes.
|
| :param stderr_bytes: The standard error as bytes.
|
| :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the
|
| user would see it in its terminal.
|
| :param return_value: The value returned from the invoked command.
|
| :param exit_code: The exit code as integer.
|
| :param exception: The exception that happened if one did.
|
| :param exc_info: Exception information (exception type, exception instance,
|
| traceback type).
|
|
|
| .. versionchanged:: 8.2
|
| ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and
|
| ``mix_stderr`` has been removed.
|
|
|
| .. versionadded:: 8.0
|
| Added ``return_value``.
|
| """
|
|
|
| def __init__(
|
| self,
|
| runner: CliRunner,
|
| stdout_bytes: bytes,
|
| stderr_bytes: bytes,
|
| output_bytes: bytes,
|
| return_value: t.Any,
|
| exit_code: int,
|
| exception: BaseException | None,
|
| exc_info: tuple[type[BaseException], BaseException, TracebackType]
|
| | None = None,
|
| ):
|
| self.runner = runner
|
| self.stdout_bytes = stdout_bytes
|
| self.stderr_bytes = stderr_bytes
|
| self.output_bytes = output_bytes
|
| self.return_value = return_value
|
| self.exit_code = exit_code
|
| self.exception = exception
|
| self.exc_info = exc_info
|
|
|
| @property
|
| def output(self) -> str:
|
| """The terminal output as unicode string, as the user would see it.
|
|
|
| .. versionchanged:: 8.2
|
| No longer a proxy for ``self.stdout``. Now has its own independent stream
|
| that is mixing `<stdout>` and `<stderr>`, in the order they were written.
|
| """
|
| return self.output_bytes.decode(self.runner.charset, "replace").replace(
|
| "\r\n", "\n"
|
| )
|
|
|
| @property
|
| def stdout(self) -> str:
|
| """The standard output as unicode string."""
|
| return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
|
| "\r\n", "\n"
|
| )
|
|
|
| @property
|
| def stderr(self) -> str:
|
| """The standard error as unicode string.
|
|
|
| .. versionchanged:: 8.2
|
| No longer raise an exception, always returns the `<stderr>` string.
|
| """
|
| return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
|
| "\r\n", "\n"
|
| )
|
|
|
| def __repr__(self) -> str:
|
| exc_str = repr(self.exception) if self.exception else "okay"
|
| return f"<{type(self).__name__} {exc_str}>"
|
|
|
|
|
| class CliRunner:
|
| """The CLI runner provides functionality to invoke a Click command line
|
| script for unittesting purposes in a isolated environment. This only
|
| works in single-threaded systems without any concurrency as it changes the
|
| global interpreter state.
|
|
|
| :param charset: the character set for the input and output data.
|
| :param env: a dictionary with environment variables for overriding.
|
| :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes
|
| to `<stdout>`. This is useful for showing examples in
|
| some circumstances. Note that regular prompts
|
| will automatically echo the input.
|
| :param catch_exceptions: Whether to catch any exceptions other than
|
| ``SystemExit`` when running :meth:`~CliRunner.invoke`.
|
|
|
| .. versionchanged:: 8.2
|
| Added the ``catch_exceptions`` parameter.
|
|
|
| .. versionchanged:: 8.2
|
| ``mix_stderr`` parameter has been removed.
|
| """
|
|
|
| def __init__(
|
| self,
|
| charset: str = "utf-8",
|
| env: cabc.Mapping[str, str | None] | None = None,
|
| echo_stdin: bool = False,
|
| catch_exceptions: bool = True,
|
| ) -> None:
|
| self.charset = charset
|
| self.env: cabc.Mapping[str, str | None] = env or {}
|
| self.echo_stdin = echo_stdin
|
| self.catch_exceptions = catch_exceptions
|
|
|
| def get_default_prog_name(self, cli: Command) -> str:
|
| """Given a command object it will return the default program name
|
| for it. The default is the `name` attribute or ``"root"`` if not
|
| set.
|
| """
|
| return cli.name or "root"
|
|
|
| def make_env(
|
| self, overrides: cabc.Mapping[str, str | None] | None = None
|
| ) -> cabc.Mapping[str, str | None]:
|
| """Returns the environment overrides for invoking a script."""
|
| rv = dict(self.env)
|
| if overrides:
|
| rv.update(overrides)
|
| return rv
|
|
|
| @contextlib.contextmanager
|
| def isolation(
|
| self,
|
| input: str | bytes | t.IO[t.Any] | None = None,
|
| env: cabc.Mapping[str, str | None] | None = None,
|
| color: bool = False,
|
| ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]:
|
| """A context manager that sets up the isolation for invoking of a
|
| command line tool. This sets up `<stdin>` with the given input data
|
| and `os.environ` with the overrides from the given dictionary.
|
| This also rebinds some internals in Click to be mocked (like the
|
| prompt functionality).
|
|
|
| This is automatically done in the :meth:`invoke` method.
|
|
|
| :param input: the input stream to put into `sys.stdin`.
|
| :param env: the environment overrides as dictionary.
|
| :param color: whether the output should contain color codes. The
|
| application can still override this explicitly.
|
|
|
| .. versionadded:: 8.2
|
| An additional output stream is returned, which is a mix of
|
| `<stdout>` and `<stderr>` streams.
|
|
|
| .. versionchanged:: 8.2
|
| Always returns the `<stderr>` stream.
|
|
|
| .. versionchanged:: 8.0
|
| `<stderr>` is opened with ``errors="backslashreplace"``
|
| instead of the default ``"strict"``.
|
|
|
| .. versionchanged:: 4.0
|
| Added the ``color`` parameter.
|
| """
|
| bytes_input = make_input_stream(input, self.charset)
|
| echo_input = None
|
|
|
| old_stdin = sys.stdin
|
| old_stdout = sys.stdout
|
| old_stderr = sys.stderr
|
| old_forced_width = formatting.FORCED_WIDTH
|
| formatting.FORCED_WIDTH = 80
|
|
|
| env = self.make_env(env)
|
|
|
| stream_mixer = StreamMixer()
|
|
|
| if self.echo_stdin:
|
| bytes_input = echo_input = t.cast(
|
| t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout)
|
| )
|
|
|
| sys.stdin = text_input = _NamedTextIOWrapper(
|
| bytes_input, encoding=self.charset, name="<stdin>", mode="r"
|
| )
|
|
|
| if self.echo_stdin:
|
|
|
|
|
| text_input._CHUNK_SIZE = 1
|
|
|
| sys.stdout = _NamedTextIOWrapper(
|
| stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w"
|
| )
|
|
|
| sys.stderr = _NamedTextIOWrapper(
|
| stream_mixer.stderr,
|
| encoding=self.charset,
|
| name="<stderr>",
|
| mode="w",
|
| errors="backslashreplace",
|
| )
|
|
|
| @_pause_echo(echo_input)
|
| def visible_input(prompt: str | None = None) -> str:
|
| sys.stdout.write(prompt or "")
|
| try:
|
| val = next(text_input).rstrip("\r\n")
|
| except StopIteration as e:
|
| raise EOFError() from e
|
| sys.stdout.write(f"{val}\n")
|
| sys.stdout.flush()
|
| return val
|
|
|
| @_pause_echo(echo_input)
|
| def hidden_input(prompt: str | None = None) -> str:
|
| sys.stdout.write(f"{prompt or ''}\n")
|
| sys.stdout.flush()
|
| try:
|
| return next(text_input).rstrip("\r\n")
|
| except StopIteration as e:
|
| raise EOFError() from e
|
|
|
| @_pause_echo(echo_input)
|
| def _getchar(echo: bool) -> str:
|
| char = sys.stdin.read(1)
|
|
|
| if echo:
|
| sys.stdout.write(char)
|
|
|
| sys.stdout.flush()
|
| return char
|
|
|
| default_color = color
|
|
|
| def should_strip_ansi(
|
| stream: t.IO[t.Any] | None = None, color: bool | None = None
|
| ) -> bool:
|
| if color is None:
|
| return not default_color
|
| return not color
|
|
|
| old_visible_prompt_func = termui.visible_prompt_func
|
| old_hidden_prompt_func = termui.hidden_prompt_func
|
| old__getchar_func = termui._getchar
|
| old_should_strip_ansi = utils.should_strip_ansi
|
| old__compat_should_strip_ansi = _compat.should_strip_ansi
|
| termui.visible_prompt_func = visible_input
|
| termui.hidden_prompt_func = hidden_input
|
| termui._getchar = _getchar
|
| utils.should_strip_ansi = should_strip_ansi
|
| _compat.should_strip_ansi = should_strip_ansi
|
|
|
| old_env = {}
|
| try:
|
| for key, value in env.items():
|
| old_env[key] = os.environ.get(key)
|
| if value is None:
|
| try:
|
| del os.environ[key]
|
| except Exception:
|
| pass
|
| else:
|
| os.environ[key] = value
|
| yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output)
|
| finally:
|
| for key, value in old_env.items():
|
| if value is None:
|
| try:
|
| del os.environ[key]
|
| except Exception:
|
| pass
|
| else:
|
| os.environ[key] = value
|
| sys.stdout = old_stdout
|
| sys.stderr = old_stderr
|
| sys.stdin = old_stdin
|
| termui.visible_prompt_func = old_visible_prompt_func
|
| termui.hidden_prompt_func = old_hidden_prompt_func
|
| termui._getchar = old__getchar_func
|
| utils.should_strip_ansi = old_should_strip_ansi
|
| _compat.should_strip_ansi = old__compat_should_strip_ansi
|
| formatting.FORCED_WIDTH = old_forced_width
|
|
|
| def invoke(
|
| self,
|
| cli: Command,
|
| args: str | cabc.Sequence[str] | None = None,
|
| input: str | bytes | t.IO[t.Any] | None = None,
|
| env: cabc.Mapping[str, str | None] | None = None,
|
| catch_exceptions: bool | None = None,
|
| color: bool = False,
|
| **extra: t.Any,
|
| ) -> Result:
|
| """Invokes a command in an isolated environment. The arguments are
|
| forwarded directly to the command line script, the `extra` keyword
|
| arguments are passed to the :meth:`~clickpkg.Command.main` function of
|
| the command.
|
|
|
| This returns a :class:`Result` object.
|
|
|
| :param cli: the command to invoke
|
| :param args: the arguments to invoke. It may be given as an iterable
|
| or a string. When given as string it will be interpreted
|
| as a Unix shell command. More details at
|
| :func:`shlex.split`.
|
| :param input: the input data for `sys.stdin`.
|
| :param env: the environment overrides.
|
| :param catch_exceptions: Whether to catch any other exceptions than
|
| ``SystemExit``. If :data:`None`, the value
|
| from :class:`CliRunner` is used.
|
| :param extra: the keyword arguments to pass to :meth:`main`.
|
| :param color: whether the output should contain color codes. The
|
| application can still override this explicitly.
|
|
|
| .. versionadded:: 8.2
|
| The result object has the ``output_bytes`` attribute with
|
| the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
|
| see it in its terminal.
|
|
|
| .. versionchanged:: 8.2
|
| The result object always returns the ``stderr_bytes`` stream.
|
|
|
| .. versionchanged:: 8.0
|
| The result object has the ``return_value`` attribute with
|
| the value returned from the invoked command.
|
|
|
| .. versionchanged:: 4.0
|
| Added the ``color`` parameter.
|
|
|
| .. versionchanged:: 3.0
|
| Added the ``catch_exceptions`` parameter.
|
|
|
| .. versionchanged:: 3.0
|
| The result object has the ``exc_info`` attribute with the
|
| traceback if available.
|
| """
|
| exc_info = None
|
| if catch_exceptions is None:
|
| catch_exceptions = self.catch_exceptions
|
|
|
| with self.isolation(input=input, env=env, color=color) as outstreams:
|
| return_value = None
|
| exception: BaseException | None = None
|
| exit_code = 0
|
|
|
| if isinstance(args, str):
|
| args = shlex.split(args)
|
|
|
| try:
|
| prog_name = extra.pop("prog_name")
|
| except KeyError:
|
| prog_name = self.get_default_prog_name(cli)
|
|
|
| try:
|
| return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
|
| except SystemExit as e:
|
| exc_info = sys.exc_info()
|
| e_code = t.cast("int | t.Any | None", e.code)
|
|
|
| if e_code is None:
|
| e_code = 0
|
|
|
| if e_code != 0:
|
| exception = e
|
|
|
| if not isinstance(e_code, int):
|
| sys.stdout.write(str(e_code))
|
| sys.stdout.write("\n")
|
| e_code = 1
|
|
|
| exit_code = e_code
|
|
|
| except Exception as e:
|
| if not catch_exceptions:
|
| raise
|
| exception = e
|
| exit_code = 1
|
| exc_info = sys.exc_info()
|
| finally:
|
| sys.stdout.flush()
|
| sys.stderr.flush()
|
| stdout = outstreams[0].getvalue()
|
| stderr = outstreams[1].getvalue()
|
| output = outstreams[2].getvalue()
|
|
|
| return Result(
|
| runner=self,
|
| stdout_bytes=stdout,
|
| stderr_bytes=stderr,
|
| output_bytes=output,
|
| return_value=return_value,
|
| exit_code=exit_code,
|
| exception=exception,
|
| exc_info=exc_info,
|
| )
|
|
|
| @contextlib.contextmanager
|
| def isolated_filesystem(
|
| self, temp_dir: str | os.PathLike[str] | None = None
|
| ) -> cabc.Iterator[str]:
|
| """A context manager that creates a temporary directory and
|
| changes the current working directory to it. This isolates tests
|
| that affect the contents of the CWD to prevent them from
|
| interfering with each other.
|
|
|
| :param temp_dir: Create the temporary directory under this
|
| directory. If given, the created directory is not removed
|
| when exiting.
|
|
|
| .. versionchanged:: 8.0
|
| Added the ``temp_dir`` parameter.
|
| """
|
| cwd = os.getcwd()
|
| dt = tempfile.mkdtemp(dir=temp_dir)
|
| os.chdir(dt)
|
|
|
| try:
|
| yield dt
|
| finally:
|
| os.chdir(cwd)
|
|
|
| if temp_dir is None:
|
| import shutil
|
|
|
| try:
|
| shutil.rmtree(dt)
|
| except OSError:
|
| pass
|
|
|