| | """Core implementation of the testing process: init, session, runtest loop.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import dataclasses |
| | import fnmatch |
| | import functools |
| | import importlib |
| | import importlib.util |
| | import os |
| | from pathlib import Path |
| | import sys |
| | from typing import AbstractSet |
| | from typing import Callable |
| | from typing import Dict |
| | from typing import final |
| | from typing import Iterable |
| | from typing import Iterator |
| | from typing import Literal |
| | from typing import overload |
| | from typing import Sequence |
| | from typing import TYPE_CHECKING |
| | import warnings |
| |
|
| | import pluggy |
| |
|
| | from _pytest import nodes |
| | import _pytest._code |
| | from _pytest.config import Config |
| | from _pytest.config import directory_arg |
| | from _pytest.config import ExitCode |
| | from _pytest.config import hookimpl |
| | from _pytest.config import PytestPluginManager |
| | from _pytest.config import UsageError |
| | from _pytest.config.argparsing import Parser |
| | from _pytest.config.compat import PathAwareHookProxy |
| | from _pytest.outcomes import exit |
| | from _pytest.pathlib import absolutepath |
| | from _pytest.pathlib import bestrelpath |
| | from _pytest.pathlib import fnmatch_ex |
| | from _pytest.pathlib import safe_exists |
| | from _pytest.pathlib import scandir |
| | from _pytest.reports import CollectReport |
| | from _pytest.reports import TestReport |
| | from _pytest.runner import collect_one_node |
| | from _pytest.runner import SetupState |
| | from _pytest.warning_types import PytestWarning |
| |
|
| |
|
| | if TYPE_CHECKING: |
| | from typing_extensions import Self |
| |
|
| | from _pytest.fixtures import FixtureManager |
| |
|
| |
|
| | def pytest_addoption(parser: Parser) -> None: |
| | parser.addini( |
| | "norecursedirs", |
| | "Directory patterns to avoid for recursion", |
| | type="args", |
| | default=[ |
| | "*.egg", |
| | ".*", |
| | "_darcs", |
| | "build", |
| | "CVS", |
| | "dist", |
| | "node_modules", |
| | "venv", |
| | "{arch}", |
| | ], |
| | ) |
| | parser.addini( |
| | "testpaths", |
| | "Directories to search for tests when no files or directories are given on the " |
| | "command line", |
| | type="args", |
| | default=[], |
| | ) |
| | group = parser.getgroup("general", "Running and selection options") |
| | group._addoption( |
| | "-x", |
| | "--exitfirst", |
| | action="store_const", |
| | dest="maxfail", |
| | const=1, |
| | help="Exit instantly on first error or failed test", |
| | ) |
| | group = parser.getgroup("pytest-warnings") |
| | group.addoption( |
| | "-W", |
| | "--pythonwarnings", |
| | action="append", |
| | help="Set which warnings to report, see -W option of Python itself", |
| | ) |
| | parser.addini( |
| | "filterwarnings", |
| | type="linelist", |
| | help="Each line specifies a pattern for " |
| | "warnings.filterwarnings. " |
| | "Processed after -W/--pythonwarnings.", |
| | ) |
| | group._addoption( |
| | "--maxfail", |
| | metavar="num", |
| | action="store", |
| | type=int, |
| | dest="maxfail", |
| | default=0, |
| | help="Exit after first num failures or errors", |
| | ) |
| | group._addoption( |
| | "--strict-config", |
| | action="store_true", |
| | help="Any warnings encountered while parsing the `pytest` section of the " |
| | "configuration file raise errors", |
| | ) |
| | group._addoption( |
| | "--strict-markers", |
| | action="store_true", |
| | help="Markers not registered in the `markers` section of the configuration " |
| | "file raise errors", |
| | ) |
| | group._addoption( |
| | "--strict", |
| | action="store_true", |
| | help="(Deprecated) alias to --strict-markers", |
| | ) |
| | group._addoption( |
| | "-c", |
| | "--config-file", |
| | metavar="FILE", |
| | type=str, |
| | dest="inifilename", |
| | help="Load configuration from `FILE` instead of trying to locate one of the " |
| | "implicit configuration files.", |
| | ) |
| | group._addoption( |
| | "--continue-on-collection-errors", |
| | action="store_true", |
| | default=False, |
| | dest="continue_on_collection_errors", |
| | help="Force test execution even if collection errors occur", |
| | ) |
| | group._addoption( |
| | "--rootdir", |
| | action="store", |
| | dest="rootdir", |
| | help="Define root directory for tests. Can be relative path: 'root_dir', './root_dir', " |
| | "'root_dir/another_dir/'; absolute path: '/home/user/root_dir'; path with variables: " |
| | "'$HOME/root_dir'.", |
| | ) |
| |
|
| | group = parser.getgroup("collect", "collection") |
| | group.addoption( |
| | "--collectonly", |
| | "--collect-only", |
| | "--co", |
| | action="store_true", |
| | help="Only collect tests, don't execute them", |
| | ) |
| | group.addoption( |
| | "--pyargs", |
| | action="store_true", |
| | help="Try to interpret all arguments as Python packages", |
| | ) |
| | group.addoption( |
| | "--ignore", |
| | action="append", |
| | metavar="path", |
| | help="Ignore path during collection (multi-allowed)", |
| | ) |
| | group.addoption( |
| | "--ignore-glob", |
| | action="append", |
| | metavar="path", |
| | help="Ignore path pattern during collection (multi-allowed)", |
| | ) |
| | group.addoption( |
| | "--deselect", |
| | action="append", |
| | metavar="nodeid_prefix", |
| | help="Deselect item (via node id prefix) during collection (multi-allowed)", |
| | ) |
| | group.addoption( |
| | "--confcutdir", |
| | dest="confcutdir", |
| | default=None, |
| | metavar="dir", |
| | type=functools.partial(directory_arg, optname="--confcutdir"), |
| | help="Only load conftest.py's relative to specified dir", |
| | ) |
| | group.addoption( |
| | "--noconftest", |
| | action="store_true", |
| | dest="noconftest", |
| | default=False, |
| | help="Don't load any conftest.py files", |
| | ) |
| | group.addoption( |
| | "--keepduplicates", |
| | "--keep-duplicates", |
| | action="store_true", |
| | dest="keepduplicates", |
| | default=False, |
| | help="Keep duplicate tests", |
| | ) |
| | group.addoption( |
| | "--collect-in-virtualenv", |
| | action="store_true", |
| | dest="collect_in_virtualenv", |
| | default=False, |
| | help="Don't ignore tests in a local virtualenv directory", |
| | ) |
| | group.addoption( |
| | "--import-mode", |
| | default="prepend", |
| | choices=["prepend", "append", "importlib"], |
| | dest="importmode", |
| | help="Prepend/append to sys.path when importing test modules and conftest " |
| | "files. Default: prepend.", |
| | ) |
| | parser.addini( |
| | "consider_namespace_packages", |
| | type="bool", |
| | default=False, |
| | help="Consider namespace packages when resolving module names during import", |
| | ) |
| |
|
| | group = parser.getgroup("debugconfig", "test session debugging and configuration") |
| | group.addoption( |
| | "--basetemp", |
| | dest="basetemp", |
| | default=None, |
| | type=validate_basetemp, |
| | metavar="dir", |
| | help=( |
| | "Base temporary directory for this test run. " |
| | "(Warning: this directory is removed if it exists.)" |
| | ), |
| | ) |
| |
|
| |
|
| | def validate_basetemp(path: str) -> str: |
| | |
| | msg = "basetemp must not be empty, the current working directory or any parent directory of it" |
| |
|
| | |
| | if not path: |
| | raise argparse.ArgumentTypeError(msg) |
| |
|
| | def is_ancestor(base: Path, query: Path) -> bool: |
| | """Return whether query is an ancestor of base.""" |
| | if base == query: |
| | return True |
| | return query in base.parents |
| |
|
| | |
| | if is_ancestor(Path.cwd(), Path(path).absolute()): |
| | raise argparse.ArgumentTypeError(msg) |
| |
|
| | |
| | if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): |
| | raise argparse.ArgumentTypeError(msg) |
| |
|
| | return path |
| |
|
| |
|
| | def wrap_session( |
| | config: Config, doit: Callable[[Config, Session], int | ExitCode | None] |
| | ) -> int | ExitCode: |
| | """Skeleton command line program.""" |
| | session = Session.from_config(config) |
| | session.exitstatus = ExitCode.OK |
| | initstate = 0 |
| | try: |
| | try: |
| | config._do_configure() |
| | initstate = 1 |
| | config.hook.pytest_sessionstart(session=session) |
| | initstate = 2 |
| | session.exitstatus = doit(config, session) or 0 |
| | except UsageError: |
| | session.exitstatus = ExitCode.USAGE_ERROR |
| | raise |
| | except Failed: |
| | session.exitstatus = ExitCode.TESTS_FAILED |
| | except (KeyboardInterrupt, exit.Exception): |
| | excinfo = _pytest._code.ExceptionInfo.from_current() |
| | exitstatus: int | ExitCode = ExitCode.INTERRUPTED |
| | if isinstance(excinfo.value, exit.Exception): |
| | if excinfo.value.returncode is not None: |
| | exitstatus = excinfo.value.returncode |
| | if initstate < 2: |
| | sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") |
| | config.hook.pytest_keyboard_interrupt(excinfo=excinfo) |
| | session.exitstatus = exitstatus |
| | except BaseException: |
| | session.exitstatus = ExitCode.INTERNAL_ERROR |
| | excinfo = _pytest._code.ExceptionInfo.from_current() |
| | try: |
| | config.notify_exception(excinfo, config.option) |
| | except exit.Exception as exc: |
| | if exc.returncode is not None: |
| | session.exitstatus = exc.returncode |
| | sys.stderr.write(f"{type(exc).__name__}: {exc}\n") |
| | else: |
| | if isinstance(excinfo.value, SystemExit): |
| | sys.stderr.write("mainloop: caught unexpected SystemExit!\n") |
| |
|
| | finally: |
| | |
| | excinfo = None |
| | os.chdir(session.startpath) |
| | if initstate >= 2: |
| | try: |
| | config.hook.pytest_sessionfinish( |
| | session=session, exitstatus=session.exitstatus |
| | ) |
| | except exit.Exception as exc: |
| | if exc.returncode is not None: |
| | session.exitstatus = exc.returncode |
| | sys.stderr.write(f"{type(exc).__name__}: {exc}\n") |
| | config._ensure_unconfigure() |
| | return session.exitstatus |
| |
|
| |
|
| | def pytest_cmdline_main(config: Config) -> int | ExitCode: |
| | return wrap_session(config, _main) |
| |
|
| |
|
| | def _main(config: Config, session: Session) -> int | ExitCode | None: |
| | """Default command line protocol for initialization, session, |
| | running tests and reporting.""" |
| | config.hook.pytest_collection(session=session) |
| | config.hook.pytest_runtestloop(session=session) |
| |
|
| | if session.testsfailed: |
| | return ExitCode.TESTS_FAILED |
| | elif session.testscollected == 0: |
| | return ExitCode.NO_TESTS_COLLECTED |
| | return None |
| |
|
| |
|
| | def pytest_collection(session: Session) -> None: |
| | session.perform_collect() |
| |
|
| |
|
| | def pytest_runtestloop(session: Session) -> bool: |
| | if session.testsfailed and not session.config.option.continue_on_collection_errors: |
| | raise session.Interrupted( |
| | "%d error%s during collection" |
| | % (session.testsfailed, "s" if session.testsfailed != 1 else "") |
| | ) |
| |
|
| | if session.config.option.collectonly: |
| | return True |
| |
|
| | for i, item in enumerate(session.items): |
| | nextitem = session.items[i + 1] if i + 1 < len(session.items) else None |
| | item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) |
| | if session.shouldfail: |
| | raise session.Failed(session.shouldfail) |
| | if session.shouldstop: |
| | raise session.Interrupted(session.shouldstop) |
| | return True |
| |
|
| |
|
| | def _in_venv(path: Path) -> bool: |
| | """Attempt to detect if ``path`` is the root of a Virtual Environment by |
| | checking for the existence of the pyvenv.cfg file. |
| | |
| | [https://peps.python.org/pep-0405/] |
| | |
| | For regression protection we also check for conda environments that do not include pyenv.cfg yet -- |
| | https://github.com/conda/conda/issues/13337 is the conda issue tracking adding pyenv.cfg. |
| | |
| | Checking for the `conda-meta/history` file per https://github.com/pytest-dev/pytest/issues/12652#issuecomment-2246336902. |
| | |
| | """ |
| | try: |
| | return ( |
| | path.joinpath("pyvenv.cfg").is_file() |
| | or path.joinpath("conda-meta", "history").is_file() |
| | ) |
| | except OSError: |
| | return False |
| |
|
| |
|
| | def pytest_ignore_collect(collection_path: Path, config: Config) -> bool | None: |
| | if collection_path.name == "__pycache__": |
| | return True |
| |
|
| | ignore_paths = config._getconftest_pathlist( |
| | "collect_ignore", path=collection_path.parent |
| | ) |
| | ignore_paths = ignore_paths or [] |
| | excludeopt = config.getoption("ignore") |
| | if excludeopt: |
| | ignore_paths.extend(absolutepath(x) for x in excludeopt) |
| |
|
| | if collection_path in ignore_paths: |
| | return True |
| |
|
| | ignore_globs = config._getconftest_pathlist( |
| | "collect_ignore_glob", path=collection_path.parent |
| | ) |
| | ignore_globs = ignore_globs or [] |
| | excludeglobopt = config.getoption("ignore_glob") |
| | if excludeglobopt: |
| | ignore_globs.extend(absolutepath(x) for x in excludeglobopt) |
| |
|
| | if any(fnmatch.fnmatch(str(collection_path), str(glob)) for glob in ignore_globs): |
| | return True |
| |
|
| | allow_in_venv = config.getoption("collect_in_virtualenv") |
| | if not allow_in_venv and _in_venv(collection_path): |
| | return True |
| |
|
| | if collection_path.is_dir(): |
| | norecursepatterns = config.getini("norecursedirs") |
| | if any(fnmatch_ex(pat, collection_path) for pat in norecursepatterns): |
| | return True |
| |
|
| | return None |
| |
|
| |
|
| | def pytest_collect_directory( |
| | path: Path, parent: nodes.Collector |
| | ) -> nodes.Collector | None: |
| | return Dir.from_parent(parent, path=path) |
| |
|
| |
|
| | def pytest_collection_modifyitems(items: list[nodes.Item], config: Config) -> None: |
| | deselect_prefixes = tuple(config.getoption("deselect") or []) |
| | if not deselect_prefixes: |
| | return |
| |
|
| | remaining = [] |
| | deselected = [] |
| | for colitem in items: |
| | if colitem.nodeid.startswith(deselect_prefixes): |
| | deselected.append(colitem) |
| | else: |
| | remaining.append(colitem) |
| |
|
| | if deselected: |
| | config.hook.pytest_deselected(items=deselected) |
| | items[:] = remaining |
| |
|
| |
|
| | class FSHookProxy: |
| | def __init__( |
| | self, |
| | pm: PytestPluginManager, |
| | remove_mods: AbstractSet[object], |
| | ) -> None: |
| | self.pm = pm |
| | self.remove_mods = remove_mods |
| |
|
| | def __getattr__(self, name: str) -> pluggy.HookCaller: |
| | x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) |
| | self.__dict__[name] = x |
| | return x |
| |
|
| |
|
| | class Interrupted(KeyboardInterrupt): |
| | """Signals that the test run was interrupted.""" |
| |
|
| | __module__ = "builtins" |
| |
|
| |
|
| | class Failed(Exception): |
| | """Signals a stop as failed test run.""" |
| |
|
| |
|
| | @dataclasses.dataclass |
| | class _bestrelpath_cache(Dict[Path, str]): |
| | __slots__ = ("path",) |
| |
|
| | path: Path |
| |
|
| | def __missing__(self, path: Path) -> str: |
| | r = bestrelpath(self.path, path) |
| | self[path] = r |
| | return r |
| |
|
| |
|
| | @final |
| | class Dir(nodes.Directory): |
| | """Collector of files in a file system directory. |
| | |
| | .. versionadded:: 8.0 |
| | |
| | .. note:: |
| | |
| | Python directories with an `__init__.py` file are instead collected by |
| | :class:`~pytest.Package` by default. Both are :class:`~pytest.Directory` |
| | collectors. |
| | """ |
| |
|
| | @classmethod |
| | def from_parent( |
| | cls, |
| | parent: nodes.Collector, |
| | *, |
| | path: Path, |
| | ) -> Self: |
| | """The public constructor. |
| | |
| | :param parent: The parent collector of this Dir. |
| | :param path: The directory's path. |
| | :type path: pathlib.Path |
| | """ |
| | return super().from_parent(parent=parent, path=path) |
| |
|
| | def collect(self) -> Iterable[nodes.Item | nodes.Collector]: |
| | config = self.config |
| | col: nodes.Collector | None |
| | cols: Sequence[nodes.Collector] |
| | ihook = self.ihook |
| | for direntry in scandir(self.path): |
| | if direntry.is_dir(): |
| | path = Path(direntry.path) |
| | if not self.session.isinitpath(path, with_parents=True): |
| | if ihook.pytest_ignore_collect(collection_path=path, config=config): |
| | continue |
| | col = ihook.pytest_collect_directory(path=path, parent=self) |
| | if col is not None: |
| | yield col |
| |
|
| | elif direntry.is_file(): |
| | path = Path(direntry.path) |
| | if not self.session.isinitpath(path): |
| | if ihook.pytest_ignore_collect(collection_path=path, config=config): |
| | continue |
| | cols = ihook.pytest_collect_file(file_path=path, parent=self) |
| | yield from cols |
| |
|
| |
|
| | @final |
| | class Session(nodes.Collector): |
| | """The root of the collection tree. |
| | |
| | ``Session`` collects the initial paths given as arguments to pytest. |
| | """ |
| |
|
| | Interrupted = Interrupted |
| | Failed = Failed |
| | |
| | _setupstate: SetupState |
| | |
| | _fixturemanager: FixtureManager |
| | exitstatus: int | ExitCode |
| |
|
| | def __init__(self, config: Config) -> None: |
| | super().__init__( |
| | name="", |
| | path=config.rootpath, |
| | fspath=None, |
| | parent=None, |
| | config=config, |
| | session=self, |
| | nodeid="", |
| | ) |
| | self.testsfailed = 0 |
| | self.testscollected = 0 |
| | self._shouldstop: bool | str = False |
| | self._shouldfail: bool | str = False |
| | self.trace = config.trace.root.get("collection") |
| | self._initialpaths: frozenset[Path] = frozenset() |
| | self._initialpaths_with_parents: frozenset[Path] = frozenset() |
| | self._notfound: list[tuple[str, Sequence[nodes.Collector]]] = [] |
| | self._initial_parts: list[CollectionArgument] = [] |
| | self._collection_cache: dict[nodes.Collector, CollectReport] = {} |
| | self.items: list[nodes.Item] = [] |
| |
|
| | self._bestrelpathcache: dict[Path, str] = _bestrelpath_cache(config.rootpath) |
| |
|
| | self.config.pluginmanager.register(self, name="session") |
| |
|
| | @classmethod |
| | def from_config(cls, config: Config) -> Session: |
| | session: Session = cls._create(config=config) |
| | return session |
| |
|
| | def __repr__(self) -> str: |
| | return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % ( |
| | self.__class__.__name__, |
| | self.name, |
| | getattr(self, "exitstatus", "<UNSET>"), |
| | self.testsfailed, |
| | self.testscollected, |
| | ) |
| |
|
| | @property |
| | def shouldstop(self) -> bool | str: |
| | return self._shouldstop |
| |
|
| | @shouldstop.setter |
| | def shouldstop(self, value: bool | str) -> None: |
| | |
| | |
| | if value is False and self._shouldstop: |
| | warnings.warn( |
| | PytestWarning( |
| | "session.shouldstop cannot be unset after it has been set; ignoring." |
| | ), |
| | stacklevel=2, |
| | ) |
| | return |
| | self._shouldstop = value |
| |
|
| | @property |
| | def shouldfail(self) -> bool | str: |
| | return self._shouldfail |
| |
|
| | @shouldfail.setter |
| | def shouldfail(self, value: bool | str) -> None: |
| | |
| | |
| | if value is False and self._shouldfail: |
| | warnings.warn( |
| | PytestWarning( |
| | "session.shouldfail cannot be unset after it has been set; ignoring." |
| | ), |
| | stacklevel=2, |
| | ) |
| | return |
| | self._shouldfail = value |
| |
|
| | @property |
| | def startpath(self) -> Path: |
| | """The path from which pytest was invoked. |
| | |
| | .. versionadded:: 7.0.0 |
| | """ |
| | return self.config.invocation_params.dir |
| |
|
| | def _node_location_to_relpath(self, node_path: Path) -> str: |
| | |
| | return self._bestrelpathcache[node_path] |
| |
|
| | @hookimpl(tryfirst=True) |
| | def pytest_collectstart(self) -> None: |
| | if self.shouldfail: |
| | raise self.Failed(self.shouldfail) |
| | if self.shouldstop: |
| | raise self.Interrupted(self.shouldstop) |
| |
|
| | @hookimpl(tryfirst=True) |
| | def pytest_runtest_logreport(self, report: TestReport | CollectReport) -> None: |
| | if report.failed and not hasattr(report, "wasxfail"): |
| | self.testsfailed += 1 |
| | maxfail = self.config.getvalue("maxfail") |
| | if maxfail and self.testsfailed >= maxfail: |
| | self.shouldfail = "stopping after %d failures" % (self.testsfailed) |
| |
|
| | pytest_collectreport = pytest_runtest_logreport |
| |
|
| | def isinitpath( |
| | self, |
| | path: str | os.PathLike[str], |
| | *, |
| | with_parents: bool = False, |
| | ) -> bool: |
| | """Is path an initial path? |
| | |
| | An initial path is a path explicitly given to pytest on the command |
| | line. |
| | |
| | :param with_parents: |
| | If set, also return True if the path is a parent of an initial path. |
| | |
| | .. versionchanged:: 8.0 |
| | Added the ``with_parents`` parameter. |
| | """ |
| | |
| | path_ = path if isinstance(path, Path) else Path(path) |
| | if with_parents: |
| | return path_ in self._initialpaths_with_parents |
| | else: |
| | return path_ in self._initialpaths |
| |
|
| | def gethookproxy(self, fspath: os.PathLike[str]) -> pluggy.HookRelay: |
| | |
| | path = fspath if isinstance(fspath, Path) else Path(fspath) |
| | pm = self.config.pluginmanager |
| | |
| | |
| | my_conftestmodules = pm._getconftestmodules(path) |
| | remove_mods = pm._conftest_plugins.difference(my_conftestmodules) |
| | proxy: pluggy.HookRelay |
| | if remove_mods: |
| | |
| | proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) |
| | else: |
| | |
| | proxy = self.config.hook |
| | return proxy |
| |
|
| | def _collect_path( |
| | self, |
| | path: Path, |
| | path_cache: dict[Path, Sequence[nodes.Collector]], |
| | ) -> Sequence[nodes.Collector]: |
| | """Create a Collector for the given path. |
| | |
| | `path_cache` makes it so the same Collectors are returned for the same |
| | path. |
| | """ |
| | if path in path_cache: |
| | return path_cache[path] |
| |
|
| | if path.is_dir(): |
| | ihook = self.gethookproxy(path.parent) |
| | col: nodes.Collector | None = ihook.pytest_collect_directory( |
| | path=path, parent=self |
| | ) |
| | cols: Sequence[nodes.Collector] = (col,) if col is not None else () |
| |
|
| | elif path.is_file(): |
| | ihook = self.gethookproxy(path) |
| | cols = ihook.pytest_collect_file(file_path=path, parent=self) |
| |
|
| | else: |
| | |
| | cols = () |
| |
|
| | path_cache[path] = cols |
| | return cols |
| |
|
| | @overload |
| | def perform_collect( |
| | self, args: Sequence[str] | None = ..., genitems: Literal[True] = ... |
| | ) -> Sequence[nodes.Item]: ... |
| |
|
| | @overload |
| | def perform_collect( |
| | self, args: Sequence[str] | None = ..., genitems: bool = ... |
| | ) -> Sequence[nodes.Item | nodes.Collector]: ... |
| |
|
| | def perform_collect( |
| | self, args: Sequence[str] | None = None, genitems: bool = True |
| | ) -> Sequence[nodes.Item | nodes.Collector]: |
| | """Perform the collection phase for this session. |
| | |
| | This is called by the default :hook:`pytest_collection` hook |
| | implementation; see the documentation of this hook for more details. |
| | For testing purposes, it may also be called directly on a fresh |
| | ``Session``. |
| | |
| | This function normally recursively expands any collectors collected |
| | from the session to their items, and only items are returned. For |
| | testing purposes, this may be suppressed by passing ``genitems=False``, |
| | in which case the return value contains these collectors unexpanded, |
| | and ``session.items`` is empty. |
| | """ |
| | if args is None: |
| | args = self.config.args |
| |
|
| | self.trace("perform_collect", self, args) |
| | self.trace.root.indent += 1 |
| |
|
| | hook = self.config.hook |
| |
|
| | self._notfound = [] |
| | self._initial_parts = [] |
| | self._collection_cache = {} |
| | self.items = [] |
| | items: Sequence[nodes.Item | nodes.Collector] = self.items |
| | try: |
| | initialpaths: list[Path] = [] |
| | initialpaths_with_parents: list[Path] = [] |
| | for arg in args: |
| | collection_argument = resolve_collection_argument( |
| | self.config.invocation_params.dir, |
| | arg, |
| | as_pypath=self.config.option.pyargs, |
| | ) |
| | self._initial_parts.append(collection_argument) |
| | initialpaths.append(collection_argument.path) |
| | initialpaths_with_parents.append(collection_argument.path) |
| | initialpaths_with_parents.extend(collection_argument.path.parents) |
| | self._initialpaths = frozenset(initialpaths) |
| | self._initialpaths_with_parents = frozenset(initialpaths_with_parents) |
| |
|
| | rep = collect_one_node(self) |
| | self.ihook.pytest_collectreport(report=rep) |
| | self.trace.root.indent -= 1 |
| | if self._notfound: |
| | errors = [] |
| | for arg, collectors in self._notfound: |
| | if collectors: |
| | errors.append( |
| | f"not found: {arg}\n(no match in any of {collectors!r})" |
| | ) |
| | else: |
| | errors.append(f"found no collectors for {arg}") |
| |
|
| | raise UsageError(*errors) |
| |
|
| | if not genitems: |
| | items = rep.result |
| | else: |
| | if rep.passed: |
| | for node in rep.result: |
| | self.items.extend(self.genitems(node)) |
| |
|
| | self.config.pluginmanager.check_pending() |
| | hook.pytest_collection_modifyitems( |
| | session=self, config=self.config, items=items |
| | ) |
| | finally: |
| | self._notfound = [] |
| | self._initial_parts = [] |
| | self._collection_cache = {} |
| | hook.pytest_collection_finish(session=self) |
| |
|
| | if genitems: |
| | self.testscollected = len(items) |
| |
|
| | return items |
| |
|
| | def _collect_one_node( |
| | self, |
| | node: nodes.Collector, |
| | handle_dupes: bool = True, |
| | ) -> tuple[CollectReport, bool]: |
| | if node in self._collection_cache and handle_dupes: |
| | rep = self._collection_cache[node] |
| | return rep, True |
| | else: |
| | rep = collect_one_node(node) |
| | self._collection_cache[node] = rep |
| | return rep, False |
| |
|
| | def collect(self) -> Iterator[nodes.Item | nodes.Collector]: |
| | |
| | |
| | |
| | path_cache: dict[Path, Sequence[nodes.Collector]] = {} |
| |
|
| | pm = self.config.pluginmanager |
| |
|
| | for collection_argument in self._initial_parts: |
| | self.trace("processing argument", collection_argument) |
| | self.trace.root.indent += 1 |
| |
|
| | argpath = collection_argument.path |
| | names = collection_argument.parts |
| | module_name = collection_argument.module_name |
| |
|
| | |
| | if argpath.is_dir(): |
| | assert not names, f"invalid arg {(argpath, names)!r}" |
| |
|
| | paths = [argpath] |
| | |
| | |
| | if module_name is None: |
| | |
| | for path in argpath.parents: |
| | if not pm._is_in_confcutdir(path): |
| | break |
| | paths.insert(0, path) |
| | else: |
| | |
| | |
| | module_name_parts = module_name.split(".") |
| | for i, path in enumerate(argpath.parents, 2): |
| | if i > len(module_name_parts) or path.stem != module_name_parts[-i]: |
| | break |
| | paths.insert(0, path) |
| |
|
| | |
| | |
| | any_matched_in_initial_part = False |
| | notfound_collectors = [] |
| | work: list[tuple[nodes.Collector | nodes.Item, list[Path | str]]] = [ |
| | (self, [*paths, *names]) |
| | ] |
| | while work: |
| | matchnode, matchparts = work.pop() |
| |
|
| | |
| | if not matchparts: |
| | yield matchnode |
| | any_matched_in_initial_part = True |
| | continue |
| |
|
| | |
| | if not isinstance(matchnode, nodes.Collector): |
| | continue |
| |
|
| | |
| | |
| | |
| | subnodes: Sequence[nodes.Collector | nodes.Item] |
| | if isinstance(matchnode, Session): |
| | assert isinstance(matchparts[0], Path) |
| | subnodes = matchnode._collect_path(matchparts[0], path_cache) |
| | else: |
| | |
| | |
| | handle_dupes = not ( |
| | len(matchparts) == 1 |
| | and isinstance(matchparts[0], Path) |
| | and matchparts[0].is_file() |
| | ) |
| | rep, duplicate = self._collect_one_node(matchnode, handle_dupes) |
| | if not duplicate and not rep.passed: |
| | |
| | |
| | |
| | matchnode.ihook.pytest_collectreport(report=rep) |
| | if not rep.passed: |
| | continue |
| | subnodes = rep.result |
| |
|
| | |
| | any_matched_in_collector = False |
| | for node in reversed(subnodes): |
| | |
| | if isinstance(matchparts[0], Path): |
| | is_match = node.path == matchparts[0] |
| | if sys.platform == "win32" and not is_match: |
| | |
| | |
| | same_file = os.path.samefile(node.path, matchparts[0]) |
| | |
| | |
| | is_match = same_file and ( |
| | os.path.islink(node.path) |
| | == os.path.islink(matchparts[0]) |
| | ) |
| |
|
| | |
| | else: |
| | |
| | |
| | is_match = ( |
| | node.name == matchparts[0] |
| | or node.name.split("[")[0] == matchparts[0] |
| | ) |
| | if is_match: |
| | work.append((node, matchparts[1:])) |
| | any_matched_in_collector = True |
| |
|
| | if not any_matched_in_collector: |
| | notfound_collectors.append(matchnode) |
| |
|
| | if not any_matched_in_initial_part: |
| | report_arg = "::".join((str(argpath), *names)) |
| | self._notfound.append((report_arg, notfound_collectors)) |
| |
|
| | self.trace.root.indent -= 1 |
| |
|
| | def genitems(self, node: nodes.Item | nodes.Collector) -> Iterator[nodes.Item]: |
| | self.trace("genitems", node) |
| | if isinstance(node, nodes.Item): |
| | node.ihook.pytest_itemcollected(item=node) |
| | yield node |
| | else: |
| | assert isinstance(node, nodes.Collector) |
| | keepduplicates = self.config.getoption("keepduplicates") |
| | |
| | handle_dupes = not (keepduplicates and isinstance(node, nodes.File)) |
| | rep, duplicate = self._collect_one_node(node, handle_dupes) |
| | if duplicate and not keepduplicates: |
| | return |
| | if rep.passed: |
| | for subnode in rep.result: |
| | yield from self.genitems(subnode) |
| | if not duplicate: |
| | node.ihook.pytest_collectreport(report=rep) |
| |
|
| |
|
| | def search_pypath(module_name: str) -> str | None: |
| | """Search sys.path for the given a dotted module name, and return its file |
| | system path if found.""" |
| | try: |
| | spec = importlib.util.find_spec(module_name) |
| | |
| | |
| | |
| | except (AttributeError, ImportError, ValueError): |
| | return None |
| | if spec is None or spec.origin is None or spec.origin == "namespace": |
| | return None |
| | elif spec.submodule_search_locations: |
| | return os.path.dirname(spec.origin) |
| | else: |
| | return spec.origin |
| |
|
| |
|
| | @dataclasses.dataclass(frozen=True) |
| | class CollectionArgument: |
| | """A resolved collection argument.""" |
| |
|
| | path: Path |
| | parts: Sequence[str] |
| | module_name: str | None |
| |
|
| |
|
| | def resolve_collection_argument( |
| | invocation_path: Path, arg: str, *, as_pypath: bool = False |
| | ) -> CollectionArgument: |
| | """Parse path arguments optionally containing selection parts and return (fspath, names). |
| | |
| | Command-line arguments can point to files and/or directories, and optionally contain |
| | parts for specific tests selection, for example: |
| | |
| | "pkg/tests/test_foo.py::TestClass::test_foo" |
| | |
| | This function ensures the path exists, and returns a resolved `CollectionArgument`: |
| | |
| | CollectionArgument( |
| | path=Path("/full/path/to/pkg/tests/test_foo.py"), |
| | parts=["TestClass", "test_foo"], |
| | module_name=None, |
| | ) |
| | |
| | When as_pypath is True, expects that the command-line argument actually contains |
| | module paths instead of file-system paths: |
| | |
| | "pkg.tests.test_foo::TestClass::test_foo" |
| | |
| | In which case we search sys.path for a matching module, and then return the *path* to the |
| | found module, which may look like this: |
| | |
| | CollectionArgument( |
| | path=Path("/home/u/myvenv/lib/site-packages/pkg/tests/test_foo.py"), |
| | parts=["TestClass", "test_foo"], |
| | module_name="pkg.tests.test_foo", |
| | ) |
| | |
| | If the path doesn't exist, raise UsageError. |
| | If the path is a directory and selection parts are present, raise UsageError. |
| | """ |
| | base, squacket, rest = str(arg).partition("[") |
| | strpath, *parts = base.split("::") |
| | if parts: |
| | parts[-1] = f"{parts[-1]}{squacket}{rest}" |
| | module_name = None |
| | if as_pypath: |
| | pyarg_strpath = search_pypath(strpath) |
| | if pyarg_strpath is not None: |
| | module_name = strpath |
| | strpath = pyarg_strpath |
| | fspath = invocation_path / strpath |
| | fspath = absolutepath(fspath) |
| | if not safe_exists(fspath): |
| | msg = ( |
| | "module or package not found: {arg} (missing __init__.py?)" |
| | if as_pypath |
| | else "file or directory not found: {arg}" |
| | ) |
| | raise UsageError(msg.format(arg=arg)) |
| | if parts and fspath.is_dir(): |
| | msg = ( |
| | "package argument cannot contain :: selection parts: {arg}" |
| | if as_pypath |
| | else "directory argument cannot contain :: selection parts: {arg}" |
| | ) |
| | raise UsageError(msg.format(arg=arg)) |
| | return CollectionArgument( |
| | path=fspath, |
| | parts=parts, |
| | module_name=module_name, |
| | ) |
| |
|