| | |
| | """(Disabled by default) support for testing pytest and pytest plugins. |
| | |
| | PYTEST_DONT_REWRITE |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import collections.abc |
| | import contextlib |
| | from fnmatch import fnmatch |
| | import gc |
| | import importlib |
| | from io import StringIO |
| | import locale |
| | import os |
| | from pathlib import Path |
| | import platform |
| | import re |
| | import shutil |
| | import subprocess |
| | import sys |
| | import traceback |
| | from typing import Any |
| | from typing import Callable |
| | from typing import Final |
| | from typing import final |
| | from typing import Generator |
| | from typing import IO |
| | from typing import Iterable |
| | from typing import Literal |
| | from typing import overload |
| | from typing import Sequence |
| | from typing import TextIO |
| | from typing import TYPE_CHECKING |
| | from weakref import WeakKeyDictionary |
| |
|
| | from iniconfig import IniConfig |
| | from iniconfig import SectionWrapper |
| |
|
| | from _pytest import timing |
| | from _pytest._code import Source |
| | from _pytest.capture import _get_multicapture |
| | from _pytest.compat import NOTSET |
| | from _pytest.compat import NotSetType |
| | from _pytest.config import _PluggyPlugin |
| | from _pytest.config import Config |
| | from _pytest.config import ExitCode |
| | from _pytest.config import hookimpl |
| | from _pytest.config import main |
| | from _pytest.config import PytestPluginManager |
| | from _pytest.config.argparsing import Parser |
| | from _pytest.deprecated import check_ispytest |
| | from _pytest.fixtures import fixture |
| | from _pytest.fixtures import FixtureRequest |
| | from _pytest.main import Session |
| | from _pytest.monkeypatch import MonkeyPatch |
| | from _pytest.nodes import Collector |
| | from _pytest.nodes import Item |
| | from _pytest.outcomes import fail |
| | from _pytest.outcomes import importorskip |
| | from _pytest.outcomes import skip |
| | from _pytest.pathlib import bestrelpath |
| | from _pytest.pathlib import make_numbered_dir |
| | from _pytest.reports import CollectReport |
| | from _pytest.reports import TestReport |
| | from _pytest.tmpdir import TempPathFactory |
| | from _pytest.warning_types import PytestWarning |
| |
|
| |
|
| | if TYPE_CHECKING: |
| | import pexpect |
| |
|
| |
|
| | pytest_plugins = ["pytester_assertions"] |
| |
|
| |
|
| | IGNORE_PAM = [ |
| | "/var/lib/sss/mc/passwd" |
| | ] |
| |
|
| |
|
| | def pytest_addoption(parser: Parser) -> None: |
| | parser.addoption( |
| | "--lsof", |
| | action="store_true", |
| | dest="lsof", |
| | default=False, |
| | help="Run FD checks if lsof is available", |
| | ) |
| |
|
| | parser.addoption( |
| | "--runpytest", |
| | default="inprocess", |
| | dest="runpytest", |
| | choices=("inprocess", "subprocess"), |
| | help=( |
| | "Run pytest sub runs in tests using an 'inprocess' " |
| | "or 'subprocess' (python -m main) method" |
| | ), |
| | ) |
| |
|
| | parser.addini( |
| | "pytester_example_dir", help="Directory to take the pytester example files from" |
| | ) |
| |
|
| |
|
| | def pytest_configure(config: Config) -> None: |
| | if config.getvalue("lsof"): |
| | checker = LsofFdLeakChecker() |
| | if checker.matching_platform(): |
| | config.pluginmanager.register(checker) |
| |
|
| | config.addinivalue_line( |
| | "markers", |
| | "pytester_example_path(*path_segments): join the given path " |
| | "segments to `pytester_example_dir` for this test.", |
| | ) |
| |
|
| |
|
| | class LsofFdLeakChecker: |
| | def get_open_files(self) -> list[tuple[str, str]]: |
| | if sys.version_info >= (3, 11): |
| | |
| | encoding = locale.getencoding() |
| | else: |
| | encoding = locale.getpreferredencoding(False) |
| | out = subprocess.run( |
| | ("lsof", "-Ffn0", "-p", str(os.getpid())), |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.DEVNULL, |
| | check=True, |
| | text=True, |
| | encoding=encoding, |
| | ).stdout |
| |
|
| | def isopen(line: str) -> bool: |
| | return line.startswith("f") and ( |
| | "deleted" not in line |
| | and "mem" not in line |
| | and "txt" not in line |
| | and "cwd" not in line |
| | ) |
| |
|
| | open_files = [] |
| |
|
| | for line in out.split("\n"): |
| | if isopen(line): |
| | fields = line.split("\0") |
| | fd = fields[0][1:] |
| | filename = fields[1][1:] |
| | if filename in IGNORE_PAM: |
| | continue |
| | if filename.startswith("/"): |
| | open_files.append((fd, filename)) |
| |
|
| | return open_files |
| |
|
| | def matching_platform(self) -> bool: |
| | try: |
| | subprocess.run(("lsof", "-v"), check=True) |
| | except (OSError, subprocess.CalledProcessError): |
| | return False |
| | else: |
| | return True |
| |
|
| | @hookimpl(wrapper=True, tryfirst=True) |
| | def pytest_runtest_protocol(self, item: Item) -> Generator[None, object, object]: |
| | lines1 = self.get_open_files() |
| | try: |
| | return (yield) |
| | finally: |
| | if hasattr(sys, "pypy_version_info"): |
| | gc.collect() |
| | lines2 = self.get_open_files() |
| |
|
| | new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} |
| | leaked_files = [t for t in lines2 if t[0] in new_fds] |
| | if leaked_files: |
| | error = [ |
| | f"***** {len(leaked_files)} FD leakage detected", |
| | *(str(f) for f in leaked_files), |
| | "*** Before:", |
| | *(str(f) for f in lines1), |
| | "*** After:", |
| | *(str(f) for f in lines2), |
| | f"***** {len(leaked_files)} FD leakage detected", |
| | "*** function {}:{}: {} ".format(*item.location), |
| | "See issue #2366", |
| | ] |
| | item.warn(PytestWarning("\n".join(error))) |
| |
|
| |
|
| | |
| |
|
| |
|
| | @fixture |
| | def _pytest(request: FixtureRequest) -> PytestArg: |
| | """Return a helper which offers a gethookrecorder(hook) method which |
| | returns a HookRecorder instance which helps to make assertions about called |
| | hooks.""" |
| | return PytestArg(request) |
| |
|
| |
|
| | class PytestArg: |
| | def __init__(self, request: FixtureRequest) -> None: |
| | self._request = request |
| |
|
| | def gethookrecorder(self, hook) -> HookRecorder: |
| | hookrecorder = HookRecorder(hook._pm) |
| | self._request.addfinalizer(hookrecorder.finish_recording) |
| | return hookrecorder |
| |
|
| |
|
| | def get_public_names(values: Iterable[str]) -> list[str]: |
| | """Only return names from iterator values without a leading underscore.""" |
| | return [x for x in values if x[0] != "_"] |
| |
|
| |
|
| | @final |
| | class RecordedHookCall: |
| | """A recorded call to a hook. |
| | |
| | The arguments to the hook call are set as attributes. |
| | For example: |
| | |
| | .. code-block:: python |
| | |
| | calls = hook_recorder.getcalls("pytest_runtest_setup") |
| | # Suppose pytest_runtest_setup was called once with `item=an_item`. |
| | assert calls[0].item is an_item |
| | """ |
| |
|
| | def __init__(self, name: str, kwargs) -> None: |
| | self.__dict__.update(kwargs) |
| | self._name = name |
| |
|
| | def __repr__(self) -> str: |
| | d = self.__dict__.copy() |
| | del d["_name"] |
| | return f"<RecordedHookCall {self._name!r}(**{d!r})>" |
| |
|
| | if TYPE_CHECKING: |
| | |
| | def __getattr__(self, key: str): ... |
| |
|
| |
|
| | @final |
| | class HookRecorder: |
| | """Record all hooks called in a plugin manager. |
| | |
| | Hook recorders are created by :class:`Pytester`. |
| | |
| | This wraps all the hook calls in the plugin manager, recording each call |
| | before propagating the normal calls. |
| | """ |
| |
|
| | def __init__( |
| | self, pluginmanager: PytestPluginManager, *, _ispytest: bool = False |
| | ) -> None: |
| | check_ispytest(_ispytest) |
| |
|
| | self._pluginmanager = pluginmanager |
| | self.calls: list[RecordedHookCall] = [] |
| | self.ret: int | ExitCode | None = None |
| |
|
| | def before(hook_name: str, hook_impls, kwargs) -> None: |
| | self.calls.append(RecordedHookCall(hook_name, kwargs)) |
| |
|
| | def after(outcome, hook_name: str, hook_impls, kwargs) -> None: |
| | pass |
| |
|
| | self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) |
| |
|
| | def finish_recording(self) -> None: |
| | self._undo_wrapping() |
| |
|
| | def getcalls(self, names: str | Iterable[str]) -> list[RecordedHookCall]: |
| | """Get all recorded calls to hooks with the given names (or name).""" |
| | if isinstance(names, str): |
| | names = names.split() |
| | return [call for call in self.calls if call._name in names] |
| |
|
| | def assert_contains(self, entries: Sequence[tuple[str, str]]) -> None: |
| | __tracebackhide__ = True |
| | i = 0 |
| | entries = list(entries) |
| | |
| | backlocals = dict(sys._getframe(1).f_locals) |
| | while entries: |
| | name, check = entries.pop(0) |
| | for ind, call in enumerate(self.calls[i:]): |
| | if call._name == name: |
| | print("NAMEMATCH", name, call) |
| | if eval(check, backlocals, call.__dict__): |
| | print("CHECKERMATCH", repr(check), "->", call) |
| | else: |
| | print("NOCHECKERMATCH", repr(check), "-", call) |
| | continue |
| | i += ind + 1 |
| | break |
| | print("NONAMEMATCH", name, "with", call) |
| | else: |
| | fail(f"could not find {name!r} check {check!r}") |
| |
|
| | def popcall(self, name: str) -> RecordedHookCall: |
| | __tracebackhide__ = True |
| | for i, call in enumerate(self.calls): |
| | if call._name == name: |
| | del self.calls[i] |
| | return call |
| | lines = [f"could not find call {name!r}, in:"] |
| | lines.extend([f" {x}" for x in self.calls]) |
| | fail("\n".join(lines)) |
| |
|
| | def getcall(self, name: str) -> RecordedHookCall: |
| | values = self.getcalls(name) |
| | assert len(values) == 1, (name, values) |
| | return values[0] |
| |
|
| | |
| |
|
| | @overload |
| | def getreports( |
| | self, |
| | names: Literal["pytest_collectreport"], |
| | ) -> Sequence[CollectReport]: ... |
| |
|
| | @overload |
| | def getreports( |
| | self, |
| | names: Literal["pytest_runtest_logreport"], |
| | ) -> Sequence[TestReport]: ... |
| |
|
| | @overload |
| | def getreports( |
| | self, |
| | names: str | Iterable[str] = ( |
| | "pytest_collectreport", |
| | "pytest_runtest_logreport", |
| | ), |
| | ) -> Sequence[CollectReport | TestReport]: ... |
| |
|
| | def getreports( |
| | self, |
| | names: str | Iterable[str] = ( |
| | "pytest_collectreport", |
| | "pytest_runtest_logreport", |
| | ), |
| | ) -> Sequence[CollectReport | TestReport]: |
| | return [x.report for x in self.getcalls(names)] |
| |
|
| | def matchreport( |
| | self, |
| | inamepart: str = "", |
| | names: str | Iterable[str] = ( |
| | "pytest_runtest_logreport", |
| | "pytest_collectreport", |
| | ), |
| | when: str | None = None, |
| | ) -> CollectReport | TestReport: |
| | """Return a testreport whose dotted import path matches.""" |
| | values = [] |
| | for rep in self.getreports(names=names): |
| | if not when and rep.when != "call" and rep.passed: |
| | |
| | continue |
| | if when and rep.when != when: |
| | continue |
| | if not inamepart or inamepart in rep.nodeid.split("::"): |
| | values.append(rep) |
| | if not values: |
| | raise ValueError( |
| | f"could not find test report matching {inamepart!r}: " |
| | "no test reports at all!" |
| | ) |
| | if len(values) > 1: |
| | raise ValueError( |
| | f"found 2 or more testreports matching {inamepart!r}: {values}" |
| | ) |
| | return values[0] |
| |
|
| | @overload |
| | def getfailures( |
| | self, |
| | names: Literal["pytest_collectreport"], |
| | ) -> Sequence[CollectReport]: ... |
| |
|
| | @overload |
| | def getfailures( |
| | self, |
| | names: Literal["pytest_runtest_logreport"], |
| | ) -> Sequence[TestReport]: ... |
| |
|
| | @overload |
| | def getfailures( |
| | self, |
| | names: str | Iterable[str] = ( |
| | "pytest_collectreport", |
| | "pytest_runtest_logreport", |
| | ), |
| | ) -> Sequence[CollectReport | TestReport]: ... |
| |
|
| | def getfailures( |
| | self, |
| | names: str | Iterable[str] = ( |
| | "pytest_collectreport", |
| | "pytest_runtest_logreport", |
| | ), |
| | ) -> Sequence[CollectReport | TestReport]: |
| | return [rep for rep in self.getreports(names) if rep.failed] |
| |
|
| | def getfailedcollections(self) -> Sequence[CollectReport]: |
| | return self.getfailures("pytest_collectreport") |
| |
|
| | def listoutcomes( |
| | self, |
| | ) -> tuple[ |
| | Sequence[TestReport], |
| | Sequence[CollectReport | TestReport], |
| | Sequence[CollectReport | TestReport], |
| | ]: |
| | passed = [] |
| | skipped = [] |
| | failed = [] |
| | for rep in self.getreports( |
| | ("pytest_collectreport", "pytest_runtest_logreport") |
| | ): |
| | if rep.passed: |
| | if rep.when == "call": |
| | assert isinstance(rep, TestReport) |
| | passed.append(rep) |
| | elif rep.skipped: |
| | skipped.append(rep) |
| | else: |
| | assert rep.failed, f"Unexpected outcome: {rep!r}" |
| | failed.append(rep) |
| | return passed, skipped, failed |
| |
|
| | def countoutcomes(self) -> list[int]: |
| | return [len(x) for x in self.listoutcomes()] |
| |
|
| | def assertoutcome(self, passed: int = 0, skipped: int = 0, failed: int = 0) -> None: |
| | __tracebackhide__ = True |
| | from _pytest.pytester_assertions import assertoutcome |
| |
|
| | outcomes = self.listoutcomes() |
| | assertoutcome( |
| | outcomes, |
| | passed=passed, |
| | skipped=skipped, |
| | failed=failed, |
| | ) |
| |
|
| | def clear(self) -> None: |
| | self.calls[:] = [] |
| |
|
| |
|
| | @fixture |
| | def linecomp() -> LineComp: |
| | """A :class: `LineComp` instance for checking that an input linearly |
| | contains a sequence of strings.""" |
| | return LineComp() |
| |
|
| |
|
| | @fixture(name="LineMatcher") |
| | def LineMatcher_fixture(request: FixtureRequest) -> type[LineMatcher]: |
| | """A reference to the :class: `LineMatcher`. |
| | |
| | This is instantiable with a list of lines (without their trailing newlines). |
| | This is useful for testing large texts, such as the output of commands. |
| | """ |
| | return LineMatcher |
| |
|
| |
|
| | @fixture |
| | def pytester( |
| | request: FixtureRequest, tmp_path_factory: TempPathFactory, monkeypatch: MonkeyPatch |
| | ) -> Pytester: |
| | """ |
| | Facilities to write tests/configuration files, execute pytest in isolation, and match |
| | against expected output, perfect for black-box testing of pytest plugins. |
| | |
| | It attempts to isolate the test run from external factors as much as possible, modifying |
| | the current working directory to ``path`` and environment variables during initialization. |
| | |
| | It is particularly useful for testing plugins. It is similar to the :fixture:`tmp_path` |
| | fixture but provides methods which aid in testing pytest itself. |
| | """ |
| | return Pytester(request, tmp_path_factory, monkeypatch, _ispytest=True) |
| |
|
| |
|
| | @fixture |
| | def _sys_snapshot() -> Generator[None]: |
| | snappaths = SysPathsSnapshot() |
| | snapmods = SysModulesSnapshot() |
| | yield |
| | snapmods.restore() |
| | snappaths.restore() |
| |
|
| |
|
| | @fixture |
| | def _config_for_test() -> Generator[Config]: |
| | from _pytest.config import get_config |
| |
|
| | config = get_config() |
| | yield config |
| | config._ensure_unconfigure() |
| |
|
| |
|
| | |
| | rex_session_duration = re.compile(r"\d+\.\d\ds") |
| | |
| | rex_outcome = re.compile(r"(\d+) (\w+)") |
| |
|
| |
|
| | @final |
| | class RunResult: |
| | """The result of running a command from :class:`~pytest.Pytester`.""" |
| |
|
| | def __init__( |
| | self, |
| | ret: int | ExitCode, |
| | outlines: list[str], |
| | errlines: list[str], |
| | duration: float, |
| | ) -> None: |
| | try: |
| | self.ret: int | ExitCode = ExitCode(ret) |
| | """The return value.""" |
| | except ValueError: |
| | self.ret = ret |
| | self.outlines = outlines |
| | """List of lines captured from stdout.""" |
| | self.errlines = errlines |
| | """List of lines captured from stderr.""" |
| | self.stdout = LineMatcher(outlines) |
| | """:class:`~pytest.LineMatcher` of stdout. |
| | |
| | Use e.g. :func:`str(stdout) <pytest.LineMatcher.__str__()>` to reconstruct stdout, or the commonly used |
| | :func:`stdout.fnmatch_lines() <pytest.LineMatcher.fnmatch_lines()>` method. |
| | """ |
| | self.stderr = LineMatcher(errlines) |
| | """:class:`~pytest.LineMatcher` of stderr.""" |
| | self.duration = duration |
| | """Duration in seconds.""" |
| |
|
| | def __repr__(self) -> str: |
| | return ( |
| | "<RunResult ret=%s len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>" |
| | % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration) |
| | ) |
| |
|
| | def parseoutcomes(self) -> dict[str, int]: |
| | """Return a dictionary of outcome noun -> count from parsing the terminal |
| | output that the test process produced. |
| | |
| | The returned nouns will always be in plural form:: |
| | |
| | ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ==== |
| | |
| | Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``. |
| | """ |
| | return self.parse_summary_nouns(self.outlines) |
| |
|
| | @classmethod |
| | def parse_summary_nouns(cls, lines) -> dict[str, int]: |
| | """Extract the nouns from a pytest terminal summary line. |
| | |
| | It always returns the plural noun for consistency:: |
| | |
| | ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ==== |
| | |
| | Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``. |
| | """ |
| | for line in reversed(lines): |
| | if rex_session_duration.search(line): |
| | outcomes = rex_outcome.findall(line) |
| | ret = {noun: int(count) for (count, noun) in outcomes} |
| | break |
| | else: |
| | raise ValueError("Pytest terminal summary report not found") |
| |
|
| | to_plural = { |
| | "warning": "warnings", |
| | "error": "errors", |
| | } |
| | return {to_plural.get(k, k): v for k, v in ret.items()} |
| |
|
| | def assert_outcomes( |
| | self, |
| | passed: int = 0, |
| | skipped: int = 0, |
| | failed: int = 0, |
| | errors: int = 0, |
| | xpassed: int = 0, |
| | xfailed: int = 0, |
| | warnings: int | None = None, |
| | deselected: int | None = None, |
| | ) -> None: |
| | """ |
| | Assert that the specified outcomes appear with the respective |
| | numbers (0 means it didn't occur) in the text output from a test run. |
| | |
| | ``warnings`` and ``deselected`` are only checked if not None. |
| | """ |
| | __tracebackhide__ = True |
| | from _pytest.pytester_assertions import assert_outcomes |
| |
|
| | outcomes = self.parseoutcomes() |
| | assert_outcomes( |
| | outcomes, |
| | passed=passed, |
| | skipped=skipped, |
| | failed=failed, |
| | errors=errors, |
| | xpassed=xpassed, |
| | xfailed=xfailed, |
| | warnings=warnings, |
| | deselected=deselected, |
| | ) |
| |
|
| |
|
| | class SysModulesSnapshot: |
| | def __init__(self, preserve: Callable[[str], bool] | None = None) -> None: |
| | self.__preserve = preserve |
| | self.__saved = dict(sys.modules) |
| |
|
| | def restore(self) -> None: |
| | if self.__preserve: |
| | self.__saved.update( |
| | (k, m) for k, m in sys.modules.items() if self.__preserve(k) |
| | ) |
| | sys.modules.clear() |
| | sys.modules.update(self.__saved) |
| |
|
| |
|
| | class SysPathsSnapshot: |
| | def __init__(self) -> None: |
| | self.__saved = list(sys.path), list(sys.meta_path) |
| |
|
| | def restore(self) -> None: |
| | sys.path[:], sys.meta_path[:] = self.__saved |
| |
|
| |
|
| | @final |
| | class Pytester: |
| | """ |
| | Facilities to write tests/configuration files, execute pytest in isolation, and match |
| | against expected output, perfect for black-box testing of pytest plugins. |
| | |
| | It attempts to isolate the test run from external factors as much as possible, modifying |
| | the current working directory to :attr:`path` and environment variables during initialization. |
| | """ |
| |
|
| | __test__ = False |
| |
|
| | CLOSE_STDIN: Final = NOTSET |
| |
|
| | class TimeoutExpired(Exception): |
| | pass |
| |
|
| | def __init__( |
| | self, |
| | request: FixtureRequest, |
| | tmp_path_factory: TempPathFactory, |
| | monkeypatch: MonkeyPatch, |
| | *, |
| | _ispytest: bool = False, |
| | ) -> None: |
| | check_ispytest(_ispytest) |
| | self._request = request |
| | self._mod_collections: WeakKeyDictionary[Collector, list[Item | Collector]] = ( |
| | WeakKeyDictionary() |
| | ) |
| | if request.function: |
| | name: str = request.function.__name__ |
| | else: |
| | name = request.node.name |
| | self._name = name |
| | self._path: Path = tmp_path_factory.mktemp(name, numbered=True) |
| | |
| | |
| | |
| | |
| | self.plugins: list[str | _PluggyPlugin] = [] |
| | self._sys_path_snapshot = SysPathsSnapshot() |
| | self._sys_modules_snapshot = self.__take_sys_modules_snapshot() |
| | self._request.addfinalizer(self._finalize) |
| | self._method = self._request.config.getoption("--runpytest") |
| | self._test_tmproot = tmp_path_factory.mktemp(f"tmp-{name}", numbered=True) |
| |
|
| | self._monkeypatch = mp = monkeypatch |
| | self.chdir() |
| | mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self._test_tmproot)) |
| | |
| | mp.delenv("TOX_ENV_DIR", raising=False) |
| | |
| | mp.delenv("PYTEST_ADDOPTS", raising=False) |
| | |
| | tmphome = str(self.path) |
| | mp.setenv("HOME", tmphome) |
| | mp.setenv("USERPROFILE", tmphome) |
| | |
| | mp.setenv("PY_COLORS", "0") |
| |
|
| | @property |
| | def path(self) -> Path: |
| | """Temporary directory path used to create files/run tests from, etc.""" |
| | return self._path |
| |
|
| | def __repr__(self) -> str: |
| | return f"<Pytester {self.path!r}>" |
| |
|
| | def _finalize(self) -> None: |
| | """ |
| | Clean up global state artifacts. |
| | |
| | Some methods modify the global interpreter state and this tries to |
| | clean this up. It does not remove the temporary directory however so |
| | it can be looked at after the test run has finished. |
| | """ |
| | self._sys_modules_snapshot.restore() |
| | self._sys_path_snapshot.restore() |
| |
|
| | def __take_sys_modules_snapshot(self) -> SysModulesSnapshot: |
| | |
| | |
| | |
| | |
| | |
| | |
| | def preserve_module(name): |
| | return name.startswith(("zope", "readline")) |
| |
|
| | return SysModulesSnapshot(preserve=preserve_module) |
| |
|
| | def make_hook_recorder(self, pluginmanager: PytestPluginManager) -> HookRecorder: |
| | """Create a new :class:`HookRecorder` for a :class:`PytestPluginManager`.""" |
| | pluginmanager.reprec = reprec = HookRecorder(pluginmanager, _ispytest=True) |
| | self._request.addfinalizer(reprec.finish_recording) |
| | return reprec |
| |
|
| | def chdir(self) -> None: |
| | """Cd into the temporary directory. |
| | |
| | This is done automatically upon instantiation. |
| | """ |
| | self._monkeypatch.chdir(self.path) |
| |
|
| | def _makefile( |
| | self, |
| | ext: str, |
| | lines: Sequence[Any | bytes], |
| | files: dict[str, str], |
| | encoding: str = "utf-8", |
| | ) -> Path: |
| | items = list(files.items()) |
| |
|
| | if ext is None: |
| | raise TypeError("ext must not be None") |
| |
|
| | if ext and not ext.startswith("."): |
| | raise ValueError( |
| | f"pytester.makefile expects a file extension, try .{ext} instead of {ext}" |
| | ) |
| |
|
| | def to_text(s: Any | bytes) -> str: |
| | return s.decode(encoding) if isinstance(s, bytes) else str(s) |
| |
|
| | if lines: |
| | source = "\n".join(to_text(x) for x in lines) |
| | basename = self._name |
| | items.insert(0, (basename, source)) |
| |
|
| | ret = None |
| | for basename, value in items: |
| | p = self.path.joinpath(basename).with_suffix(ext) |
| | p.parent.mkdir(parents=True, exist_ok=True) |
| | source_ = Source(value) |
| | source = "\n".join(to_text(line) for line in source_.lines) |
| | p.write_text(source.strip(), encoding=encoding) |
| | if ret is None: |
| | ret = p |
| | assert ret is not None |
| | return ret |
| |
|
| | def makefile(self, ext: str, *args: str, **kwargs: str) -> Path: |
| | r"""Create new text file(s) in the test directory. |
| | |
| | :param ext: |
| | The extension the file(s) should use, including the dot, e.g. `.py`. |
| | :param args: |
| | All args are treated as strings and joined using newlines. |
| | The result is written as contents to the file. The name of the |
| | file is based on the test function requesting this fixture. |
| | :param kwargs: |
| | Each keyword is the name of a file, while the value of it will |
| | be written as contents of the file. |
| | :returns: |
| | The first created file. |
| | |
| | Examples: |
| | |
| | .. code-block:: python |
| | |
| | pytester.makefile(".txt", "line1", "line2") |
| | |
| | pytester.makefile(".ini", pytest="[pytest]\naddopts=-rs\n") |
| | |
| | To create binary files, use :meth:`pathlib.Path.write_bytes` directly: |
| | |
| | .. code-block:: python |
| | |
| | filename = pytester.path.joinpath("foo.bin") |
| | filename.write_bytes(b"...") |
| | """ |
| | return self._makefile(ext, args, kwargs) |
| |
|
| | def makeconftest(self, source: str) -> Path: |
| | """Write a conftest.py file. |
| | |
| | :param source: The contents. |
| | :returns: The conftest.py file. |
| | """ |
| | return self.makepyfile(conftest=source) |
| |
|
| | def makeini(self, source: str) -> Path: |
| | """Write a tox.ini file. |
| | |
| | :param source: The contents. |
| | :returns: The tox.ini file. |
| | """ |
| | return self.makefile(".ini", tox=source) |
| |
|
| | def getinicfg(self, source: str) -> SectionWrapper: |
| | """Return the pytest section from the tox.ini config file.""" |
| | p = self.makeini(source) |
| | return IniConfig(str(p))["pytest"] |
| |
|
| | def makepyprojecttoml(self, source: str) -> Path: |
| | """Write a pyproject.toml file. |
| | |
| | :param source: The contents. |
| | :returns: The pyproject.ini file. |
| | |
| | .. versionadded:: 6.0 |
| | """ |
| | return self.makefile(".toml", pyproject=source) |
| |
|
| | def makepyfile(self, *args, **kwargs) -> Path: |
| | r"""Shortcut for .makefile() with a .py extension. |
| | |
| | Defaults to the test name with a '.py' extension, e.g test_foobar.py, overwriting |
| | existing files. |
| | |
| | Examples: |
| | |
| | .. code-block:: python |
| | |
| | def test_something(pytester): |
| | # Initial file is created test_something.py. |
| | pytester.makepyfile("foobar") |
| | # To create multiple files, pass kwargs accordingly. |
| | pytester.makepyfile(custom="foobar") |
| | # At this point, both 'test_something.py' & 'custom.py' exist in the test directory. |
| | |
| | """ |
| | return self._makefile(".py", args, kwargs) |
| |
|
| | def maketxtfile(self, *args, **kwargs) -> Path: |
| | r"""Shortcut for .makefile() with a .txt extension. |
| | |
| | Defaults to the test name with a '.txt' extension, e.g test_foobar.txt, overwriting |
| | existing files. |
| | |
| | Examples: |
| | |
| | .. code-block:: python |
| | |
| | def test_something(pytester): |
| | # Initial file is created test_something.txt. |
| | pytester.maketxtfile("foobar") |
| | # To create multiple files, pass kwargs accordingly. |
| | pytester.maketxtfile(custom="foobar") |
| | # At this point, both 'test_something.txt' & 'custom.txt' exist in the test directory. |
| | |
| | """ |
| | return self._makefile(".txt", args, kwargs) |
| |
|
| | def syspathinsert(self, path: str | os.PathLike[str] | None = None) -> None: |
| | """Prepend a directory to sys.path, defaults to :attr:`path`. |
| | |
| | This is undone automatically when this object dies at the end of each |
| | test. |
| | |
| | :param path: |
| | The path. |
| | """ |
| | if path is None: |
| | path = self.path |
| |
|
| | self._monkeypatch.syspath_prepend(str(path)) |
| |
|
| | def mkdir(self, name: str | os.PathLike[str]) -> Path: |
| | """Create a new (sub)directory. |
| | |
| | :param name: |
| | The name of the directory, relative to the pytester path. |
| | :returns: |
| | The created directory. |
| | :rtype: pathlib.Path |
| | """ |
| | p = self.path / name |
| | p.mkdir() |
| | return p |
| |
|
| | def mkpydir(self, name: str | os.PathLike[str]) -> Path: |
| | """Create a new python package. |
| | |
| | This creates a (sub)directory with an empty ``__init__.py`` file so it |
| | gets recognised as a Python package. |
| | """ |
| | p = self.path / name |
| | p.mkdir() |
| | p.joinpath("__init__.py").touch() |
| | return p |
| |
|
| | def copy_example(self, name: str | None = None) -> Path: |
| | """Copy file from project's directory into the testdir. |
| | |
| | :param name: |
| | The name of the file to copy. |
| | :return: |
| | Path to the copied directory (inside ``self.path``). |
| | :rtype: pathlib.Path |
| | """ |
| | example_dir_ = self._request.config.getini("pytester_example_dir") |
| | if example_dir_ is None: |
| | raise ValueError("pytester_example_dir is unset, can't copy examples") |
| | example_dir: Path = self._request.config.rootpath / example_dir_ |
| |
|
| | for extra_element in self._request.node.iter_markers("pytester_example_path"): |
| | assert extra_element.args |
| | example_dir = example_dir.joinpath(*extra_element.args) |
| |
|
| | if name is None: |
| | func_name = self._name |
| | maybe_dir = example_dir / func_name |
| | maybe_file = example_dir / (func_name + ".py") |
| |
|
| | if maybe_dir.is_dir(): |
| | example_path = maybe_dir |
| | elif maybe_file.is_file(): |
| | example_path = maybe_file |
| | else: |
| | raise LookupError( |
| | f"{func_name} can't be found as module or package in {example_dir}" |
| | ) |
| | else: |
| | example_path = example_dir.joinpath(name) |
| |
|
| | if example_path.is_dir() and not example_path.joinpath("__init__.py").is_file(): |
| | shutil.copytree(example_path, self.path, symlinks=True, dirs_exist_ok=True) |
| | return self.path |
| | elif example_path.is_file(): |
| | result = self.path.joinpath(example_path.name) |
| | shutil.copy(example_path, result) |
| | return result |
| | else: |
| | raise LookupError( |
| | f'example "{example_path}" is not found as a file or directory' |
| | ) |
| |
|
| | def getnode(self, config: Config, arg: str | os.PathLike[str]) -> Collector | Item: |
| | """Get the collection node of a file. |
| | |
| | :param config: |
| | A pytest config. |
| | See :py:meth:`parseconfig` and :py:meth:`parseconfigure` for creating it. |
| | :param arg: |
| | Path to the file. |
| | :returns: |
| | The node. |
| | """ |
| | session = Session.from_config(config) |
| | assert "::" not in str(arg) |
| | p = Path(os.path.abspath(arg)) |
| | config.hook.pytest_sessionstart(session=session) |
| | res = session.perform_collect([str(p)], genitems=False)[0] |
| | config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK) |
| | return res |
| |
|
| | def getpathnode(self, path: str | os.PathLike[str]) -> Collector | Item: |
| | """Return the collection node of a file. |
| | |
| | This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to |
| | create the (configured) pytest Config instance. |
| | |
| | :param path: |
| | Path to the file. |
| | :returns: |
| | The node. |
| | """ |
| | path = Path(path) |
| | config = self.parseconfigure(path) |
| | session = Session.from_config(config) |
| | x = bestrelpath(session.path, path) |
| | config.hook.pytest_sessionstart(session=session) |
| | res = session.perform_collect([x], genitems=False)[0] |
| | config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK) |
| | return res |
| |
|
| | def genitems(self, colitems: Sequence[Item | Collector]) -> list[Item]: |
| | """Generate all test items from a collection node. |
| | |
| | This recurses into the collection node and returns a list of all the |
| | test items contained within. |
| | |
| | :param colitems: |
| | The collection nodes. |
| | :returns: |
| | The collected items. |
| | """ |
| | session = colitems[0].session |
| | result: list[Item] = [] |
| | for colitem in colitems: |
| | result.extend(session.genitems(colitem)) |
| | return result |
| |
|
| | def runitem(self, source: str) -> Any: |
| | """Run the "test_func" Item. |
| | |
| | The calling test instance (class containing the test method) must |
| | provide a ``.getrunner()`` method which should return a runner which |
| | can run the test protocol for a single item, e.g. |
| | ``_pytest.runner.runtestprotocol``. |
| | """ |
| | |
| | item = self.getitem(source) |
| | |
| | testclassinstance = self._request.instance |
| | runner = testclassinstance.getrunner() |
| | return runner(item) |
| |
|
| | def inline_runsource(self, source: str, *cmdlineargs) -> HookRecorder: |
| | """Run a test module in process using ``pytest.main()``. |
| | |
| | This run writes "source" into a temporary file and runs |
| | ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance |
| | for the result. |
| | |
| | :param source: The source code of the test module. |
| | :param cmdlineargs: Any extra command line arguments to use. |
| | """ |
| | p = self.makepyfile(source) |
| | values = [*list(cmdlineargs), p] |
| | return self.inline_run(*values) |
| |
|
| | def inline_genitems(self, *args) -> tuple[list[Item], HookRecorder]: |
| | """Run ``pytest.main(['--collect-only'])`` in-process. |
| | |
| | Runs the :py:func:`pytest.main` function to run all of pytest inside |
| | the test process itself like :py:meth:`inline_run`, but returns a |
| | tuple of the collected items and a :py:class:`HookRecorder` instance. |
| | """ |
| | rec = self.inline_run("--collect-only", *args) |
| | items = [x.item for x in rec.getcalls("pytest_itemcollected")] |
| | return items, rec |
| |
|
| | def inline_run( |
| | self, |
| | *args: str | os.PathLike[str], |
| | plugins=(), |
| | no_reraise_ctrlc: bool = False, |
| | ) -> HookRecorder: |
| | """Run ``pytest.main()`` in-process, returning a HookRecorder. |
| | |
| | Runs the :py:func:`pytest.main` function to run all of pytest inside |
| | the test process itself. This means it can return a |
| | :py:class:`HookRecorder` instance which gives more detailed results |
| | from that run than can be done by matching stdout/stderr from |
| | :py:meth:`runpytest`. |
| | |
| | :param args: |
| | Command line arguments to pass to :py:func:`pytest.main`. |
| | :param plugins: |
| | Extra plugin instances the ``pytest.main()`` instance should use. |
| | :param no_reraise_ctrlc: |
| | Typically we reraise keyboard interrupts from the child run. If |
| | True, the KeyboardInterrupt exception is captured. |
| | """ |
| | |
| | |
| | |
| | importlib.invalidate_caches() |
| |
|
| | plugins = list(plugins) |
| | finalizers = [] |
| | try: |
| | |
| | |
| | |
| | |
| | finalizers.append(self.__take_sys_modules_snapshot().restore) |
| | finalizers.append(SysPathsSnapshot().restore) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | rec = [] |
| |
|
| | class Collect: |
| | def pytest_configure(x, config: Config) -> None: |
| | rec.append(self.make_hook_recorder(config.pluginmanager)) |
| |
|
| | plugins.append(Collect()) |
| | ret = main([str(x) for x in args], plugins=plugins) |
| | if len(rec) == 1: |
| | reprec = rec.pop() |
| | else: |
| |
|
| | class reprec: |
| | pass |
| |
|
| | reprec.ret = ret |
| |
|
| | |
| | |
| | if ret == ExitCode.INTERRUPTED and not no_reraise_ctrlc: |
| | calls = reprec.getcalls("pytest_keyboard_interrupt") |
| | if calls and calls[-1].excinfo.type == KeyboardInterrupt: |
| | raise KeyboardInterrupt() |
| | return reprec |
| | finally: |
| | for finalizer in finalizers: |
| | finalizer() |
| |
|
| | def runpytest_inprocess( |
| | self, *args: str | os.PathLike[str], **kwargs: Any |
| | ) -> RunResult: |
| | """Return result of running pytest in-process, providing a similar |
| | interface to what self.runpytest() provides.""" |
| | syspathinsert = kwargs.pop("syspathinsert", False) |
| |
|
| | if syspathinsert: |
| | self.syspathinsert() |
| | now = timing.time() |
| | capture = _get_multicapture("sys") |
| | capture.start_capturing() |
| | try: |
| | try: |
| | reprec = self.inline_run(*args, **kwargs) |
| | except SystemExit as e: |
| | ret = e.args[0] |
| | try: |
| | ret = ExitCode(e.args[0]) |
| | except ValueError: |
| | pass |
| |
|
| | class reprec: |
| | ret = ret |
| |
|
| | except Exception: |
| | traceback.print_exc() |
| |
|
| | class reprec: |
| | ret = ExitCode(3) |
| |
|
| | finally: |
| | out, err = capture.readouterr() |
| | capture.stop_capturing() |
| | sys.stdout.write(out) |
| | sys.stderr.write(err) |
| |
|
| | assert reprec.ret is not None |
| | res = RunResult( |
| | reprec.ret, out.splitlines(), err.splitlines(), timing.time() - now |
| | ) |
| | res.reprec = reprec |
| | return res |
| |
|
| | def runpytest(self, *args: str | os.PathLike[str], **kwargs: Any) -> RunResult: |
| | """Run pytest inline or in a subprocess, depending on the command line |
| | option "--runpytest" and return a :py:class:`~pytest.RunResult`.""" |
| | new_args = self._ensure_basetemp(args) |
| | if self._method == "inprocess": |
| | return self.runpytest_inprocess(*new_args, **kwargs) |
| | elif self._method == "subprocess": |
| | return self.runpytest_subprocess(*new_args, **kwargs) |
| | raise RuntimeError(f"Unrecognized runpytest option: {self._method}") |
| |
|
| | def _ensure_basetemp( |
| | self, args: Sequence[str | os.PathLike[str]] |
| | ) -> list[str | os.PathLike[str]]: |
| | new_args = list(args) |
| | for x in new_args: |
| | if str(x).startswith("--basetemp"): |
| | break |
| | else: |
| | new_args.append( |
| | "--basetemp={}".format(self.path.parent.joinpath("basetemp")) |
| | ) |
| | return new_args |
| |
|
| | def parseconfig(self, *args: str | os.PathLike[str]) -> Config: |
| | """Return a new pytest :class:`pytest.Config` instance from given |
| | commandline args. |
| | |
| | This invokes the pytest bootstrapping code in _pytest.config to create a |
| | new :py:class:`pytest.PytestPluginManager` and call the |
| | :hook:`pytest_cmdline_parse` hook to create a new :class:`pytest.Config` |
| | instance. |
| | |
| | If :attr:`plugins` has been populated they should be plugin modules |
| | to be registered with the plugin manager. |
| | """ |
| | import _pytest.config |
| |
|
| | new_args = self._ensure_basetemp(args) |
| | new_args = [str(x) for x in new_args] |
| |
|
| | config = _pytest.config._prepareconfig(new_args, self.plugins) |
| | |
| | |
| | |
| | self._request.addfinalizer(config._ensure_unconfigure) |
| | return config |
| |
|
| | def parseconfigure(self, *args: str | os.PathLike[str]) -> Config: |
| | """Return a new pytest configured Config instance. |
| | |
| | Returns a new :py:class:`pytest.Config` instance like |
| | :py:meth:`parseconfig`, but also calls the :hook:`pytest_configure` |
| | hook. |
| | """ |
| | config = self.parseconfig(*args) |
| | config._do_configure() |
| | return config |
| |
|
| | def getitem( |
| | self, source: str | os.PathLike[str], funcname: str = "test_func" |
| | ) -> Item: |
| | """Return the test item for a test function. |
| | |
| | Writes the source to a python file and runs pytest's collection on |
| | the resulting module, returning the test item for the requested |
| | function name. |
| | |
| | :param source: |
| | The module source. |
| | :param funcname: |
| | The name of the test function for which to return a test item. |
| | :returns: |
| | The test item. |
| | """ |
| | items = self.getitems(source) |
| | for item in items: |
| | if item.name == funcname: |
| | return item |
| | assert 0, f"{funcname!r} item not found in module:\n{source}\nitems: {items}" |
| |
|
| | def getitems(self, source: str | os.PathLike[str]) -> list[Item]: |
| | """Return all test items collected from the module. |
| | |
| | Writes the source to a Python file and runs pytest's collection on |
| | the resulting module, returning all test items contained within. |
| | """ |
| | modcol = self.getmodulecol(source) |
| | return self.genitems([modcol]) |
| |
|
| | def getmodulecol( |
| | self, |
| | source: str | os.PathLike[str], |
| | configargs=(), |
| | *, |
| | withinit: bool = False, |
| | ): |
| | """Return the module collection node for ``source``. |
| | |
| | Writes ``source`` to a file using :py:meth:`makepyfile` and then |
| | runs the pytest collection on it, returning the collection node for the |
| | test module. |
| | |
| | :param source: |
| | The source code of the module to collect. |
| | |
| | :param configargs: |
| | Any extra arguments to pass to :py:meth:`parseconfigure`. |
| | |
| | :param withinit: |
| | Whether to also write an ``__init__.py`` file to the same |
| | directory to ensure it is a package. |
| | """ |
| | if isinstance(source, os.PathLike): |
| | path = self.path.joinpath(source) |
| | assert not withinit, "not supported for paths" |
| | else: |
| | kw = {self._name: str(source)} |
| | path = self.makepyfile(**kw) |
| | if withinit: |
| | self.makepyfile(__init__="#") |
| | self.config = config = self.parseconfigure(path, *configargs) |
| | return self.getnode(config, path) |
| |
|
| | def collect_by_name(self, modcol: Collector, name: str) -> Item | Collector | None: |
| | """Return the collection node for name from the module collection. |
| | |
| | Searches a module collection node for a collection node matching the |
| | given name. |
| | |
| | :param modcol: A module collection node; see :py:meth:`getmodulecol`. |
| | :param name: The name of the node to return. |
| | """ |
| | if modcol not in self._mod_collections: |
| | self._mod_collections[modcol] = list(modcol.collect()) |
| | for colitem in self._mod_collections[modcol]: |
| | if colitem.name == name: |
| | return colitem |
| | return None |
| |
|
| | def popen( |
| | self, |
| | cmdargs: Sequence[str | os.PathLike[str]], |
| | stdout: int | TextIO = subprocess.PIPE, |
| | stderr: int | TextIO = subprocess.PIPE, |
| | stdin: NotSetType | bytes | IO[Any] | int = CLOSE_STDIN, |
| | **kw, |
| | ): |
| | """Invoke :py:class:`subprocess.Popen`. |
| | |
| | Calls :py:class:`subprocess.Popen` making sure the current working |
| | directory is in ``PYTHONPATH``. |
| | |
| | You probably want to use :py:meth:`run` instead. |
| | """ |
| | env = os.environ.copy() |
| | env["PYTHONPATH"] = os.pathsep.join( |
| | filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) |
| | ) |
| | kw["env"] = env |
| |
|
| | if stdin is self.CLOSE_STDIN: |
| | kw["stdin"] = subprocess.PIPE |
| | elif isinstance(stdin, bytes): |
| | kw["stdin"] = subprocess.PIPE |
| | else: |
| | kw["stdin"] = stdin |
| |
|
| | popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) |
| | if stdin is self.CLOSE_STDIN: |
| | assert popen.stdin is not None |
| | popen.stdin.close() |
| | elif isinstance(stdin, bytes): |
| | assert popen.stdin is not None |
| | popen.stdin.write(stdin) |
| |
|
| | return popen |
| |
|
| | def run( |
| | self, |
| | *cmdargs: str | os.PathLike[str], |
| | timeout: float | None = None, |
| | stdin: NotSetType | bytes | IO[Any] | int = CLOSE_STDIN, |
| | ) -> RunResult: |
| | """Run a command with arguments. |
| | |
| | Run a process using :py:class:`subprocess.Popen` saving the stdout and |
| | stderr. |
| | |
| | :param cmdargs: |
| | The sequence of arguments to pass to :py:class:`subprocess.Popen`, |
| | with path-like objects being converted to :py:class:`str` |
| | automatically. |
| | :param timeout: |
| | The period in seconds after which to timeout and raise |
| | :py:class:`Pytester.TimeoutExpired`. |
| | :param stdin: |
| | Optional standard input. |
| | |
| | - If it is ``CLOSE_STDIN`` (Default), then this method calls |
| | :py:class:`subprocess.Popen` with ``stdin=subprocess.PIPE``, and |
| | the standard input is closed immediately after the new command is |
| | started. |
| | |
| | - If it is of type :py:class:`bytes`, these bytes are sent to the |
| | standard input of the command. |
| | |
| | - Otherwise, it is passed through to :py:class:`subprocess.Popen`. |
| | For further information in this case, consult the document of the |
| | ``stdin`` parameter in :py:class:`subprocess.Popen`. |
| | :type stdin: _pytest.compat.NotSetType | bytes | IO[Any] | int |
| | :returns: |
| | The result. |
| | |
| | """ |
| | __tracebackhide__ = True |
| |
|
| | cmdargs = tuple(os.fspath(arg) for arg in cmdargs) |
| | p1 = self.path.joinpath("stdout") |
| | p2 = self.path.joinpath("stderr") |
| | print("running:", *cmdargs) |
| | print(" in:", Path.cwd()) |
| |
|
| | with p1.open("w", encoding="utf8") as f1, p2.open("w", encoding="utf8") as f2: |
| | now = timing.time() |
| | popen = self.popen( |
| | cmdargs, |
| | stdin=stdin, |
| | stdout=f1, |
| | stderr=f2, |
| | close_fds=(sys.platform != "win32"), |
| | ) |
| | if popen.stdin is not None: |
| | popen.stdin.close() |
| |
|
| | def handle_timeout() -> None: |
| | __tracebackhide__ = True |
| |
|
| | timeout_message = f"{timeout} second timeout expired running: {cmdargs}" |
| |
|
| | popen.kill() |
| | popen.wait() |
| | raise self.TimeoutExpired(timeout_message) |
| |
|
| | if timeout is None: |
| | ret = popen.wait() |
| | else: |
| | try: |
| | ret = popen.wait(timeout) |
| | except subprocess.TimeoutExpired: |
| | handle_timeout() |
| |
|
| | with p1.open(encoding="utf8") as f1, p2.open(encoding="utf8") as f2: |
| | out = f1.read().splitlines() |
| | err = f2.read().splitlines() |
| |
|
| | self._dump_lines(out, sys.stdout) |
| | self._dump_lines(err, sys.stderr) |
| |
|
| | with contextlib.suppress(ValueError): |
| | ret = ExitCode(ret) |
| | return RunResult(ret, out, err, timing.time() - now) |
| |
|
| | def _dump_lines(self, lines, fp): |
| | try: |
| | for line in lines: |
| | print(line, file=fp) |
| | except UnicodeEncodeError: |
| | print(f"couldn't print to {fp} because of encoding") |
| |
|
| | def _getpytestargs(self) -> tuple[str, ...]: |
| | return sys.executable, "-mpytest" |
| |
|
| | def runpython(self, script: os.PathLike[str]) -> RunResult: |
| | """Run a python script using sys.executable as interpreter.""" |
| | return self.run(sys.executable, script) |
| |
|
| | def runpython_c(self, command: str) -> RunResult: |
| | """Run ``python -c "command"``.""" |
| | return self.run(sys.executable, "-c", command) |
| |
|
| | def runpytest_subprocess( |
| | self, *args: str | os.PathLike[str], timeout: float | None = None |
| | ) -> RunResult: |
| | """Run pytest as a subprocess with given arguments. |
| | |
| | Any plugins added to the :py:attr:`plugins` list will be added using the |
| | ``-p`` command line option. Additionally ``--basetemp`` is used to put |
| | any temporary files and directories in a numbered directory prefixed |
| | with "runpytest-" to not conflict with the normal numbered pytest |
| | location for temporary files and directories. |
| | |
| | :param args: |
| | The sequence of arguments to pass to the pytest subprocess. |
| | :param timeout: |
| | The period in seconds after which to timeout and raise |
| | :py:class:`Pytester.TimeoutExpired`. |
| | :returns: |
| | The result. |
| | """ |
| | __tracebackhide__ = True |
| | p = make_numbered_dir(root=self.path, prefix="runpytest-", mode=0o700) |
| | args = (f"--basetemp={p}", *args) |
| | plugins = [x for x in self.plugins if isinstance(x, str)] |
| | if plugins: |
| | args = ("-p", plugins[0], *args) |
| | args = self._getpytestargs() + args |
| | return self.run(*args, timeout=timeout) |
| |
|
| | def spawn_pytest(self, string: str, expect_timeout: float = 10.0) -> pexpect.spawn: |
| | """Run pytest using pexpect. |
| | |
| | This makes sure to use the right pytest and sets up the temporary |
| | directory locations. |
| | |
| | The pexpect child is returned. |
| | """ |
| | basetemp = self.path / "temp-pexpect" |
| | basetemp.mkdir(mode=0o700) |
| | invoke = " ".join(map(str, self._getpytestargs())) |
| | cmd = f"{invoke} --basetemp={basetemp} {string}" |
| | return self.spawn(cmd, expect_timeout=expect_timeout) |
| |
|
| | def spawn(self, cmd: str, expect_timeout: float = 10.0) -> pexpect.spawn: |
| | """Run a command using pexpect. |
| | |
| | The pexpect child is returned. |
| | """ |
| | pexpect = importorskip("pexpect", "3.0") |
| | if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): |
| | skip("pypy-64 bit not supported") |
| | if not hasattr(pexpect, "spawn"): |
| | skip("pexpect.spawn not available") |
| | logfile = self.path.joinpath("spawn.out").open("wb") |
| |
|
| | child = pexpect.spawn(cmd, logfile=logfile, timeout=expect_timeout) |
| | self._request.addfinalizer(logfile.close) |
| | return child |
| |
|
| |
|
| | class LineComp: |
| | def __init__(self) -> None: |
| | self.stringio = StringIO() |
| | """:class:`python:io.StringIO()` instance used for input.""" |
| |
|
| | def assert_contains_lines(self, lines2: Sequence[str]) -> None: |
| | """Assert that ``lines2`` are contained (linearly) in :attr:`stringio`'s value. |
| | |
| | Lines are matched using :func:`LineMatcher.fnmatch_lines <pytest.LineMatcher.fnmatch_lines>`. |
| | """ |
| | __tracebackhide__ = True |
| | val = self.stringio.getvalue() |
| | self.stringio.truncate(0) |
| | self.stringio.seek(0) |
| | lines1 = val.split("\n") |
| | LineMatcher(lines1).fnmatch_lines(lines2) |
| |
|
| |
|
| | class LineMatcher: |
| | """Flexible matching of text. |
| | |
| | This is a convenience class to test large texts like the output of |
| | commands. |
| | |
| | The constructor takes a list of lines without their trailing newlines, i.e. |
| | ``text.splitlines()``. |
| | """ |
| |
|
| | def __init__(self, lines: list[str]) -> None: |
| | self.lines = lines |
| | self._log_output: list[str] = [] |
| |
|
| | def __str__(self) -> str: |
| | """Return the entire original text. |
| | |
| | .. versionadded:: 6.2 |
| | You can use :meth:`str` in older versions. |
| | """ |
| | return "\n".join(self.lines) |
| |
|
| | def _getlines(self, lines2: str | Sequence[str] | Source) -> Sequence[str]: |
| | if isinstance(lines2, str): |
| | lines2 = Source(lines2) |
| | if isinstance(lines2, Source): |
| | lines2 = lines2.strip().lines |
| | return lines2 |
| |
|
| | def fnmatch_lines_random(self, lines2: Sequence[str]) -> None: |
| | """Check lines exist in the output in any order (using :func:`python:fnmatch.fnmatch`).""" |
| | __tracebackhide__ = True |
| | self._match_lines_random(lines2, fnmatch) |
| |
|
| | def re_match_lines_random(self, lines2: Sequence[str]) -> None: |
| | """Check lines exist in the output in any order (using :func:`python:re.match`).""" |
| | __tracebackhide__ = True |
| | self._match_lines_random(lines2, lambda name, pat: bool(re.match(pat, name))) |
| |
|
| | def _match_lines_random( |
| | self, lines2: Sequence[str], match_func: Callable[[str, str], bool] |
| | ) -> None: |
| | __tracebackhide__ = True |
| | lines2 = self._getlines(lines2) |
| | for line in lines2: |
| | for x in self.lines: |
| | if line == x or match_func(x, line): |
| | self._log("matched: ", repr(line)) |
| | break |
| | else: |
| | msg = f"line {line!r} not found in output" |
| | self._log(msg) |
| | self._fail(msg) |
| |
|
| | def get_lines_after(self, fnline: str) -> Sequence[str]: |
| | """Return all lines following the given line in the text. |
| | |
| | The given line can contain glob wildcards. |
| | """ |
| | for i, line in enumerate(self.lines): |
| | if fnline == line or fnmatch(line, fnline): |
| | return self.lines[i + 1 :] |
| | raise ValueError(f"line {fnline!r} not found in output") |
| |
|
| | def _log(self, *args) -> None: |
| | self._log_output.append(" ".join(str(x) for x in args)) |
| |
|
| | @property |
| | def _log_text(self) -> str: |
| | return "\n".join(self._log_output) |
| |
|
| | def fnmatch_lines( |
| | self, lines2: Sequence[str], *, consecutive: bool = False |
| | ) -> None: |
| | """Check lines exist in the output (using :func:`python:fnmatch.fnmatch`). |
| | |
| | The argument is a list of lines which have to match and can use glob |
| | wildcards. If they do not match a pytest.fail() is called. The |
| | matches and non-matches are also shown as part of the error message. |
| | |
| | :param lines2: String patterns to match. |
| | :param consecutive: Match lines consecutively? |
| | """ |
| | __tracebackhide__ = True |
| | self._match_lines(lines2, fnmatch, "fnmatch", consecutive=consecutive) |
| |
|
| | def re_match_lines( |
| | self, lines2: Sequence[str], *, consecutive: bool = False |
| | ) -> None: |
| | """Check lines exist in the output (using :func:`python:re.match`). |
| | |
| | The argument is a list of lines which have to match using ``re.match``. |
| | If they do not match a pytest.fail() is called. |
| | |
| | The matches and non-matches are also shown as part of the error message. |
| | |
| | :param lines2: string patterns to match. |
| | :param consecutive: match lines consecutively? |
| | """ |
| | __tracebackhide__ = True |
| | self._match_lines( |
| | lines2, |
| | lambda name, pat: bool(re.match(pat, name)), |
| | "re.match", |
| | consecutive=consecutive, |
| | ) |
| |
|
| | def _match_lines( |
| | self, |
| | lines2: Sequence[str], |
| | match_func: Callable[[str, str], bool], |
| | match_nickname: str, |
| | *, |
| | consecutive: bool = False, |
| | ) -> None: |
| | """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. |
| | |
| | :param Sequence[str] lines2: |
| | List of string patterns to match. The actual format depends on |
| | ``match_func``. |
| | :param match_func: |
| | A callable ``match_func(line, pattern)`` where line is the |
| | captured line from stdout/stderr and pattern is the matching |
| | pattern. |
| | :param str match_nickname: |
| | The nickname for the match function that will be logged to stdout |
| | when a match occurs. |
| | :param consecutive: |
| | Match lines consecutively? |
| | """ |
| | if not isinstance(lines2, collections.abc.Sequence): |
| | raise TypeError(f"invalid type for lines2: {type(lines2).__name__}") |
| | lines2 = self._getlines(lines2) |
| | lines1 = self.lines[:] |
| | extralines = [] |
| | __tracebackhide__ = True |
| | wnick = len(match_nickname) + 1 |
| | started = False |
| | for line in lines2: |
| | nomatchprinted = False |
| | while lines1: |
| | nextline = lines1.pop(0) |
| | if line == nextline: |
| | self._log("exact match:", repr(line)) |
| | started = True |
| | break |
| | elif match_func(nextline, line): |
| | self._log(f"{match_nickname}:", repr(line)) |
| | self._log( |
| | "{:>{width}}".format("with:", width=wnick), repr(nextline) |
| | ) |
| | started = True |
| | break |
| | else: |
| | if consecutive and started: |
| | msg = f"no consecutive match: {line!r}" |
| | self._log(msg) |
| | self._log( |
| | "{:>{width}}".format("with:", width=wnick), repr(nextline) |
| | ) |
| | self._fail(msg) |
| | if not nomatchprinted: |
| | self._log( |
| | "{:>{width}}".format("nomatch:", width=wnick), repr(line) |
| | ) |
| | nomatchprinted = True |
| | self._log("{:>{width}}".format("and:", width=wnick), repr(nextline)) |
| | extralines.append(nextline) |
| | else: |
| | msg = f"remains unmatched: {line!r}" |
| | self._log(msg) |
| | self._fail(msg) |
| | self._log_output = [] |
| |
|
| | def no_fnmatch_line(self, pat: str) -> None: |
| | """Ensure captured lines do not match the given pattern, using ``fnmatch.fnmatch``. |
| | |
| | :param str pat: The pattern to match lines. |
| | """ |
| | __tracebackhide__ = True |
| | self._no_match_line(pat, fnmatch, "fnmatch") |
| |
|
| | def no_re_match_line(self, pat: str) -> None: |
| | """Ensure captured lines do not match the given pattern, using ``re.match``. |
| | |
| | :param str pat: The regular expression to match lines. |
| | """ |
| | __tracebackhide__ = True |
| | self._no_match_line( |
| | pat, lambda name, pat: bool(re.match(pat, name)), "re.match" |
| | ) |
| |
|
| | def _no_match_line( |
| | self, pat: str, match_func: Callable[[str, str], bool], match_nickname: str |
| | ) -> None: |
| | """Ensure captured lines does not have a the given pattern, using ``fnmatch.fnmatch``. |
| | |
| | :param str pat: The pattern to match lines. |
| | """ |
| | __tracebackhide__ = True |
| | nomatch_printed = False |
| | wnick = len(match_nickname) + 1 |
| | for line in self.lines: |
| | if match_func(line, pat): |
| | msg = f"{match_nickname}: {pat!r}" |
| | self._log(msg) |
| | self._log("{:>{width}}".format("with:", width=wnick), repr(line)) |
| | self._fail(msg) |
| | else: |
| | if not nomatch_printed: |
| | self._log("{:>{width}}".format("nomatch:", width=wnick), repr(pat)) |
| | nomatch_printed = True |
| | self._log("{:>{width}}".format("and:", width=wnick), repr(line)) |
| | self._log_output = [] |
| |
|
| | def _fail(self, msg: str) -> None: |
| | __tracebackhide__ = True |
| | log_text = self._log_text |
| | self._log_output = [] |
| | fail(log_text) |
| |
|
| | def str(self) -> str: |
| | """Return the entire original text.""" |
| | return str(self) |
| |
|