Spaces:
Paused
Paused
| # mypy: allow-untyped-defs | |
| """Basic collect and runtest protocol implementations.""" | |
| import bdb | |
| import dataclasses | |
| import os | |
| import sys | |
| from typing import Callable | |
| from typing import cast | |
| from typing import Dict | |
| from typing import final | |
| from typing import Generic | |
| from typing import List | |
| from typing import Literal | |
| from typing import Optional | |
| from typing import Tuple | |
| from typing import Type | |
| from typing import TYPE_CHECKING | |
| from typing import TypeVar | |
| from typing import Union | |
| from .reports import BaseReport | |
| from .reports import CollectErrorRepr | |
| from .reports import CollectReport | |
| from .reports import TestReport | |
| from _pytest import timing | |
| from _pytest._code.code import ExceptionChainRepr | |
| from _pytest._code.code import ExceptionInfo | |
| from _pytest._code.code import TerminalRepr | |
| from _pytest.config.argparsing import Parser | |
| from _pytest.deprecated import check_ispytest | |
| from _pytest.nodes import Collector | |
| from _pytest.nodes import Directory | |
| from _pytest.nodes import Item | |
| from _pytest.nodes import Node | |
| from _pytest.outcomes import Exit | |
| from _pytest.outcomes import OutcomeException | |
| from _pytest.outcomes import Skipped | |
| from _pytest.outcomes import TEST_OUTCOME | |
| if sys.version_info < (3, 11): | |
| from exceptiongroup import BaseExceptionGroup | |
| if TYPE_CHECKING: | |
| from _pytest.main import Session | |
| from _pytest.terminal import TerminalReporter | |
| # | |
| # pytest plugin hooks. | |
| def pytest_addoption(parser: Parser) -> None: | |
| group = parser.getgroup("terminal reporting", "Reporting", after="general") | |
| group.addoption( | |
| "--durations", | |
| action="store", | |
| type=int, | |
| default=None, | |
| metavar="N", | |
| help="Show N slowest setup/test durations (N=0 for all)", | |
| ) | |
| group.addoption( | |
| "--durations-min", | |
| action="store", | |
| type=float, | |
| default=0.005, | |
| metavar="N", | |
| help="Minimal duration in seconds for inclusion in slowest list. " | |
| "Default: 0.005.", | |
| ) | |
| def pytest_terminal_summary(terminalreporter: "TerminalReporter") -> None: | |
| durations = terminalreporter.config.option.durations | |
| durations_min = terminalreporter.config.option.durations_min | |
| verbose = terminalreporter.config.getvalue("verbose") | |
| if durations is None: | |
| return | |
| tr = terminalreporter | |
| dlist = [] | |
| for replist in tr.stats.values(): | |
| for rep in replist: | |
| if hasattr(rep, "duration"): | |
| dlist.append(rep) | |
| if not dlist: | |
| return | |
| dlist.sort(key=lambda x: x.duration, reverse=True) | |
| if not durations: | |
| tr.write_sep("=", "slowest durations") | |
| else: | |
| tr.write_sep("=", "slowest %s durations" % durations) | |
| dlist = dlist[:durations] | |
| for i, rep in enumerate(dlist): | |
| if verbose < 2 and rep.duration < durations_min: | |
| tr.write_line("") | |
| tr.write_line( | |
| f"({len(dlist) - i} durations < {durations_min:g}s hidden. Use -vv to show these durations.)" | |
| ) | |
| break | |
| tr.write_line(f"{rep.duration:02.2f}s {rep.when:<8} {rep.nodeid}") | |
| def pytest_sessionstart(session: "Session") -> None: | |
| session._setupstate = SetupState() | |
| def pytest_sessionfinish(session: "Session") -> None: | |
| session._setupstate.teardown_exact(None) | |
| def pytest_runtest_protocol(item: Item, nextitem: Optional[Item]) -> bool: | |
| ihook = item.ihook | |
| ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location) | |
| runtestprotocol(item, nextitem=nextitem) | |
| ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location) | |
| return True | |
| def runtestprotocol( | |
| item: Item, log: bool = True, nextitem: Optional[Item] = None | |
| ) -> List[TestReport]: | |
| hasrequest = hasattr(item, "_request") | |
| if hasrequest and not item._request: # type: ignore[attr-defined] | |
| # This only happens if the item is re-run, as is done by | |
| # pytest-rerunfailures. | |
| item._initrequest() # type: ignore[attr-defined] | |
| rep = call_and_report(item, "setup", log) | |
| reports = [rep] | |
| if rep.passed: | |
| if item.config.getoption("setupshow", False): | |
| show_test_item(item) | |
| if not item.config.getoption("setuponly", False): | |
| reports.append(call_and_report(item, "call", log)) | |
| reports.append(call_and_report(item, "teardown", log, nextitem=nextitem)) | |
| # After all teardown hooks have been called | |
| # want funcargs and request info to go away. | |
| if hasrequest: | |
| item._request = False # type: ignore[attr-defined] | |
| item.funcargs = None # type: ignore[attr-defined] | |
| return reports | |
| def show_test_item(item: Item) -> None: | |
| """Show test function, parameters and the fixtures of the test item.""" | |
| tw = item.config.get_terminal_writer() | |
| tw.line() | |
| tw.write(" " * 8) | |
| tw.write(item.nodeid) | |
| used_fixtures = sorted(getattr(item, "fixturenames", [])) | |
| if used_fixtures: | |
| tw.write(" (fixtures used: {})".format(", ".join(used_fixtures))) | |
| tw.flush() | |
| def pytest_runtest_setup(item: Item) -> None: | |
| _update_current_test_var(item, "setup") | |
| item.session._setupstate.setup(item) | |
| def pytest_runtest_call(item: Item) -> None: | |
| _update_current_test_var(item, "call") | |
| try: | |
| del sys.last_type | |
| del sys.last_value | |
| del sys.last_traceback | |
| if sys.version_info >= (3, 12, 0): | |
| del sys.last_exc # type: ignore[attr-defined] | |
| except AttributeError: | |
| pass | |
| try: | |
| item.runtest() | |
| except Exception as e: | |
| # Store trace info to allow postmortem debugging | |
| sys.last_type = type(e) | |
| sys.last_value = e | |
| if sys.version_info >= (3, 12, 0): | |
| sys.last_exc = e # type: ignore[attr-defined] | |
| assert e.__traceback__ is not None | |
| # Skip *this* frame | |
| sys.last_traceback = e.__traceback__.tb_next | |
| raise e | |
| def pytest_runtest_teardown(item: Item, nextitem: Optional[Item]) -> None: | |
| _update_current_test_var(item, "teardown") | |
| item.session._setupstate.teardown_exact(nextitem) | |
| _update_current_test_var(item, None) | |
| def _update_current_test_var( | |
| item: Item, when: Optional[Literal["setup", "call", "teardown"]] | |
| ) -> None: | |
| """Update :envvar:`PYTEST_CURRENT_TEST` to reflect the current item and stage. | |
| If ``when`` is None, delete ``PYTEST_CURRENT_TEST`` from the environment. | |
| """ | |
| var_name = "PYTEST_CURRENT_TEST" | |
| if when: | |
| value = f"{item.nodeid} ({when})" | |
| # don't allow null bytes on environment variables (see #2644, #2957) | |
| value = value.replace("\x00", "(null)") | |
| os.environ[var_name] = value | |
| else: | |
| os.environ.pop(var_name) | |
| def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str]]: | |
| if report.when in ("setup", "teardown"): | |
| if report.failed: | |
| # category, shortletter, verbose-word | |
| return "error", "E", "ERROR" | |
| elif report.skipped: | |
| return "skipped", "s", "SKIPPED" | |
| else: | |
| return "", "", "" | |
| return None | |
| # | |
| # Implementation | |
| def call_and_report( | |
| item: Item, when: Literal["setup", "call", "teardown"], log: bool = True, **kwds | |
| ) -> TestReport: | |
| ihook = item.ihook | |
| if when == "setup": | |
| runtest_hook: Callable[..., None] = ihook.pytest_runtest_setup | |
| elif when == "call": | |
| runtest_hook = ihook.pytest_runtest_call | |
| elif when == "teardown": | |
| runtest_hook = ihook.pytest_runtest_teardown | |
| else: | |
| assert False, f"Unhandled runtest hook case: {when}" | |
| reraise: Tuple[Type[BaseException], ...] = (Exit,) | |
| if not item.config.getoption("usepdb", False): | |
| reraise += (KeyboardInterrupt,) | |
| call = CallInfo.from_call( | |
| lambda: runtest_hook(item=item, **kwds), when=when, reraise=reraise | |
| ) | |
| report: TestReport = ihook.pytest_runtest_makereport(item=item, call=call) | |
| if log: | |
| ihook.pytest_runtest_logreport(report=report) | |
| if check_interactive_exception(call, report): | |
| ihook.pytest_exception_interact(node=item, call=call, report=report) | |
| return report | |
| def check_interactive_exception(call: "CallInfo[object]", report: BaseReport) -> bool: | |
| """Check whether the call raised an exception that should be reported as | |
| interactive.""" | |
| if call.excinfo is None: | |
| # Didn't raise. | |
| return False | |
| if hasattr(report, "wasxfail"): | |
| # Exception was expected. | |
| return False | |
| if isinstance(call.excinfo.value, (Skipped, bdb.BdbQuit)): | |
| # Special control flow exception. | |
| return False | |
| return True | |
| TResult = TypeVar("TResult", covariant=True) | |
| class CallInfo(Generic[TResult]): | |
| """Result/Exception info of a function invocation.""" | |
| _result: Optional[TResult] | |
| #: The captured exception of the call, if it raised. | |
| excinfo: Optional[ExceptionInfo[BaseException]] | |
| #: The system time when the call started, in seconds since the epoch. | |
| start: float | |
| #: The system time when the call ended, in seconds since the epoch. | |
| stop: float | |
| #: The call duration, in seconds. | |
| duration: float | |
| #: The context of invocation: "collect", "setup", "call" or "teardown". | |
| when: Literal["collect", "setup", "call", "teardown"] | |
| def __init__( | |
| self, | |
| result: Optional[TResult], | |
| excinfo: Optional[ExceptionInfo[BaseException]], | |
| start: float, | |
| stop: float, | |
| duration: float, | |
| when: Literal["collect", "setup", "call", "teardown"], | |
| *, | |
| _ispytest: bool = False, | |
| ) -> None: | |
| check_ispytest(_ispytest) | |
| self._result = result | |
| self.excinfo = excinfo | |
| self.start = start | |
| self.stop = stop | |
| self.duration = duration | |
| self.when = when | |
| def result(self) -> TResult: | |
| """The return value of the call, if it didn't raise. | |
| Can only be accessed if excinfo is None. | |
| """ | |
| if self.excinfo is not None: | |
| raise AttributeError(f"{self!r} has no valid result") | |
| # The cast is safe because an exception wasn't raised, hence | |
| # _result has the expected function return type (which may be | |
| # None, that's why a cast and not an assert). | |
| return cast(TResult, self._result) | |
| def from_call( | |
| cls, | |
| func: Callable[[], TResult], | |
| when: Literal["collect", "setup", "call", "teardown"], | |
| reraise: Optional[ | |
| Union[Type[BaseException], Tuple[Type[BaseException], ...]] | |
| ] = None, | |
| ) -> "CallInfo[TResult]": | |
| """Call func, wrapping the result in a CallInfo. | |
| :param func: | |
| The function to call. Called without arguments. | |
| :param when: | |
| The phase in which the function is called. | |
| :param reraise: | |
| Exception or exceptions that shall propagate if raised by the | |
| function, instead of being wrapped in the CallInfo. | |
| """ | |
| excinfo = None | |
| start = timing.time() | |
| precise_start = timing.perf_counter() | |
| try: | |
| result: Optional[TResult] = func() | |
| except BaseException: | |
| excinfo = ExceptionInfo.from_current() | |
| if reraise is not None and isinstance(excinfo.value, reraise): | |
| raise | |
| result = None | |
| # use the perf counter | |
| precise_stop = timing.perf_counter() | |
| duration = precise_stop - precise_start | |
| stop = timing.time() | |
| return cls( | |
| start=start, | |
| stop=stop, | |
| duration=duration, | |
| when=when, | |
| result=result, | |
| excinfo=excinfo, | |
| _ispytest=True, | |
| ) | |
| def __repr__(self) -> str: | |
| if self.excinfo is None: | |
| return f"<CallInfo when={self.when!r} result: {self._result!r}>" | |
| return f"<CallInfo when={self.when!r} excinfo={self.excinfo!r}>" | |
| def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> TestReport: | |
| return TestReport.from_item_and_call(item, call) | |
| def pytest_make_collect_report(collector: Collector) -> CollectReport: | |
| def collect() -> List[Union[Item, Collector]]: | |
| # Before collecting, if this is a Directory, load the conftests. | |
| # If a conftest import fails to load, it is considered a collection | |
| # error of the Directory collector. This is why it's done inside of the | |
| # CallInfo wrapper. | |
| # | |
| # Note: initial conftests are loaded early, not here. | |
| if isinstance(collector, Directory): | |
| collector.config.pluginmanager._loadconftestmodules( | |
| collector.path, | |
| collector.config.getoption("importmode"), | |
| rootpath=collector.config.rootpath, | |
| consider_namespace_packages=collector.config.getini( | |
| "consider_namespace_packages" | |
| ), | |
| ) | |
| return list(collector.collect()) | |
| call = CallInfo.from_call( | |
| collect, "collect", reraise=(KeyboardInterrupt, SystemExit) | |
| ) | |
| longrepr: Union[None, Tuple[str, int, str], str, TerminalRepr] = None | |
| if not call.excinfo: | |
| outcome: Literal["passed", "skipped", "failed"] = "passed" | |
| else: | |
| skip_exceptions = [Skipped] | |
| unittest = sys.modules.get("unittest") | |
| if unittest is not None: | |
| skip_exceptions.append(unittest.SkipTest) | |
| if isinstance(call.excinfo.value, tuple(skip_exceptions)): | |
| outcome = "skipped" | |
| r_ = collector._repr_failure_py(call.excinfo, "line") | |
| assert isinstance(r_, ExceptionChainRepr), repr(r_) | |
| r = r_.reprcrash | |
| assert r | |
| longrepr = (str(r.path), r.lineno, r.message) | |
| else: | |
| outcome = "failed" | |
| errorinfo = collector.repr_failure(call.excinfo) | |
| if not hasattr(errorinfo, "toterminal"): | |
| assert isinstance(errorinfo, str) | |
| errorinfo = CollectErrorRepr(errorinfo) | |
| longrepr = errorinfo | |
| result = call.result if not call.excinfo else None | |
| rep = CollectReport(collector.nodeid, outcome, longrepr, result) | |
| rep.call = call # type: ignore # see collect_one_node | |
| return rep | |
| class SetupState: | |
| """Shared state for setting up/tearing down test items or collectors | |
| in a session. | |
| Suppose we have a collection tree as follows: | |
| <Session session> | |
| <Module mod1> | |
| <Function item1> | |
| <Module mod2> | |
| <Function item2> | |
| The SetupState maintains a stack. The stack starts out empty: | |
| [] | |
| During the setup phase of item1, setup(item1) is called. What it does | |
| is: | |
| push session to stack, run session.setup() | |
| push mod1 to stack, run mod1.setup() | |
| push item1 to stack, run item1.setup() | |
| The stack is: | |
| [session, mod1, item1] | |
| While the stack is in this shape, it is allowed to add finalizers to | |
| each of session, mod1, item1 using addfinalizer(). | |
| During the teardown phase of item1, teardown_exact(item2) is called, | |
| where item2 is the next item to item1. What it does is: | |
| pop item1 from stack, run its teardowns | |
| pop mod1 from stack, run its teardowns | |
| mod1 was popped because it ended its purpose with item1. The stack is: | |
| [session] | |
| During the setup phase of item2, setup(item2) is called. What it does | |
| is: | |
| push mod2 to stack, run mod2.setup() | |
| push item2 to stack, run item2.setup() | |
| Stack: | |
| [session, mod2, item2] | |
| During the teardown phase of item2, teardown_exact(None) is called, | |
| because item2 is the last item. What it does is: | |
| pop item2 from stack, run its teardowns | |
| pop mod2 from stack, run its teardowns | |
| pop session from stack, run its teardowns | |
| Stack: | |
| [] | |
| The end! | |
| """ | |
| def __init__(self) -> None: | |
| # The stack is in the dict insertion order. | |
| self.stack: Dict[ | |
| Node, | |
| Tuple[ | |
| # Node's finalizers. | |
| List[Callable[[], object]], | |
| # Node's exception, if its setup raised. | |
| Optional[Union[OutcomeException, Exception]], | |
| ], | |
| ] = {} | |
| def setup(self, item: Item) -> None: | |
| """Setup objects along the collector chain to the item.""" | |
| needed_collectors = item.listchain() | |
| # If a collector fails its setup, fail its entire subtree of items. | |
| # The setup is not retried for each item - the same exception is used. | |
| for col, (finalizers, exc) in self.stack.items(): | |
| assert col in needed_collectors, "previous item was not torn down properly" | |
| if exc: | |
| raise exc | |
| for col in needed_collectors[len(self.stack) :]: | |
| assert col not in self.stack | |
| # Push onto the stack. | |
| self.stack[col] = ([col.teardown], None) | |
| try: | |
| col.setup() | |
| except TEST_OUTCOME as exc: | |
| self.stack[col] = (self.stack[col][0], exc) | |
| raise exc | |
| def addfinalizer(self, finalizer: Callable[[], object], node: Node) -> None: | |
| """Attach a finalizer to the given node. | |
| The node must be currently active in the stack. | |
| """ | |
| assert node and not isinstance(node, tuple) | |
| assert callable(finalizer) | |
| assert node in self.stack, (node, self.stack) | |
| self.stack[node][0].append(finalizer) | |
| def teardown_exact(self, nextitem: Optional[Item]) -> None: | |
| """Teardown the current stack up until reaching nodes that nextitem | |
| also descends from. | |
| When nextitem is None (meaning we're at the last item), the entire | |
| stack is torn down. | |
| """ | |
| needed_collectors = nextitem and nextitem.listchain() or [] | |
| exceptions: List[BaseException] = [] | |
| while self.stack: | |
| if list(self.stack.keys()) == needed_collectors[: len(self.stack)]: | |
| break | |
| node, (finalizers, _) = self.stack.popitem() | |
| these_exceptions = [] | |
| while finalizers: | |
| fin = finalizers.pop() | |
| try: | |
| fin() | |
| except TEST_OUTCOME as e: | |
| these_exceptions.append(e) | |
| if len(these_exceptions) == 1: | |
| exceptions.extend(these_exceptions) | |
| elif these_exceptions: | |
| msg = f"errors while tearing down {node!r}" | |
| exceptions.append(BaseExceptionGroup(msg, these_exceptions[::-1])) | |
| if len(exceptions) == 1: | |
| raise exceptions[0] | |
| elif exceptions: | |
| raise BaseExceptionGroup("errors during test teardown", exceptions[::-1]) | |
| if nextitem is None: | |
| assert not self.stack | |
| def collect_one_node(collector: Collector) -> CollectReport: | |
| ihook = collector.ihook | |
| ihook.pytest_collectstart(collector=collector) | |
| rep: CollectReport = ihook.pytest_make_collect_report(collector=collector) | |
| call = rep.__dict__.pop("call", None) | |
| if call and check_interactive_exception(call, rep): | |
| ihook.pytest_exception_interact(node=collector, call=call, report=rep) | |
| return rep | |