Spaces:
Paused
Paused
| # mypy: allow-untyped-defs | |
| import abc | |
| from collections import defaultdict | |
| from collections import deque | |
| import dataclasses | |
| import functools | |
| import inspect | |
| import os | |
| from pathlib import Path | |
| import sys | |
| from typing import AbstractSet | |
| from typing import Any | |
| from typing import Callable | |
| from typing import cast | |
| from typing import Dict | |
| from typing import Final | |
| from typing import final | |
| from typing import Generator | |
| from typing import Generic | |
| from typing import Iterable | |
| from typing import Iterator | |
| from typing import List | |
| from typing import MutableMapping | |
| from typing import NoReturn | |
| from typing import Optional | |
| from typing import OrderedDict | |
| from typing import overload | |
| from typing import Sequence | |
| from typing import Set | |
| from typing import Tuple | |
| from typing import TYPE_CHECKING | |
| from typing import TypeVar | |
| from typing import Union | |
| import warnings | |
| import _pytest | |
| from _pytest import nodes | |
| from _pytest._code import getfslineno | |
| from _pytest._code import Source | |
| from _pytest._code.code import FormattedExcinfo | |
| from _pytest._code.code import TerminalRepr | |
| from _pytest._io import TerminalWriter | |
| from _pytest.compat import _PytestWrapper | |
| from _pytest.compat import assert_never | |
| from _pytest.compat import get_real_func | |
| from _pytest.compat import get_real_method | |
| from _pytest.compat import getfuncargnames | |
| from _pytest.compat import getimfunc | |
| from _pytest.compat import getlocation | |
| from _pytest.compat import is_generator | |
| from _pytest.compat import NOTSET | |
| from _pytest.compat import NotSetType | |
| from _pytest.compat import safe_getattr | |
| from _pytest.config import _PluggyPlugin | |
| from _pytest.config import Config | |
| from _pytest.config import ExitCode | |
| from _pytest.config.argparsing import Parser | |
| from _pytest.deprecated import check_ispytest | |
| from _pytest.deprecated import MARKED_FIXTURE | |
| from _pytest.deprecated import YIELD_FIXTURE | |
| from _pytest.mark import Mark | |
| from _pytest.mark import ParameterSet | |
| from _pytest.mark.structures import MarkDecorator | |
| from _pytest.outcomes import fail | |
| from _pytest.outcomes import skip | |
| from _pytest.outcomes import TEST_OUTCOME | |
| from _pytest.pathlib import absolutepath | |
| from _pytest.pathlib import bestrelpath | |
| from _pytest.scope import _ScopeName | |
| from _pytest.scope import HIGH_SCOPES | |
| from _pytest.scope import Scope | |
| if sys.version_info < (3, 11): | |
| from exceptiongroup import BaseExceptionGroup | |
| if TYPE_CHECKING: | |
| from _pytest.main import Session | |
| from _pytest.python import CallSpec2 | |
| from _pytest.python import Function | |
| from _pytest.python import Metafunc | |
| # The value of the fixture -- return/yield of the fixture function (type variable). | |
| FixtureValue = TypeVar("FixtureValue") | |
| # The type of the fixture function (type variable). | |
| FixtureFunction = TypeVar("FixtureFunction", bound=Callable[..., object]) | |
| # The type of a fixture function (type alias generic in fixture value). | |
| _FixtureFunc = Union[ | |
| Callable[..., FixtureValue], Callable[..., Generator[FixtureValue, None, None]] | |
| ] | |
| # The type of FixtureDef.cached_result (type alias generic in fixture value). | |
| _FixtureCachedResult = Union[ | |
| Tuple[ | |
| # The result. | |
| FixtureValue, | |
| # Cache key. | |
| object, | |
| None, | |
| ], | |
| Tuple[ | |
| None, | |
| # Cache key. | |
| object, | |
| # Exception if raised. | |
| BaseException, | |
| ], | |
| ] | |
| class PseudoFixtureDef(Generic[FixtureValue]): | |
| cached_result: "_FixtureCachedResult[FixtureValue]" | |
| _scope: Scope | |
| def pytest_sessionstart(session: "Session") -> None: | |
| session._fixturemanager = FixtureManager(session) | |
| def get_scope_package( | |
| node: nodes.Item, | |
| fixturedef: "FixtureDef[object]", | |
| ) -> Optional[nodes.Node]: | |
| from _pytest.python import Package | |
| for parent in node.iter_parents(): | |
| if isinstance(parent, Package) and parent.nodeid == fixturedef.baseid: | |
| return parent | |
| return node.session | |
| def get_scope_node(node: nodes.Node, scope: Scope) -> Optional[nodes.Node]: | |
| import _pytest.python | |
| if scope is Scope.Function: | |
| # Type ignored because this is actually safe, see: | |
| # https://github.com/python/mypy/issues/4717 | |
| return node.getparent(nodes.Item) # type: ignore[type-abstract] | |
| elif scope is Scope.Class: | |
| return node.getparent(_pytest.python.Class) | |
| elif scope is Scope.Module: | |
| return node.getparent(_pytest.python.Module) | |
| elif scope is Scope.Package: | |
| return node.getparent(_pytest.python.Package) | |
| elif scope is Scope.Session: | |
| return node.getparent(_pytest.main.Session) | |
| else: | |
| assert_never(scope) | |
| def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]: | |
| """Return fixturemarker or None if it doesn't exist or raised | |
| exceptions.""" | |
| return cast( | |
| Optional[FixtureFunctionMarker], | |
| safe_getattr(obj, "_pytestfixturefunction", None), | |
| ) | |
| class FixtureArgKey: | |
| argname: str | |
| param_index: int | |
| scoped_item_path: Optional[Path] | |
| item_cls: Optional[type] | |
| def get_parametrized_fixture_keys( | |
| item: nodes.Item, scope: Scope | |
| ) -> Iterator[FixtureArgKey]: | |
| """Return list of keys for all parametrized arguments which match | |
| the specified scope.""" | |
| assert scope is not Scope.Function | |
| try: | |
| callspec: CallSpec2 = item.callspec # type: ignore[attr-defined] | |
| except AttributeError: | |
| return | |
| for argname in callspec.indices: | |
| if callspec._arg2scope[argname] != scope: | |
| continue | |
| item_cls = None | |
| if scope is Scope.Session: | |
| scoped_item_path = None | |
| elif scope is Scope.Package: | |
| scoped_item_path = item.path | |
| elif scope is Scope.Module: | |
| scoped_item_path = item.path | |
| elif scope is Scope.Class: | |
| scoped_item_path = item.path | |
| item_cls = item.cls # type: ignore[attr-defined] | |
| else: | |
| assert_never(scope) | |
| param_index = callspec.indices[argname] | |
| yield FixtureArgKey(argname, param_index, scoped_item_path, item_cls) | |
| # Algorithm for sorting on a per-parametrized resource setup basis. | |
| # It is called for Session scope first and performs sorting | |
| # down to the lower scopes such as to minimize number of "high scope" | |
| # setups and teardowns. | |
| def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]: | |
| argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]] = {} | |
| items_by_argkey: Dict[ | |
| Scope, Dict[FixtureArgKey, OrderedDict[nodes.Item, None]] | |
| ] = {} | |
| for scope in HIGH_SCOPES: | |
| scoped_argkeys_cache = argkeys_cache[scope] = {} | |
| scoped_items_by_argkey = items_by_argkey[scope] = defaultdict(OrderedDict) | |
| for item in items: | |
| keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None) | |
| if keys: | |
| scoped_argkeys_cache[item] = keys | |
| for key in keys: | |
| scoped_items_by_argkey[key][item] = None | |
| items_dict = dict.fromkeys(items, None) | |
| return list( | |
| reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session) | |
| ) | |
| def fix_cache_order( | |
| item: nodes.Item, | |
| argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]], | |
| items_by_argkey: Dict[Scope, Dict[FixtureArgKey, OrderedDict[nodes.Item, None]]], | |
| ) -> None: | |
| for scope in HIGH_SCOPES: | |
| scoped_items_by_argkey = items_by_argkey[scope] | |
| for key in argkeys_cache[scope].get(item, []): | |
| scoped_items_by_argkey[key][item] = None | |
| scoped_items_by_argkey[key].move_to_end(item, last=False) | |
| def reorder_items_atscope( | |
| items: Dict[nodes.Item, None], | |
| argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]], | |
| items_by_argkey: Dict[Scope, Dict[FixtureArgKey, OrderedDict[nodes.Item, None]]], | |
| scope: Scope, | |
| ) -> Dict[nodes.Item, None]: | |
| if scope is Scope.Function or len(items) < 3: | |
| return items | |
| ignore: Set[Optional[FixtureArgKey]] = set() | |
| items_deque = deque(items) | |
| items_done: Dict[nodes.Item, None] = {} | |
| scoped_items_by_argkey = items_by_argkey[scope] | |
| scoped_argkeys_cache = argkeys_cache[scope] | |
| while items_deque: | |
| no_argkey_group: Dict[nodes.Item, None] = {} | |
| slicing_argkey = None | |
| while items_deque: | |
| item = items_deque.popleft() | |
| if item in items_done or item in no_argkey_group: | |
| continue | |
| argkeys = dict.fromkeys( | |
| (k for k in scoped_argkeys_cache.get(item, []) if k not in ignore), None | |
| ) | |
| if not argkeys: | |
| no_argkey_group[item] = None | |
| else: | |
| slicing_argkey, _ = argkeys.popitem() | |
| # We don't have to remove relevant items from later in the | |
| # deque because they'll just be ignored. | |
| matching_items = [ | |
| i for i in scoped_items_by_argkey[slicing_argkey] if i in items | |
| ] | |
| for i in reversed(matching_items): | |
| fix_cache_order(i, argkeys_cache, items_by_argkey) | |
| items_deque.appendleft(i) | |
| break | |
| if no_argkey_group: | |
| no_argkey_group = reorder_items_atscope( | |
| no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower() | |
| ) | |
| for item in no_argkey_group: | |
| items_done[item] = None | |
| ignore.add(slicing_argkey) | |
| return items_done | |
| class FuncFixtureInfo: | |
| """Fixture-related information for a fixture-requesting item (e.g. test | |
| function). | |
| This is used to examine the fixtures which an item requests statically | |
| (known during collection). This includes autouse fixtures, fixtures | |
| requested by the `usefixtures` marker, fixtures requested in the function | |
| parameters, and the transitive closure of these. | |
| An item may also request fixtures dynamically (using `request.getfixturevalue`); | |
| these are not reflected here. | |
| """ | |
| __slots__ = ("argnames", "initialnames", "names_closure", "name2fixturedefs") | |
| # Fixture names that the item requests directly by function parameters. | |
| argnames: Tuple[str, ...] | |
| # Fixture names that the item immediately requires. These include | |
| # argnames + fixture names specified via usefixtures and via autouse=True in | |
| # fixture definitions. | |
| initialnames: Tuple[str, ...] | |
| # The transitive closure of the fixture names that the item requires. | |
| # Note: can't include dynamic dependencies (`request.getfixturevalue` calls). | |
| names_closure: List[str] | |
| # A map from a fixture name in the transitive closure to the FixtureDefs | |
| # matching the name which are applicable to this function. | |
| # There may be multiple overriding fixtures with the same name. The | |
| # sequence is ordered from furthest to closes to the function. | |
| name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]] | |
| def prune_dependency_tree(self) -> None: | |
| """Recompute names_closure from initialnames and name2fixturedefs. | |
| Can only reduce names_closure, which means that the new closure will | |
| always be a subset of the old one. The order is preserved. | |
| This method is needed because direct parametrization may shadow some | |
| of the fixtures that were included in the originally built dependency | |
| tree. In this way the dependency tree can get pruned, and the closure | |
| of argnames may get reduced. | |
| """ | |
| closure: Set[str] = set() | |
| working_set = set(self.initialnames) | |
| while working_set: | |
| argname = working_set.pop() | |
| # Argname may be something not included in the original names_closure, | |
| # in which case we ignore it. This currently happens with pseudo | |
| # FixtureDefs which wrap 'get_direct_param_fixture_func(request)'. | |
| # So they introduce the new dependency 'request' which might have | |
| # been missing in the original tree (closure). | |
| if argname not in closure and argname in self.names_closure: | |
| closure.add(argname) | |
| if argname in self.name2fixturedefs: | |
| working_set.update(self.name2fixturedefs[argname][-1].argnames) | |
| self.names_closure[:] = sorted(closure, key=self.names_closure.index) | |
| class FixtureRequest(abc.ABC): | |
| """The type of the ``request`` fixture. | |
| A request object gives access to the requesting test context and has a | |
| ``param`` attribute in case the fixture is parametrized. | |
| """ | |
| def __init__( | |
| self, | |
| pyfuncitem: "Function", | |
| fixturename: Optional[str], | |
| arg2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]], | |
| fixture_defs: Dict[str, "FixtureDef[Any]"], | |
| *, | |
| _ispytest: bool = False, | |
| ) -> None: | |
| check_ispytest(_ispytest) | |
| #: Fixture for which this request is being performed. | |
| self.fixturename: Final = fixturename | |
| self._pyfuncitem: Final = pyfuncitem | |
| # The FixtureDefs for each fixture name requested by this item. | |
| # Starts from the statically-known fixturedefs resolved during | |
| # collection. Dynamically requested fixtures (using | |
| # `request.getfixturevalue("foo")`) are added dynamically. | |
| self._arg2fixturedefs: Final = arg2fixturedefs | |
| # The evaluated argnames so far, mapping to the FixtureDef they resolved | |
| # to. | |
| self._fixture_defs: Final = fixture_defs | |
| # Notes on the type of `param`: | |
| # -`request.param` is only defined in parametrized fixtures, and will raise | |
| # AttributeError otherwise. Python typing has no notion of "undefined", so | |
| # this cannot be reflected in the type. | |
| # - Technically `param` is only (possibly) defined on SubRequest, not | |
| # FixtureRequest, but the typing of that is still in flux so this cheats. | |
| # - In the future we might consider using a generic for the param type, but | |
| # for now just using Any. | |
| self.param: Any | |
| def _fixturemanager(self) -> "FixtureManager": | |
| return self._pyfuncitem.session._fixturemanager | |
| def _scope(self) -> Scope: | |
| raise NotImplementedError() | |
| def scope(self) -> _ScopeName: | |
| """Scope string, one of "function", "class", "module", "package", "session".""" | |
| return self._scope.value | |
| def _check_scope( | |
| self, | |
| requested_fixturedef: Union["FixtureDef[object]", PseudoFixtureDef[object]], | |
| requested_scope: Scope, | |
| ) -> None: | |
| raise NotImplementedError() | |
| def fixturenames(self) -> List[str]: | |
| """Names of all active fixtures in this request.""" | |
| result = list(self._pyfuncitem.fixturenames) | |
| result.extend(set(self._fixture_defs).difference(result)) | |
| return result | |
| def node(self): | |
| """Underlying collection node (depends on current request scope).""" | |
| raise NotImplementedError() | |
| def config(self) -> Config: | |
| """The pytest config object associated with this request.""" | |
| return self._pyfuncitem.config | |
| def function(self): | |
| """Test function object if the request has a per-function scope.""" | |
| if self.scope != "function": | |
| raise AttributeError( | |
| f"function not available in {self.scope}-scoped context" | |
| ) | |
| return self._pyfuncitem.obj | |
| def cls(self): | |
| """Class (can be None) where the test function was collected.""" | |
| if self.scope not in ("class", "function"): | |
| raise AttributeError(f"cls not available in {self.scope}-scoped context") | |
| clscol = self._pyfuncitem.getparent(_pytest.python.Class) | |
| if clscol: | |
| return clscol.obj | |
| def instance(self): | |
| """Instance (can be None) on which test function was collected.""" | |
| if self.scope != "function": | |
| return None | |
| return getattr(self._pyfuncitem, "instance", None) | |
| def module(self): | |
| """Python module object where the test function was collected.""" | |
| if self.scope not in ("function", "class", "module"): | |
| raise AttributeError(f"module not available in {self.scope}-scoped context") | |
| mod = self._pyfuncitem.getparent(_pytest.python.Module) | |
| assert mod is not None | |
| return mod.obj | |
| def path(self) -> Path: | |
| """Path where the test function was collected.""" | |
| if self.scope not in ("function", "class", "module", "package"): | |
| raise AttributeError(f"path not available in {self.scope}-scoped context") | |
| return self._pyfuncitem.path | |
| def keywords(self) -> MutableMapping[str, Any]: | |
| """Keywords/markers dictionary for the underlying node.""" | |
| node: nodes.Node = self.node | |
| return node.keywords | |
| def session(self) -> "Session": | |
| """Pytest session object.""" | |
| return self._pyfuncitem.session | |
| def addfinalizer(self, finalizer: Callable[[], object]) -> None: | |
| """Add finalizer/teardown function to be called without arguments after | |
| the last test within the requesting test context finished execution.""" | |
| raise NotImplementedError() | |
| def applymarker(self, marker: Union[str, MarkDecorator]) -> None: | |
| """Apply a marker to a single test function invocation. | |
| This method is useful if you don't want to have a keyword/marker | |
| on all function invocations. | |
| :param marker: | |
| An object created by a call to ``pytest.mark.NAME(...)``. | |
| """ | |
| self.node.add_marker(marker) | |
| def raiseerror(self, msg: Optional[str]) -> NoReturn: | |
| """Raise a FixtureLookupError exception. | |
| :param msg: | |
| An optional custom error message. | |
| """ | |
| raise FixtureLookupError(None, self, msg) | |
| def getfixturevalue(self, argname: str) -> Any: | |
| """Dynamically run a named fixture function. | |
| Declaring fixtures via function argument is recommended where possible. | |
| But if you can only decide whether to use another fixture at test | |
| setup time, you may use this function to retrieve it inside a fixture | |
| or test function body. | |
| This method can be used during the test setup phase or the test run | |
| phase, but during the test teardown phase a fixture's value may not | |
| be available. | |
| :param argname: | |
| The fixture name. | |
| :raises pytest.FixtureLookupError: | |
| If the given fixture could not be found. | |
| """ | |
| # Note that in addition to the use case described in the docstring, | |
| # getfixturevalue() is also called by pytest itself during item and fixture | |
| # setup to evaluate the fixtures that are requested statically | |
| # (using function parameters, autouse, etc). | |
| fixturedef = self._get_active_fixturedef(argname) | |
| assert fixturedef.cached_result is not None, ( | |
| f'The fixture value for "{argname}" is not available. ' | |
| "This can happen when the fixture has already been torn down." | |
| ) | |
| return fixturedef.cached_result[0] | |
| def _iter_chain(self) -> Iterator["SubRequest"]: | |
| """Yield all SubRequests in the chain, from self up. | |
| Note: does *not* yield the TopRequest. | |
| """ | |
| current = self | |
| while isinstance(current, SubRequest): | |
| yield current | |
| current = current._parent_request | |
| def _get_active_fixturedef( | |
| self, argname: str | |
| ) -> Union["FixtureDef[object]", PseudoFixtureDef[object]]: | |
| if argname == "request": | |
| cached_result = (self, [0], None) | |
| return PseudoFixtureDef(cached_result, Scope.Function) | |
| # If we already finished computing a fixture by this name in this item, | |
| # return it. | |
| fixturedef = self._fixture_defs.get(argname) | |
| if fixturedef is not None: | |
| self._check_scope(fixturedef, fixturedef._scope) | |
| return fixturedef | |
| # Find the appropriate fixturedef. | |
| fixturedefs = self._arg2fixturedefs.get(argname, None) | |
| if fixturedefs is None: | |
| # We arrive here because of a dynamic call to | |
| # getfixturevalue(argname) which was naturally | |
| # not known at parsing/collection time. | |
| fixturedefs = self._fixturemanager.getfixturedefs(argname, self._pyfuncitem) | |
| if fixturedefs is not None: | |
| self._arg2fixturedefs[argname] = fixturedefs | |
| # No fixtures defined with this name. | |
| if fixturedefs is None: | |
| raise FixtureLookupError(argname, self) | |
| # The are no fixtures with this name applicable for the function. | |
| if not fixturedefs: | |
| raise FixtureLookupError(argname, self) | |
| # A fixture may override another fixture with the same name, e.g. a | |
| # fixture in a module can override a fixture in a conftest, a fixture in | |
| # a class can override a fixture in the module, and so on. | |
| # An overriding fixture can request its own name (possibly indirectly); | |
| # in this case it gets the value of the fixture it overrides, one level | |
| # up. | |
| # Check how many `argname`s deep we are, and take the next one. | |
| # `fixturedefs` is sorted from furthest to closest, so use negative | |
| # indexing to go in reverse. | |
| index = -1 | |
| for request in self._iter_chain(): | |
| if request.fixturename == argname: | |
| index -= 1 | |
| # If already consumed all of the available levels, fail. | |
| if -index > len(fixturedefs): | |
| raise FixtureLookupError(argname, self) | |
| fixturedef = fixturedefs[index] | |
| # Prepare a SubRequest object for calling the fixture. | |
| try: | |
| callspec = self._pyfuncitem.callspec | |
| except AttributeError: | |
| callspec = None | |
| if callspec is not None and argname in callspec.params: | |
| param = callspec.params[argname] | |
| param_index = callspec.indices[argname] | |
| # The parametrize invocation scope overrides the fixture's scope. | |
| scope = callspec._arg2scope[argname] | |
| else: | |
| param = NOTSET | |
| param_index = 0 | |
| scope = fixturedef._scope | |
| self._check_fixturedef_without_param(fixturedef) | |
| self._check_scope(fixturedef, scope) | |
| subrequest = SubRequest( | |
| self, scope, param, param_index, fixturedef, _ispytest=True | |
| ) | |
| # Make sure the fixture value is cached, running it if it isn't | |
| fixturedef.execute(request=subrequest) | |
| self._fixture_defs[argname] = fixturedef | |
| return fixturedef | |
| def _check_fixturedef_without_param(self, fixturedef: "FixtureDef[object]") -> None: | |
| """Check that this request is allowed to execute this fixturedef without | |
| a param.""" | |
| funcitem = self._pyfuncitem | |
| has_params = fixturedef.params is not None | |
| fixtures_not_supported = getattr(funcitem, "nofuncargs", False) | |
| if has_params and fixtures_not_supported: | |
| msg = ( | |
| f"{funcitem.name} does not support fixtures, maybe unittest.TestCase subclass?\n" | |
| f"Node id: {funcitem.nodeid}\n" | |
| f"Function type: {type(funcitem).__name__}" | |
| ) | |
| fail(msg, pytrace=False) | |
| if has_params: | |
| frame = inspect.stack()[3] | |
| frameinfo = inspect.getframeinfo(frame[0]) | |
| source_path = absolutepath(frameinfo.filename) | |
| source_lineno = frameinfo.lineno | |
| try: | |
| source_path_str = str(source_path.relative_to(funcitem.config.rootpath)) | |
| except ValueError: | |
| source_path_str = str(source_path) | |
| location = getlocation(fixturedef.func, funcitem.config.rootpath) | |
| msg = ( | |
| "The requested fixture has no parameter defined for test:\n" | |
| f" {funcitem.nodeid}\n\n" | |
| f"Requested fixture '{fixturedef.argname}' defined in:\n" | |
| f"{location}\n\n" | |
| f"Requested here:\n" | |
| f"{source_path_str}:{source_lineno}" | |
| ) | |
| fail(msg, pytrace=False) | |
| def _get_fixturestack(self) -> List["FixtureDef[Any]"]: | |
| values = [request._fixturedef for request in self._iter_chain()] | |
| values.reverse() | |
| return values | |
| class TopRequest(FixtureRequest): | |
| """The type of the ``request`` fixture in a test function.""" | |
| def __init__(self, pyfuncitem: "Function", *, _ispytest: bool = False) -> None: | |
| super().__init__( | |
| fixturename=None, | |
| pyfuncitem=pyfuncitem, | |
| arg2fixturedefs=pyfuncitem._fixtureinfo.name2fixturedefs.copy(), | |
| fixture_defs={}, | |
| _ispytest=_ispytest, | |
| ) | |
| def _scope(self) -> Scope: | |
| return Scope.Function | |
| def _check_scope( | |
| self, | |
| requested_fixturedef: Union["FixtureDef[object]", PseudoFixtureDef[object]], | |
| requested_scope: Scope, | |
| ) -> None: | |
| # TopRequest always has function scope so always valid. | |
| pass | |
| def node(self): | |
| return self._pyfuncitem | |
| def __repr__(self) -> str: | |
| return "<FixtureRequest for %r>" % (self.node) | |
| def _fillfixtures(self) -> None: | |
| item = self._pyfuncitem | |
| for argname in item.fixturenames: | |
| if argname not in item.funcargs: | |
| item.funcargs[argname] = self.getfixturevalue(argname) | |
| def addfinalizer(self, finalizer: Callable[[], object]) -> None: | |
| self.node.addfinalizer(finalizer) | |
| class SubRequest(FixtureRequest): | |
| """The type of the ``request`` fixture in a fixture function requested | |
| (transitively) by a test function.""" | |
| def __init__( | |
| self, | |
| request: FixtureRequest, | |
| scope: Scope, | |
| param: Any, | |
| param_index: int, | |
| fixturedef: "FixtureDef[object]", | |
| *, | |
| _ispytest: bool = False, | |
| ) -> None: | |
| super().__init__( | |
| pyfuncitem=request._pyfuncitem, | |
| fixturename=fixturedef.argname, | |
| fixture_defs=request._fixture_defs, | |
| arg2fixturedefs=request._arg2fixturedefs, | |
| _ispytest=_ispytest, | |
| ) | |
| self._parent_request: Final[FixtureRequest] = request | |
| self._scope_field: Final = scope | |
| self._fixturedef: Final[FixtureDef[object]] = fixturedef | |
| if param is not NOTSET: | |
| self.param = param | |
| self.param_index: Final = param_index | |
| def __repr__(self) -> str: | |
| return f"<SubRequest {self.fixturename!r} for {self._pyfuncitem!r}>" | |
| def _scope(self) -> Scope: | |
| return self._scope_field | |
| def node(self): | |
| scope = self._scope | |
| if scope is Scope.Function: | |
| # This might also be a non-function Item despite its attribute name. | |
| node: Optional[nodes.Node] = self._pyfuncitem | |
| elif scope is Scope.Package: | |
| node = get_scope_package(self._pyfuncitem, self._fixturedef) | |
| else: | |
| node = get_scope_node(self._pyfuncitem, scope) | |
| if node is None and scope is Scope.Class: | |
| # Fallback to function item itself. | |
| node = self._pyfuncitem | |
| assert node, f'Could not obtain a node for scope "{scope}" for function {self._pyfuncitem!r}' | |
| return node | |
| def _check_scope( | |
| self, | |
| requested_fixturedef: Union["FixtureDef[object]", PseudoFixtureDef[object]], | |
| requested_scope: Scope, | |
| ) -> None: | |
| if isinstance(requested_fixturedef, PseudoFixtureDef): | |
| return | |
| if self._scope > requested_scope: | |
| # Try to report something helpful. | |
| argname = requested_fixturedef.argname | |
| fixture_stack = "\n".join( | |
| self._format_fixturedef_line(fixturedef) | |
| for fixturedef in self._get_fixturestack() | |
| ) | |
| requested_fixture = self._format_fixturedef_line(requested_fixturedef) | |
| fail( | |
| f"ScopeMismatch: You tried to access the {requested_scope.value} scoped " | |
| f"fixture {argname} with a {self._scope.value} scoped request object. " | |
| f"Requesting fixture stack:\n{fixture_stack}\n" | |
| f"Requested fixture:\n{requested_fixture}", | |
| pytrace=False, | |
| ) | |
| def _format_fixturedef_line(self, fixturedef: "FixtureDef[object]") -> str: | |
| factory = fixturedef.func | |
| path, lineno = getfslineno(factory) | |
| if isinstance(path, Path): | |
| path = bestrelpath(self._pyfuncitem.session.path, path) | |
| signature = inspect.signature(factory) | |
| return f"{path}:{lineno + 1}: def {factory.__name__}{signature}" | |
| def addfinalizer(self, finalizer: Callable[[], object]) -> None: | |
| self._fixturedef.addfinalizer(finalizer) | |
| class FixtureLookupError(LookupError): | |
| """Could not return a requested fixture (missing or invalid).""" | |
| def __init__( | |
| self, argname: Optional[str], request: FixtureRequest, msg: Optional[str] = None | |
| ) -> None: | |
| self.argname = argname | |
| self.request = request | |
| self.fixturestack = request._get_fixturestack() | |
| self.msg = msg | |
| def formatrepr(self) -> "FixtureLookupErrorRepr": | |
| tblines: List[str] = [] | |
| addline = tblines.append | |
| stack = [self.request._pyfuncitem.obj] | |
| stack.extend(map(lambda x: x.func, self.fixturestack)) | |
| msg = self.msg | |
| if msg is not None: | |
| # The last fixture raise an error, let's present | |
| # it at the requesting side. | |
| stack = stack[:-1] | |
| for function in stack: | |
| fspath, lineno = getfslineno(function) | |
| try: | |
| lines, _ = inspect.getsourcelines(get_real_func(function)) | |
| except (OSError, IndexError, TypeError): | |
| error_msg = "file %s, line %s: source code not available" | |
| addline(error_msg % (fspath, lineno + 1)) | |
| else: | |
| addline(f"file {fspath}, line {lineno + 1}") | |
| for i, line in enumerate(lines): | |
| line = line.rstrip() | |
| addline(" " + line) | |
| if line.lstrip().startswith("def"): | |
| break | |
| if msg is None: | |
| fm = self.request._fixturemanager | |
| available = set() | |
| parent = self.request._pyfuncitem.parent | |
| assert parent is not None | |
| for name, fixturedefs in fm._arg2fixturedefs.items(): | |
| faclist = list(fm._matchfactories(fixturedefs, parent)) | |
| if faclist: | |
| available.add(name) | |
| if self.argname in available: | |
| msg = ( | |
| f" recursive dependency involving fixture '{self.argname}' detected" | |
| ) | |
| else: | |
| msg = f"fixture '{self.argname}' not found" | |
| msg += "\n available fixtures: {}".format(", ".join(sorted(available))) | |
| msg += "\n use 'pytest --fixtures [testpath]' for help on them." | |
| return FixtureLookupErrorRepr(fspath, lineno, tblines, msg, self.argname) | |
| class FixtureLookupErrorRepr(TerminalRepr): | |
| def __init__( | |
| self, | |
| filename: Union[str, "os.PathLike[str]"], | |
| firstlineno: int, | |
| tblines: Sequence[str], | |
| errorstring: str, | |
| argname: Optional[str], | |
| ) -> None: | |
| self.tblines = tblines | |
| self.errorstring = errorstring | |
| self.filename = filename | |
| self.firstlineno = firstlineno | |
| self.argname = argname | |
| def toterminal(self, tw: TerminalWriter) -> None: | |
| # tw.line("FixtureLookupError: %s" %(self.argname), red=True) | |
| for tbline in self.tblines: | |
| tw.line(tbline.rstrip()) | |
| lines = self.errorstring.split("\n") | |
| if lines: | |
| tw.line( | |
| f"{FormattedExcinfo.fail_marker} {lines[0].strip()}", | |
| red=True, | |
| ) | |
| for line in lines[1:]: | |
| tw.line( | |
| f"{FormattedExcinfo.flow_marker} {line.strip()}", | |
| red=True, | |
| ) | |
| tw.line() | |
| tw.line("%s:%d" % (os.fspath(self.filename), self.firstlineno + 1)) | |
| def call_fixture_func( | |
| fixturefunc: "_FixtureFunc[FixtureValue]", request: FixtureRequest, kwargs | |
| ) -> FixtureValue: | |
| if is_generator(fixturefunc): | |
| fixturefunc = cast( | |
| Callable[..., Generator[FixtureValue, None, None]], fixturefunc | |
| ) | |
| generator = fixturefunc(**kwargs) | |
| try: | |
| fixture_result = next(generator) | |
| except StopIteration: | |
| raise ValueError(f"{request.fixturename} did not yield a value") from None | |
| finalizer = functools.partial(_teardown_yield_fixture, fixturefunc, generator) | |
| request.addfinalizer(finalizer) | |
| else: | |
| fixturefunc = cast(Callable[..., FixtureValue], fixturefunc) | |
| fixture_result = fixturefunc(**kwargs) | |
| return fixture_result | |
| def _teardown_yield_fixture(fixturefunc, it) -> None: | |
| """Execute the teardown of a fixture function by advancing the iterator | |
| after the yield and ensure the iteration ends (if not it means there is | |
| more than one yield in the function).""" | |
| try: | |
| next(it) | |
| except StopIteration: | |
| pass | |
| else: | |
| fs, lineno = getfslineno(fixturefunc) | |
| fail( | |
| f"fixture function has more than one 'yield':\n\n" | |
| f"{Source(fixturefunc).indent()}\n" | |
| f"{fs}:{lineno + 1}", | |
| pytrace=False, | |
| ) | |
| def _eval_scope_callable( | |
| scope_callable: Callable[[str, Config], _ScopeName], | |
| fixture_name: str, | |
| config: Config, | |
| ) -> _ScopeName: | |
| try: | |
| # Type ignored because there is no typing mechanism to specify | |
| # keyword arguments, currently. | |
| result = scope_callable(fixture_name=fixture_name, config=config) # type: ignore[call-arg] | |
| except Exception as e: | |
| raise TypeError( | |
| f"Error evaluating {scope_callable} while defining fixture '{fixture_name}'.\n" | |
| "Expected a function with the signature (*, fixture_name, config)" | |
| ) from e | |
| if not isinstance(result, str): | |
| fail( | |
| f"Expected {scope_callable} to return a 'str' while defining fixture '{fixture_name}', but it returned:\n" | |
| f"{result!r}", | |
| pytrace=False, | |
| ) | |
| return result | |
| class FixtureDef(Generic[FixtureValue]): | |
| """A container for a fixture definition. | |
| Note: At this time, only explicitly documented fields and methods are | |
| considered public stable API. | |
| """ | |
| def __init__( | |
| self, | |
| config: Config, | |
| baseid: Optional[str], | |
| argname: str, | |
| func: "_FixtureFunc[FixtureValue]", | |
| scope: Union[Scope, _ScopeName, Callable[[str, Config], _ScopeName], None], | |
| params: Optional[Sequence[object]], | |
| ids: Optional[ | |
| Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]] | |
| ] = None, | |
| *, | |
| _ispytest: bool = False, | |
| ) -> None: | |
| check_ispytest(_ispytest) | |
| # The "base" node ID for the fixture. | |
| # | |
| # This is a node ID prefix. A fixture is only available to a node (e.g. | |
| # a `Function` item) if the fixture's baseid is a nodeid of a parent of | |
| # node. | |
| # | |
| # For a fixture found in a Collector's object (e.g. a `Module`s module, | |
| # a `Class`'s class), the baseid is the Collector's nodeid. | |
| # | |
| # For a fixture found in a conftest plugin, the baseid is the conftest's | |
| # directory path relative to the rootdir. | |
| # | |
| # For other plugins, the baseid is the empty string (always matches). | |
| self.baseid: Final = baseid or "" | |
| # Whether the fixture was found from a node or a conftest in the | |
| # collection tree. Will be false for fixtures defined in non-conftest | |
| # plugins. | |
| self.has_location: Final = baseid is not None | |
| # The fixture factory function. | |
| self.func: Final = func | |
| # The name by which the fixture may be requested. | |
| self.argname: Final = argname | |
| if scope is None: | |
| scope = Scope.Function | |
| elif callable(scope): | |
| scope = _eval_scope_callable(scope, argname, config) | |
| if isinstance(scope, str): | |
| scope = Scope.from_user( | |
| scope, descr=f"Fixture '{func.__name__}'", where=baseid | |
| ) | |
| self._scope: Final = scope | |
| # If the fixture is directly parametrized, the parameter values. | |
| self.params: Final = params | |
| # If the fixture is directly parametrized, a tuple of explicit IDs to | |
| # assign to the parameter values, or a callable to generate an ID given | |
| # a parameter value. | |
| self.ids: Final = ids | |
| # The names requested by the fixtures. | |
| self.argnames: Final = getfuncargnames(func, name=argname) | |
| # If the fixture was executed, the current value of the fixture. | |
| # Can change if the fixture is executed with different parameters. | |
| self.cached_result: Optional[_FixtureCachedResult[FixtureValue]] = None | |
| self._finalizers: Final[List[Callable[[], object]]] = [] | |
| def scope(self) -> _ScopeName: | |
| """Scope string, one of "function", "class", "module", "package", "session".""" | |
| return self._scope.value | |
| def addfinalizer(self, finalizer: Callable[[], object]) -> None: | |
| self._finalizers.append(finalizer) | |
| def finish(self, request: SubRequest) -> None: | |
| exceptions: List[BaseException] = [] | |
| while self._finalizers: | |
| fin = self._finalizers.pop() | |
| try: | |
| fin() | |
| except BaseException as e: | |
| exceptions.append(e) | |
| node = request.node | |
| node.ihook.pytest_fixture_post_finalizer(fixturedef=self, request=request) | |
| # Even if finalization fails, we invalidate the cached fixture | |
| # value and remove all finalizers because they may be bound methods | |
| # which will keep instances alive. | |
| self.cached_result = None | |
| self._finalizers.clear() | |
| if len(exceptions) == 1: | |
| raise exceptions[0] | |
| elif len(exceptions) > 1: | |
| msg = f'errors while tearing down fixture "{self.argname}" of {node}' | |
| raise BaseExceptionGroup(msg, exceptions[::-1]) | |
| def execute(self, request: SubRequest) -> FixtureValue: | |
| """Return the value of this fixture, executing it if not cached.""" | |
| # Ensure that the dependent fixtures requested by this fixture are loaded. | |
| # This needs to be done before checking if we have a cached value, since | |
| # if a dependent fixture has their cache invalidated, e.g. due to | |
| # parametrization, they finalize themselves and fixtures depending on it | |
| # (which will likely include this fixture) setting `self.cached_result = None`. | |
| # See #4871 | |
| requested_fixtures_that_should_finalize_us = [] | |
| for argname in self.argnames: | |
| fixturedef = request._get_active_fixturedef(argname) | |
| # Saves requested fixtures in a list so we later can add our finalizer | |
| # to them, ensuring that if a requested fixture gets torn down we get torn | |
| # down first. This is generally handled by SetupState, but still currently | |
| # needed when this fixture is not parametrized but depends on a parametrized | |
| # fixture. | |
| if not isinstance(fixturedef, PseudoFixtureDef): | |
| requested_fixtures_that_should_finalize_us.append(fixturedef) | |
| # Check for (and return) cached value/exception. | |
| my_cache_key = self.cache_key(request) | |
| if self.cached_result is not None: | |
| cache_key = self.cached_result[1] | |
| # note: comparison with `==` can fail (or be expensive) for e.g. | |
| # numpy arrays (#6497). | |
| if my_cache_key is cache_key: | |
| if self.cached_result[2] is not None: | |
| exc = self.cached_result[2] | |
| raise exc | |
| else: | |
| result = self.cached_result[0] | |
| return result | |
| # We have a previous but differently parametrized fixture instance | |
| # so we need to tear it down before creating a new one. | |
| self.finish(request) | |
| assert self.cached_result is None | |
| # Add finalizer to requested fixtures we saved previously. | |
| # We make sure to do this after checking for cached value to avoid | |
| # adding our finalizer multiple times. (#12135) | |
| finalizer = functools.partial(self.finish, request=request) | |
| for parent_fixture in requested_fixtures_that_should_finalize_us: | |
| parent_fixture.addfinalizer(finalizer) | |
| ihook = request.node.ihook | |
| try: | |
| # Setup the fixture, run the code in it, and cache the value | |
| # in self.cached_result | |
| result = ihook.pytest_fixture_setup(fixturedef=self, request=request) | |
| finally: | |
| # schedule our finalizer, even if the setup failed | |
| request.node.addfinalizer(finalizer) | |
| return result | |
| def cache_key(self, request: SubRequest) -> object: | |
| return getattr(request, "param", None) | |
| def __repr__(self) -> str: | |
| return f"<FixtureDef argname={self.argname!r} scope={self.scope!r} baseid={self.baseid!r}>" | |
| def resolve_fixture_function( | |
| fixturedef: FixtureDef[FixtureValue], request: FixtureRequest | |
| ) -> "_FixtureFunc[FixtureValue]": | |
| """Get the actual callable that can be called to obtain the fixture | |
| value.""" | |
| fixturefunc = fixturedef.func | |
| # The fixture function needs to be bound to the actual | |
| # request.instance so that code working with "fixturedef" behaves | |
| # as expected. | |
| instance = request.instance | |
| if instance is not None: | |
| # Handle the case where fixture is defined not in a test class, but some other class | |
| # (for example a plugin class with a fixture), see #2270. | |
| if hasattr(fixturefunc, "__self__") and not isinstance( | |
| instance, | |
| fixturefunc.__self__.__class__, | |
| ): | |
| return fixturefunc | |
| fixturefunc = getimfunc(fixturedef.func) | |
| if fixturefunc != fixturedef.func: | |
| fixturefunc = fixturefunc.__get__(instance) | |
| return fixturefunc | |
| def pytest_fixture_setup( | |
| fixturedef: FixtureDef[FixtureValue], request: SubRequest | |
| ) -> FixtureValue: | |
| """Execution of fixture setup.""" | |
| kwargs = {} | |
| for argname in fixturedef.argnames: | |
| kwargs[argname] = request.getfixturevalue(argname) | |
| fixturefunc = resolve_fixture_function(fixturedef, request) | |
| my_cache_key = fixturedef.cache_key(request) | |
| try: | |
| result = call_fixture_func(fixturefunc, request, kwargs) | |
| except TEST_OUTCOME as e: | |
| if isinstance(e, skip.Exception): | |
| # The test requested a fixture which caused a skip. | |
| # Don't show the fixture as the skip location, as then the user | |
| # wouldn't know which test skipped. | |
| e._use_item_location = True | |
| fixturedef.cached_result = (None, my_cache_key, e) | |
| raise | |
| fixturedef.cached_result = (result, my_cache_key, None) | |
| return result | |
| def wrap_function_to_error_out_if_called_directly( | |
| function: FixtureFunction, | |
| fixture_marker: "FixtureFunctionMarker", | |
| ) -> FixtureFunction: | |
| """Wrap the given fixture function so we can raise an error about it being called directly, | |
| instead of used as an argument in a test function.""" | |
| name = fixture_marker.name or function.__name__ | |
| message = ( | |
| f'Fixture "{name}" called directly. Fixtures are not meant to be called directly,\n' | |
| "but are created automatically when test functions request them as parameters.\n" | |
| "See https://docs.pytest.org/en/stable/explanation/fixtures.html for more information about fixtures, and\n" | |
| "https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly about how to update your code." | |
| ) | |
| def result(*args, **kwargs): | |
| fail(message, pytrace=False) | |
| # Keep reference to the original function in our own custom attribute so we don't unwrap | |
| # further than this point and lose useful wrappings like @mock.patch (#3774). | |
| result.__pytest_wrapped__ = _PytestWrapper(function) # type: ignore[attr-defined] | |
| return cast(FixtureFunction, result) | |
| class FixtureFunctionMarker: | |
| scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" | |
| params: Optional[Tuple[object, ...]] | |
| autouse: bool = False | |
| ids: Optional[ | |
| Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]] | |
| ] = None | |
| name: Optional[str] = None | |
| _ispytest: dataclasses.InitVar[bool] = False | |
| def __post_init__(self, _ispytest: bool) -> None: | |
| check_ispytest(_ispytest) | |
| def __call__(self, function: FixtureFunction) -> FixtureFunction: | |
| if inspect.isclass(function): | |
| raise ValueError("class fixtures not supported (maybe in the future)") | |
| if getattr(function, "_pytestfixturefunction", False): | |
| raise ValueError( | |
| f"@pytest.fixture is being applied more than once to the same function {function.__name__!r}" | |
| ) | |
| if hasattr(function, "pytestmark"): | |
| warnings.warn(MARKED_FIXTURE, stacklevel=2) | |
| function = wrap_function_to_error_out_if_called_directly(function, self) | |
| name = self.name or function.__name__ | |
| if name == "request": | |
| location = getlocation(function) | |
| fail( | |
| f"'request' is a reserved word for fixtures, use another name:\n {location}", | |
| pytrace=False, | |
| ) | |
| # Type ignored because https://github.com/python/mypy/issues/2087. | |
| function._pytestfixturefunction = self # type: ignore[attr-defined] | |
| return function | |
| def fixture( | |
| fixture_function: FixtureFunction, | |
| *, | |
| scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ..., | |
| params: Optional[Iterable[object]] = ..., | |
| autouse: bool = ..., | |
| ids: Optional[ | |
| Union[Sequence[Optional[object]], Callable[[Any], Optional[object]]] | |
| ] = ..., | |
| name: Optional[str] = ..., | |
| ) -> FixtureFunction: ... | |
| def fixture( | |
| fixture_function: None = ..., | |
| *, | |
| scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ..., | |
| params: Optional[Iterable[object]] = ..., | |
| autouse: bool = ..., | |
| ids: Optional[ | |
| Union[Sequence[Optional[object]], Callable[[Any], Optional[object]]] | |
| ] = ..., | |
| name: Optional[str] = None, | |
| ) -> FixtureFunctionMarker: ... | |
| def fixture( | |
| fixture_function: Optional[FixtureFunction] = None, | |
| *, | |
| scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = "function", | |
| params: Optional[Iterable[object]] = None, | |
| autouse: bool = False, | |
| ids: Optional[ | |
| Union[Sequence[Optional[object]], Callable[[Any], Optional[object]]] | |
| ] = None, | |
| name: Optional[str] = None, | |
| ) -> Union[FixtureFunctionMarker, FixtureFunction]: | |
| """Decorator to mark a fixture factory function. | |
| This decorator can be used, with or without parameters, to define a | |
| fixture function. | |
| The name of the fixture function can later be referenced to cause its | |
| invocation ahead of running tests: test modules or classes can use the | |
| ``pytest.mark.usefixtures(fixturename)`` marker. | |
| Test functions can directly use fixture names as input arguments in which | |
| case the fixture instance returned from the fixture function will be | |
| injected. | |
| Fixtures can provide their values to test functions using ``return`` or | |
| ``yield`` statements. When using ``yield`` the code block after the | |
| ``yield`` statement is executed as teardown code regardless of the test | |
| outcome, and must yield exactly once. | |
| :param scope: | |
| The scope for which this fixture is shared; one of ``"function"`` | |
| (default), ``"class"``, ``"module"``, ``"package"`` or ``"session"``. | |
| This parameter may also be a callable which receives ``(fixture_name, config)`` | |
| as parameters, and must return a ``str`` with one of the values mentioned above. | |
| See :ref:`dynamic scope` in the docs for more information. | |
| :param params: | |
| An optional list of parameters which will cause multiple invocations | |
| of the fixture function and all of the tests using it. The current | |
| parameter is available in ``request.param``. | |
| :param autouse: | |
| If True, the fixture func is activated for all tests that can see it. | |
| If False (the default), an explicit reference is needed to activate | |
| the fixture. | |
| :param ids: | |
| Sequence of ids each corresponding to the params so that they are | |
| part of the test id. If no ids are provided they will be generated | |
| automatically from the params. | |
| :param name: | |
| The name of the fixture. This defaults to the name of the decorated | |
| function. If a fixture is used in the same module in which it is | |
| defined, the function name of the fixture will be shadowed by the | |
| function arg that requests the fixture; one way to resolve this is to | |
| name the decorated function ``fixture_<fixturename>`` and then use | |
| ``@pytest.fixture(name='<fixturename>')``. | |
| """ | |
| fixture_marker = FixtureFunctionMarker( | |
| scope=scope, | |
| params=tuple(params) if params is not None else None, | |
| autouse=autouse, | |
| ids=None if ids is None else ids if callable(ids) else tuple(ids), | |
| name=name, | |
| _ispytest=True, | |
| ) | |
| # Direct decoration. | |
| if fixture_function: | |
| return fixture_marker(fixture_function) | |
| return fixture_marker | |
| def yield_fixture( | |
| fixture_function=None, | |
| *args, | |
| scope="function", | |
| params=None, | |
| autouse=False, | |
| ids=None, | |
| name=None, | |
| ): | |
| """(Return a) decorator to mark a yield-fixture factory function. | |
| .. deprecated:: 3.0 | |
| Use :py:func:`pytest.fixture` directly instead. | |
| """ | |
| warnings.warn(YIELD_FIXTURE, stacklevel=2) | |
| return fixture( | |
| fixture_function, | |
| *args, | |
| scope=scope, | |
| params=params, | |
| autouse=autouse, | |
| ids=ids, | |
| name=name, | |
| ) | |
| def pytestconfig(request: FixtureRequest) -> Config: | |
| """Session-scoped fixture that returns the session's :class:`pytest.Config` | |
| object. | |
| Example:: | |
| def test_foo(pytestconfig): | |
| if pytestconfig.getoption("verbose") > 0: | |
| ... | |
| """ | |
| return request.config | |
| def pytest_addoption(parser: Parser) -> None: | |
| parser.addini( | |
| "usefixtures", | |
| type="args", | |
| default=[], | |
| help="List of default fixtures to be used with this project", | |
| ) | |
| group = parser.getgroup("general") | |
| group.addoption( | |
| "--fixtures", | |
| "--funcargs", | |
| action="store_true", | |
| dest="showfixtures", | |
| default=False, | |
| help="Show available fixtures, sorted by plugin appearance " | |
| "(fixtures with leading '_' are only shown with '-v')", | |
| ) | |
| group.addoption( | |
| "--fixtures-per-test", | |
| action="store_true", | |
| dest="show_fixtures_per_test", | |
| default=False, | |
| help="Show fixtures per test", | |
| ) | |
| def pytest_cmdline_main(config: Config) -> Optional[Union[int, ExitCode]]: | |
| if config.option.showfixtures: | |
| showfixtures(config) | |
| return 0 | |
| if config.option.show_fixtures_per_test: | |
| show_fixtures_per_test(config) | |
| return 0 | |
| return None | |
| def _get_direct_parametrize_args(node: nodes.Node) -> Set[str]: | |
| """Return all direct parametrization arguments of a node, so we don't | |
| mistake them for fixtures. | |
| Check https://github.com/pytest-dev/pytest/issues/5036. | |
| These things are done later as well when dealing with parametrization | |
| so this could be improved. | |
| """ | |
| parametrize_argnames: Set[str] = set() | |
| for marker in node.iter_markers(name="parametrize"): | |
| if not marker.kwargs.get("indirect", False): | |
| p_argnames, _ = ParameterSet._parse_parametrize_args( | |
| *marker.args, **marker.kwargs | |
| ) | |
| parametrize_argnames.update(p_argnames) | |
| return parametrize_argnames | |
| def deduplicate_names(*seqs: Iterable[str]) -> Tuple[str, ...]: | |
| """De-duplicate the sequence of names while keeping the original order.""" | |
| # Ideally we would use a set, but it does not preserve insertion order. | |
| return tuple(dict.fromkeys(name for seq in seqs for name in seq)) | |
| class FixtureManager: | |
| """pytest fixture definitions and information is stored and managed | |
| from this class. | |
| During collection fm.parsefactories() is called multiple times to parse | |
| fixture function definitions into FixtureDef objects and internal | |
| data structures. | |
| During collection of test functions, metafunc-mechanics instantiate | |
| a FuncFixtureInfo object which is cached per node/func-name. | |
| This FuncFixtureInfo object is later retrieved by Function nodes | |
| which themselves offer a fixturenames attribute. | |
| The FuncFixtureInfo object holds information about fixtures and FixtureDefs | |
| relevant for a particular function. An initial list of fixtures is | |
| assembled like this: | |
| - ini-defined usefixtures | |
| - autouse-marked fixtures along the collection chain up from the function | |
| - usefixtures markers at module/class/function level | |
| - test function funcargs | |
| Subsequently the funcfixtureinfo.fixturenames attribute is computed | |
| as the closure of the fixtures needed to setup the initial fixtures, | |
| i.e. fixtures needed by fixture functions themselves are appended | |
| to the fixturenames list. | |
| Upon the test-setup phases all fixturenames are instantiated, retrieved | |
| by a lookup of their FuncFixtureInfo. | |
| """ | |
| def __init__(self, session: "Session") -> None: | |
| self.session = session | |
| self.config: Config = session.config | |
| # Maps a fixture name (argname) to all of the FixtureDefs in the test | |
| # suite/plugins defined with this name. Populated by parsefactories(). | |
| # TODO: The order of the FixtureDefs list of each arg is significant, | |
| # explain. | |
| self._arg2fixturedefs: Final[Dict[str, List[FixtureDef[Any]]]] = {} | |
| self._holderobjseen: Final[Set[object]] = set() | |
| # A mapping from a nodeid to a list of autouse fixtures it defines. | |
| self._nodeid_autousenames: Final[Dict[str, List[str]]] = { | |
| "": self.config.getini("usefixtures"), | |
| } | |
| session.config.pluginmanager.register(self, "funcmanage") | |
| def getfixtureinfo( | |
| self, | |
| node: nodes.Item, | |
| func: Optional[Callable[..., object]], | |
| cls: Optional[type], | |
| ) -> FuncFixtureInfo: | |
| """Calculate the :class:`FuncFixtureInfo` for an item. | |
| If ``func`` is None, or if the item sets an attribute | |
| ``nofuncargs = True``, then ``func`` is not examined at all. | |
| :param node: | |
| The item requesting the fixtures. | |
| :param func: | |
| The item's function. | |
| :param cls: | |
| If the function is a method, the method's class. | |
| """ | |
| if func is not None and not getattr(node, "nofuncargs", False): | |
| argnames = getfuncargnames(func, name=node.name, cls=cls) | |
| else: | |
| argnames = () | |
| usefixturesnames = self._getusefixturesnames(node) | |
| autousenames = self._getautousenames(node) | |
| initialnames = deduplicate_names(autousenames, usefixturesnames, argnames) | |
| direct_parametrize_args = _get_direct_parametrize_args(node) | |
| names_closure, arg2fixturedefs = self.getfixtureclosure( | |
| parentnode=node, | |
| initialnames=initialnames, | |
| ignore_args=direct_parametrize_args, | |
| ) | |
| return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs) | |
| def pytest_plugin_registered(self, plugin: _PluggyPlugin, plugin_name: str) -> None: | |
| # Fixtures defined in conftest plugins are only visible to within the | |
| # conftest's directory. This is unlike fixtures in non-conftest plugins | |
| # which have global visibility. So for conftests, construct the base | |
| # nodeid from the plugin name (which is the conftest path). | |
| if plugin_name and plugin_name.endswith("conftest.py"): | |
| # Note: we explicitly do *not* use `plugin.__file__` here -- The | |
| # difference is that plugin_name has the correct capitalization on | |
| # case-insensitive systems (Windows) and other normalization issues | |
| # (issue #11816). | |
| conftestpath = absolutepath(plugin_name) | |
| try: | |
| nodeid = str(conftestpath.parent.relative_to(self.config.rootpath)) | |
| except ValueError: | |
| nodeid = "" | |
| if nodeid == ".": | |
| nodeid = "" | |
| if os.sep != nodes.SEP: | |
| nodeid = nodeid.replace(os.sep, nodes.SEP) | |
| else: | |
| nodeid = None | |
| self.parsefactories(plugin, nodeid) | |
| def _getautousenames(self, node: nodes.Node) -> Iterator[str]: | |
| """Return the names of autouse fixtures applicable to node.""" | |
| for parentnode in node.listchain(): | |
| basenames = self._nodeid_autousenames.get(parentnode.nodeid) | |
| if basenames: | |
| yield from basenames | |
| def _getusefixturesnames(self, node: nodes.Item) -> Iterator[str]: | |
| """Return the names of usefixtures fixtures applicable to node.""" | |
| for mark in node.iter_markers(name="usefixtures"): | |
| yield from mark.args | |
| def getfixtureclosure( | |
| self, | |
| parentnode: nodes.Node, | |
| initialnames: Tuple[str, ...], | |
| ignore_args: AbstractSet[str], | |
| ) -> Tuple[List[str], Dict[str, Sequence[FixtureDef[Any]]]]: | |
| # Collect the closure of all fixtures, starting with the given | |
| # fixturenames as the initial set. As we have to visit all | |
| # factory definitions anyway, we also return an arg2fixturedefs | |
| # mapping so that the caller can reuse it and does not have | |
| # to re-discover fixturedefs again for each fixturename | |
| # (discovering matching fixtures for a given name/node is expensive). | |
| fixturenames_closure = list(initialnames) | |
| arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {} | |
| lastlen = -1 | |
| while lastlen != len(fixturenames_closure): | |
| lastlen = len(fixturenames_closure) | |
| for argname in fixturenames_closure: | |
| if argname in ignore_args: | |
| continue | |
| if argname in arg2fixturedefs: | |
| continue | |
| fixturedefs = self.getfixturedefs(argname, parentnode) | |
| if fixturedefs: | |
| arg2fixturedefs[argname] = fixturedefs | |
| for arg in fixturedefs[-1].argnames: | |
| if arg not in fixturenames_closure: | |
| fixturenames_closure.append(arg) | |
| def sort_by_scope(arg_name: str) -> Scope: | |
| try: | |
| fixturedefs = arg2fixturedefs[arg_name] | |
| except KeyError: | |
| return Scope.Function | |
| else: | |
| return fixturedefs[-1]._scope | |
| fixturenames_closure.sort(key=sort_by_scope, reverse=True) | |
| return fixturenames_closure, arg2fixturedefs | |
| def pytest_generate_tests(self, metafunc: "Metafunc") -> None: | |
| """Generate new tests based on parametrized fixtures used by the given metafunc""" | |
| def get_parametrize_mark_argnames(mark: Mark) -> Sequence[str]: | |
| args, _ = ParameterSet._parse_parametrize_args(*mark.args, **mark.kwargs) | |
| return args | |
| for argname in metafunc.fixturenames: | |
| # Get the FixtureDefs for the argname. | |
| fixture_defs = metafunc._arg2fixturedefs.get(argname) | |
| if not fixture_defs: | |
| # Will raise FixtureLookupError at setup time if not parametrized somewhere | |
| # else (e.g @pytest.mark.parametrize) | |
| continue | |
| # If the test itself parametrizes using this argname, give it | |
| # precedence. | |
| if any( | |
| argname in get_parametrize_mark_argnames(mark) | |
| for mark in metafunc.definition.iter_markers("parametrize") | |
| ): | |
| continue | |
| # In the common case we only look at the fixture def with the | |
| # closest scope (last in the list). But if the fixture overrides | |
| # another fixture, while requesting the super fixture, keep going | |
| # in case the super fixture is parametrized (#1953). | |
| for fixturedef in reversed(fixture_defs): | |
| # Fixture is parametrized, apply it and stop. | |
| if fixturedef.params is not None: | |
| metafunc.parametrize( | |
| argname, | |
| fixturedef.params, | |
| indirect=True, | |
| scope=fixturedef.scope, | |
| ids=fixturedef.ids, | |
| ) | |
| break | |
| # Not requesting the overridden super fixture, stop. | |
| if argname not in fixturedef.argnames: | |
| break | |
| # Try next super fixture, if any. | |
| def pytest_collection_modifyitems(self, items: List[nodes.Item]) -> None: | |
| # Separate parametrized setups. | |
| items[:] = reorder_items(items) | |
| def _register_fixture( | |
| self, | |
| *, | |
| name: str, | |
| func: "_FixtureFunc[object]", | |
| nodeid: Optional[str], | |
| scope: Union[ | |
| Scope, _ScopeName, Callable[[str, Config], _ScopeName], None | |
| ] = "function", | |
| params: Optional[Sequence[object]] = None, | |
| ids: Optional[ | |
| Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]] | |
| ] = None, | |
| autouse: bool = False, | |
| ) -> None: | |
| """Register a fixture | |
| :param name: | |
| The fixture's name. | |
| :param func: | |
| The fixture's implementation function. | |
| :param nodeid: | |
| The visibility of the fixture. The fixture will be available to the | |
| node with this nodeid and its children in the collection tree. | |
| None means that the fixture is visible to the entire collection tree, | |
| e.g. a fixture defined for general use in a plugin. | |
| :param scope: | |
| The fixture's scope. | |
| :param params: | |
| The fixture's parametrization params. | |
| :param ids: | |
| The fixture's IDs. | |
| :param autouse: | |
| Whether this is an autouse fixture. | |
| """ | |
| fixture_def = FixtureDef( | |
| config=self.config, | |
| baseid=nodeid, | |
| argname=name, | |
| func=func, | |
| scope=scope, | |
| params=params, | |
| ids=ids, | |
| _ispytest=True, | |
| ) | |
| faclist = self._arg2fixturedefs.setdefault(name, []) | |
| if fixture_def.has_location: | |
| faclist.append(fixture_def) | |
| else: | |
| # fixturedefs with no location are at the front | |
| # so this inserts the current fixturedef after the | |
| # existing fixturedefs from external plugins but | |
| # before the fixturedefs provided in conftests. | |
| i = len([f for f in faclist if not f.has_location]) | |
| faclist.insert(i, fixture_def) | |
| if autouse: | |
| self._nodeid_autousenames.setdefault(nodeid or "", []).append(name) | |
| def parsefactories( | |
| self, | |
| node_or_obj: nodes.Node, | |
| ) -> None: | |
| raise NotImplementedError() | |
| def parsefactories( | |
| self, | |
| node_or_obj: object, | |
| nodeid: Optional[str], | |
| ) -> None: | |
| raise NotImplementedError() | |
| def parsefactories( | |
| self, | |
| node_or_obj: Union[nodes.Node, object], | |
| nodeid: Union[str, NotSetType, None] = NOTSET, | |
| ) -> None: | |
| """Collect fixtures from a collection node or object. | |
| Found fixtures are parsed into `FixtureDef`s and saved. | |
| If `node_or_object` is a collection node (with an underlying Python | |
| object), the node's object is traversed and the node's nodeid is used to | |
| determine the fixtures' visibility. `nodeid` must not be specified in | |
| this case. | |
| If `node_or_object` is an object (e.g. a plugin), the object is | |
| traversed and the given `nodeid` is used to determine the fixtures' | |
| visibility. `nodeid` must be specified in this case; None and "" mean | |
| total visibility. | |
| """ | |
| if nodeid is not NOTSET: | |
| holderobj = node_or_obj | |
| else: | |
| assert isinstance(node_or_obj, nodes.Node) | |
| holderobj = cast(object, node_or_obj.obj) # type: ignore[attr-defined] | |
| assert isinstance(node_or_obj.nodeid, str) | |
| nodeid = node_or_obj.nodeid | |
| if holderobj in self._holderobjseen: | |
| return | |
| self._holderobjseen.add(holderobj) | |
| for name in dir(holderobj): | |
| # The attribute can be an arbitrary descriptor, so the attribute | |
| # access below can raise. safe_getatt() ignores such exceptions. | |
| obj = safe_getattr(holderobj, name, None) | |
| marker = getfixturemarker(obj) | |
| if not isinstance(marker, FixtureFunctionMarker): | |
| # Magic globals with __getattr__ might have got us a wrong | |
| # fixture attribute. | |
| continue | |
| if marker.name: | |
| name = marker.name | |
| # During fixture definition we wrap the original fixture function | |
| # to issue a warning if called directly, so here we unwrap it in | |
| # order to not emit the warning when pytest itself calls the | |
| # fixture function. | |
| func = get_real_method(obj, holderobj) | |
| self._register_fixture( | |
| name=name, | |
| nodeid=nodeid, | |
| func=func, | |
| scope=marker.scope, | |
| params=marker.params, | |
| ids=marker.ids, | |
| autouse=marker.autouse, | |
| ) | |
| def getfixturedefs( | |
| self, argname: str, node: nodes.Node | |
| ) -> Optional[Sequence[FixtureDef[Any]]]: | |
| """Get FixtureDefs for a fixture name which are applicable | |
| to a given node. | |
| Returns None if there are no fixtures at all defined with the given | |
| name. (This is different from the case in which there are fixtures | |
| with the given name, but none applicable to the node. In this case, | |
| an empty result is returned). | |
| :param argname: Name of the fixture to search for. | |
| :param node: The requesting Node. | |
| """ | |
| try: | |
| fixturedefs = self._arg2fixturedefs[argname] | |
| except KeyError: | |
| return None | |
| return tuple(self._matchfactories(fixturedefs, node)) | |
| def _matchfactories( | |
| self, fixturedefs: Iterable[FixtureDef[Any]], node: nodes.Node | |
| ) -> Iterator[FixtureDef[Any]]: | |
| parentnodeids = {n.nodeid for n in node.iter_parents()} | |
| for fixturedef in fixturedefs: | |
| if fixturedef.baseid in parentnodeids: | |
| yield fixturedef | |
| def show_fixtures_per_test(config: Config) -> Union[int, ExitCode]: | |
| from _pytest.main import wrap_session | |
| return wrap_session(config, _show_fixtures_per_test) | |
| _PYTEST_DIR = Path(_pytest.__file__).parent | |
| def _pretty_fixture_path(invocation_dir: Path, func) -> str: | |
| loc = Path(getlocation(func, invocation_dir)) | |
| prefix = Path("...", "_pytest") | |
| try: | |
| return str(prefix / loc.relative_to(_PYTEST_DIR)) | |
| except ValueError: | |
| return bestrelpath(invocation_dir, loc) | |
| def _show_fixtures_per_test(config: Config, session: "Session") -> None: | |
| import _pytest.config | |
| session.perform_collect() | |
| invocation_dir = config.invocation_params.dir | |
| tw = _pytest.config.create_terminal_writer(config) | |
| verbose = config.getvalue("verbose") | |
| def get_best_relpath(func) -> str: | |
| loc = getlocation(func, invocation_dir) | |
| return bestrelpath(invocation_dir, Path(loc)) | |
| def write_fixture(fixture_def: FixtureDef[object]) -> None: | |
| argname = fixture_def.argname | |
| if verbose <= 0 and argname.startswith("_"): | |
| return | |
| prettypath = _pretty_fixture_path(invocation_dir, fixture_def.func) | |
| tw.write(f"{argname}", green=True) | |
| tw.write(f" -- {prettypath}", yellow=True) | |
| tw.write("\n") | |
| fixture_doc = inspect.getdoc(fixture_def.func) | |
| if fixture_doc: | |
| write_docstring( | |
| tw, fixture_doc.split("\n\n")[0] if verbose <= 0 else fixture_doc | |
| ) | |
| else: | |
| tw.line(" no docstring available", red=True) | |
| def write_item(item: nodes.Item) -> None: | |
| # Not all items have _fixtureinfo attribute. | |
| info: Optional[FuncFixtureInfo] = getattr(item, "_fixtureinfo", None) | |
| if info is None or not info.name2fixturedefs: | |
| # This test item does not use any fixtures. | |
| return | |
| tw.line() | |
| tw.sep("-", f"fixtures used by {item.name}") | |
| # TODO: Fix this type ignore. | |
| tw.sep("-", f"({get_best_relpath(item.function)})") # type: ignore[attr-defined] | |
| # dict key not used in loop but needed for sorting. | |
| for _, fixturedefs in sorted(info.name2fixturedefs.items()): | |
| assert fixturedefs is not None | |
| if not fixturedefs: | |
| continue | |
| # Last item is expected to be the one used by the test item. | |
| write_fixture(fixturedefs[-1]) | |
| for session_item in session.items: | |
| write_item(session_item) | |
| def showfixtures(config: Config) -> Union[int, ExitCode]: | |
| from _pytest.main import wrap_session | |
| return wrap_session(config, _showfixtures_main) | |
| def _showfixtures_main(config: Config, session: "Session") -> None: | |
| import _pytest.config | |
| session.perform_collect() | |
| invocation_dir = config.invocation_params.dir | |
| tw = _pytest.config.create_terminal_writer(config) | |
| verbose = config.getvalue("verbose") | |
| fm = session._fixturemanager | |
| available = [] | |
| seen: Set[Tuple[str, str]] = set() | |
| for argname, fixturedefs in fm._arg2fixturedefs.items(): | |
| assert fixturedefs is not None | |
| if not fixturedefs: | |
| continue | |
| for fixturedef in fixturedefs: | |
| loc = getlocation(fixturedef.func, invocation_dir) | |
| if (fixturedef.argname, loc) in seen: | |
| continue | |
| seen.add((fixturedef.argname, loc)) | |
| available.append( | |
| ( | |
| len(fixturedef.baseid), | |
| fixturedef.func.__module__, | |
| _pretty_fixture_path(invocation_dir, fixturedef.func), | |
| fixturedef.argname, | |
| fixturedef, | |
| ) | |
| ) | |
| available.sort() | |
| currentmodule = None | |
| for baseid, module, prettypath, argname, fixturedef in available: | |
| if currentmodule != module: | |
| if not module.startswith("_pytest."): | |
| tw.line() | |
| tw.sep("-", f"fixtures defined from {module}") | |
| currentmodule = module | |
| if verbose <= 0 and argname.startswith("_"): | |
| continue | |
| tw.write(f"{argname}", green=True) | |
| if fixturedef.scope != "function": | |
| tw.write(" [%s scope]" % fixturedef.scope, cyan=True) | |
| tw.write(f" -- {prettypath}", yellow=True) | |
| tw.write("\n") | |
| doc = inspect.getdoc(fixturedef.func) | |
| if doc: | |
| write_docstring(tw, doc.split("\n\n")[0] if verbose <= 0 else doc) | |
| else: | |
| tw.line(" no docstring available", red=True) | |
| tw.line() | |
| def write_docstring(tw: TerminalWriter, doc: str, indent: str = " ") -> None: | |
| for line in doc.split("\n"): | |
| tw.line(indent + line) | |