|
|
"""Core implementation of the testing process: init, session, runtest loop.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import dataclasses |
|
|
import fnmatch |
|
|
import functools |
|
|
import importlib |
|
|
import importlib.util |
|
|
import os |
|
|
from pathlib import Path |
|
|
import sys |
|
|
from typing import AbstractSet |
|
|
from typing import Callable |
|
|
from typing import Dict |
|
|
from typing import final |
|
|
from typing import Iterable |
|
|
from typing import Iterator |
|
|
from typing import Literal |
|
|
from typing import overload |
|
|
from typing import Sequence |
|
|
from typing import TYPE_CHECKING |
|
|
import warnings |
|
|
|
|
|
import pluggy |
|
|
|
|
|
from _pytest import nodes |
|
|
import _pytest._code |
|
|
from _pytest.config import Config |
|
|
from _pytest.config import directory_arg |
|
|
from _pytest.config import ExitCode |
|
|
from _pytest.config import hookimpl |
|
|
from _pytest.config import PytestPluginManager |
|
|
from _pytest.config import UsageError |
|
|
from _pytest.config.argparsing import Parser |
|
|
from _pytest.config.compat import PathAwareHookProxy |
|
|
from _pytest.outcomes import exit |
|
|
from _pytest.pathlib import absolutepath |
|
|
from _pytest.pathlib import bestrelpath |
|
|
from _pytest.pathlib import fnmatch_ex |
|
|
from _pytest.pathlib import safe_exists |
|
|
from _pytest.pathlib import scandir |
|
|
from _pytest.reports import CollectReport |
|
|
from _pytest.reports import TestReport |
|
|
from _pytest.runner import collect_one_node |
|
|
from _pytest.runner import SetupState |
|
|
from _pytest.warning_types import PytestWarning |
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from typing_extensions import Self |
|
|
|
|
|
from _pytest.fixtures import FixtureManager |
|
|
|
|
|
|
|
|
def pytest_addoption(parser: Parser) -> None: |
|
|
parser.addini( |
|
|
"norecursedirs", |
|
|
"Directory patterns to avoid for recursion", |
|
|
type="args", |
|
|
default=[ |
|
|
"*.egg", |
|
|
".*", |
|
|
"_darcs", |
|
|
"build", |
|
|
"CVS", |
|
|
"dist", |
|
|
"node_modules", |
|
|
"venv", |
|
|
"{arch}", |
|
|
], |
|
|
) |
|
|
parser.addini( |
|
|
"testpaths", |
|
|
"Directories to search for tests when no files or directories are given on the " |
|
|
"command line", |
|
|
type="args", |
|
|
default=[], |
|
|
) |
|
|
group = parser.getgroup("general", "Running and selection options") |
|
|
group._addoption( |
|
|
"-x", |
|
|
"--exitfirst", |
|
|
action="store_const", |
|
|
dest="maxfail", |
|
|
const=1, |
|
|
help="Exit instantly on first error or failed test", |
|
|
) |
|
|
group = parser.getgroup("pytest-warnings") |
|
|
group.addoption( |
|
|
"-W", |
|
|
"--pythonwarnings", |
|
|
action="append", |
|
|
help="Set which warnings to report, see -W option of Python itself", |
|
|
) |
|
|
parser.addini( |
|
|
"filterwarnings", |
|
|
type="linelist", |
|
|
help="Each line specifies a pattern for " |
|
|
"warnings.filterwarnings. " |
|
|
"Processed after -W/--pythonwarnings.", |
|
|
) |
|
|
group._addoption( |
|
|
"--maxfail", |
|
|
metavar="num", |
|
|
action="store", |
|
|
type=int, |
|
|
dest="maxfail", |
|
|
default=0, |
|
|
help="Exit after first num failures or errors", |
|
|
) |
|
|
group._addoption( |
|
|
"--strict-config", |
|
|
action="store_true", |
|
|
help="Any warnings encountered while parsing the `pytest` section of the " |
|
|
"configuration file raise errors", |
|
|
) |
|
|
group._addoption( |
|
|
"--strict-markers", |
|
|
action="store_true", |
|
|
help="Markers not registered in the `markers` section of the configuration " |
|
|
"file raise errors", |
|
|
) |
|
|
group._addoption( |
|
|
"--strict", |
|
|
action="store_true", |
|
|
help="(Deprecated) alias to --strict-markers", |
|
|
) |
|
|
group._addoption( |
|
|
"-c", |
|
|
"--config-file", |
|
|
metavar="FILE", |
|
|
type=str, |
|
|
dest="inifilename", |
|
|
help="Load configuration from `FILE` instead of trying to locate one of the " |
|
|
"implicit configuration files.", |
|
|
) |
|
|
group._addoption( |
|
|
"--continue-on-collection-errors", |
|
|
action="store_true", |
|
|
default=False, |
|
|
dest="continue_on_collection_errors", |
|
|
help="Force test execution even if collection errors occur", |
|
|
) |
|
|
group._addoption( |
|
|
"--rootdir", |
|
|
action="store", |
|
|
dest="rootdir", |
|
|
help="Define root directory for tests. Can be relative path: 'root_dir', './root_dir', " |
|
|
"'root_dir/another_dir/'; absolute path: '/home/user/root_dir'; path with variables: " |
|
|
"'$HOME/root_dir'.", |
|
|
) |
|
|
|
|
|
group = parser.getgroup("collect", "collection") |
|
|
group.addoption( |
|
|
"--collectonly", |
|
|
"--collect-only", |
|
|
"--co", |
|
|
action="store_true", |
|
|
help="Only collect tests, don't execute them", |
|
|
) |
|
|
group.addoption( |
|
|
"--pyargs", |
|
|
action="store_true", |
|
|
help="Try to interpret all arguments as Python packages", |
|
|
) |
|
|
group.addoption( |
|
|
"--ignore", |
|
|
action="append", |
|
|
metavar="path", |
|
|
help="Ignore path during collection (multi-allowed)", |
|
|
) |
|
|
group.addoption( |
|
|
"--ignore-glob", |
|
|
action="append", |
|
|
metavar="path", |
|
|
help="Ignore path pattern during collection (multi-allowed)", |
|
|
) |
|
|
group.addoption( |
|
|
"--deselect", |
|
|
action="append", |
|
|
metavar="nodeid_prefix", |
|
|
help="Deselect item (via node id prefix) during collection (multi-allowed)", |
|
|
) |
|
|
group.addoption( |
|
|
"--confcutdir", |
|
|
dest="confcutdir", |
|
|
default=None, |
|
|
metavar="dir", |
|
|
type=functools.partial(directory_arg, optname="--confcutdir"), |
|
|
help="Only load conftest.py's relative to specified dir", |
|
|
) |
|
|
group.addoption( |
|
|
"--noconftest", |
|
|
action="store_true", |
|
|
dest="noconftest", |
|
|
default=False, |
|
|
help="Don't load any conftest.py files", |
|
|
) |
|
|
group.addoption( |
|
|
"--keepduplicates", |
|
|
"--keep-duplicates", |
|
|
action="store_true", |
|
|
dest="keepduplicates", |
|
|
default=False, |
|
|
help="Keep duplicate tests", |
|
|
) |
|
|
group.addoption( |
|
|
"--collect-in-virtualenv", |
|
|
action="store_true", |
|
|
dest="collect_in_virtualenv", |
|
|
default=False, |
|
|
help="Don't ignore tests in a local virtualenv directory", |
|
|
) |
|
|
group.addoption( |
|
|
"--import-mode", |
|
|
default="prepend", |
|
|
choices=["prepend", "append", "importlib"], |
|
|
dest="importmode", |
|
|
help="Prepend/append to sys.path when importing test modules and conftest " |
|
|
"files. Default: prepend.", |
|
|
) |
|
|
parser.addini( |
|
|
"consider_namespace_packages", |
|
|
type="bool", |
|
|
default=False, |
|
|
help="Consider namespace packages when resolving module names during import", |
|
|
) |
|
|
|
|
|
group = parser.getgroup("debugconfig", "test session debugging and configuration") |
|
|
group.addoption( |
|
|
"--basetemp", |
|
|
dest="basetemp", |
|
|
default=None, |
|
|
type=validate_basetemp, |
|
|
metavar="dir", |
|
|
help=( |
|
|
"Base temporary directory for this test run. " |
|
|
"(Warning: this directory is removed if it exists.)" |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
def validate_basetemp(path: str) -> str: |
|
|
|
|
|
msg = "basetemp must not be empty, the current working directory or any parent directory of it" |
|
|
|
|
|
|
|
|
if not path: |
|
|
raise argparse.ArgumentTypeError(msg) |
|
|
|
|
|
def is_ancestor(base: Path, query: Path) -> bool: |
|
|
"""Return whether query is an ancestor of base.""" |
|
|
if base == query: |
|
|
return True |
|
|
return query in base.parents |
|
|
|
|
|
|
|
|
if is_ancestor(Path.cwd(), Path(path).absolute()): |
|
|
raise argparse.ArgumentTypeError(msg) |
|
|
|
|
|
|
|
|
if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): |
|
|
raise argparse.ArgumentTypeError(msg) |
|
|
|
|
|
return path |
|
|
|
|
|
|
|
|
def wrap_session( |
|
|
config: Config, doit: Callable[[Config, Session], int | ExitCode | None] |
|
|
) -> int | ExitCode: |
|
|
"""Skeleton command line program.""" |
|
|
session = Session.from_config(config) |
|
|
session.exitstatus = ExitCode.OK |
|
|
initstate = 0 |
|
|
try: |
|
|
try: |
|
|
config._do_configure() |
|
|
initstate = 1 |
|
|
config.hook.pytest_sessionstart(session=session) |
|
|
initstate = 2 |
|
|
session.exitstatus = doit(config, session) or 0 |
|
|
except UsageError: |
|
|
session.exitstatus = ExitCode.USAGE_ERROR |
|
|
raise |
|
|
except Failed: |
|
|
session.exitstatus = ExitCode.TESTS_FAILED |
|
|
except (KeyboardInterrupt, exit.Exception): |
|
|
excinfo = _pytest._code.ExceptionInfo.from_current() |
|
|
exitstatus: int | ExitCode = ExitCode.INTERRUPTED |
|
|
if isinstance(excinfo.value, exit.Exception): |
|
|
if excinfo.value.returncode is not None: |
|
|
exitstatus = excinfo.value.returncode |
|
|
if initstate < 2: |
|
|
sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") |
|
|
config.hook.pytest_keyboard_interrupt(excinfo=excinfo) |
|
|
session.exitstatus = exitstatus |
|
|
except BaseException: |
|
|
session.exitstatus = ExitCode.INTERNAL_ERROR |
|
|
excinfo = _pytest._code.ExceptionInfo.from_current() |
|
|
try: |
|
|
config.notify_exception(excinfo, config.option) |
|
|
except exit.Exception as exc: |
|
|
if exc.returncode is not None: |
|
|
session.exitstatus = exc.returncode |
|
|
sys.stderr.write(f"{type(exc).__name__}: {exc}\n") |
|
|
else: |
|
|
if isinstance(excinfo.value, SystemExit): |
|
|
sys.stderr.write("mainloop: caught unexpected SystemExit!\n") |
|
|
|
|
|
finally: |
|
|
|
|
|
excinfo = None |
|
|
os.chdir(session.startpath) |
|
|
if initstate >= 2: |
|
|
try: |
|
|
config.hook.pytest_sessionfinish( |
|
|
session=session, exitstatus=session.exitstatus |
|
|
) |
|
|
except exit.Exception as exc: |
|
|
if exc.returncode is not None: |
|
|
session.exitstatus = exc.returncode |
|
|
sys.stderr.write(f"{type(exc).__name__}: {exc}\n") |
|
|
config._ensure_unconfigure() |
|
|
return session.exitstatus |
|
|
|
|
|
|
|
|
def pytest_cmdline_main(config: Config) -> int | ExitCode: |
|
|
return wrap_session(config, _main) |
|
|
|
|
|
|
|
|
def _main(config: Config, session: Session) -> int | ExitCode | None: |
|
|
"""Default command line protocol for initialization, session, |
|
|
running tests and reporting.""" |
|
|
config.hook.pytest_collection(session=session) |
|
|
config.hook.pytest_runtestloop(session=session) |
|
|
|
|
|
if session.testsfailed: |
|
|
return ExitCode.TESTS_FAILED |
|
|
elif session.testscollected == 0: |
|
|
return ExitCode.NO_TESTS_COLLECTED |
|
|
return None |
|
|
|
|
|
|
|
|
def pytest_collection(session: Session) -> None: |
|
|
session.perform_collect() |
|
|
|
|
|
|
|
|
def pytest_runtestloop(session: Session) -> bool: |
|
|
if session.testsfailed and not session.config.option.continue_on_collection_errors: |
|
|
raise session.Interrupted( |
|
|
"%d error%s during collection" |
|
|
% (session.testsfailed, "s" if session.testsfailed != 1 else "") |
|
|
) |
|
|
|
|
|
if session.config.option.collectonly: |
|
|
return True |
|
|
|
|
|
for i, item in enumerate(session.items): |
|
|
nextitem = session.items[i + 1] if i + 1 < len(session.items) else None |
|
|
item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) |
|
|
if session.shouldfail: |
|
|
raise session.Failed(session.shouldfail) |
|
|
if session.shouldstop: |
|
|
raise session.Interrupted(session.shouldstop) |
|
|
return True |
|
|
|
|
|
|
|
|
def _in_venv(path: Path) -> bool: |
|
|
"""Attempt to detect if ``path`` is the root of a Virtual Environment by |
|
|
checking for the existence of the pyvenv.cfg file. |
|
|
|
|
|
[https://peps.python.org/pep-0405/] |
|
|
|
|
|
For regression protection we also check for conda environments that do not include pyenv.cfg yet -- |
|
|
https://github.com/conda/conda/issues/13337 is the conda issue tracking adding pyenv.cfg. |
|
|
|
|
|
Checking for the `conda-meta/history` file per https://github.com/pytest-dev/pytest/issues/12652#issuecomment-2246336902. |
|
|
|
|
|
""" |
|
|
try: |
|
|
return ( |
|
|
path.joinpath("pyvenv.cfg").is_file() |
|
|
or path.joinpath("conda-meta", "history").is_file() |
|
|
) |
|
|
except OSError: |
|
|
return False |
|
|
|
|
|
|
|
|
def pytest_ignore_collect(collection_path: Path, config: Config) -> bool | None: |
|
|
if collection_path.name == "__pycache__": |
|
|
return True |
|
|
|
|
|
ignore_paths = config._getconftest_pathlist( |
|
|
"collect_ignore", path=collection_path.parent |
|
|
) |
|
|
ignore_paths = ignore_paths or [] |
|
|
excludeopt = config.getoption("ignore") |
|
|
if excludeopt: |
|
|
ignore_paths.extend(absolutepath(x) for x in excludeopt) |
|
|
|
|
|
if collection_path in ignore_paths: |
|
|
return True |
|
|
|
|
|
ignore_globs = config._getconftest_pathlist( |
|
|
"collect_ignore_glob", path=collection_path.parent |
|
|
) |
|
|
ignore_globs = ignore_globs or [] |
|
|
excludeglobopt = config.getoption("ignore_glob") |
|
|
if excludeglobopt: |
|
|
ignore_globs.extend(absolutepath(x) for x in excludeglobopt) |
|
|
|
|
|
if any(fnmatch.fnmatch(str(collection_path), str(glob)) for glob in ignore_globs): |
|
|
return True |
|
|
|
|
|
allow_in_venv = config.getoption("collect_in_virtualenv") |
|
|
if not allow_in_venv and _in_venv(collection_path): |
|
|
return True |
|
|
|
|
|
if collection_path.is_dir(): |
|
|
norecursepatterns = config.getini("norecursedirs") |
|
|
if any(fnmatch_ex(pat, collection_path) for pat in norecursepatterns): |
|
|
return True |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def pytest_collect_directory( |
|
|
path: Path, parent: nodes.Collector |
|
|
) -> nodes.Collector | None: |
|
|
return Dir.from_parent(parent, path=path) |
|
|
|
|
|
|
|
|
def pytest_collection_modifyitems(items: list[nodes.Item], config: Config) -> None: |
|
|
deselect_prefixes = tuple(config.getoption("deselect") or []) |
|
|
if not deselect_prefixes: |
|
|
return |
|
|
|
|
|
remaining = [] |
|
|
deselected = [] |
|
|
for colitem in items: |
|
|
if colitem.nodeid.startswith(deselect_prefixes): |
|
|
deselected.append(colitem) |
|
|
else: |
|
|
remaining.append(colitem) |
|
|
|
|
|
if deselected: |
|
|
config.hook.pytest_deselected(items=deselected) |
|
|
items[:] = remaining |
|
|
|
|
|
|
|
|
class FSHookProxy: |
|
|
def __init__( |
|
|
self, |
|
|
pm: PytestPluginManager, |
|
|
remove_mods: AbstractSet[object], |
|
|
) -> None: |
|
|
self.pm = pm |
|
|
self.remove_mods = remove_mods |
|
|
|
|
|
def __getattr__(self, name: str) -> pluggy.HookCaller: |
|
|
x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) |
|
|
self.__dict__[name] = x |
|
|
return x |
|
|
|
|
|
|
|
|
class Interrupted(KeyboardInterrupt): |
|
|
"""Signals that the test run was interrupted.""" |
|
|
|
|
|
__module__ = "builtins" |
|
|
|
|
|
|
|
|
class Failed(Exception): |
|
|
"""Signals a stop as failed test run.""" |
|
|
|
|
|
|
|
|
@dataclasses.dataclass |
|
|
class _bestrelpath_cache(Dict[Path, str]): |
|
|
__slots__ = ("path",) |
|
|
|
|
|
path: Path |
|
|
|
|
|
def __missing__(self, path: Path) -> str: |
|
|
r = bestrelpath(self.path, path) |
|
|
self[path] = r |
|
|
return r |
|
|
|
|
|
|
|
|
@final |
|
|
class Dir(nodes.Directory): |
|
|
"""Collector of files in a file system directory. |
|
|
|
|
|
.. versionadded:: 8.0 |
|
|
|
|
|
.. note:: |
|
|
|
|
|
Python directories with an `__init__.py` file are instead collected by |
|
|
:class:`~pytest.Package` by default. Both are :class:`~pytest.Directory` |
|
|
collectors. |
|
|
""" |
|
|
|
|
|
@classmethod |
|
|
def from_parent( |
|
|
cls, |
|
|
parent: nodes.Collector, |
|
|
*, |
|
|
path: Path, |
|
|
) -> Self: |
|
|
"""The public constructor. |
|
|
|
|
|
:param parent: The parent collector of this Dir. |
|
|
:param path: The directory's path. |
|
|
:type path: pathlib.Path |
|
|
""" |
|
|
return super().from_parent(parent=parent, path=path) |
|
|
|
|
|
def collect(self) -> Iterable[nodes.Item | nodes.Collector]: |
|
|
config = self.config |
|
|
col: nodes.Collector | None |
|
|
cols: Sequence[nodes.Collector] |
|
|
ihook = self.ihook |
|
|
for direntry in scandir(self.path): |
|
|
if direntry.is_dir(): |
|
|
path = Path(direntry.path) |
|
|
if not self.session.isinitpath(path, with_parents=True): |
|
|
if ihook.pytest_ignore_collect(collection_path=path, config=config): |
|
|
continue |
|
|
col = ihook.pytest_collect_directory(path=path, parent=self) |
|
|
if col is not None: |
|
|
yield col |
|
|
|
|
|
elif direntry.is_file(): |
|
|
path = Path(direntry.path) |
|
|
if not self.session.isinitpath(path): |
|
|
if ihook.pytest_ignore_collect(collection_path=path, config=config): |
|
|
continue |
|
|
cols = ihook.pytest_collect_file(file_path=path, parent=self) |
|
|
yield from cols |
|
|
|
|
|
|
|
|
@final |
|
|
class Session(nodes.Collector): |
|
|
"""The root of the collection tree. |
|
|
|
|
|
``Session`` collects the initial paths given as arguments to pytest. |
|
|
""" |
|
|
|
|
|
Interrupted = Interrupted |
|
|
Failed = Failed |
|
|
|
|
|
_setupstate: SetupState |
|
|
|
|
|
_fixturemanager: FixtureManager |
|
|
exitstatus: int | ExitCode |
|
|
|
|
|
def __init__(self, config: Config) -> None: |
|
|
super().__init__( |
|
|
name="", |
|
|
path=config.rootpath, |
|
|
fspath=None, |
|
|
parent=None, |
|
|
config=config, |
|
|
session=self, |
|
|
nodeid="", |
|
|
) |
|
|
self.testsfailed = 0 |
|
|
self.testscollected = 0 |
|
|
self._shouldstop: bool | str = False |
|
|
self._shouldfail: bool | str = False |
|
|
self.trace = config.trace.root.get("collection") |
|
|
self._initialpaths: frozenset[Path] = frozenset() |
|
|
self._initialpaths_with_parents: frozenset[Path] = frozenset() |
|
|
self._notfound: list[tuple[str, Sequence[nodes.Collector]]] = [] |
|
|
self._initial_parts: list[CollectionArgument] = [] |
|
|
self._collection_cache: dict[nodes.Collector, CollectReport] = {} |
|
|
self.items: list[nodes.Item] = [] |
|
|
|
|
|
self._bestrelpathcache: dict[Path, str] = _bestrelpath_cache(config.rootpath) |
|
|
|
|
|
self.config.pluginmanager.register(self, name="session") |
|
|
|
|
|
@classmethod |
|
|
def from_config(cls, config: Config) -> Session: |
|
|
session: Session = cls._create(config=config) |
|
|
return session |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % ( |
|
|
self.__class__.__name__, |
|
|
self.name, |
|
|
getattr(self, "exitstatus", "<UNSET>"), |
|
|
self.testsfailed, |
|
|
self.testscollected, |
|
|
) |
|
|
|
|
|
@property |
|
|
def shouldstop(self) -> bool | str: |
|
|
return self._shouldstop |
|
|
|
|
|
@shouldstop.setter |
|
|
def shouldstop(self, value: bool | str) -> None: |
|
|
|
|
|
|
|
|
if value is False and self._shouldstop: |
|
|
warnings.warn( |
|
|
PytestWarning( |
|
|
"session.shouldstop cannot be unset after it has been set; ignoring." |
|
|
), |
|
|
stacklevel=2, |
|
|
) |
|
|
return |
|
|
self._shouldstop = value |
|
|
|
|
|
@property |
|
|
def shouldfail(self) -> bool | str: |
|
|
return self._shouldfail |
|
|
|
|
|
@shouldfail.setter |
|
|
def shouldfail(self, value: bool | str) -> None: |
|
|
|
|
|
|
|
|
if value is False and self._shouldfail: |
|
|
warnings.warn( |
|
|
PytestWarning( |
|
|
"session.shouldfail cannot be unset after it has been set; ignoring." |
|
|
), |
|
|
stacklevel=2, |
|
|
) |
|
|
return |
|
|
self._shouldfail = value |
|
|
|
|
|
@property |
|
|
def startpath(self) -> Path: |
|
|
"""The path from which pytest was invoked. |
|
|
|
|
|
.. versionadded:: 7.0.0 |
|
|
""" |
|
|
return self.config.invocation_params.dir |
|
|
|
|
|
def _node_location_to_relpath(self, node_path: Path) -> str: |
|
|
|
|
|
return self._bestrelpathcache[node_path] |
|
|
|
|
|
@hookimpl(tryfirst=True) |
|
|
def pytest_collectstart(self) -> None: |
|
|
if self.shouldfail: |
|
|
raise self.Failed(self.shouldfail) |
|
|
if self.shouldstop: |
|
|
raise self.Interrupted(self.shouldstop) |
|
|
|
|
|
@hookimpl(tryfirst=True) |
|
|
def pytest_runtest_logreport(self, report: TestReport | CollectReport) -> None: |
|
|
if report.failed and not hasattr(report, "wasxfail"): |
|
|
self.testsfailed += 1 |
|
|
maxfail = self.config.getvalue("maxfail") |
|
|
if maxfail and self.testsfailed >= maxfail: |
|
|
self.shouldfail = "stopping after %d failures" % (self.testsfailed) |
|
|
|
|
|
pytest_collectreport = pytest_runtest_logreport |
|
|
|
|
|
def isinitpath( |
|
|
self, |
|
|
path: str | os.PathLike[str], |
|
|
*, |
|
|
with_parents: bool = False, |
|
|
) -> bool: |
|
|
"""Is path an initial path? |
|
|
|
|
|
An initial path is a path explicitly given to pytest on the command |
|
|
line. |
|
|
|
|
|
:param with_parents: |
|
|
If set, also return True if the path is a parent of an initial path. |
|
|
|
|
|
.. versionchanged:: 8.0 |
|
|
Added the ``with_parents`` parameter. |
|
|
""" |
|
|
|
|
|
path_ = path if isinstance(path, Path) else Path(path) |
|
|
if with_parents: |
|
|
return path_ in self._initialpaths_with_parents |
|
|
else: |
|
|
return path_ in self._initialpaths |
|
|
|
|
|
def gethookproxy(self, fspath: os.PathLike[str]) -> pluggy.HookRelay: |
|
|
|
|
|
path = fspath if isinstance(fspath, Path) else Path(fspath) |
|
|
pm = self.config.pluginmanager |
|
|
|
|
|
|
|
|
my_conftestmodules = pm._getconftestmodules(path) |
|
|
remove_mods = pm._conftest_plugins.difference(my_conftestmodules) |
|
|
proxy: pluggy.HookRelay |
|
|
if remove_mods: |
|
|
|
|
|
proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) |
|
|
else: |
|
|
|
|
|
proxy = self.config.hook |
|
|
return proxy |
|
|
|
|
|
def _collect_path( |
|
|
self, |
|
|
path: Path, |
|
|
path_cache: dict[Path, Sequence[nodes.Collector]], |
|
|
) -> Sequence[nodes.Collector]: |
|
|
"""Create a Collector for the given path. |
|
|
|
|
|
`path_cache` makes it so the same Collectors are returned for the same |
|
|
path. |
|
|
""" |
|
|
if path in path_cache: |
|
|
return path_cache[path] |
|
|
|
|
|
if path.is_dir(): |
|
|
ihook = self.gethookproxy(path.parent) |
|
|
col: nodes.Collector | None = ihook.pytest_collect_directory( |
|
|
path=path, parent=self |
|
|
) |
|
|
cols: Sequence[nodes.Collector] = (col,) if col is not None else () |
|
|
|
|
|
elif path.is_file(): |
|
|
ihook = self.gethookproxy(path) |
|
|
cols = ihook.pytest_collect_file(file_path=path, parent=self) |
|
|
|
|
|
else: |
|
|
|
|
|
cols = () |
|
|
|
|
|
path_cache[path] = cols |
|
|
return cols |
|
|
|
|
|
@overload |
|
|
def perform_collect( |
|
|
self, args: Sequence[str] | None = ..., genitems: Literal[True] = ... |
|
|
) -> Sequence[nodes.Item]: ... |
|
|
|
|
|
@overload |
|
|
def perform_collect( |
|
|
self, args: Sequence[str] | None = ..., genitems: bool = ... |
|
|
) -> Sequence[nodes.Item | nodes.Collector]: ... |
|
|
|
|
|
def perform_collect( |
|
|
self, args: Sequence[str] | None = None, genitems: bool = True |
|
|
) -> Sequence[nodes.Item | nodes.Collector]: |
|
|
"""Perform the collection phase for this session. |
|
|
|
|
|
This is called by the default :hook:`pytest_collection` hook |
|
|
implementation; see the documentation of this hook for more details. |
|
|
For testing purposes, it may also be called directly on a fresh |
|
|
``Session``. |
|
|
|
|
|
This function normally recursively expands any collectors collected |
|
|
from the session to their items, and only items are returned. For |
|
|
testing purposes, this may be suppressed by passing ``genitems=False``, |
|
|
in which case the return value contains these collectors unexpanded, |
|
|
and ``session.items`` is empty. |
|
|
""" |
|
|
if args is None: |
|
|
args = self.config.args |
|
|
|
|
|
self.trace("perform_collect", self, args) |
|
|
self.trace.root.indent += 1 |
|
|
|
|
|
hook = self.config.hook |
|
|
|
|
|
self._notfound = [] |
|
|
self._initial_parts = [] |
|
|
self._collection_cache = {} |
|
|
self.items = [] |
|
|
items: Sequence[nodes.Item | nodes.Collector] = self.items |
|
|
try: |
|
|
initialpaths: list[Path] = [] |
|
|
initialpaths_with_parents: list[Path] = [] |
|
|
for arg in args: |
|
|
collection_argument = resolve_collection_argument( |
|
|
self.config.invocation_params.dir, |
|
|
arg, |
|
|
as_pypath=self.config.option.pyargs, |
|
|
) |
|
|
self._initial_parts.append(collection_argument) |
|
|
initialpaths.append(collection_argument.path) |
|
|
initialpaths_with_parents.append(collection_argument.path) |
|
|
initialpaths_with_parents.extend(collection_argument.path.parents) |
|
|
self._initialpaths = frozenset(initialpaths) |
|
|
self._initialpaths_with_parents = frozenset(initialpaths_with_parents) |
|
|
|
|
|
rep = collect_one_node(self) |
|
|
self.ihook.pytest_collectreport(report=rep) |
|
|
self.trace.root.indent -= 1 |
|
|
if self._notfound: |
|
|
errors = [] |
|
|
for arg, collectors in self._notfound: |
|
|
if collectors: |
|
|
errors.append( |
|
|
f"not found: {arg}\n(no match in any of {collectors!r})" |
|
|
) |
|
|
else: |
|
|
errors.append(f"found no collectors for {arg}") |
|
|
|
|
|
raise UsageError(*errors) |
|
|
|
|
|
if not genitems: |
|
|
items = rep.result |
|
|
else: |
|
|
if rep.passed: |
|
|
for node in rep.result: |
|
|
self.items.extend(self.genitems(node)) |
|
|
|
|
|
self.config.pluginmanager.check_pending() |
|
|
hook.pytest_collection_modifyitems( |
|
|
session=self, config=self.config, items=items |
|
|
) |
|
|
finally: |
|
|
self._notfound = [] |
|
|
self._initial_parts = [] |
|
|
self._collection_cache = {} |
|
|
hook.pytest_collection_finish(session=self) |
|
|
|
|
|
if genitems: |
|
|
self.testscollected = len(items) |
|
|
|
|
|
return items |
|
|
|
|
|
def _collect_one_node( |
|
|
self, |
|
|
node: nodes.Collector, |
|
|
handle_dupes: bool = True, |
|
|
) -> tuple[CollectReport, bool]: |
|
|
if node in self._collection_cache and handle_dupes: |
|
|
rep = self._collection_cache[node] |
|
|
return rep, True |
|
|
else: |
|
|
rep = collect_one_node(node) |
|
|
self._collection_cache[node] = rep |
|
|
return rep, False |
|
|
|
|
|
def collect(self) -> Iterator[nodes.Item | nodes.Collector]: |
|
|
|
|
|
|
|
|
|
|
|
path_cache: dict[Path, Sequence[nodes.Collector]] = {} |
|
|
|
|
|
pm = self.config.pluginmanager |
|
|
|
|
|
for collection_argument in self._initial_parts: |
|
|
self.trace("processing argument", collection_argument) |
|
|
self.trace.root.indent += 1 |
|
|
|
|
|
argpath = collection_argument.path |
|
|
names = collection_argument.parts |
|
|
module_name = collection_argument.module_name |
|
|
|
|
|
|
|
|
if argpath.is_dir(): |
|
|
assert not names, f"invalid arg {(argpath, names)!r}" |
|
|
|
|
|
paths = [argpath] |
|
|
|
|
|
|
|
|
if module_name is None: |
|
|
|
|
|
for path in argpath.parents: |
|
|
if not pm._is_in_confcutdir(path): |
|
|
break |
|
|
paths.insert(0, path) |
|
|
else: |
|
|
|
|
|
|
|
|
module_name_parts = module_name.split(".") |
|
|
for i, path in enumerate(argpath.parents, 2): |
|
|
if i > len(module_name_parts) or path.stem != module_name_parts[-i]: |
|
|
break |
|
|
paths.insert(0, path) |
|
|
|
|
|
|
|
|
|
|
|
any_matched_in_initial_part = False |
|
|
notfound_collectors = [] |
|
|
work: list[tuple[nodes.Collector | nodes.Item, list[Path | str]]] = [ |
|
|
(self, [*paths, *names]) |
|
|
] |
|
|
while work: |
|
|
matchnode, matchparts = work.pop() |
|
|
|
|
|
|
|
|
if not matchparts: |
|
|
yield matchnode |
|
|
any_matched_in_initial_part = True |
|
|
continue |
|
|
|
|
|
|
|
|
if not isinstance(matchnode, nodes.Collector): |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subnodes: Sequence[nodes.Collector | nodes.Item] |
|
|
if isinstance(matchnode, Session): |
|
|
assert isinstance(matchparts[0], Path) |
|
|
subnodes = matchnode._collect_path(matchparts[0], path_cache) |
|
|
else: |
|
|
|
|
|
|
|
|
handle_dupes = not ( |
|
|
len(matchparts) == 1 |
|
|
and isinstance(matchparts[0], Path) |
|
|
and matchparts[0].is_file() |
|
|
) |
|
|
rep, duplicate = self._collect_one_node(matchnode, handle_dupes) |
|
|
if not duplicate and not rep.passed: |
|
|
|
|
|
|
|
|
|
|
|
matchnode.ihook.pytest_collectreport(report=rep) |
|
|
if not rep.passed: |
|
|
continue |
|
|
subnodes = rep.result |
|
|
|
|
|
|
|
|
any_matched_in_collector = False |
|
|
for node in reversed(subnodes): |
|
|
|
|
|
if isinstance(matchparts[0], Path): |
|
|
is_match = node.path == matchparts[0] |
|
|
if sys.platform == "win32" and not is_match: |
|
|
|
|
|
|
|
|
same_file = os.path.samefile(node.path, matchparts[0]) |
|
|
|
|
|
|
|
|
is_match = same_file and ( |
|
|
os.path.islink(node.path) |
|
|
== os.path.islink(matchparts[0]) |
|
|
) |
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
is_match = ( |
|
|
node.name == matchparts[0] |
|
|
or node.name.split("[")[0] == matchparts[0] |
|
|
) |
|
|
if is_match: |
|
|
work.append((node, matchparts[1:])) |
|
|
any_matched_in_collector = True |
|
|
|
|
|
if not any_matched_in_collector: |
|
|
notfound_collectors.append(matchnode) |
|
|
|
|
|
if not any_matched_in_initial_part: |
|
|
report_arg = "::".join((str(argpath), *names)) |
|
|
self._notfound.append((report_arg, notfound_collectors)) |
|
|
|
|
|
self.trace.root.indent -= 1 |
|
|
|
|
|
def genitems(self, node: nodes.Item | nodes.Collector) -> Iterator[nodes.Item]: |
|
|
self.trace("genitems", node) |
|
|
if isinstance(node, nodes.Item): |
|
|
node.ihook.pytest_itemcollected(item=node) |
|
|
yield node |
|
|
else: |
|
|
assert isinstance(node, nodes.Collector) |
|
|
keepduplicates = self.config.getoption("keepduplicates") |
|
|
|
|
|
handle_dupes = not (keepduplicates and isinstance(node, nodes.File)) |
|
|
rep, duplicate = self._collect_one_node(node, handle_dupes) |
|
|
if duplicate and not keepduplicates: |
|
|
return |
|
|
if rep.passed: |
|
|
for subnode in rep.result: |
|
|
yield from self.genitems(subnode) |
|
|
if not duplicate: |
|
|
node.ihook.pytest_collectreport(report=rep) |
|
|
|
|
|
|
|
|
def search_pypath(module_name: str) -> str | None: |
|
|
"""Search sys.path for the given a dotted module name, and return its file |
|
|
system path if found.""" |
|
|
try: |
|
|
spec = importlib.util.find_spec(module_name) |
|
|
|
|
|
|
|
|
|
|
|
except (AttributeError, ImportError, ValueError): |
|
|
return None |
|
|
if spec is None or spec.origin is None or spec.origin == "namespace": |
|
|
return None |
|
|
elif spec.submodule_search_locations: |
|
|
return os.path.dirname(spec.origin) |
|
|
else: |
|
|
return spec.origin |
|
|
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
|
class CollectionArgument: |
|
|
"""A resolved collection argument.""" |
|
|
|
|
|
path: Path |
|
|
parts: Sequence[str] |
|
|
module_name: str | None |
|
|
|
|
|
|
|
|
def resolve_collection_argument( |
|
|
invocation_path: Path, arg: str, *, as_pypath: bool = False |
|
|
) -> CollectionArgument: |
|
|
"""Parse path arguments optionally containing selection parts and return (fspath, names). |
|
|
|
|
|
Command-line arguments can point to files and/or directories, and optionally contain |
|
|
parts for specific tests selection, for example: |
|
|
|
|
|
"pkg/tests/test_foo.py::TestClass::test_foo" |
|
|
|
|
|
This function ensures the path exists, and returns a resolved `CollectionArgument`: |
|
|
|
|
|
CollectionArgument( |
|
|
path=Path("/full/path/to/pkg/tests/test_foo.py"), |
|
|
parts=["TestClass", "test_foo"], |
|
|
module_name=None, |
|
|
) |
|
|
|
|
|
When as_pypath is True, expects that the command-line argument actually contains |
|
|
module paths instead of file-system paths: |
|
|
|
|
|
"pkg.tests.test_foo::TestClass::test_foo" |
|
|
|
|
|
In which case we search sys.path for a matching module, and then return the *path* to the |
|
|
found module, which may look like this: |
|
|
|
|
|
CollectionArgument( |
|
|
path=Path("/home/u/myvenv/lib/site-packages/pkg/tests/test_foo.py"), |
|
|
parts=["TestClass", "test_foo"], |
|
|
module_name="pkg.tests.test_foo", |
|
|
) |
|
|
|
|
|
If the path doesn't exist, raise UsageError. |
|
|
If the path is a directory and selection parts are present, raise UsageError. |
|
|
""" |
|
|
base, squacket, rest = str(arg).partition("[") |
|
|
strpath, *parts = base.split("::") |
|
|
if parts: |
|
|
parts[-1] = f"{parts[-1]}{squacket}{rest}" |
|
|
module_name = None |
|
|
if as_pypath: |
|
|
pyarg_strpath = search_pypath(strpath) |
|
|
if pyarg_strpath is not None: |
|
|
module_name = strpath |
|
|
strpath = pyarg_strpath |
|
|
fspath = invocation_path / strpath |
|
|
fspath = absolutepath(fspath) |
|
|
if not safe_exists(fspath): |
|
|
msg = ( |
|
|
"module or package not found: {arg} (missing __init__.py?)" |
|
|
if as_pypath |
|
|
else "file or directory not found: {arg}" |
|
|
) |
|
|
raise UsageError(msg.format(arg=arg)) |
|
|
if parts and fspath.is_dir(): |
|
|
msg = ( |
|
|
"package argument cannot contain :: selection parts: {arg}" |
|
|
if as_pypath |
|
|
else "directory argument cannot contain :: selection parts: {arg}" |
|
|
) |
|
|
raise UsageError(msg.format(arg=arg)) |
|
|
return CollectionArgument( |
|
|
path=fspath, |
|
|
parts=parts, |
|
|
module_name=module_name, |
|
|
) |
|
|
|