| """Core implementation of the testing process: init, session, runtest loop.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| from collections.abc import Callable |
| from collections.abc import Iterable |
| from collections.abc import Iterator |
| from collections.abc import Sequence |
| from collections.abc import Set as AbstractSet |
| import dataclasses |
| import fnmatch |
| import functools |
| import importlib |
| import importlib.util |
| import os |
| from pathlib import Path |
| import sys |
| from typing import final |
| from typing import Literal |
| from typing import overload |
| from typing import TYPE_CHECKING |
| import warnings |
|
|
| import pluggy |
|
|
| from _pytest import nodes |
| import _pytest._code |
| from _pytest.config import Config |
| from _pytest.config import directory_arg |
| from _pytest.config import ExitCode |
| from _pytest.config import hookimpl |
| from _pytest.config import PytestPluginManager |
| from _pytest.config import UsageError |
| from _pytest.config.argparsing import Parser |
| from _pytest.config.compat import PathAwareHookProxy |
| from _pytest.outcomes import exit |
| from _pytest.pathlib import absolutepath |
| from _pytest.pathlib import bestrelpath |
| from _pytest.pathlib import fnmatch_ex |
| from _pytest.pathlib import safe_exists |
| from _pytest.pathlib import scandir |
| from _pytest.reports import CollectReport |
| from _pytest.reports import TestReport |
| from _pytest.runner import collect_one_node |
| from _pytest.runner import SetupState |
| from _pytest.warning_types import PytestWarning |
|
|
|
|
| if TYPE_CHECKING: |
| from typing_extensions import Self |
|
|
| from _pytest.fixtures import FixtureManager |
|
|
|
|
| def pytest_addoption(parser: Parser) -> None: |
| group = parser.getgroup("general", "Running and selection options") |
| group._addoption( |
| "-x", |
| "--exitfirst", |
| action="store_const", |
| dest="maxfail", |
| const=1, |
| help="Exit instantly on first error or failed test", |
| ) |
| group.addoption( |
| "--maxfail", |
| metavar="num", |
| action="store", |
| type=int, |
| dest="maxfail", |
| default=0, |
| help="Exit after first num failures or errors", |
| ) |
| group.addoption( |
| "--strict-config", |
| action="store_true", |
| help="Any warnings encountered while parsing the `pytest` section of the " |
| "configuration file raise errors", |
| ) |
| group.addoption( |
| "--strict-markers", |
| action="store_true", |
| help="Markers not registered in the `markers` section of the configuration " |
| "file raise errors", |
| ) |
| group.addoption( |
| "--strict", |
| action="store_true", |
| help="(Deprecated) alias to --strict-markers", |
| ) |
|
|
| group = parser.getgroup("pytest-warnings") |
| group.addoption( |
| "-W", |
| "--pythonwarnings", |
| action="append", |
| help="Set which warnings to report, see -W option of Python itself", |
| ) |
| parser.addini( |
| "filterwarnings", |
| type="linelist", |
| help="Each line specifies a pattern for " |
| "warnings.filterwarnings. " |
| "Processed after -W/--pythonwarnings.", |
| ) |
|
|
| group = parser.getgroup("collect", "collection") |
| group.addoption( |
| "--collectonly", |
| "--collect-only", |
| "--co", |
| action="store_true", |
| help="Only collect tests, don't execute them", |
| ) |
| group.addoption( |
| "--pyargs", |
| action="store_true", |
| help="Try to interpret all arguments as Python packages", |
| ) |
| group.addoption( |
| "--ignore", |
| action="append", |
| metavar="path", |
| help="Ignore path during collection (multi-allowed)", |
| ) |
| group.addoption( |
| "--ignore-glob", |
| action="append", |
| metavar="path", |
| help="Ignore path pattern during collection (multi-allowed)", |
| ) |
| group.addoption( |
| "--deselect", |
| action="append", |
| metavar="nodeid_prefix", |
| help="Deselect item (via node id prefix) during collection (multi-allowed)", |
| ) |
| group.addoption( |
| "--confcutdir", |
| dest="confcutdir", |
| default=None, |
| metavar="dir", |
| type=functools.partial(directory_arg, optname="--confcutdir"), |
| help="Only load conftest.py's relative to specified dir", |
| ) |
| group.addoption( |
| "--noconftest", |
| action="store_true", |
| dest="noconftest", |
| default=False, |
| help="Don't load any conftest.py files", |
| ) |
| group.addoption( |
| "--keepduplicates", |
| "--keep-duplicates", |
| action="store_true", |
| dest="keepduplicates", |
| default=False, |
| help="Keep duplicate tests", |
| ) |
| group.addoption( |
| "--collect-in-virtualenv", |
| action="store_true", |
| dest="collect_in_virtualenv", |
| default=False, |
| help="Don't ignore tests in a local virtualenv directory", |
| ) |
| group.addoption( |
| "--continue-on-collection-errors", |
| action="store_true", |
| default=False, |
| dest="continue_on_collection_errors", |
| help="Force test execution even if collection errors occur", |
| ) |
| group.addoption( |
| "--import-mode", |
| default="prepend", |
| choices=["prepend", "append", "importlib"], |
| dest="importmode", |
| help="Prepend/append to sys.path when importing test modules and conftest " |
| "files. Default: prepend.", |
| ) |
| parser.addini( |
| "norecursedirs", |
| "Directory patterns to avoid for recursion", |
| type="args", |
| default=[ |
| "*.egg", |
| ".*", |
| "_darcs", |
| "build", |
| "CVS", |
| "dist", |
| "node_modules", |
| "venv", |
| "{arch}", |
| ], |
| ) |
| parser.addini( |
| "testpaths", |
| "Directories to search for tests when no files or directories are given on the " |
| "command line", |
| type="args", |
| default=[], |
| ) |
| parser.addini( |
| "collect_imported_tests", |
| "Whether to collect tests in imported modules outside `testpaths`", |
| type="bool", |
| default=True, |
| ) |
| parser.addini( |
| "consider_namespace_packages", |
| type="bool", |
| default=False, |
| help="Consider namespace packages when resolving module names during import", |
| ) |
|
|
| group = parser.getgroup("debugconfig", "test session debugging and configuration") |
| group._addoption( |
| "-c", |
| "--config-file", |
| metavar="FILE", |
| type=str, |
| dest="inifilename", |
| help="Load configuration from `FILE` instead of trying to locate one of the " |
| "implicit configuration files.", |
| ) |
| group.addoption( |
| "--rootdir", |
| action="store", |
| dest="rootdir", |
| help="Define root directory for tests. Can be relative path: 'root_dir', './root_dir', " |
| "'root_dir/another_dir/'; absolute path: '/home/user/root_dir'; path with variables: " |
| "'$HOME/root_dir'.", |
| ) |
| group.addoption( |
| "--basetemp", |
| dest="basetemp", |
| default=None, |
| type=validate_basetemp, |
| metavar="dir", |
| help=( |
| "Base temporary directory for this test run. " |
| "(Warning: this directory is removed if it exists.)" |
| ), |
| ) |
|
|
|
|
| def validate_basetemp(path: str) -> str: |
| |
| msg = "basetemp must not be empty, the current working directory or any parent directory of it" |
|
|
| |
| if not path: |
| raise argparse.ArgumentTypeError(msg) |
|
|
| def is_ancestor(base: Path, query: Path) -> bool: |
| """Return whether query is an ancestor of base.""" |
| if base == query: |
| return True |
| return query in base.parents |
|
|
| |
| if is_ancestor(Path.cwd(), Path(path).absolute()): |
| raise argparse.ArgumentTypeError(msg) |
|
|
| |
| if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): |
| raise argparse.ArgumentTypeError(msg) |
|
|
| return path |
|
|
|
|
| def wrap_session( |
| config: Config, doit: Callable[[Config, Session], int | ExitCode | None] |
| ) -> int | ExitCode: |
| """Skeleton command line program.""" |
| session = Session.from_config(config) |
| session.exitstatus = ExitCode.OK |
| initstate = 0 |
| try: |
| try: |
| config._do_configure() |
| initstate = 1 |
| config.hook.pytest_sessionstart(session=session) |
| initstate = 2 |
| session.exitstatus = doit(config, session) or 0 |
| except UsageError: |
| session.exitstatus = ExitCode.USAGE_ERROR |
| raise |
| except Failed: |
| session.exitstatus = ExitCode.TESTS_FAILED |
| except (KeyboardInterrupt, exit.Exception): |
| excinfo = _pytest._code.ExceptionInfo.from_current() |
| exitstatus: int | ExitCode = ExitCode.INTERRUPTED |
| if isinstance(excinfo.value, exit.Exception): |
| if excinfo.value.returncode is not None: |
| exitstatus = excinfo.value.returncode |
| if initstate < 2: |
| sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") |
| config.hook.pytest_keyboard_interrupt(excinfo=excinfo) |
| session.exitstatus = exitstatus |
| except BaseException: |
| session.exitstatus = ExitCode.INTERNAL_ERROR |
| excinfo = _pytest._code.ExceptionInfo.from_current() |
| try: |
| config.notify_exception(excinfo, config.option) |
| except exit.Exception as exc: |
| if exc.returncode is not None: |
| session.exitstatus = exc.returncode |
| sys.stderr.write(f"{type(exc).__name__}: {exc}\n") |
| else: |
| if isinstance(excinfo.value, SystemExit): |
| sys.stderr.write("mainloop: caught unexpected SystemExit!\n") |
|
|
| finally: |
| |
| excinfo = None |
| os.chdir(session.startpath) |
| if initstate >= 2: |
| try: |
| config.hook.pytest_sessionfinish( |
| session=session, exitstatus=session.exitstatus |
| ) |
| except exit.Exception as exc: |
| if exc.returncode is not None: |
| session.exitstatus = exc.returncode |
| sys.stderr.write(f"{type(exc).__name__}: {exc}\n") |
| config._ensure_unconfigure() |
| return session.exitstatus |
|
|
|
|
| def pytest_cmdline_main(config: Config) -> int | ExitCode: |
| return wrap_session(config, _main) |
|
|
|
|
| def _main(config: Config, session: Session) -> int | ExitCode | None: |
| """Default command line protocol for initialization, session, |
| running tests and reporting.""" |
| config.hook.pytest_collection(session=session) |
| config.hook.pytest_runtestloop(session=session) |
|
|
| if session.testsfailed: |
| return ExitCode.TESTS_FAILED |
| elif session.testscollected == 0: |
| return ExitCode.NO_TESTS_COLLECTED |
| return None |
|
|
|
|
| def pytest_collection(session: Session) -> None: |
| session.perform_collect() |
|
|
|
|
| def pytest_runtestloop(session: Session) -> bool: |
| if session.testsfailed and not session.config.option.continue_on_collection_errors: |
| raise session.Interrupted( |
| f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection" |
| ) |
|
|
| if session.config.option.collectonly: |
| return True |
|
|
| for i, item in enumerate(session.items): |
| nextitem = session.items[i + 1] if i + 1 < len(session.items) else None |
| item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) |
| if session.shouldfail: |
| raise session.Failed(session.shouldfail) |
| if session.shouldstop: |
| raise session.Interrupted(session.shouldstop) |
| return True |
|
|
|
|
| def _in_venv(path: Path) -> bool: |
| """Attempt to detect if ``path`` is the root of a Virtual Environment by |
| checking for the existence of the pyvenv.cfg file. |
| |
| [https://peps.python.org/pep-0405/] |
| |
| For regression protection we also check for conda environments that do not include pyenv.cfg yet -- |
| https://github.com/conda/conda/issues/13337 is the conda issue tracking adding pyenv.cfg. |
| |
| Checking for the `conda-meta/history` file per https://github.com/pytest-dev/pytest/issues/12652#issuecomment-2246336902. |
| |
| """ |
| try: |
| return ( |
| path.joinpath("pyvenv.cfg").is_file() |
| or path.joinpath("conda-meta", "history").is_file() |
| ) |
| except OSError: |
| return False |
|
|
|
|
| def pytest_ignore_collect(collection_path: Path, config: Config) -> bool | None: |
| if collection_path.name == "__pycache__": |
| return True |
|
|
| ignore_paths = config._getconftest_pathlist( |
| "collect_ignore", path=collection_path.parent |
| ) |
| ignore_paths = ignore_paths or [] |
| excludeopt = config.getoption("ignore") |
| if excludeopt: |
| ignore_paths.extend(absolutepath(x) for x in excludeopt) |
|
|
| if collection_path in ignore_paths: |
| return True |
|
|
| ignore_globs = config._getconftest_pathlist( |
| "collect_ignore_glob", path=collection_path.parent |
| ) |
| ignore_globs = ignore_globs or [] |
| excludeglobopt = config.getoption("ignore_glob") |
| if excludeglobopt: |
| ignore_globs.extend(absolutepath(x) for x in excludeglobopt) |
|
|
| if any(fnmatch.fnmatch(str(collection_path), str(glob)) for glob in ignore_globs): |
| return True |
|
|
| allow_in_venv = config.getoption("collect_in_virtualenv") |
| if not allow_in_venv and _in_venv(collection_path): |
| return True |
|
|
| if collection_path.is_dir(): |
| norecursepatterns = config.getini("norecursedirs") |
| if any(fnmatch_ex(pat, collection_path) for pat in norecursepatterns): |
| return True |
|
|
| return None |
|
|
|
|
| def pytest_collect_directory( |
| path: Path, parent: nodes.Collector |
| ) -> nodes.Collector | None: |
| return Dir.from_parent(parent, path=path) |
|
|
|
|
| def pytest_collection_modifyitems(items: list[nodes.Item], config: Config) -> None: |
| deselect_prefixes = tuple(config.getoption("deselect") or []) |
| if not deselect_prefixes: |
| return |
|
|
| remaining = [] |
| deselected = [] |
| for colitem in items: |
| if colitem.nodeid.startswith(deselect_prefixes): |
| deselected.append(colitem) |
| else: |
| remaining.append(colitem) |
|
|
| if deselected: |
| config.hook.pytest_deselected(items=deselected) |
| items[:] = remaining |
|
|
|
|
| class FSHookProxy: |
| def __init__( |
| self, |
| pm: PytestPluginManager, |
| remove_mods: AbstractSet[object], |
| ) -> None: |
| self.pm = pm |
| self.remove_mods = remove_mods |
|
|
| def __getattr__(self, name: str) -> pluggy.HookCaller: |
| x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) |
| self.__dict__[name] = x |
| return x |
|
|
|
|
| class Interrupted(KeyboardInterrupt): |
| """Signals that the test run was interrupted.""" |
|
|
| __module__ = "builtins" |
|
|
|
|
| class Failed(Exception): |
| """Signals a stop as failed test run.""" |
|
|
|
|
| @dataclasses.dataclass |
| class _bestrelpath_cache(dict[Path, str]): |
| __slots__ = ("path",) |
|
|
| path: Path |
|
|
| def __missing__(self, path: Path) -> str: |
| r = bestrelpath(self.path, path) |
| self[path] = r |
| return r |
|
|
|
|
| @final |
| class Dir(nodes.Directory): |
| """Collector of files in a file system directory. |
| |
| .. versionadded:: 8.0 |
| |
| .. note:: |
| |
| Python directories with an `__init__.py` file are instead collected by |
| :class:`~pytest.Package` by default. Both are :class:`~pytest.Directory` |
| collectors. |
| """ |
|
|
| @classmethod |
| def from_parent( |
| cls, |
| parent: nodes.Collector, |
| *, |
| path: Path, |
| ) -> Self: |
| """The public constructor. |
| |
| :param parent: The parent collector of this Dir. |
| :param path: The directory's path. |
| :type path: pathlib.Path |
| """ |
| return super().from_parent(parent=parent, path=path) |
|
|
| def collect(self) -> Iterable[nodes.Item | nodes.Collector]: |
| config = self.config |
| col: nodes.Collector | None |
| cols: Sequence[nodes.Collector] |
| ihook = self.ihook |
| for direntry in scandir(self.path): |
| if direntry.is_dir(): |
| path = Path(direntry.path) |
| if not self.session.isinitpath(path, with_parents=True): |
| if ihook.pytest_ignore_collect(collection_path=path, config=config): |
| continue |
| col = ihook.pytest_collect_directory(path=path, parent=self) |
| if col is not None: |
| yield col |
|
|
| elif direntry.is_file(): |
| path = Path(direntry.path) |
| if not self.session.isinitpath(path): |
| if ihook.pytest_ignore_collect(collection_path=path, config=config): |
| continue |
| cols = ihook.pytest_collect_file(file_path=path, parent=self) |
| yield from cols |
|
|
|
|
| @final |
| class Session(nodes.Collector): |
| """The root of the collection tree. |
| |
| ``Session`` collects the initial paths given as arguments to pytest. |
| """ |
|
|
| Interrupted = Interrupted |
| Failed = Failed |
| |
| _setupstate: SetupState |
| |
| _fixturemanager: FixtureManager |
| exitstatus: int | ExitCode |
|
|
| def __init__(self, config: Config) -> None: |
| super().__init__( |
| name="", |
| path=config.rootpath, |
| fspath=None, |
| parent=None, |
| config=config, |
| session=self, |
| nodeid="", |
| ) |
| self.testsfailed = 0 |
| self.testscollected = 0 |
| self._shouldstop: bool | str = False |
| self._shouldfail: bool | str = False |
| self.trace = config.trace.root.get("collection") |
| self._initialpaths: frozenset[Path] = frozenset() |
| self._initialpaths_with_parents: frozenset[Path] = frozenset() |
| self._notfound: list[tuple[str, Sequence[nodes.Collector]]] = [] |
| self._initial_parts: list[CollectionArgument] = [] |
| self._collection_cache: dict[nodes.Collector, CollectReport] = {} |
| self.items: list[nodes.Item] = [] |
|
|
| self._bestrelpathcache: dict[Path, str] = _bestrelpath_cache(config.rootpath) |
|
|
| self.config.pluginmanager.register(self, name="session") |
|
|
| @classmethod |
| def from_config(cls, config: Config) -> Session: |
| session: Session = cls._create(config=config) |
| return session |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"<{self.__class__.__name__} {self.name} " |
| f"exitstatus=%r " |
| f"testsfailed={self.testsfailed} " |
| f"testscollected={self.testscollected}>" |
| ) % getattr(self, "exitstatus", "<UNSET>") |
|
|
| @property |
| def shouldstop(self) -> bool | str: |
| return self._shouldstop |
|
|
| @shouldstop.setter |
| def shouldstop(self, value: bool | str) -> None: |
| |
| |
| if value is False and self._shouldstop: |
| warnings.warn( |
| PytestWarning( |
| "session.shouldstop cannot be unset after it has been set; ignoring." |
| ), |
| stacklevel=2, |
| ) |
| return |
| self._shouldstop = value |
|
|
| @property |
| def shouldfail(self) -> bool | str: |
| return self._shouldfail |
|
|
| @shouldfail.setter |
| def shouldfail(self, value: bool | str) -> None: |
| |
| |
| if value is False and self._shouldfail: |
| warnings.warn( |
| PytestWarning( |
| "session.shouldfail cannot be unset after it has been set; ignoring." |
| ), |
| stacklevel=2, |
| ) |
| return |
| self._shouldfail = value |
|
|
| @property |
| def startpath(self) -> Path: |
| """The path from which pytest was invoked. |
| |
| .. versionadded:: 7.0.0 |
| """ |
| return self.config.invocation_params.dir |
|
|
| def _node_location_to_relpath(self, node_path: Path) -> str: |
| |
| return self._bestrelpathcache[node_path] |
|
|
| @hookimpl(tryfirst=True) |
| def pytest_collectstart(self) -> None: |
| if self.shouldfail: |
| raise self.Failed(self.shouldfail) |
| if self.shouldstop: |
| raise self.Interrupted(self.shouldstop) |
|
|
| @hookimpl(tryfirst=True) |
| def pytest_runtest_logreport(self, report: TestReport | CollectReport) -> None: |
| if report.failed and not hasattr(report, "wasxfail"): |
| self.testsfailed += 1 |
| maxfail = self.config.getvalue("maxfail") |
| if maxfail and self.testsfailed >= maxfail: |
| self.shouldfail = f"stopping after {self.testsfailed} failures" |
|
|
| pytest_collectreport = pytest_runtest_logreport |
|
|
| def isinitpath( |
| self, |
| path: str | os.PathLike[str], |
| *, |
| with_parents: bool = False, |
| ) -> bool: |
| """Is path an initial path? |
| |
| An initial path is a path explicitly given to pytest on the command |
| line. |
| |
| :param with_parents: |
| If set, also return True if the path is a parent of an initial path. |
| |
| .. versionchanged:: 8.0 |
| Added the ``with_parents`` parameter. |
| """ |
| |
| path_ = path if isinstance(path, Path) else Path(path) |
| if with_parents: |
| return path_ in self._initialpaths_with_parents |
| else: |
| return path_ in self._initialpaths |
|
|
| def gethookproxy(self, fspath: os.PathLike[str]) -> pluggy.HookRelay: |
| |
| path = fspath if isinstance(fspath, Path) else Path(fspath) |
| pm = self.config.pluginmanager |
| |
| |
| my_conftestmodules = pm._getconftestmodules(path) |
| remove_mods = pm._conftest_plugins.difference(my_conftestmodules) |
| proxy: pluggy.HookRelay |
| if remove_mods: |
| |
| proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) |
| else: |
| |
| proxy = self.config.hook |
| return proxy |
|
|
| def _collect_path( |
| self, |
| path: Path, |
| path_cache: dict[Path, Sequence[nodes.Collector]], |
| ) -> Sequence[nodes.Collector]: |
| """Create a Collector for the given path. |
| |
| `path_cache` makes it so the same Collectors are returned for the same |
| path. |
| """ |
| if path in path_cache: |
| return path_cache[path] |
|
|
| if path.is_dir(): |
| ihook = self.gethookproxy(path.parent) |
| col: nodes.Collector | None = ihook.pytest_collect_directory( |
| path=path, parent=self |
| ) |
| cols: Sequence[nodes.Collector] = (col,) if col is not None else () |
|
|
| elif path.is_file(): |
| ihook = self.gethookproxy(path) |
| cols = ihook.pytest_collect_file(file_path=path, parent=self) |
|
|
| else: |
| |
| cols = () |
|
|
| path_cache[path] = cols |
| return cols |
|
|
| @overload |
| def perform_collect( |
| self, args: Sequence[str] | None = ..., genitems: Literal[True] = ... |
| ) -> Sequence[nodes.Item]: ... |
|
|
| @overload |
| def perform_collect( |
| self, args: Sequence[str] | None = ..., genitems: bool = ... |
| ) -> Sequence[nodes.Item | nodes.Collector]: ... |
|
|
| def perform_collect( |
| self, args: Sequence[str] | None = None, genitems: bool = True |
| ) -> Sequence[nodes.Item | nodes.Collector]: |
| """Perform the collection phase for this session. |
| |
| This is called by the default :hook:`pytest_collection` hook |
| implementation; see the documentation of this hook for more details. |
| For testing purposes, it may also be called directly on a fresh |
| ``Session``. |
| |
| This function normally recursively expands any collectors collected |
| from the session to their items, and only items are returned. For |
| testing purposes, this may be suppressed by passing ``genitems=False``, |
| in which case the return value contains these collectors unexpanded, |
| and ``session.items`` is empty. |
| """ |
| if args is None: |
| args = self.config.args |
|
|
| self.trace("perform_collect", self, args) |
| self.trace.root.indent += 1 |
|
|
| hook = self.config.hook |
|
|
| self._notfound = [] |
| self._initial_parts = [] |
| self._collection_cache = {} |
| self.items = [] |
| items: Sequence[nodes.Item | nodes.Collector] = self.items |
| try: |
| initialpaths: list[Path] = [] |
| initialpaths_with_parents: list[Path] = [] |
| for arg in args: |
| collection_argument = resolve_collection_argument( |
| self.config.invocation_params.dir, |
| arg, |
| as_pypath=self.config.option.pyargs, |
| ) |
| self._initial_parts.append(collection_argument) |
| initialpaths.append(collection_argument.path) |
| initialpaths_with_parents.append(collection_argument.path) |
| initialpaths_with_parents.extend(collection_argument.path.parents) |
| self._initialpaths = frozenset(initialpaths) |
| self._initialpaths_with_parents = frozenset(initialpaths_with_parents) |
|
|
| rep = collect_one_node(self) |
| self.ihook.pytest_collectreport(report=rep) |
| self.trace.root.indent -= 1 |
| if self._notfound: |
| errors = [] |
| for arg, collectors in self._notfound: |
| if collectors: |
| errors.append( |
| f"not found: {arg}\n(no match in any of {collectors!r})" |
| ) |
| else: |
| errors.append(f"found no collectors for {arg}") |
|
|
| raise UsageError(*errors) |
|
|
| if not genitems: |
| items = rep.result |
| else: |
| if rep.passed: |
| for node in rep.result: |
| self.items.extend(self.genitems(node)) |
|
|
| self.config.pluginmanager.check_pending() |
| hook.pytest_collection_modifyitems( |
| session=self, config=self.config, items=items |
| ) |
| finally: |
| self._notfound = [] |
| self._initial_parts = [] |
| self._collection_cache = {} |
| hook.pytest_collection_finish(session=self) |
|
|
| if genitems: |
| self.testscollected = len(items) |
|
|
| return items |
|
|
| def _collect_one_node( |
| self, |
| node: nodes.Collector, |
| handle_dupes: bool = True, |
| ) -> tuple[CollectReport, bool]: |
| if node in self._collection_cache and handle_dupes: |
| rep = self._collection_cache[node] |
| return rep, True |
| else: |
| rep = collect_one_node(node) |
| self._collection_cache[node] = rep |
| return rep, False |
|
|
| def collect(self) -> Iterator[nodes.Item | nodes.Collector]: |
| |
| |
| |
| path_cache: dict[Path, Sequence[nodes.Collector]] = {} |
|
|
| pm = self.config.pluginmanager |
|
|
| for collection_argument in self._initial_parts: |
| self.trace("processing argument", collection_argument) |
| self.trace.root.indent += 1 |
|
|
| argpath = collection_argument.path |
| names = collection_argument.parts |
| module_name = collection_argument.module_name |
|
|
| |
| if argpath.is_dir(): |
| assert not names, f"invalid arg {(argpath, names)!r}" |
|
|
| paths = [argpath] |
| |
| |
| if module_name is None: |
| |
| for path in argpath.parents: |
| if not pm._is_in_confcutdir(path): |
| break |
| paths.insert(0, path) |
| else: |
| |
| |
| module_name_parts = module_name.split(".") |
| for i, path in enumerate(argpath.parents, 2): |
| if i > len(module_name_parts) or path.stem != module_name_parts[-i]: |
| break |
| paths.insert(0, path) |
|
|
| |
| |
| any_matched_in_initial_part = False |
| notfound_collectors = [] |
| work: list[tuple[nodes.Collector | nodes.Item, list[Path | str]]] = [ |
| (self, [*paths, *names]) |
| ] |
| while work: |
| matchnode, matchparts = work.pop() |
|
|
| |
| if not matchparts: |
| yield matchnode |
| any_matched_in_initial_part = True |
| continue |
|
|
| |
| if not isinstance(matchnode, nodes.Collector): |
| continue |
|
|
| |
| |
| |
| subnodes: Sequence[nodes.Collector | nodes.Item] |
| if isinstance(matchnode, Session): |
| assert isinstance(matchparts[0], Path) |
| subnodes = matchnode._collect_path(matchparts[0], path_cache) |
| else: |
| |
| |
| handle_dupes = not ( |
| len(matchparts) == 1 |
| and isinstance(matchparts[0], Path) |
| and matchparts[0].is_file() |
| ) |
| rep, duplicate = self._collect_one_node(matchnode, handle_dupes) |
| if not duplicate and not rep.passed: |
| |
| |
| |
| matchnode.ihook.pytest_collectreport(report=rep) |
| if not rep.passed: |
| continue |
| subnodes = rep.result |
|
|
| |
| any_matched_in_collector = False |
| for node in reversed(subnodes): |
| |
| if isinstance(matchparts[0], Path): |
| is_match = node.path == matchparts[0] |
| if sys.platform == "win32" and not is_match: |
| |
| |
| same_file = os.path.samefile(node.path, matchparts[0]) |
| |
| |
| is_match = same_file and ( |
| os.path.islink(node.path) |
| == os.path.islink(matchparts[0]) |
| ) |
|
|
| |
| else: |
| |
| |
| is_match = ( |
| node.name == matchparts[0] |
| or node.name.split("[")[0] == matchparts[0] |
| ) |
| if is_match: |
| work.append((node, matchparts[1:])) |
| any_matched_in_collector = True |
|
|
| if not any_matched_in_collector: |
| notfound_collectors.append(matchnode) |
|
|
| if not any_matched_in_initial_part: |
| report_arg = "::".join((str(argpath), *names)) |
| self._notfound.append((report_arg, notfound_collectors)) |
|
|
| self.trace.root.indent -= 1 |
|
|
| def genitems(self, node: nodes.Item | nodes.Collector) -> Iterator[nodes.Item]: |
| self.trace("genitems", node) |
| if isinstance(node, nodes.Item): |
| node.ihook.pytest_itemcollected(item=node) |
| yield node |
| else: |
| assert isinstance(node, nodes.Collector) |
| keepduplicates = self.config.getoption("keepduplicates") |
| |
| handle_dupes = not (keepduplicates and isinstance(node, nodes.File)) |
| rep, duplicate = self._collect_one_node(node, handle_dupes) |
| if duplicate and not keepduplicates: |
| return |
| if rep.passed: |
| for subnode in rep.result: |
| yield from self.genitems(subnode) |
| if not duplicate: |
| node.ihook.pytest_collectreport(report=rep) |
|
|
|
|
| def search_pypath(module_name: str) -> str | None: |
| """Search sys.path for the given a dotted module name, and return its file |
| system path if found.""" |
| try: |
| spec = importlib.util.find_spec(module_name) |
| |
| |
| |
| except (AttributeError, ImportError, ValueError): |
| return None |
| if spec is None or spec.origin is None or spec.origin == "namespace": |
| return None |
| elif spec.submodule_search_locations: |
| return os.path.dirname(spec.origin) |
| else: |
| return spec.origin |
|
|
|
|
| @dataclasses.dataclass(frozen=True) |
| class CollectionArgument: |
| """A resolved collection argument.""" |
|
|
| path: Path |
| parts: Sequence[str] |
| module_name: str | None |
|
|
|
|
| def resolve_collection_argument( |
| invocation_path: Path, arg: str, *, as_pypath: bool = False |
| ) -> CollectionArgument: |
| """Parse path arguments optionally containing selection parts and return (fspath, names). |
| |
| Command-line arguments can point to files and/or directories, and optionally contain |
| parts for specific tests selection, for example: |
| |
| "pkg/tests/test_foo.py::TestClass::test_foo" |
| |
| This function ensures the path exists, and returns a resolved `CollectionArgument`: |
| |
| CollectionArgument( |
| path=Path("/full/path/to/pkg/tests/test_foo.py"), |
| parts=["TestClass", "test_foo"], |
| module_name=None, |
| ) |
| |
| When as_pypath is True, expects that the command-line argument actually contains |
| module paths instead of file-system paths: |
| |
| "pkg.tests.test_foo::TestClass::test_foo" |
| |
| In which case we search sys.path for a matching module, and then return the *path* to the |
| found module, which may look like this: |
| |
| CollectionArgument( |
| path=Path("/home/u/myvenv/lib/site-packages/pkg/tests/test_foo.py"), |
| parts=["TestClass", "test_foo"], |
| module_name="pkg.tests.test_foo", |
| ) |
| |
| If the path doesn't exist, raise UsageError. |
| If the path is a directory and selection parts are present, raise UsageError. |
| """ |
| base, squacket, rest = str(arg).partition("[") |
| strpath, *parts = base.split("::") |
| if parts: |
| parts[-1] = f"{parts[-1]}{squacket}{rest}" |
| module_name = None |
| if as_pypath: |
| pyarg_strpath = search_pypath(strpath) |
| if pyarg_strpath is not None: |
| module_name = strpath |
| strpath = pyarg_strpath |
| fspath = invocation_path / strpath |
| fspath = absolutepath(fspath) |
| if not safe_exists(fspath): |
| msg = ( |
| "module or package not found: {arg} (missing __init__.py?)" |
| if as_pypath |
| else "file or directory not found: {arg}" |
| ) |
| raise UsageError(msg.format(arg=arg)) |
| if parts and fspath.is_dir(): |
| msg = ( |
| "package argument cannot contain :: selection parts: {arg}" |
| if as_pypath |
| else "directory argument cannot contain :: selection parts: {arg}" |
| ) |
| raise UsageError(msg.format(arg=arg)) |
| return CollectionArgument( |
| path=fspath, |
| parts=parts, |
| module_name=module_name, |
| ) |
|
|