Spaces:
Paused
Paused
| """Core implementation of the testing process: init, session, runtest loop.""" | |
| import argparse | |
| import dataclasses | |
| import fnmatch | |
| import functools | |
| import importlib | |
| import importlib.util | |
| import os | |
| from pathlib import Path | |
| import sys | |
| from typing import AbstractSet | |
| from typing import Callable | |
| from typing import Dict | |
| from typing import final | |
| from typing import FrozenSet | |
| from typing import Iterable | |
| from typing import Iterator | |
| from typing import List | |
| from typing import Literal | |
| from typing import Optional | |
| from typing import overload | |
| from typing import Sequence | |
| from typing import Tuple | |
| from typing import TYPE_CHECKING | |
| from typing import Union | |
| import warnings | |
| import pluggy | |
| from _pytest import nodes | |
| import _pytest._code | |
| from _pytest.config import Config | |
| from _pytest.config import directory_arg | |
| from _pytest.config import ExitCode | |
| from _pytest.config import hookimpl | |
| from _pytest.config import PytestPluginManager | |
| from _pytest.config import UsageError | |
| from _pytest.config.argparsing import Parser | |
| from _pytest.config.compat import PathAwareHookProxy | |
| from _pytest.fixtures import FixtureManager | |
| from _pytest.outcomes import exit | |
| from _pytest.pathlib import absolutepath | |
| from _pytest.pathlib import bestrelpath | |
| from _pytest.pathlib import fnmatch_ex | |
| from _pytest.pathlib import safe_exists | |
| from _pytest.pathlib import scandir | |
| from _pytest.reports import CollectReport | |
| from _pytest.reports import TestReport | |
| from _pytest.runner import collect_one_node | |
| from _pytest.runner import SetupState | |
| from _pytest.warning_types import PytestWarning | |
| if TYPE_CHECKING: | |
| from typing import Self | |
| def pytest_addoption(parser: Parser) -> None: | |
| parser.addini( | |
| "norecursedirs", | |
| "Directory patterns to avoid for recursion", | |
| type="args", | |
| default=[ | |
| "*.egg", | |
| ".*", | |
| "_darcs", | |
| "build", | |
| "CVS", | |
| "dist", | |
| "node_modules", | |
| "venv", | |
| "{arch}", | |
| ], | |
| ) | |
| parser.addini( | |
| "testpaths", | |
| "Directories to search for tests when no files or directories are given on the " | |
| "command line", | |
| type="args", | |
| default=[], | |
| ) | |
| group = parser.getgroup("general", "Running and selection options") | |
| group._addoption( | |
| "-x", | |
| "--exitfirst", | |
| action="store_const", | |
| dest="maxfail", | |
| const=1, | |
| help="Exit instantly on first error or failed test", | |
| ) | |
| group = parser.getgroup("pytest-warnings") | |
| group.addoption( | |
| "-W", | |
| "--pythonwarnings", | |
| action="append", | |
| help="Set which warnings to report, see -W option of Python itself", | |
| ) | |
| parser.addini( | |
| "filterwarnings", | |
| type="linelist", | |
| help="Each line specifies a pattern for " | |
| "warnings.filterwarnings. " | |
| "Processed after -W/--pythonwarnings.", | |
| ) | |
| group._addoption( | |
| "--maxfail", | |
| metavar="num", | |
| action="store", | |
| type=int, | |
| dest="maxfail", | |
| default=0, | |
| help="Exit after first num failures or errors", | |
| ) | |
| group._addoption( | |
| "--strict-config", | |
| action="store_true", | |
| help="Any warnings encountered while parsing the `pytest` section of the " | |
| "configuration file raise errors", | |
| ) | |
| group._addoption( | |
| "--strict-markers", | |
| action="store_true", | |
| help="Markers not registered in the `markers` section of the configuration " | |
| "file raise errors", | |
| ) | |
| group._addoption( | |
| "--strict", | |
| action="store_true", | |
| help="(Deprecated) alias to --strict-markers", | |
| ) | |
| group._addoption( | |
| "-c", | |
| "--config-file", | |
| metavar="FILE", | |
| type=str, | |
| dest="inifilename", | |
| help="Load configuration from `FILE` instead of trying to locate one of the " | |
| "implicit configuration files.", | |
| ) | |
| group._addoption( | |
| "--continue-on-collection-errors", | |
| action="store_true", | |
| default=False, | |
| dest="continue_on_collection_errors", | |
| help="Force test execution even if collection errors occur", | |
| ) | |
| group._addoption( | |
| "--rootdir", | |
| action="store", | |
| dest="rootdir", | |
| help="Define root directory for tests. Can be relative path: 'root_dir', './root_dir', " | |
| "'root_dir/another_dir/'; absolute path: '/home/user/root_dir'; path with variables: " | |
| "'$HOME/root_dir'.", | |
| ) | |
| group = parser.getgroup("collect", "collection") | |
| group.addoption( | |
| "--collectonly", | |
| "--collect-only", | |
| "--co", | |
| action="store_true", | |
| help="Only collect tests, don't execute them", | |
| ) | |
| group.addoption( | |
| "--pyargs", | |
| action="store_true", | |
| help="Try to interpret all arguments as Python packages", | |
| ) | |
| group.addoption( | |
| "--ignore", | |
| action="append", | |
| metavar="path", | |
| help="Ignore path during collection (multi-allowed)", | |
| ) | |
| group.addoption( | |
| "--ignore-glob", | |
| action="append", | |
| metavar="path", | |
| help="Ignore path pattern during collection (multi-allowed)", | |
| ) | |
| group.addoption( | |
| "--deselect", | |
| action="append", | |
| metavar="nodeid_prefix", | |
| help="Deselect item (via node id prefix) during collection (multi-allowed)", | |
| ) | |
| group.addoption( | |
| "--confcutdir", | |
| dest="confcutdir", | |
| default=None, | |
| metavar="dir", | |
| type=functools.partial(directory_arg, optname="--confcutdir"), | |
| help="Only load conftest.py's relative to specified dir", | |
| ) | |
| group.addoption( | |
| "--noconftest", | |
| action="store_true", | |
| dest="noconftest", | |
| default=False, | |
| help="Don't load any conftest.py files", | |
| ) | |
| group.addoption( | |
| "--keepduplicates", | |
| "--keep-duplicates", | |
| action="store_true", | |
| dest="keepduplicates", | |
| default=False, | |
| help="Keep duplicate tests", | |
| ) | |
| group.addoption( | |
| "--collect-in-virtualenv", | |
| action="store_true", | |
| dest="collect_in_virtualenv", | |
| default=False, | |
| help="Don't ignore tests in a local virtualenv directory", | |
| ) | |
| group.addoption( | |
| "--import-mode", | |
| default="prepend", | |
| choices=["prepend", "append", "importlib"], | |
| dest="importmode", | |
| help="Prepend/append to sys.path when importing test modules and conftest " | |
| "files. Default: prepend.", | |
| ) | |
| parser.addini( | |
| "consider_namespace_packages", | |
| type="bool", | |
| default=False, | |
| help="Consider namespace packages when resolving module names during import", | |
| ) | |
| group = parser.getgroup("debugconfig", "test session debugging and configuration") | |
| group.addoption( | |
| "--basetemp", | |
| dest="basetemp", | |
| default=None, | |
| type=validate_basetemp, | |
| metavar="dir", | |
| help=( | |
| "Base temporary directory for this test run. " | |
| "(Warning: this directory is removed if it exists.)" | |
| ), | |
| ) | |
| def validate_basetemp(path: str) -> str: | |
| # GH 7119 | |
| msg = "basetemp must not be empty, the current working directory or any parent directory of it" | |
| # empty path | |
| if not path: | |
| raise argparse.ArgumentTypeError(msg) | |
| def is_ancestor(base: Path, query: Path) -> bool: | |
| """Return whether query is an ancestor of base.""" | |
| if base == query: | |
| return True | |
| return query in base.parents | |
| # check if path is an ancestor of cwd | |
| if is_ancestor(Path.cwd(), Path(path).absolute()): | |
| raise argparse.ArgumentTypeError(msg) | |
| # check symlinks for ancestors | |
| if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): | |
| raise argparse.ArgumentTypeError(msg) | |
| return path | |
| def wrap_session( | |
| config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]] | |
| ) -> Union[int, ExitCode]: | |
| """Skeleton command line program.""" | |
| session = Session.from_config(config) | |
| session.exitstatus = ExitCode.OK | |
| initstate = 0 | |
| try: | |
| try: | |
| config._do_configure() | |
| initstate = 1 | |
| config.hook.pytest_sessionstart(session=session) | |
| initstate = 2 | |
| session.exitstatus = doit(config, session) or 0 | |
| except UsageError: | |
| session.exitstatus = ExitCode.USAGE_ERROR | |
| raise | |
| except Failed: | |
| session.exitstatus = ExitCode.TESTS_FAILED | |
| except (KeyboardInterrupt, exit.Exception): | |
| excinfo = _pytest._code.ExceptionInfo.from_current() | |
| exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED | |
| if isinstance(excinfo.value, exit.Exception): | |
| if excinfo.value.returncode is not None: | |
| exitstatus = excinfo.value.returncode | |
| if initstate < 2: | |
| sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") | |
| config.hook.pytest_keyboard_interrupt(excinfo=excinfo) | |
| session.exitstatus = exitstatus | |
| except BaseException: | |
| session.exitstatus = ExitCode.INTERNAL_ERROR | |
| excinfo = _pytest._code.ExceptionInfo.from_current() | |
| try: | |
| config.notify_exception(excinfo, config.option) | |
| except exit.Exception as exc: | |
| if exc.returncode is not None: | |
| session.exitstatus = exc.returncode | |
| sys.stderr.write(f"{type(exc).__name__}: {exc}\n") | |
| else: | |
| if isinstance(excinfo.value, SystemExit): | |
| sys.stderr.write("mainloop: caught unexpected SystemExit!\n") | |
| finally: | |
| # Explicitly break reference cycle. | |
| excinfo = None # type: ignore | |
| os.chdir(session.startpath) | |
| if initstate >= 2: | |
| try: | |
| config.hook.pytest_sessionfinish( | |
| session=session, exitstatus=session.exitstatus | |
| ) | |
| except exit.Exception as exc: | |
| if exc.returncode is not None: | |
| session.exitstatus = exc.returncode | |
| sys.stderr.write(f"{type(exc).__name__}: {exc}\n") | |
| config._ensure_unconfigure() | |
| return session.exitstatus | |
| def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]: | |
| return wrap_session(config, _main) | |
| def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]: | |
| """Default command line protocol for initialization, session, | |
| running tests and reporting.""" | |
| config.hook.pytest_collection(session=session) | |
| config.hook.pytest_runtestloop(session=session) | |
| if session.testsfailed: | |
| return ExitCode.TESTS_FAILED | |
| elif session.testscollected == 0: | |
| return ExitCode.NO_TESTS_COLLECTED | |
| return None | |
| def pytest_collection(session: "Session") -> None: | |
| session.perform_collect() | |
| def pytest_runtestloop(session: "Session") -> bool: | |
| if session.testsfailed and not session.config.option.continue_on_collection_errors: | |
| raise session.Interrupted( | |
| "%d error%s during collection" | |
| % (session.testsfailed, "s" if session.testsfailed != 1 else "") | |
| ) | |
| if session.config.option.collectonly: | |
| return True | |
| for i, item in enumerate(session.items): | |
| nextitem = session.items[i + 1] if i + 1 < len(session.items) else None | |
| item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) | |
| if session.shouldfail: | |
| raise session.Failed(session.shouldfail) | |
| if session.shouldstop: | |
| raise session.Interrupted(session.shouldstop) | |
| return True | |
| def _in_venv(path: Path) -> bool: | |
| """Attempt to detect if ``path`` is the root of a Virtual Environment by | |
| checking for the existence of the appropriate activate script.""" | |
| bindir = path.joinpath("Scripts" if sys.platform.startswith("win") else "bin") | |
| try: | |
| if not bindir.is_dir(): | |
| return False | |
| except OSError: | |
| return False | |
| activates = ( | |
| "activate", | |
| "activate.csh", | |
| "activate.fish", | |
| "Activate", | |
| "Activate.bat", | |
| "Activate.ps1", | |
| ) | |
| return any(fname.name in activates for fname in bindir.iterdir()) | |
| def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[bool]: | |
| if collection_path.name == "__pycache__": | |
| return True | |
| ignore_paths = config._getconftest_pathlist( | |
| "collect_ignore", path=collection_path.parent | |
| ) | |
| ignore_paths = ignore_paths or [] | |
| excludeopt = config.getoption("ignore") | |
| if excludeopt: | |
| ignore_paths.extend(absolutepath(x) for x in excludeopt) | |
| if collection_path in ignore_paths: | |
| return True | |
| ignore_globs = config._getconftest_pathlist( | |
| "collect_ignore_glob", path=collection_path.parent | |
| ) | |
| ignore_globs = ignore_globs or [] | |
| excludeglobopt = config.getoption("ignore_glob") | |
| if excludeglobopt: | |
| ignore_globs.extend(absolutepath(x) for x in excludeglobopt) | |
| if any(fnmatch.fnmatch(str(collection_path), str(glob)) for glob in ignore_globs): | |
| return True | |
| allow_in_venv = config.getoption("collect_in_virtualenv") | |
| if not allow_in_venv and _in_venv(collection_path): | |
| return True | |
| if collection_path.is_dir(): | |
| norecursepatterns = config.getini("norecursedirs") | |
| if any(fnmatch_ex(pat, collection_path) for pat in norecursepatterns): | |
| return True | |
| return None | |
| def pytest_collect_directory( | |
| path: Path, parent: nodes.Collector | |
| ) -> Optional[nodes.Collector]: | |
| return Dir.from_parent(parent, path=path) | |
| def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None: | |
| deselect_prefixes = tuple(config.getoption("deselect") or []) | |
| if not deselect_prefixes: | |
| return | |
| remaining = [] | |
| deselected = [] | |
| for colitem in items: | |
| if colitem.nodeid.startswith(deselect_prefixes): | |
| deselected.append(colitem) | |
| else: | |
| remaining.append(colitem) | |
| if deselected: | |
| config.hook.pytest_deselected(items=deselected) | |
| items[:] = remaining | |
| class FSHookProxy: | |
| def __init__( | |
| self, | |
| pm: PytestPluginManager, | |
| remove_mods: AbstractSet[object], | |
| ) -> None: | |
| self.pm = pm | |
| self.remove_mods = remove_mods | |
| def __getattr__(self, name: str) -> pluggy.HookCaller: | |
| x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) | |
| self.__dict__[name] = x | |
| return x | |
| class Interrupted(KeyboardInterrupt): | |
| """Signals that the test run was interrupted.""" | |
| __module__ = "builtins" # For py3. | |
| class Failed(Exception): | |
| """Signals a stop as failed test run.""" | |
| class _bestrelpath_cache(Dict[Path, str]): | |
| __slots__ = ("path",) | |
| path: Path | |
| def __missing__(self, path: Path) -> str: | |
| r = bestrelpath(self.path, path) | |
| self[path] = r | |
| return r | |
| class Dir(nodes.Directory): | |
| """Collector of files in a file system directory. | |
| .. versionadded:: 8.0 | |
| .. note:: | |
| Python directories with an `__init__.py` file are instead collected by | |
| :class:`~pytest.Package` by default. Both are :class:`~pytest.Directory` | |
| collectors. | |
| """ | |
| def from_parent( # type: ignore[override] | |
| cls, | |
| parent: nodes.Collector, | |
| *, | |
| path: Path, | |
| ) -> "Self": | |
| """The public constructor. | |
| :param parent: The parent collector of this Dir. | |
| :param path: The directory's path. | |
| """ | |
| return super().from_parent(parent=parent, path=path) | |
| def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]: | |
| config = self.config | |
| col: Optional[nodes.Collector] | |
| cols: Sequence[nodes.Collector] | |
| ihook = self.ihook | |
| for direntry in scandir(self.path): | |
| if direntry.is_dir(): | |
| path = Path(direntry.path) | |
| if not self.session.isinitpath(path, with_parents=True): | |
| if ihook.pytest_ignore_collect(collection_path=path, config=config): | |
| continue | |
| col = ihook.pytest_collect_directory(path=path, parent=self) | |
| if col is not None: | |
| yield col | |
| elif direntry.is_file(): | |
| path = Path(direntry.path) | |
| if not self.session.isinitpath(path): | |
| if ihook.pytest_ignore_collect(collection_path=path, config=config): | |
| continue | |
| cols = ihook.pytest_collect_file(file_path=path, parent=self) | |
| yield from cols | |
| class Session(nodes.Collector): | |
| """The root of the collection tree. | |
| ``Session`` collects the initial paths given as arguments to pytest. | |
| """ | |
| Interrupted = Interrupted | |
| Failed = Failed | |
| # Set on the session by runner.pytest_sessionstart. | |
| _setupstate: SetupState | |
| # Set on the session by fixtures.pytest_sessionstart. | |
| _fixturemanager: FixtureManager | |
| exitstatus: Union[int, ExitCode] | |
| def __init__(self, config: Config) -> None: | |
| super().__init__( | |
| name="", | |
| path=config.rootpath, | |
| fspath=None, | |
| parent=None, | |
| config=config, | |
| session=self, | |
| nodeid="", | |
| ) | |
| self.testsfailed = 0 | |
| self.testscollected = 0 | |
| self._shouldstop: Union[bool, str] = False | |
| self._shouldfail: Union[bool, str] = False | |
| self.trace = config.trace.root.get("collection") | |
| self._initialpaths: FrozenSet[Path] = frozenset() | |
| self._initialpaths_with_parents: FrozenSet[Path] = frozenset() | |
| self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = [] | |
| self._initial_parts: List[CollectionArgument] = [] | |
| self._collection_cache: Dict[nodes.Collector, CollectReport] = {} | |
| self.items: List[nodes.Item] = [] | |
| self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath) | |
| self.config.pluginmanager.register(self, name="session") | |
| def from_config(cls, config: Config) -> "Session": | |
| session: Session = cls._create(config=config) | |
| return session | |
| def __repr__(self) -> str: | |
| return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % ( | |
| self.__class__.__name__, | |
| self.name, | |
| getattr(self, "exitstatus", "<UNSET>"), | |
| self.testsfailed, | |
| self.testscollected, | |
| ) | |
| def shouldstop(self) -> Union[bool, str]: | |
| return self._shouldstop | |
| def shouldstop(self, value: Union[bool, str]) -> None: | |
| # The runner checks shouldfail and assumes that if it is set we are | |
| # definitely stopping, so prevent unsetting it. | |
| if value is False and self._shouldstop: | |
| warnings.warn( | |
| PytestWarning( | |
| "session.shouldstop cannot be unset after it has been set; ignoring." | |
| ), | |
| stacklevel=2, | |
| ) | |
| return | |
| self._shouldstop = value | |
| def shouldfail(self) -> Union[bool, str]: | |
| return self._shouldfail | |
| def shouldfail(self, value: Union[bool, str]) -> None: | |
| # The runner checks shouldfail and assumes that if it is set we are | |
| # definitely stopping, so prevent unsetting it. | |
| if value is False and self._shouldfail: | |
| warnings.warn( | |
| PytestWarning( | |
| "session.shouldfail cannot be unset after it has been set; ignoring." | |
| ), | |
| stacklevel=2, | |
| ) | |
| return | |
| self._shouldfail = value | |
| def startpath(self) -> Path: | |
| """The path from which pytest was invoked. | |
| .. versionadded:: 7.0.0 | |
| """ | |
| return self.config.invocation_params.dir | |
| def _node_location_to_relpath(self, node_path: Path) -> str: | |
| # bestrelpath is a quite slow function. | |
| return self._bestrelpathcache[node_path] | |
| def pytest_collectstart(self) -> None: | |
| if self.shouldfail: | |
| raise self.Failed(self.shouldfail) | |
| if self.shouldstop: | |
| raise self.Interrupted(self.shouldstop) | |
| def pytest_runtest_logreport( | |
| self, report: Union[TestReport, CollectReport] | |
| ) -> None: | |
| if report.failed and not hasattr(report, "wasxfail"): | |
| self.testsfailed += 1 | |
| maxfail = self.config.getvalue("maxfail") | |
| if maxfail and self.testsfailed >= maxfail: | |
| self.shouldfail = "stopping after %d failures" % (self.testsfailed) | |
| pytest_collectreport = pytest_runtest_logreport | |
| def isinitpath( | |
| self, | |
| path: Union[str, "os.PathLike[str]"], | |
| *, | |
| with_parents: bool = False, | |
| ) -> bool: | |
| """Is path an initial path? | |
| An initial path is a path explicitly given to pytest on the command | |
| line. | |
| :param with_parents: | |
| If set, also return True if the path is a parent of an initial path. | |
| .. versionchanged:: 8.0 | |
| Added the ``with_parents`` parameter. | |
| """ | |
| # Optimization: Path(Path(...)) is much slower than isinstance. | |
| path_ = path if isinstance(path, Path) else Path(path) | |
| if with_parents: | |
| return path_ in self._initialpaths_with_parents | |
| else: | |
| return path_ in self._initialpaths | |
| def gethookproxy(self, fspath: "os.PathLike[str]") -> pluggy.HookRelay: | |
| # Optimization: Path(Path(...)) is much slower than isinstance. | |
| path = fspath if isinstance(fspath, Path) else Path(fspath) | |
| pm = self.config.pluginmanager | |
| # Check if we have the common case of running | |
| # hooks with all conftest.py files. | |
| my_conftestmodules = pm._getconftestmodules(path) | |
| remove_mods = pm._conftest_plugins.difference(my_conftestmodules) | |
| proxy: pluggy.HookRelay | |
| if remove_mods: | |
| # One or more conftests are not in use at this path. | |
| proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) # type: ignore[arg-type,assignment] | |
| else: | |
| # All plugins are active for this fspath. | |
| proxy = self.config.hook | |
| return proxy | |
| def _collect_path( | |
| self, | |
| path: Path, | |
| path_cache: Dict[Path, Sequence[nodes.Collector]], | |
| ) -> Sequence[nodes.Collector]: | |
| """Create a Collector for the given path. | |
| `path_cache` makes it so the same Collectors are returned for the same | |
| path. | |
| """ | |
| if path in path_cache: | |
| return path_cache[path] | |
| if path.is_dir(): | |
| ihook = self.gethookproxy(path.parent) | |
| col: Optional[nodes.Collector] = ihook.pytest_collect_directory( | |
| path=path, parent=self | |
| ) | |
| cols: Sequence[nodes.Collector] = (col,) if col is not None else () | |
| elif path.is_file(): | |
| ihook = self.gethookproxy(path) | |
| cols = ihook.pytest_collect_file(file_path=path, parent=self) | |
| else: | |
| # Broken symlink or invalid/missing file. | |
| cols = () | |
| path_cache[path] = cols | |
| return cols | |
| def perform_collect( | |
| self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ... | |
| ) -> Sequence[nodes.Item]: ... | |
| def perform_collect( | |
| self, args: Optional[Sequence[str]] = ..., genitems: bool = ... | |
| ) -> Sequence[Union[nodes.Item, nodes.Collector]]: ... | |
| def perform_collect( | |
| self, args: Optional[Sequence[str]] = None, genitems: bool = True | |
| ) -> Sequence[Union[nodes.Item, nodes.Collector]]: | |
| """Perform the collection phase for this session. | |
| This is called by the default :hook:`pytest_collection` hook | |
| implementation; see the documentation of this hook for more details. | |
| For testing purposes, it may also be called directly on a fresh | |
| ``Session``. | |
| This function normally recursively expands any collectors collected | |
| from the session to their items, and only items are returned. For | |
| testing purposes, this may be suppressed by passing ``genitems=False``, | |
| in which case the return value contains these collectors unexpanded, | |
| and ``session.items`` is empty. | |
| """ | |
| if args is None: | |
| args = self.config.args | |
| self.trace("perform_collect", self, args) | |
| self.trace.root.indent += 1 | |
| hook = self.config.hook | |
| self._notfound = [] | |
| self._initial_parts = [] | |
| self._collection_cache = {} | |
| self.items = [] | |
| items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items | |
| try: | |
| initialpaths: List[Path] = [] | |
| initialpaths_with_parents: List[Path] = [] | |
| for arg in args: | |
| collection_argument = resolve_collection_argument( | |
| self.config.invocation_params.dir, | |
| arg, | |
| as_pypath=self.config.option.pyargs, | |
| ) | |
| self._initial_parts.append(collection_argument) | |
| initialpaths.append(collection_argument.path) | |
| initialpaths_with_parents.append(collection_argument.path) | |
| initialpaths_with_parents.extend(collection_argument.path.parents) | |
| self._initialpaths = frozenset(initialpaths) | |
| self._initialpaths_with_parents = frozenset(initialpaths_with_parents) | |
| rep = collect_one_node(self) | |
| self.ihook.pytest_collectreport(report=rep) | |
| self.trace.root.indent -= 1 | |
| if self._notfound: | |
| errors = [] | |
| for arg, collectors in self._notfound: | |
| if collectors: | |
| errors.append( | |
| f"not found: {arg}\n(no match in any of {collectors!r})" | |
| ) | |
| else: | |
| errors.append(f"found no collectors for {arg}") | |
| raise UsageError(*errors) | |
| if not genitems: | |
| items = rep.result | |
| else: | |
| if rep.passed: | |
| for node in rep.result: | |
| self.items.extend(self.genitems(node)) | |
| self.config.pluginmanager.check_pending() | |
| hook.pytest_collection_modifyitems( | |
| session=self, config=self.config, items=items | |
| ) | |
| finally: | |
| self._notfound = [] | |
| self._initial_parts = [] | |
| self._collection_cache = {} | |
| hook.pytest_collection_finish(session=self) | |
| if genitems: | |
| self.testscollected = len(items) | |
| return items | |
| def _collect_one_node( | |
| self, | |
| node: nodes.Collector, | |
| handle_dupes: bool = True, | |
| ) -> Tuple[CollectReport, bool]: | |
| if node in self._collection_cache and handle_dupes: | |
| rep = self._collection_cache[node] | |
| return rep, True | |
| else: | |
| rep = collect_one_node(node) | |
| self._collection_cache[node] = rep | |
| return rep, False | |
| def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: | |
| # This is a cache for the root directories of the initial paths. | |
| # We can't use collection_cache for Session because of its special | |
| # role as the bootstrapping collector. | |
| path_cache: Dict[Path, Sequence[nodes.Collector]] = {} | |
| pm = self.config.pluginmanager | |
| for collection_argument in self._initial_parts: | |
| self.trace("processing argument", collection_argument) | |
| self.trace.root.indent += 1 | |
| argpath = collection_argument.path | |
| names = collection_argument.parts | |
| module_name = collection_argument.module_name | |
| # resolve_collection_argument() ensures this. | |
| if argpath.is_dir(): | |
| assert not names, f"invalid arg {(argpath, names)!r}" | |
| paths = [argpath] | |
| # Add relevant parents of the path, from the root, e.g. | |
| # /a/b/c.py -> [/, /a, /a/b, /a/b/c.py] | |
| if module_name is None: | |
| # Paths outside of the confcutdir should not be considered. | |
| for path in argpath.parents: | |
| if not pm._is_in_confcutdir(path): | |
| break | |
| paths.insert(0, path) | |
| else: | |
| # For --pyargs arguments, only consider paths matching the module | |
| # name. Paths beyond the package hierarchy are not included. | |
| module_name_parts = module_name.split(".") | |
| for i, path in enumerate(argpath.parents, 2): | |
| if i > len(module_name_parts) or path.stem != module_name_parts[-i]: | |
| break | |
| paths.insert(0, path) | |
| # Start going over the parts from the root, collecting each level | |
| # and discarding all nodes which don't match the level's part. | |
| any_matched_in_initial_part = False | |
| notfound_collectors = [] | |
| work: List[ | |
| Tuple[Union[nodes.Collector, nodes.Item], List[Union[Path, str]]] | |
| ] = [(self, [*paths, *names])] | |
| while work: | |
| matchnode, matchparts = work.pop() | |
| # Pop'd all of the parts, this is a match. | |
| if not matchparts: | |
| yield matchnode | |
| any_matched_in_initial_part = True | |
| continue | |
| # Should have been matched by now, discard. | |
| if not isinstance(matchnode, nodes.Collector): | |
| continue | |
| # Collect this level of matching. | |
| # Collecting Session (self) is done directly to avoid endless | |
| # recursion to this function. | |
| subnodes: Sequence[Union[nodes.Collector, nodes.Item]] | |
| if isinstance(matchnode, Session): | |
| assert isinstance(matchparts[0], Path) | |
| subnodes = matchnode._collect_path(matchparts[0], path_cache) | |
| else: | |
| # For backward compat, files given directly multiple | |
| # times on the command line should not be deduplicated. | |
| handle_dupes = not ( | |
| len(matchparts) == 1 | |
| and isinstance(matchparts[0], Path) | |
| and matchparts[0].is_file() | |
| ) | |
| rep, duplicate = self._collect_one_node(matchnode, handle_dupes) | |
| if not duplicate and not rep.passed: | |
| # Report collection failures here to avoid failing to | |
| # run some test specified in the command line because | |
| # the module could not be imported (#134). | |
| matchnode.ihook.pytest_collectreport(report=rep) | |
| if not rep.passed: | |
| continue | |
| subnodes = rep.result | |
| # Prune this level. | |
| any_matched_in_collector = False | |
| for node in reversed(subnodes): | |
| # Path part e.g. `/a/b/` in `/a/b/test_file.py::TestIt::test_it`. | |
| if isinstance(matchparts[0], Path): | |
| is_match = node.path == matchparts[0] | |
| if sys.platform == "win32" and not is_match: | |
| # In case the file paths do not match, fallback to samefile() to | |
| # account for short-paths on Windows (#11895). | |
| same_file = os.path.samefile(node.path, matchparts[0]) | |
| # We don't want to match links to the current node, | |
| # otherwise we would match the same file more than once (#12039). | |
| is_match = same_file and ( | |
| os.path.islink(node.path) | |
| == os.path.islink(matchparts[0]) | |
| ) | |
| # Name part e.g. `TestIt` in `/a/b/test_file.py::TestIt::test_it`. | |
| else: | |
| # TODO: Remove parametrized workaround once collection structure contains | |
| # parametrization. | |
| is_match = ( | |
| node.name == matchparts[0] | |
| or node.name.split("[")[0] == matchparts[0] | |
| ) | |
| if is_match: | |
| work.append((node, matchparts[1:])) | |
| any_matched_in_collector = True | |
| if not any_matched_in_collector: | |
| notfound_collectors.append(matchnode) | |
| if not any_matched_in_initial_part: | |
| report_arg = "::".join((str(argpath), *names)) | |
| self._notfound.append((report_arg, notfound_collectors)) | |
| self.trace.root.indent -= 1 | |
| def genitems( | |
| self, node: Union[nodes.Item, nodes.Collector] | |
| ) -> Iterator[nodes.Item]: | |
| self.trace("genitems", node) | |
| if isinstance(node, nodes.Item): | |
| node.ihook.pytest_itemcollected(item=node) | |
| yield node | |
| else: | |
| assert isinstance(node, nodes.Collector) | |
| keepduplicates = self.config.getoption("keepduplicates") | |
| # For backward compat, dedup only applies to files. | |
| handle_dupes = not (keepduplicates and isinstance(node, nodes.File)) | |
| rep, duplicate = self._collect_one_node(node, handle_dupes) | |
| if duplicate and not keepduplicates: | |
| return | |
| if rep.passed: | |
| for subnode in rep.result: | |
| yield from self.genitems(subnode) | |
| if not duplicate: | |
| node.ihook.pytest_collectreport(report=rep) | |
| def search_pypath(module_name: str) -> Optional[str]: | |
| """Search sys.path for the given a dotted module name, and return its file | |
| system path if found.""" | |
| try: | |
| spec = importlib.util.find_spec(module_name) | |
| # AttributeError: looks like package module, but actually filename | |
| # ImportError: module does not exist | |
| # ValueError: not a module name | |
| except (AttributeError, ImportError, ValueError): | |
| return None | |
| if spec is None or spec.origin is None or spec.origin == "namespace": | |
| return None | |
| elif spec.submodule_search_locations: | |
| return os.path.dirname(spec.origin) | |
| else: | |
| return spec.origin | |
| class CollectionArgument: | |
| """A resolved collection argument.""" | |
| path: Path | |
| parts: Sequence[str] | |
| module_name: Optional[str] | |
| def resolve_collection_argument( | |
| invocation_path: Path, arg: str, *, as_pypath: bool = False | |
| ) -> CollectionArgument: | |
| """Parse path arguments optionally containing selection parts and return (fspath, names). | |
| Command-line arguments can point to files and/or directories, and optionally contain | |
| parts for specific tests selection, for example: | |
| "pkg/tests/test_foo.py::TestClass::test_foo" | |
| This function ensures the path exists, and returns a resolved `CollectionArgument`: | |
| CollectionArgument( | |
| path=Path("/full/path/to/pkg/tests/test_foo.py"), | |
| parts=["TestClass", "test_foo"], | |
| module_name=None, | |
| ) | |
| When as_pypath is True, expects that the command-line argument actually contains | |
| module paths instead of file-system paths: | |
| "pkg.tests.test_foo::TestClass::test_foo" | |
| In which case we search sys.path for a matching module, and then return the *path* to the | |
| found module, which may look like this: | |
| CollectionArgument( | |
| path=Path("/home/u/myvenv/lib/site-packages/pkg/tests/test_foo.py"), | |
| parts=["TestClass", "test_foo"], | |
| module_name="pkg.tests.test_foo", | |
| ) | |
| If the path doesn't exist, raise UsageError. | |
| If the path is a directory and selection parts are present, raise UsageError. | |
| """ | |
| base, squacket, rest = str(arg).partition("[") | |
| strpath, *parts = base.split("::") | |
| if parts: | |
| parts[-1] = f"{parts[-1]}{squacket}{rest}" | |
| module_name = None | |
| if as_pypath: | |
| pyarg_strpath = search_pypath(strpath) | |
| if pyarg_strpath is not None: | |
| module_name = strpath | |
| strpath = pyarg_strpath | |
| fspath = invocation_path / strpath | |
| fspath = absolutepath(fspath) | |
| if not safe_exists(fspath): | |
| msg = ( | |
| "module or package not found: {arg} (missing __init__.py?)" | |
| if as_pypath | |
| else "file or directory not found: {arg}" | |
| ) | |
| raise UsageError(msg.format(arg=arg)) | |
| if parts and fspath.is_dir(): | |
| msg = ( | |
| "package argument cannot contain :: selection parts: {arg}" | |
| if as_pypath | |
| else "directory argument cannot contain :: selection parts: {arg}" | |
| ) | |
| raise UsageError(msg.format(arg=arg)) | |
| return CollectionArgument( | |
| path=fspath, | |
| parts=parts, | |
| module_name=module_name, | |
| ) | |