| | |
| | """Per-test stdout/stderr capturing mechanism.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import abc |
| | import collections |
| | import contextlib |
| | import io |
| | from io import UnsupportedOperation |
| | import os |
| | import sys |
| | from tempfile import TemporaryFile |
| | from types import TracebackType |
| | from typing import Any |
| | from typing import AnyStr |
| | from typing import BinaryIO |
| | from typing import Final |
| | from typing import final |
| | from typing import Generator |
| | from typing import Generic |
| | from typing import Iterable |
| | from typing import Iterator |
| | from typing import Literal |
| | from typing import NamedTuple |
| | from typing import TextIO |
| | from typing import TYPE_CHECKING |
| |
|
| |
|
| | if TYPE_CHECKING: |
| | from typing_extensions import Self |
| |
|
| | from _pytest.config import Config |
| | from _pytest.config import hookimpl |
| | from _pytest.config.argparsing import Parser |
| | from _pytest.deprecated import check_ispytest |
| | from _pytest.fixtures import fixture |
| | from _pytest.fixtures import SubRequest |
| | from _pytest.nodes import Collector |
| | from _pytest.nodes import File |
| | from _pytest.nodes import Item |
| | from _pytest.reports import CollectReport |
| |
|
| |
|
| | _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] |
| |
|
| |
|
| | def pytest_addoption(parser: Parser) -> None: |
| | group = parser.getgroup("general") |
| | group._addoption( |
| | "--capture", |
| | action="store", |
| | default="fd", |
| | metavar="method", |
| | choices=["fd", "sys", "no", "tee-sys"], |
| | help="Per-test capturing method: one of fd|sys|no|tee-sys", |
| | ) |
| | group._addoption( |
| | "-s", |
| | action="store_const", |
| | const="no", |
| | dest="capture", |
| | help="Shortcut for --capture=no", |
| | ) |
| |
|
| |
|
| | def _colorama_workaround() -> None: |
| | """Ensure colorama is imported so that it attaches to the correct stdio |
| | handles on Windows. |
| | |
| | colorama uses the terminal on import time. So if something does the |
| | first import of colorama while I/O capture is active, colorama will |
| | fail in various ways. |
| | """ |
| | if sys.platform.startswith("win32"): |
| | try: |
| | import colorama |
| | except ImportError: |
| | pass |
| |
|
| |
|
| | def _windowsconsoleio_workaround(stream: TextIO) -> None: |
| | """Workaround for Windows Unicode console handling. |
| | |
| | Python 3.6 implemented Unicode console handling for Windows. This works |
| | by reading/writing to the raw console handle using |
| | ``{Read,Write}ConsoleW``. |
| | |
| | The problem is that we are going to ``dup2`` over the stdio file |
| | descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the |
| | handles used by Python to write to the console. Though there is still some |
| | weirdness and the console handle seems to only be closed randomly and not |
| | on the first call to ``CloseHandle``, or maybe it gets reopened with the |
| | same handle value when we suspend capturing. |
| | |
| | The workaround in this case will reopen stdio with a different fd which |
| | also means a different handle by replicating the logic in |
| | "Py_lifecycle.c:initstdio/create_stdio". |
| | |
| | :param stream: |
| | In practice ``sys.stdout`` or ``sys.stderr``, but given |
| | here as parameter for unittesting purposes. |
| | |
| | See https://github.com/pytest-dev/py/issues/103. |
| | """ |
| | if not sys.platform.startswith("win32") or hasattr(sys, "pypy_version_info"): |
| | return |
| |
|
| | |
| | if not hasattr(stream, "buffer"): |
| | return |
| |
|
| | raw_stdout = stream.buffer.raw if hasattr(stream.buffer, "raw") else stream.buffer |
| |
|
| | if not isinstance(raw_stdout, io._WindowsConsoleIO): |
| | return |
| |
|
| | def _reopen_stdio(f, mode): |
| | if not hasattr(stream.buffer, "raw") and mode[0] == "w": |
| | buffering = 0 |
| | else: |
| | buffering = -1 |
| |
|
| | return io.TextIOWrapper( |
| | open(os.dup(f.fileno()), mode, buffering), |
| | f.encoding, |
| | f.errors, |
| | f.newlines, |
| | f.line_buffering, |
| | ) |
| |
|
| | sys.stdin = _reopen_stdio(sys.stdin, "rb") |
| | sys.stdout = _reopen_stdio(sys.stdout, "wb") |
| | sys.stderr = _reopen_stdio(sys.stderr, "wb") |
| |
|
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_load_initial_conftests(early_config: Config) -> Generator[None]: |
| | ns = early_config.known_args_namespace |
| | if ns.capture == "fd": |
| | _windowsconsoleio_workaround(sys.stdout) |
| | _colorama_workaround() |
| | pluginmanager = early_config.pluginmanager |
| | capman = CaptureManager(ns.capture) |
| | pluginmanager.register(capman, "capturemanager") |
| |
|
| | |
| | early_config.add_cleanup(capman.stop_global_capturing) |
| |
|
| | |
| | capman.start_global_capturing() |
| | try: |
| | try: |
| | yield |
| | finally: |
| | capman.suspend_global_capture() |
| | except BaseException: |
| | out, err = capman.read_global_capture() |
| | sys.stdout.write(out) |
| | sys.stderr.write(err) |
| | raise |
| |
|
| |
|
| | |
| |
|
| |
|
| | class EncodedFile(io.TextIOWrapper): |
| | __slots__ = () |
| |
|
| | @property |
| | def name(self) -> str: |
| | |
| | |
| | return repr(self.buffer) |
| |
|
| | @property |
| | def mode(self) -> str: |
| | |
| | |
| | return self.buffer.mode.replace("b", "") |
| |
|
| |
|
| | class CaptureIO(io.TextIOWrapper): |
| | def __init__(self) -> None: |
| | super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) |
| |
|
| | def getvalue(self) -> str: |
| | assert isinstance(self.buffer, io.BytesIO) |
| | return self.buffer.getvalue().decode("UTF-8") |
| |
|
| |
|
| | class TeeCaptureIO(CaptureIO): |
| | def __init__(self, other: TextIO) -> None: |
| | self._other = other |
| | super().__init__() |
| |
|
| | def write(self, s: str) -> int: |
| | super().write(s) |
| | return self._other.write(s) |
| |
|
| |
|
| | class DontReadFromInput(TextIO): |
| | @property |
| | def encoding(self) -> str: |
| | assert sys.__stdin__ is not None |
| | return sys.__stdin__.encoding |
| |
|
| | def read(self, size: int = -1) -> str: |
| | raise OSError( |
| | "pytest: reading from stdin while output is captured! Consider using `-s`." |
| | ) |
| |
|
| | readline = read |
| |
|
| | def __next__(self) -> str: |
| | return self.readline() |
| |
|
| | def readlines(self, hint: int | None = -1) -> list[str]: |
| | raise OSError( |
| | "pytest: reading from stdin while output is captured! Consider using `-s`." |
| | ) |
| |
|
| | def __iter__(self) -> Iterator[str]: |
| | return self |
| |
|
| | def fileno(self) -> int: |
| | raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") |
| |
|
| | def flush(self) -> None: |
| | raise UnsupportedOperation("redirected stdin is pseudofile, has no flush()") |
| |
|
| | def isatty(self) -> bool: |
| | return False |
| |
|
| | def close(self) -> None: |
| | pass |
| |
|
| | def readable(self) -> bool: |
| | return False |
| |
|
| | def seek(self, offset: int, whence: int = 0) -> int: |
| | raise UnsupportedOperation("redirected stdin is pseudofile, has no seek(int)") |
| |
|
| | def seekable(self) -> bool: |
| | return False |
| |
|
| | def tell(self) -> int: |
| | raise UnsupportedOperation("redirected stdin is pseudofile, has no tell()") |
| |
|
| | def truncate(self, size: int | None = None) -> int: |
| | raise UnsupportedOperation("cannot truncate stdin") |
| |
|
| | def write(self, data: str) -> int: |
| | raise UnsupportedOperation("cannot write to stdin") |
| |
|
| | def writelines(self, lines: Iterable[str]) -> None: |
| | raise UnsupportedOperation("Cannot write to stdin") |
| |
|
| | def writable(self) -> bool: |
| | return False |
| |
|
| | def __enter__(self) -> Self: |
| | return self |
| |
|
| | def __exit__( |
| | self, |
| | type: type[BaseException] | None, |
| | value: BaseException | None, |
| | traceback: TracebackType | None, |
| | ) -> None: |
| | pass |
| |
|
| | @property |
| | def buffer(self) -> BinaryIO: |
| | |
| | return self |
| |
|
| |
|
| | |
| |
|
| |
|
| | class CaptureBase(abc.ABC, Generic[AnyStr]): |
| | EMPTY_BUFFER: AnyStr |
| |
|
| | @abc.abstractmethod |
| | def __init__(self, fd: int) -> None: |
| | raise NotImplementedError() |
| |
|
| | @abc.abstractmethod |
| | def start(self) -> None: |
| | raise NotImplementedError() |
| |
|
| | @abc.abstractmethod |
| | def done(self) -> None: |
| | raise NotImplementedError() |
| |
|
| | @abc.abstractmethod |
| | def suspend(self) -> None: |
| | raise NotImplementedError() |
| |
|
| | @abc.abstractmethod |
| | def resume(self) -> None: |
| | raise NotImplementedError() |
| |
|
| | @abc.abstractmethod |
| | def writeorg(self, data: AnyStr) -> None: |
| | raise NotImplementedError() |
| |
|
| | @abc.abstractmethod |
| | def snap(self) -> AnyStr: |
| | raise NotImplementedError() |
| |
|
| |
|
| | patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} |
| |
|
| |
|
| | class NoCapture(CaptureBase[str]): |
| | EMPTY_BUFFER = "" |
| |
|
| | def __init__(self, fd: int) -> None: |
| | pass |
| |
|
| | def start(self) -> None: |
| | pass |
| |
|
| | def done(self) -> None: |
| | pass |
| |
|
| | def suspend(self) -> None: |
| | pass |
| |
|
| | def resume(self) -> None: |
| | pass |
| |
|
| | def snap(self) -> str: |
| | return "" |
| |
|
| | def writeorg(self, data: str) -> None: |
| | pass |
| |
|
| |
|
| | class SysCaptureBase(CaptureBase[AnyStr]): |
| | def __init__( |
| | self, fd: int, tmpfile: TextIO | None = None, *, tee: bool = False |
| | ) -> None: |
| | name = patchsysdict[fd] |
| | self._old: TextIO = getattr(sys, name) |
| | self.name = name |
| | if tmpfile is None: |
| | if name == "stdin": |
| | tmpfile = DontReadFromInput() |
| | else: |
| | tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) |
| | self.tmpfile = tmpfile |
| | self._state = "initialized" |
| |
|
| | def repr(self, class_name: str) -> str: |
| | return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( |
| | class_name, |
| | self.name, |
| | hasattr(self, "_old") and repr(self._old) or "<UNSET>", |
| | self._state, |
| | self.tmpfile, |
| | ) |
| |
|
| | def __repr__(self) -> str: |
| | return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( |
| | self.__class__.__name__, |
| | self.name, |
| | hasattr(self, "_old") and repr(self._old) or "<UNSET>", |
| | self._state, |
| | self.tmpfile, |
| | ) |
| |
|
| | def _assert_state(self, op: str, states: tuple[str, ...]) -> None: |
| | assert ( |
| | self._state in states |
| | ), "cannot {} in state {!r}: expected one of {}".format( |
| | op, self._state, ", ".join(states) |
| | ) |
| |
|
| | def start(self) -> None: |
| | self._assert_state("start", ("initialized",)) |
| | setattr(sys, self.name, self.tmpfile) |
| | self._state = "started" |
| |
|
| | def done(self) -> None: |
| | self._assert_state("done", ("initialized", "started", "suspended", "done")) |
| | if self._state == "done": |
| | return |
| | setattr(sys, self.name, self._old) |
| | del self._old |
| | self.tmpfile.close() |
| | self._state = "done" |
| |
|
| | def suspend(self) -> None: |
| | self._assert_state("suspend", ("started", "suspended")) |
| | setattr(sys, self.name, self._old) |
| | self._state = "suspended" |
| |
|
| | def resume(self) -> None: |
| | self._assert_state("resume", ("started", "suspended")) |
| | if self._state == "started": |
| | return |
| | setattr(sys, self.name, self.tmpfile) |
| | self._state = "started" |
| |
|
| |
|
| | class SysCaptureBinary(SysCaptureBase[bytes]): |
| | EMPTY_BUFFER = b"" |
| |
|
| | def snap(self) -> bytes: |
| | self._assert_state("snap", ("started", "suspended")) |
| | self.tmpfile.seek(0) |
| | res = self.tmpfile.buffer.read() |
| | self.tmpfile.seek(0) |
| | self.tmpfile.truncate() |
| | return res |
| |
|
| | def writeorg(self, data: bytes) -> None: |
| | self._assert_state("writeorg", ("started", "suspended")) |
| | self._old.flush() |
| | self._old.buffer.write(data) |
| | self._old.buffer.flush() |
| |
|
| |
|
| | class SysCapture(SysCaptureBase[str]): |
| | EMPTY_BUFFER = "" |
| |
|
| | def snap(self) -> str: |
| | self._assert_state("snap", ("started", "suspended")) |
| | assert isinstance(self.tmpfile, CaptureIO) |
| | res = self.tmpfile.getvalue() |
| | self.tmpfile.seek(0) |
| | self.tmpfile.truncate() |
| | return res |
| |
|
| | def writeorg(self, data: str) -> None: |
| | self._assert_state("writeorg", ("started", "suspended")) |
| | self._old.write(data) |
| | self._old.flush() |
| |
|
| |
|
| | class FDCaptureBase(CaptureBase[AnyStr]): |
| | def __init__(self, targetfd: int) -> None: |
| | self.targetfd = targetfd |
| |
|
| | try: |
| | os.fstat(targetfd) |
| | except OSError: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.targetfd_invalid: int | None = os.open(os.devnull, os.O_RDWR) |
| | os.dup2(self.targetfd_invalid, targetfd) |
| | else: |
| | self.targetfd_invalid = None |
| | self.targetfd_save = os.dup(targetfd) |
| |
|
| | if targetfd == 0: |
| | self.tmpfile = open(os.devnull, encoding="utf-8") |
| | self.syscapture: CaptureBase[str] = SysCapture(targetfd) |
| | else: |
| | self.tmpfile = EncodedFile( |
| | TemporaryFile(buffering=0), |
| | encoding="utf-8", |
| | errors="replace", |
| | newline="", |
| | write_through=True, |
| | ) |
| | if targetfd in patchsysdict: |
| | self.syscapture = SysCapture(targetfd, self.tmpfile) |
| | else: |
| | self.syscapture = NoCapture(targetfd) |
| |
|
| | self._state = "initialized" |
| |
|
| | def __repr__(self) -> str: |
| | return ( |
| | f"<{self.__class__.__name__} {self.targetfd} oldfd={self.targetfd_save} " |
| | f"_state={self._state!r} tmpfile={self.tmpfile!r}>" |
| | ) |
| |
|
| | def _assert_state(self, op: str, states: tuple[str, ...]) -> None: |
| | assert ( |
| | self._state in states |
| | ), "cannot {} in state {!r}: expected one of {}".format( |
| | op, self._state, ", ".join(states) |
| | ) |
| |
|
| | def start(self) -> None: |
| | """Start capturing on targetfd using memorized tmpfile.""" |
| | self._assert_state("start", ("initialized",)) |
| | os.dup2(self.tmpfile.fileno(), self.targetfd) |
| | self.syscapture.start() |
| | self._state = "started" |
| |
|
| | def done(self) -> None: |
| | """Stop capturing, restore streams, return original capture file, |
| | seeked to position zero.""" |
| | self._assert_state("done", ("initialized", "started", "suspended", "done")) |
| | if self._state == "done": |
| | return |
| | os.dup2(self.targetfd_save, self.targetfd) |
| | os.close(self.targetfd_save) |
| | if self.targetfd_invalid is not None: |
| | if self.targetfd_invalid != self.targetfd: |
| | os.close(self.targetfd) |
| | os.close(self.targetfd_invalid) |
| | self.syscapture.done() |
| | self.tmpfile.close() |
| | self._state = "done" |
| |
|
| | def suspend(self) -> None: |
| | self._assert_state("suspend", ("started", "suspended")) |
| | if self._state == "suspended": |
| | return |
| | self.syscapture.suspend() |
| | os.dup2(self.targetfd_save, self.targetfd) |
| | self._state = "suspended" |
| |
|
| | def resume(self) -> None: |
| | self._assert_state("resume", ("started", "suspended")) |
| | if self._state == "started": |
| | return |
| | self.syscapture.resume() |
| | os.dup2(self.tmpfile.fileno(), self.targetfd) |
| | self._state = "started" |
| |
|
| |
|
| | class FDCaptureBinary(FDCaptureBase[bytes]): |
| | """Capture IO to/from a given OS-level file descriptor. |
| | |
| | snap() produces `bytes`. |
| | """ |
| |
|
| | EMPTY_BUFFER = b"" |
| |
|
| | def snap(self) -> bytes: |
| | self._assert_state("snap", ("started", "suspended")) |
| | self.tmpfile.seek(0) |
| | res = self.tmpfile.buffer.read() |
| | self.tmpfile.seek(0) |
| | self.tmpfile.truncate() |
| | return res |
| |
|
| | def writeorg(self, data: bytes) -> None: |
| | """Write to original file descriptor.""" |
| | self._assert_state("writeorg", ("started", "suspended")) |
| | os.write(self.targetfd_save, data) |
| |
|
| |
|
| | class FDCapture(FDCaptureBase[str]): |
| | """Capture IO to/from a given OS-level file descriptor. |
| | |
| | snap() produces text. |
| | """ |
| |
|
| | EMPTY_BUFFER = "" |
| |
|
| | def snap(self) -> str: |
| | self._assert_state("snap", ("started", "suspended")) |
| | self.tmpfile.seek(0) |
| | res = self.tmpfile.read() |
| | self.tmpfile.seek(0) |
| | self.tmpfile.truncate() |
| | return res |
| |
|
| | def writeorg(self, data: str) -> None: |
| | """Write to original file descriptor.""" |
| | self._assert_state("writeorg", ("started", "suspended")) |
| | |
| | os.write(self.targetfd_save, data.encode("utf-8")) |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| | if sys.version_info >= (3, 11) or TYPE_CHECKING: |
| |
|
| | @final |
| | class CaptureResult(NamedTuple, Generic[AnyStr]): |
| | """The result of :method:`caplog.readouterr() <pytest.CaptureFixture.readouterr>`.""" |
| |
|
| | out: AnyStr |
| | err: AnyStr |
| |
|
| | else: |
| |
|
| | class CaptureResult( |
| | collections.namedtuple("CaptureResult", ["out", "err"]), |
| | Generic[AnyStr], |
| | ): |
| | """The result of :method:`caplog.readouterr() <pytest.CaptureFixture.readouterr>`.""" |
| |
|
| | __slots__ = () |
| |
|
| |
|
| | class MultiCapture(Generic[AnyStr]): |
| | _state = None |
| | _in_suspended = False |
| |
|
| | def __init__( |
| | self, |
| | in_: CaptureBase[AnyStr] | None, |
| | out: CaptureBase[AnyStr] | None, |
| | err: CaptureBase[AnyStr] | None, |
| | ) -> None: |
| | self.in_: CaptureBase[AnyStr] | None = in_ |
| | self.out: CaptureBase[AnyStr] | None = out |
| | self.err: CaptureBase[AnyStr] | None = err |
| |
|
| | def __repr__(self) -> str: |
| | return ( |
| | f"<MultiCapture out={self.out!r} err={self.err!r} in_={self.in_!r} " |
| | f"_state={self._state!r} _in_suspended={self._in_suspended!r}>" |
| | ) |
| |
|
| | def start_capturing(self) -> None: |
| | self._state = "started" |
| | if self.in_: |
| | self.in_.start() |
| | if self.out: |
| | self.out.start() |
| | if self.err: |
| | self.err.start() |
| |
|
| | def pop_outerr_to_orig(self) -> tuple[AnyStr, AnyStr]: |
| | """Pop current snapshot out/err capture and flush to orig streams.""" |
| | out, err = self.readouterr() |
| | if out: |
| | assert self.out is not None |
| | self.out.writeorg(out) |
| | if err: |
| | assert self.err is not None |
| | self.err.writeorg(err) |
| | return out, err |
| |
|
| | def suspend_capturing(self, in_: bool = False) -> None: |
| | self._state = "suspended" |
| | if self.out: |
| | self.out.suspend() |
| | if self.err: |
| | self.err.suspend() |
| | if in_ and self.in_: |
| | self.in_.suspend() |
| | self._in_suspended = True |
| |
|
| | def resume_capturing(self) -> None: |
| | self._state = "started" |
| | if self.out: |
| | self.out.resume() |
| | if self.err: |
| | self.err.resume() |
| | if self._in_suspended: |
| | assert self.in_ is not None |
| | self.in_.resume() |
| | self._in_suspended = False |
| |
|
| | def stop_capturing(self) -> None: |
| | """Stop capturing and reset capturing streams.""" |
| | if self._state == "stopped": |
| | raise ValueError("was already stopped") |
| | self._state = "stopped" |
| | if self.out: |
| | self.out.done() |
| | if self.err: |
| | self.err.done() |
| | if self.in_: |
| | self.in_.done() |
| |
|
| | def is_started(self) -> bool: |
| | """Whether actively capturing -- not suspended or stopped.""" |
| | return self._state == "started" |
| |
|
| | def readouterr(self) -> CaptureResult[AnyStr]: |
| | out = self.out.snap() if self.out else "" |
| | err = self.err.snap() if self.err else "" |
| | |
| | return CaptureResult(out, err) |
| |
|
| |
|
| | def _get_multicapture(method: _CaptureMethod) -> MultiCapture[str]: |
| | if method == "fd": |
| | return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) |
| | elif method == "sys": |
| | return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2)) |
| | elif method == "no": |
| | return MultiCapture(in_=None, out=None, err=None) |
| | elif method == "tee-sys": |
| | return MultiCapture( |
| | in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) |
| | ) |
| | raise ValueError(f"unknown capturing method: {method!r}") |
| |
|
| |
|
| | |
| |
|
| |
|
| | class CaptureManager: |
| | """The capture plugin. |
| | |
| | Manages that the appropriate capture method is enabled/disabled during |
| | collection and each test phase (setup, call, teardown). After each of |
| | those points, the captured output is obtained and attached to the |
| | collection/runtest report. |
| | |
| | There are two levels of capture: |
| | |
| | * global: enabled by default and can be suppressed by the ``-s`` |
| | option. This is always enabled/disabled during collection and each test |
| | phase. |
| | |
| | * fixture: when a test function or one of its fixture depend on the |
| | ``capsys`` or ``capfd`` fixtures. In this case special handling is |
| | needed to ensure the fixtures take precedence over the global capture. |
| | """ |
| |
|
| | def __init__(self, method: _CaptureMethod) -> None: |
| | self._method: Final = method |
| | self._global_capturing: MultiCapture[str] | None = None |
| | self._capture_fixture: CaptureFixture[Any] | None = None |
| |
|
| | def __repr__(self) -> str: |
| | return ( |
| | f"<CaptureManager _method={self._method!r} _global_capturing={self._global_capturing!r} " |
| | f"_capture_fixture={self._capture_fixture!r}>" |
| | ) |
| |
|
| | def is_capturing(self) -> str | bool: |
| | if self.is_globally_capturing(): |
| | return "global" |
| | if self._capture_fixture: |
| | return f"fixture {self._capture_fixture.request.fixturename}" |
| | return False |
| |
|
| | |
| |
|
| | def is_globally_capturing(self) -> bool: |
| | return self._method != "no" |
| |
|
| | def start_global_capturing(self) -> None: |
| | assert self._global_capturing is None |
| | self._global_capturing = _get_multicapture(self._method) |
| | self._global_capturing.start_capturing() |
| |
|
| | def stop_global_capturing(self) -> None: |
| | if self._global_capturing is not None: |
| | self._global_capturing.pop_outerr_to_orig() |
| | self._global_capturing.stop_capturing() |
| | self._global_capturing = None |
| |
|
| | def resume_global_capture(self) -> None: |
| | |
| | |
| | if self._global_capturing is not None: |
| | self._global_capturing.resume_capturing() |
| |
|
| | def suspend_global_capture(self, in_: bool = False) -> None: |
| | if self._global_capturing is not None: |
| | self._global_capturing.suspend_capturing(in_=in_) |
| |
|
| | def suspend(self, in_: bool = False) -> None: |
| | |
| | self.suspend_fixture() |
| | self.suspend_global_capture(in_) |
| |
|
| | def resume(self) -> None: |
| | self.resume_global_capture() |
| | self.resume_fixture() |
| |
|
| | def read_global_capture(self) -> CaptureResult[str]: |
| | assert self._global_capturing is not None |
| | return self._global_capturing.readouterr() |
| |
|
| | |
| |
|
| | def set_fixture(self, capture_fixture: CaptureFixture[Any]) -> None: |
| | if self._capture_fixture: |
| | current_fixture = self._capture_fixture.request.fixturename |
| | requested_fixture = capture_fixture.request.fixturename |
| | capture_fixture.request.raiseerror( |
| | f"cannot use {requested_fixture} and {current_fixture} at the same time" |
| | ) |
| | self._capture_fixture = capture_fixture |
| |
|
| | def unset_fixture(self) -> None: |
| | self._capture_fixture = None |
| |
|
| | def activate_fixture(self) -> None: |
| | """If the current item is using ``capsys`` or ``capfd``, activate |
| | them so they take precedence over the global capture.""" |
| | if self._capture_fixture: |
| | self._capture_fixture._start() |
| |
|
| | def deactivate_fixture(self) -> None: |
| | """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any.""" |
| | if self._capture_fixture: |
| | self._capture_fixture.close() |
| |
|
| | def suspend_fixture(self) -> None: |
| | if self._capture_fixture: |
| | self._capture_fixture._suspend() |
| |
|
| | def resume_fixture(self) -> None: |
| | if self._capture_fixture: |
| | self._capture_fixture._resume() |
| |
|
| | |
| |
|
| | @contextlib.contextmanager |
| | def global_and_fixture_disabled(self) -> Generator[None]: |
| | """Context manager to temporarily disable global and current fixture capturing.""" |
| | do_fixture = self._capture_fixture and self._capture_fixture._is_started() |
| | if do_fixture: |
| | self.suspend_fixture() |
| | do_global = self._global_capturing and self._global_capturing.is_started() |
| | if do_global: |
| | self.suspend_global_capture() |
| | try: |
| | yield |
| | finally: |
| | if do_global: |
| | self.resume_global_capture() |
| | if do_fixture: |
| | self.resume_fixture() |
| |
|
| | @contextlib.contextmanager |
| | def item_capture(self, when: str, item: Item) -> Generator[None]: |
| | self.resume_global_capture() |
| | self.activate_fixture() |
| | try: |
| | yield |
| | finally: |
| | self.deactivate_fixture() |
| | self.suspend_global_capture(in_=False) |
| |
|
| | out, err = self.read_global_capture() |
| | item.add_report_section(when, "stdout", out) |
| | item.add_report_section(when, "stderr", err) |
| |
|
| | |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_make_collect_report( |
| | self, collector: Collector |
| | ) -> Generator[None, CollectReport, CollectReport]: |
| | if isinstance(collector, File): |
| | self.resume_global_capture() |
| | try: |
| | rep = yield |
| | finally: |
| | self.suspend_global_capture() |
| | out, err = self.read_global_capture() |
| | if out: |
| | rep.sections.append(("Captured stdout", out)) |
| | if err: |
| | rep.sections.append(("Captured stderr", err)) |
| | else: |
| | rep = yield |
| | return rep |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtest_setup(self, item: Item) -> Generator[None]: |
| | with self.item_capture("setup", item): |
| | return (yield) |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtest_call(self, item: Item) -> Generator[None]: |
| | with self.item_capture("call", item): |
| | return (yield) |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtest_teardown(self, item: Item) -> Generator[None]: |
| | with self.item_capture("teardown", item): |
| | return (yield) |
| |
|
| | @hookimpl(tryfirst=True) |
| | def pytest_keyboard_interrupt(self) -> None: |
| | self.stop_global_capturing() |
| |
|
| | @hookimpl(tryfirst=True) |
| | def pytest_internalerror(self) -> None: |
| | self.stop_global_capturing() |
| |
|
| |
|
| | class CaptureFixture(Generic[AnyStr]): |
| | """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`, |
| | :fixture:`capfd` and :fixture:`capfdbinary` fixtures.""" |
| |
|
| | def __init__( |
| | self, |
| | captureclass: type[CaptureBase[AnyStr]], |
| | request: SubRequest, |
| | *, |
| | _ispytest: bool = False, |
| | ) -> None: |
| | check_ispytest(_ispytest) |
| | self.captureclass: type[CaptureBase[AnyStr]] = captureclass |
| | self.request = request |
| | self._capture: MultiCapture[AnyStr] | None = None |
| | self._captured_out: AnyStr = self.captureclass.EMPTY_BUFFER |
| | self._captured_err: AnyStr = self.captureclass.EMPTY_BUFFER |
| |
|
| | def _start(self) -> None: |
| | if self._capture is None: |
| | self._capture = MultiCapture( |
| | in_=None, |
| | out=self.captureclass(1), |
| | err=self.captureclass(2), |
| | ) |
| | self._capture.start_capturing() |
| |
|
| | def close(self) -> None: |
| | if self._capture is not None: |
| | out, err = self._capture.pop_outerr_to_orig() |
| | self._captured_out += out |
| | self._captured_err += err |
| | self._capture.stop_capturing() |
| | self._capture = None |
| |
|
| | def readouterr(self) -> CaptureResult[AnyStr]: |
| | """Read and return the captured output so far, resetting the internal |
| | buffer. |
| | |
| | :returns: |
| | The captured content as a namedtuple with ``out`` and ``err`` |
| | string attributes. |
| | """ |
| | captured_out, captured_err = self._captured_out, self._captured_err |
| | if self._capture is not None: |
| | out, err = self._capture.readouterr() |
| | captured_out += out |
| | captured_err += err |
| | self._captured_out = self.captureclass.EMPTY_BUFFER |
| | self._captured_err = self.captureclass.EMPTY_BUFFER |
| | return CaptureResult(captured_out, captured_err) |
| |
|
| | def _suspend(self) -> None: |
| | """Suspend this fixture's own capturing temporarily.""" |
| | if self._capture is not None: |
| | self._capture.suspend_capturing() |
| |
|
| | def _resume(self) -> None: |
| | """Resume this fixture's own capturing temporarily.""" |
| | if self._capture is not None: |
| | self._capture.resume_capturing() |
| |
|
| | def _is_started(self) -> bool: |
| | """Whether actively capturing -- not disabled or closed.""" |
| | if self._capture is not None: |
| | return self._capture.is_started() |
| | return False |
| |
|
| | @contextlib.contextmanager |
| | def disabled(self) -> Generator[None]: |
| | """Temporarily disable capturing while inside the ``with`` block.""" |
| | capmanager: CaptureManager = self.request.config.pluginmanager.getplugin( |
| | "capturemanager" |
| | ) |
| | with capmanager.global_and_fixture_disabled(): |
| | yield |
| |
|
| |
|
| | |
| |
|
| |
|
| | @fixture |
| | def capsys(request: SubRequest) -> Generator[CaptureFixture[str]]: |
| | r"""Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. |
| | |
| | The captured output is made available via ``capsys.readouterr()`` method |
| | calls, which return a ``(out, err)`` namedtuple. |
| | ``out`` and ``err`` will be ``text`` objects. |
| | |
| | Returns an instance of :class:`CaptureFixture[str] <pytest.CaptureFixture>`. |
| | |
| | Example: |
| | |
| | .. code-block:: python |
| | |
| | def test_output(capsys): |
| | print("hello") |
| | captured = capsys.readouterr() |
| | assert captured.out == "hello\n" |
| | """ |
| | capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") |
| | capture_fixture = CaptureFixture(SysCapture, request, _ispytest=True) |
| | capman.set_fixture(capture_fixture) |
| | capture_fixture._start() |
| | yield capture_fixture |
| | capture_fixture.close() |
| | capman.unset_fixture() |
| |
|
| |
|
| | @fixture |
| | def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes]]: |
| | r"""Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. |
| | |
| | The captured output is made available via ``capsysbinary.readouterr()`` |
| | method calls, which return a ``(out, err)`` namedtuple. |
| | ``out`` and ``err`` will be ``bytes`` objects. |
| | |
| | Returns an instance of :class:`CaptureFixture[bytes] <pytest.CaptureFixture>`. |
| | |
| | Example: |
| | |
| | .. code-block:: python |
| | |
| | def test_output(capsysbinary): |
| | print("hello") |
| | captured = capsysbinary.readouterr() |
| | assert captured.out == b"hello\n" |
| | """ |
| | capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") |
| | capture_fixture = CaptureFixture(SysCaptureBinary, request, _ispytest=True) |
| | capman.set_fixture(capture_fixture) |
| | capture_fixture._start() |
| | yield capture_fixture |
| | capture_fixture.close() |
| | capman.unset_fixture() |
| |
|
| |
|
| | @fixture |
| | def capfd(request: SubRequest) -> Generator[CaptureFixture[str]]: |
| | r"""Enable text capturing of writes to file descriptors ``1`` and ``2``. |
| | |
| | The captured output is made available via ``capfd.readouterr()`` method |
| | calls, which return a ``(out, err)`` namedtuple. |
| | ``out`` and ``err`` will be ``text`` objects. |
| | |
| | Returns an instance of :class:`CaptureFixture[str] <pytest.CaptureFixture>`. |
| | |
| | Example: |
| | |
| | .. code-block:: python |
| | |
| | def test_system_echo(capfd): |
| | os.system('echo "hello"') |
| | captured = capfd.readouterr() |
| | assert captured.out == "hello\n" |
| | """ |
| | capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") |
| | capture_fixture = CaptureFixture(FDCapture, request, _ispytest=True) |
| | capman.set_fixture(capture_fixture) |
| | capture_fixture._start() |
| | yield capture_fixture |
| | capture_fixture.close() |
| | capman.unset_fixture() |
| |
|
| |
|
| | @fixture |
| | def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes]]: |
| | r"""Enable bytes capturing of writes to file descriptors ``1`` and ``2``. |
| | |
| | The captured output is made available via ``capfd.readouterr()`` method |
| | calls, which return a ``(out, err)`` namedtuple. |
| | ``out`` and ``err`` will be ``byte`` objects. |
| | |
| | Returns an instance of :class:`CaptureFixture[bytes] <pytest.CaptureFixture>`. |
| | |
| | Example: |
| | |
| | .. code-block:: python |
| | |
| | def test_system_echo(capfdbinary): |
| | os.system('echo "hello"') |
| | captured = capfdbinary.readouterr() |
| | assert captured.out == b"hello\n" |
| | |
| | """ |
| | capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") |
| | capture_fixture = CaptureFixture(FDCaptureBinary, request, _ispytest=True) |
| | capman.set_fixture(capture_fixture) |
| | capture_fixture._start() |
| | yield capture_fixture |
| | capture_fixture.close() |
| | capman.unset_fixture() |
| |
|