| |
| from __future__ import annotations |
|
|
| from collections.abc import Iterable |
| from collections.abc import Iterator |
| from collections.abc import Mapping |
| from collections.abc import Sequence |
| import dataclasses |
| from io import StringIO |
| import os |
| from pprint import pprint |
| from typing import Any |
| from typing import cast |
| from typing import final |
| from typing import Literal |
| from typing import NoReturn |
| from typing import TYPE_CHECKING |
|
|
| from _pytest._code.code import ExceptionChainRepr |
| from _pytest._code.code import ExceptionInfo |
| from _pytest._code.code import ExceptionRepr |
| from _pytest._code.code import ReprEntry |
| from _pytest._code.code import ReprEntryNative |
| from _pytest._code.code import ReprExceptionInfo |
| from _pytest._code.code import ReprFileLocation |
| from _pytest._code.code import ReprFuncArgs |
| from _pytest._code.code import ReprLocals |
| from _pytest._code.code import ReprTraceback |
| from _pytest._code.code import TerminalRepr |
| from _pytest._io import TerminalWriter |
| from _pytest.config import Config |
| from _pytest.nodes import Collector |
| from _pytest.nodes import Item |
| from _pytest.outcomes import fail |
| from _pytest.outcomes import skip |
|
|
|
|
| if TYPE_CHECKING: |
| from typing_extensions import Self |
|
|
| from _pytest.runner import CallInfo |
|
|
|
|
| def getworkerinfoline(node): |
| try: |
| return node._workerinfocache |
| except AttributeError: |
| d = node.workerinfo |
| ver = "{}.{}.{}".format(*d["version_info"][:3]) |
| node._workerinfocache = s = "[{}] {} -- Python {} {}".format( |
| d["id"], d["sysplatform"], ver, d["executable"] |
| ) |
| return s |
|
|
|
|
| class BaseReport: |
| when: str | None |
| location: tuple[str, int | None, str] | None |
| longrepr: ( |
| None | ExceptionInfo[BaseException] | tuple[str, int, str] | str | TerminalRepr |
| ) |
| sections: list[tuple[str, str]] |
| nodeid: str |
| outcome: Literal["passed", "failed", "skipped"] |
|
|
| def __init__(self, **kw: Any) -> None: |
| self.__dict__.update(kw) |
|
|
| if TYPE_CHECKING: |
| |
| def __getattr__(self, key: str) -> Any: ... |
|
|
| def toterminal(self, out: TerminalWriter) -> None: |
| if hasattr(self, "node"): |
| worker_info = getworkerinfoline(self.node) |
| if worker_info: |
| out.line(worker_info) |
|
|
| longrepr = self.longrepr |
| if longrepr is None: |
| return |
|
|
| if hasattr(longrepr, "toterminal"): |
| longrepr_terminal = cast(TerminalRepr, longrepr) |
| longrepr_terminal.toterminal(out) |
| else: |
| try: |
| s = str(longrepr) |
| except UnicodeEncodeError: |
| s = "<unprintable longrepr>" |
| out.line(s) |
|
|
| def get_sections(self, prefix: str) -> Iterator[tuple[str, str]]: |
| for name, content in self.sections: |
| if name.startswith(prefix): |
| yield prefix, content |
|
|
| @property |
| def longreprtext(self) -> str: |
| """Read-only property that returns the full string representation of |
| ``longrepr``. |
| |
| .. versionadded:: 3.0 |
| """ |
| file = StringIO() |
| tw = TerminalWriter(file) |
| tw.hasmarkup = False |
| self.toterminal(tw) |
| exc = file.getvalue() |
| return exc.strip() |
|
|
| @property |
| def caplog(self) -> str: |
| """Return captured log lines, if log capturing is enabled. |
| |
| .. versionadded:: 3.5 |
| """ |
| return "\n".join( |
| content for (prefix, content) in self.get_sections("Captured log") |
| ) |
|
|
| @property |
| def capstdout(self) -> str: |
| """Return captured text from stdout, if capturing is enabled. |
| |
| .. versionadded:: 3.0 |
| """ |
| return "".join( |
| content for (prefix, content) in self.get_sections("Captured stdout") |
| ) |
|
|
| @property |
| def capstderr(self) -> str: |
| """Return captured text from stderr, if capturing is enabled. |
| |
| .. versionadded:: 3.0 |
| """ |
| return "".join( |
| content for (prefix, content) in self.get_sections("Captured stderr") |
| ) |
|
|
| @property |
| def passed(self) -> bool: |
| """Whether the outcome is passed.""" |
| return self.outcome == "passed" |
|
|
| @property |
| def failed(self) -> bool: |
| """Whether the outcome is failed.""" |
| return self.outcome == "failed" |
|
|
| @property |
| def skipped(self) -> bool: |
| """Whether the outcome is skipped.""" |
| return self.outcome == "skipped" |
|
|
| @property |
| def fspath(self) -> str: |
| """The path portion of the reported node, as a string.""" |
| return self.nodeid.split("::")[0] |
|
|
| @property |
| def count_towards_summary(self) -> bool: |
| """**Experimental** Whether this report should be counted towards the |
| totals shown at the end of the test session: "1 passed, 1 failure, etc". |
| |
| .. note:: |
| |
| This function is considered **experimental**, so beware that it is subject to changes |
| even in patch releases. |
| """ |
| return True |
|
|
| @property |
| def head_line(self) -> str | None: |
| """**Experimental** The head line shown with longrepr output for this |
| report, more commonly during traceback representation during |
| failures:: |
| |
| ________ Test.foo ________ |
| |
| |
| In the example above, the head_line is "Test.foo". |
| |
| .. note:: |
| |
| This function is considered **experimental**, so beware that it is subject to changes |
| even in patch releases. |
| """ |
| if self.location is not None: |
| fspath, lineno, domain = self.location |
| return domain |
| return None |
|
|
| def _get_verbose_word_with_markup( |
| self, config: Config, default_markup: Mapping[str, bool] |
| ) -> tuple[str, Mapping[str, bool]]: |
| _category, _short, verbose = config.hook.pytest_report_teststatus( |
| report=self, config=config |
| ) |
|
|
| if isinstance(verbose, str): |
| return verbose, default_markup |
|
|
| if isinstance(verbose, Sequence) and len(verbose) == 2: |
| word, markup = verbose |
| if isinstance(word, str) and isinstance(markup, Mapping): |
| return word, markup |
|
|
| fail( |
| "pytest_report_teststatus() hook (from a plugin) returned " |
| f"an invalid verbose value: {verbose!r}.\nExpected either a string " |
| "or a tuple of (word, markup)." |
| ) |
|
|
| def _to_json(self) -> dict[str, Any]: |
| """Return the contents of this report as a dict of builtin entries, |
| suitable for serialization. |
| |
| This was originally the serialize_report() function from xdist (ca03269). |
| |
| Experimental method. |
| """ |
| return _report_to_json(self) |
|
|
| @classmethod |
| def _from_json(cls, reportdict: dict[str, object]) -> Self: |
| """Create either a TestReport or CollectReport, depending on the calling class. |
| |
| It is the callers responsibility to know which class to pass here. |
| |
| This was originally the serialize_report() function from xdist (ca03269). |
| |
| Experimental method. |
| """ |
| kwargs = _report_kwargs_from_json(reportdict) |
| return cls(**kwargs) |
|
|
|
|
| def _report_unserialization_failure( |
| type_name: str, report_class: type[BaseReport], reportdict |
| ) -> NoReturn: |
| url = "https://github.com/pytest-dev/pytest/issues" |
| stream = StringIO() |
| pprint("-" * 100, stream=stream) |
| pprint(f"INTERNALERROR: Unknown entry type returned: {type_name}", stream=stream) |
| pprint(f"report_name: {report_class}", stream=stream) |
| pprint(reportdict, stream=stream) |
| pprint(f"Please report this bug at {url}", stream=stream) |
| pprint("-" * 100, stream=stream) |
| raise RuntimeError(stream.getvalue()) |
|
|
|
|
| @final |
| class TestReport(BaseReport): |
| """Basic test report object (also used for setup and teardown calls if |
| they fail). |
| |
| Reports can contain arbitrary extra attributes. |
| """ |
|
|
| __test__ = False |
|
|
| |
| |
| wasxfail: str |
|
|
| def __init__( |
| self, |
| nodeid: str, |
| location: tuple[str, int | None, str], |
| keywords: Mapping[str, Any], |
| outcome: Literal["passed", "failed", "skipped"], |
| longrepr: None |
| | ExceptionInfo[BaseException] |
| | tuple[str, int, str] |
| | str |
| | TerminalRepr, |
| when: Literal["setup", "call", "teardown"], |
| sections: Iterable[tuple[str, str]] = (), |
| duration: float = 0, |
| start: float = 0, |
| stop: float = 0, |
| user_properties: Iterable[tuple[str, object]] | None = None, |
| **extra, |
| ) -> None: |
| |
| self.nodeid = nodeid |
|
|
| |
| |
| |
| |
| |
| self.location: tuple[str, int | None, str] = location |
|
|
| |
| |
| self.keywords: Mapping[str, Any] = keywords |
|
|
| |
| self.outcome = outcome |
|
|
| |
| self.longrepr = longrepr |
|
|
| |
| self.when: Literal["setup", "call", "teardown"] = when |
|
|
| |
| |
| self.user_properties = list(user_properties or []) |
|
|
| |
| |
| |
| |
| self.sections = list(sections) |
|
|
| |
| self.duration: float = duration |
|
|
| |
| self.start: float = start |
| |
| self.stop: float = stop |
|
|
| self.__dict__.update(extra) |
|
|
| def __repr__(self) -> str: |
| return f"<{self.__class__.__name__} {self.nodeid!r} when={self.when!r} outcome={self.outcome!r}>" |
|
|
| @classmethod |
| def from_item_and_call(cls, item: Item, call: CallInfo[None]) -> TestReport: |
| """Create and fill a TestReport with standard item and call info. |
| |
| :param item: The item. |
| :param call: The call info. |
| """ |
| when = call.when |
| |
| assert when != "collect" |
| duration = call.duration |
| start = call.start |
| stop = call.stop |
| keywords = {x: 1 for x in item.keywords} |
| excinfo = call.excinfo |
| sections = [] |
| if not call.excinfo: |
| outcome: Literal["passed", "failed", "skipped"] = "passed" |
| longrepr: ( |
| None |
| | ExceptionInfo[BaseException] |
| | tuple[str, int, str] |
| | str |
| | TerminalRepr |
| ) = None |
| else: |
| if not isinstance(excinfo, ExceptionInfo): |
| outcome = "failed" |
| longrepr = excinfo |
| elif isinstance(excinfo.value, skip.Exception): |
| outcome = "skipped" |
| r = excinfo._getreprcrash() |
| assert r is not None, ( |
| "There should always be a traceback entry for skipping a test." |
| ) |
| if excinfo.value._use_item_location: |
| path, line = item.reportinfo()[:2] |
| assert line is not None |
| longrepr = os.fspath(path), line + 1, r.message |
| else: |
| longrepr = (str(r.path), r.lineno, r.message) |
| else: |
| outcome = "failed" |
| if call.when == "call": |
| longrepr = item.repr_failure(excinfo) |
| else: |
| longrepr = item._repr_failure_py( |
| excinfo, style=item.config.getoption("tbstyle", "auto") |
| ) |
| for rwhen, key, content in item._report_sections: |
| sections.append((f"Captured {key} {rwhen}", content)) |
| return cls( |
| item.nodeid, |
| item.location, |
| keywords, |
| outcome, |
| longrepr, |
| when, |
| sections, |
| duration, |
| start, |
| stop, |
| user_properties=item.user_properties, |
| ) |
|
|
|
|
| @final |
| class CollectReport(BaseReport): |
| """Collection report object. |
| |
| Reports can contain arbitrary extra attributes. |
| """ |
|
|
| when = "collect" |
|
|
| def __init__( |
| self, |
| nodeid: str, |
| outcome: Literal["passed", "failed", "skipped"], |
| longrepr: None |
| | ExceptionInfo[BaseException] |
| | tuple[str, int, str] |
| | str |
| | TerminalRepr, |
| result: list[Item | Collector] | None, |
| sections: Iterable[tuple[str, str]] = (), |
| **extra, |
| ) -> None: |
| |
| self.nodeid = nodeid |
|
|
| |
| self.outcome = outcome |
|
|
| |
| self.longrepr = longrepr |
|
|
| |
| self.result = result or [] |
|
|
| |
| |
| |
| |
| self.sections = list(sections) |
|
|
| self.__dict__.update(extra) |
|
|
| @property |
| def location( |
| self, |
| ) -> tuple[str, int | None, str] | None: |
| return (self.fspath, None, self.fspath) |
|
|
| def __repr__(self) -> str: |
| return f"<CollectReport {self.nodeid!r} lenresult={len(self.result)} outcome={self.outcome!r}>" |
|
|
|
|
| class CollectErrorRepr(TerminalRepr): |
| def __init__(self, msg: str) -> None: |
| self.longrepr = msg |
|
|
| def toterminal(self, out: TerminalWriter) -> None: |
| out.line(self.longrepr, red=True) |
|
|
|
|
| def pytest_report_to_serializable( |
| report: CollectReport | TestReport, |
| ) -> dict[str, Any] | None: |
| if isinstance(report, (TestReport, CollectReport)): |
| data = report._to_json() |
| data["$report_type"] = report.__class__.__name__ |
| return data |
| |
| return None |
|
|
|
|
| def pytest_report_from_serializable( |
| data: dict[str, Any], |
| ) -> CollectReport | TestReport | None: |
| if "$report_type" in data: |
| if data["$report_type"] == "TestReport": |
| return TestReport._from_json(data) |
| elif data["$report_type"] == "CollectReport": |
| return CollectReport._from_json(data) |
| assert False, "Unknown report_type unserialize data: {}".format( |
| data["$report_type"] |
| ) |
| return None |
|
|
|
|
| def _report_to_json(report: BaseReport) -> dict[str, Any]: |
| """Return the contents of this report as a dict of builtin entries, |
| suitable for serialization. |
| |
| This was originally the serialize_report() function from xdist (ca03269). |
| """ |
|
|
| def serialize_repr_entry( |
| entry: ReprEntry | ReprEntryNative, |
| ) -> dict[str, Any]: |
| data = dataclasses.asdict(entry) |
| for key, value in data.items(): |
| if hasattr(value, "__dict__"): |
| data[key] = dataclasses.asdict(value) |
| entry_data = {"type": type(entry).__name__, "data": data} |
| return entry_data |
|
|
| def serialize_repr_traceback(reprtraceback: ReprTraceback) -> dict[str, Any]: |
| result = dataclasses.asdict(reprtraceback) |
| result["reprentries"] = [ |
| serialize_repr_entry(x) for x in reprtraceback.reprentries |
| ] |
| return result |
|
|
| def serialize_repr_crash( |
| reprcrash: ReprFileLocation | None, |
| ) -> dict[str, Any] | None: |
| if reprcrash is not None: |
| return dataclasses.asdict(reprcrash) |
| else: |
| return None |
|
|
| def serialize_exception_longrepr(rep: BaseReport) -> dict[str, Any]: |
| assert rep.longrepr is not None |
| |
| longrepr = cast(ExceptionRepr, rep.longrepr) |
| result: dict[str, Any] = { |
| "reprcrash": serialize_repr_crash(longrepr.reprcrash), |
| "reprtraceback": serialize_repr_traceback(longrepr.reprtraceback), |
| "sections": longrepr.sections, |
| } |
| if isinstance(longrepr, ExceptionChainRepr): |
| result["chain"] = [] |
| for repr_traceback, repr_crash, description in longrepr.chain: |
| result["chain"].append( |
| ( |
| serialize_repr_traceback(repr_traceback), |
| serialize_repr_crash(repr_crash), |
| description, |
| ) |
| ) |
| else: |
| result["chain"] = None |
| return result |
|
|
| d = report.__dict__.copy() |
| if hasattr(report.longrepr, "toterminal"): |
| if hasattr(report.longrepr, "reprtraceback") and hasattr( |
| report.longrepr, "reprcrash" |
| ): |
| d["longrepr"] = serialize_exception_longrepr(report) |
| else: |
| d["longrepr"] = str(report.longrepr) |
| else: |
| d["longrepr"] = report.longrepr |
| for name in d: |
| if isinstance(d[name], os.PathLike): |
| d[name] = os.fspath(d[name]) |
| elif name == "result": |
| d[name] = None |
| return d |
|
|
|
|
| def _report_kwargs_from_json(reportdict: dict[str, Any]) -> dict[str, Any]: |
| """Return **kwargs that can be used to construct a TestReport or |
| CollectReport instance. |
| |
| This was originally the serialize_report() function from xdist (ca03269). |
| """ |
|
|
| def deserialize_repr_entry(entry_data): |
| data = entry_data["data"] |
| entry_type = entry_data["type"] |
| if entry_type == "ReprEntry": |
| reprfuncargs = None |
| reprfileloc = None |
| reprlocals = None |
| if data["reprfuncargs"]: |
| reprfuncargs = ReprFuncArgs(**data["reprfuncargs"]) |
| if data["reprfileloc"]: |
| reprfileloc = ReprFileLocation(**data["reprfileloc"]) |
| if data["reprlocals"]: |
| reprlocals = ReprLocals(data["reprlocals"]["lines"]) |
|
|
| reprentry: ReprEntry | ReprEntryNative = ReprEntry( |
| lines=data["lines"], |
| reprfuncargs=reprfuncargs, |
| reprlocals=reprlocals, |
| reprfileloc=reprfileloc, |
| style=data["style"], |
| ) |
| elif entry_type == "ReprEntryNative": |
| reprentry = ReprEntryNative(data["lines"]) |
| else: |
| _report_unserialization_failure(entry_type, TestReport, reportdict) |
| return reprentry |
|
|
| def deserialize_repr_traceback(repr_traceback_dict): |
| repr_traceback_dict["reprentries"] = [ |
| deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"] |
| ] |
| return ReprTraceback(**repr_traceback_dict) |
|
|
| def deserialize_repr_crash(repr_crash_dict: dict[str, Any] | None): |
| if repr_crash_dict is not None: |
| return ReprFileLocation(**repr_crash_dict) |
| else: |
| return None |
|
|
| if ( |
| reportdict["longrepr"] |
| and "reprcrash" in reportdict["longrepr"] |
| and "reprtraceback" in reportdict["longrepr"] |
| ): |
| reprtraceback = deserialize_repr_traceback( |
| reportdict["longrepr"]["reprtraceback"] |
| ) |
| reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"]) |
| if reportdict["longrepr"]["chain"]: |
| chain = [] |
| for repr_traceback_data, repr_crash_data, description in reportdict[ |
| "longrepr" |
| ]["chain"]: |
| chain.append( |
| ( |
| deserialize_repr_traceback(repr_traceback_data), |
| deserialize_repr_crash(repr_crash_data), |
| description, |
| ) |
| ) |
| exception_info: ExceptionChainRepr | ReprExceptionInfo = ExceptionChainRepr( |
| chain |
| ) |
| else: |
| exception_info = ReprExceptionInfo( |
| reprtraceback=reprtraceback, |
| reprcrash=reprcrash, |
| ) |
|
|
| for section in reportdict["longrepr"]["sections"]: |
| exception_info.addsection(*section) |
| reportdict["longrepr"] = exception_info |
|
|
| return reportdict |
|
|