| | |
| | """Implementation of the cache provider.""" |
| |
|
| | |
| | |
| | from __future__ import annotations |
| |
|
| | import dataclasses |
| | import errno |
| | import json |
| | import os |
| | from pathlib import Path |
| | import tempfile |
| | from typing import final |
| | from typing import Generator |
| | from typing import Iterable |
| |
|
| | from .pathlib import resolve_from_str |
| | from .pathlib import rm_rf |
| | from .reports import CollectReport |
| | from _pytest import nodes |
| | from _pytest._io import TerminalWriter |
| | from _pytest.config import Config |
| | from _pytest.config import ExitCode |
| | from _pytest.config import hookimpl |
| | from _pytest.config.argparsing import Parser |
| | from _pytest.deprecated import check_ispytest |
| | from _pytest.fixtures import fixture |
| | from _pytest.fixtures import FixtureRequest |
| | from _pytest.main import Session |
| | from _pytest.nodes import Directory |
| | from _pytest.nodes import File |
| | from _pytest.reports import TestReport |
| |
|
| |
|
| | README_CONTENT = """\ |
| | # pytest cache directory # |
| | |
| | This directory contains data from the pytest's cache plugin, |
| | which provides the `--lf` and `--ff` options, as well as the `cache` fixture. |
| | |
| | **Do not** commit this to version control. |
| | |
| | See [the docs](https://docs.pytest.org/en/stable/how-to/cache.html) for more information. |
| | """ |
| |
|
| | CACHEDIR_TAG_CONTENT = b"""\ |
| | Signature: 8a477f597d28d172789f06886806bc55 |
| | # This file is a cache directory tag created by pytest. |
| | # For information about cache directory tags, see: |
| | # https://bford.info/cachedir/spec.html |
| | """ |
| |
|
| |
|
| | @final |
| | @dataclasses.dataclass |
| | class Cache: |
| | """Instance of the `cache` fixture.""" |
| |
|
| | _cachedir: Path = dataclasses.field(repr=False) |
| | _config: Config = dataclasses.field(repr=False) |
| |
|
| | |
| | _CACHE_PREFIX_DIRS = "d" |
| |
|
| | |
| | _CACHE_PREFIX_VALUES = "v" |
| |
|
| | def __init__( |
| | self, cachedir: Path, config: Config, *, _ispytest: bool = False |
| | ) -> None: |
| | check_ispytest(_ispytest) |
| | self._cachedir = cachedir |
| | self._config = config |
| |
|
| | @classmethod |
| | def for_config(cls, config: Config, *, _ispytest: bool = False) -> Cache: |
| | """Create the Cache instance for a Config. |
| | |
| | :meta private: |
| | """ |
| | check_ispytest(_ispytest) |
| | cachedir = cls.cache_dir_from_config(config, _ispytest=True) |
| | if config.getoption("cacheclear") and cachedir.is_dir(): |
| | cls.clear_cache(cachedir, _ispytest=True) |
| | return cls(cachedir, config, _ispytest=True) |
| |
|
| | @classmethod |
| | def clear_cache(cls, cachedir: Path, _ispytest: bool = False) -> None: |
| | """Clear the sub-directories used to hold cached directories and values. |
| | |
| | :meta private: |
| | """ |
| | check_ispytest(_ispytest) |
| | for prefix in (cls._CACHE_PREFIX_DIRS, cls._CACHE_PREFIX_VALUES): |
| | d = cachedir / prefix |
| | if d.is_dir(): |
| | rm_rf(d) |
| |
|
| | @staticmethod |
| | def cache_dir_from_config(config: Config, *, _ispytest: bool = False) -> Path: |
| | """Get the path to the cache directory for a Config. |
| | |
| | :meta private: |
| | """ |
| | check_ispytest(_ispytest) |
| | return resolve_from_str(config.getini("cache_dir"), config.rootpath) |
| |
|
| | def warn(self, fmt: str, *, _ispytest: bool = False, **args: object) -> None: |
| | """Issue a cache warning. |
| | |
| | :meta private: |
| | """ |
| | check_ispytest(_ispytest) |
| | import warnings |
| |
|
| | from _pytest.warning_types import PytestCacheWarning |
| |
|
| | warnings.warn( |
| | PytestCacheWarning(fmt.format(**args) if args else fmt), |
| | self._config.hook, |
| | stacklevel=3, |
| | ) |
| |
|
| | def _mkdir(self, path: Path) -> None: |
| | self._ensure_cache_dir_and_supporting_files() |
| | path.mkdir(exist_ok=True, parents=True) |
| |
|
| | def mkdir(self, name: str) -> Path: |
| | """Return a directory path object with the given name. |
| | |
| | If the directory does not yet exist, it will be created. You can use |
| | it to manage files to e.g. store/retrieve database dumps across test |
| | sessions. |
| | |
| | .. versionadded:: 7.0 |
| | |
| | :param name: |
| | Must be a string not containing a ``/`` separator. |
| | Make sure the name contains your plugin or application |
| | identifiers to prevent clashes with other cache users. |
| | """ |
| | path = Path(name) |
| | if len(path.parts) > 1: |
| | raise ValueError("name is not allowed to contain path separators") |
| | res = self._cachedir.joinpath(self._CACHE_PREFIX_DIRS, path) |
| | self._mkdir(res) |
| | return res |
| |
|
| | def _getvaluepath(self, key: str) -> Path: |
| | return self._cachedir.joinpath(self._CACHE_PREFIX_VALUES, Path(key)) |
| |
|
| | def get(self, key: str, default): |
| | """Return the cached value for the given key. |
| | |
| | If no value was yet cached or the value cannot be read, the specified |
| | default is returned. |
| | |
| | :param key: |
| | Must be a ``/`` separated value. Usually the first |
| | name is the name of your plugin or your application. |
| | :param default: |
| | The value to return in case of a cache-miss or invalid cache value. |
| | """ |
| | path = self._getvaluepath(key) |
| | try: |
| | with path.open("r", encoding="UTF-8") as f: |
| | return json.load(f) |
| | except (ValueError, OSError): |
| | return default |
| |
|
| | def set(self, key: str, value: object) -> None: |
| | """Save value for the given key. |
| | |
| | :param key: |
| | Must be a ``/`` separated value. Usually the first |
| | name is the name of your plugin or your application. |
| | :param value: |
| | Must be of any combination of basic python types, |
| | including nested types like lists of dictionaries. |
| | """ |
| | path = self._getvaluepath(key) |
| | try: |
| | self._mkdir(path.parent) |
| | except OSError as exc: |
| | self.warn( |
| | f"could not create cache path {path}: {exc}", |
| | _ispytest=True, |
| | ) |
| | return |
| | data = json.dumps(value, ensure_ascii=False, indent=2) |
| | try: |
| | f = path.open("w", encoding="UTF-8") |
| | except OSError as exc: |
| | self.warn( |
| | f"cache could not write path {path}: {exc}", |
| | _ispytest=True, |
| | ) |
| | else: |
| | with f: |
| | f.write(data) |
| |
|
| | def _ensure_cache_dir_and_supporting_files(self) -> None: |
| | """Create the cache dir and its supporting files.""" |
| | if self._cachedir.is_dir(): |
| | return |
| |
|
| | self._cachedir.parent.mkdir(parents=True, exist_ok=True) |
| | with tempfile.TemporaryDirectory( |
| | prefix="pytest-cache-files-", |
| | dir=self._cachedir.parent, |
| | ) as newpath: |
| | path = Path(newpath) |
| |
|
| | |
| | |
| | umask = os.umask(0o022) |
| | os.umask(umask) |
| | path.chmod(0o777 - umask) |
| |
|
| | with open(path.joinpath("README.md"), "x", encoding="UTF-8") as f: |
| | f.write(README_CONTENT) |
| | with open(path.joinpath(".gitignore"), "x", encoding="UTF-8") as f: |
| | f.write("# Created by pytest automatically.\n*\n") |
| | with open(path.joinpath("CACHEDIR.TAG"), "xb") as f: |
| | f.write(CACHEDIR_TAG_CONTENT) |
| |
|
| | try: |
| | path.rename(self._cachedir) |
| | except OSError as e: |
| | |
| | |
| | |
| | |
| | |
| | if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): |
| | raise |
| | else: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | path.mkdir() |
| |
|
| |
|
| | class LFPluginCollWrapper: |
| | def __init__(self, lfplugin: LFPlugin) -> None: |
| | self.lfplugin = lfplugin |
| | self._collected_at_least_one_failure = False |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_make_collect_report( |
| | self, collector: nodes.Collector |
| | ) -> Generator[None, CollectReport, CollectReport]: |
| | res = yield |
| | if isinstance(collector, (Session, Directory)): |
| | |
| | lf_paths = self.lfplugin._last_failed_paths |
| |
|
| | |
| | def sort_key(node: nodes.Item | nodes.Collector) -> bool: |
| | return node.path in lf_paths |
| |
|
| | res.result = sorted( |
| | res.result, |
| | key=sort_key, |
| | reverse=True, |
| | ) |
| |
|
| | elif isinstance(collector, File): |
| | if collector.path in self.lfplugin._last_failed_paths: |
| | result = res.result |
| | lastfailed = self.lfplugin.lastfailed |
| |
|
| | |
| | if not self._collected_at_least_one_failure: |
| | if not any(x.nodeid in lastfailed for x in result): |
| | return res |
| | self.lfplugin.config.pluginmanager.register( |
| | LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip" |
| | ) |
| | self._collected_at_least_one_failure = True |
| |
|
| | session = collector.session |
| | result[:] = [ |
| | x |
| | for x in result |
| | if x.nodeid in lastfailed |
| | |
| | or session.isinitpath(x.path) |
| | |
| | or isinstance(x, nodes.Collector) |
| | ] |
| |
|
| | return res |
| |
|
| |
|
| | class LFPluginCollSkipfiles: |
| | def __init__(self, lfplugin: LFPlugin) -> None: |
| | self.lfplugin = lfplugin |
| |
|
| | @hookimpl |
| | def pytest_make_collect_report( |
| | self, collector: nodes.Collector |
| | ) -> CollectReport | None: |
| | if isinstance(collector, File): |
| | if collector.path not in self.lfplugin._last_failed_paths: |
| | self.lfplugin._skipped_files += 1 |
| |
|
| | return CollectReport( |
| | collector.nodeid, "passed", longrepr=None, result=[] |
| | ) |
| | return None |
| |
|
| |
|
| | class LFPlugin: |
| | """Plugin which implements the --lf (run last-failing) option.""" |
| |
|
| | def __init__(self, config: Config) -> None: |
| | self.config = config |
| | active_keys = "lf", "failedfirst" |
| | self.active = any(config.getoption(key) for key in active_keys) |
| | assert config.cache |
| | self.lastfailed: dict[str, bool] = config.cache.get("cache/lastfailed", {}) |
| | self._previously_failed_count: int | None = None |
| | self._report_status: str | None = None |
| | self._skipped_files = 0 |
| |
|
| | if config.getoption("lf"): |
| | self._last_failed_paths = self.get_last_failed_paths() |
| | config.pluginmanager.register( |
| | LFPluginCollWrapper(self), "lfplugin-collwrapper" |
| | ) |
| |
|
| | def get_last_failed_paths(self) -> set[Path]: |
| | """Return a set with all Paths of the previously failed nodeids and |
| | their parents.""" |
| | rootpath = self.config.rootpath |
| | result = set() |
| | for nodeid in self.lastfailed: |
| | path = rootpath / nodeid.split("::")[0] |
| | result.add(path) |
| | result.update(path.parents) |
| | return {x for x in result if x.exists()} |
| |
|
| | def pytest_report_collectionfinish(self) -> str | None: |
| | if self.active and self.config.get_verbosity() >= 0: |
| | return f"run-last-failure: {self._report_status}" |
| | return None |
| |
|
| | def pytest_runtest_logreport(self, report: TestReport) -> None: |
| | if (report.when == "call" and report.passed) or report.skipped: |
| | self.lastfailed.pop(report.nodeid, None) |
| | elif report.failed: |
| | self.lastfailed[report.nodeid] = True |
| |
|
| | def pytest_collectreport(self, report: CollectReport) -> None: |
| | passed = report.outcome in ("passed", "skipped") |
| | if passed: |
| | if report.nodeid in self.lastfailed: |
| | self.lastfailed.pop(report.nodeid) |
| | self.lastfailed.update((item.nodeid, True) for item in report.result) |
| | else: |
| | self.lastfailed[report.nodeid] = True |
| |
|
| | @hookimpl(wrapper=True, tryfirst=True) |
| | def pytest_collection_modifyitems( |
| | self, config: Config, items: list[nodes.Item] |
| | ) -> Generator[None]: |
| | res = yield |
| |
|
| | if not self.active: |
| | return res |
| |
|
| | if self.lastfailed: |
| | previously_failed = [] |
| | previously_passed = [] |
| | for item in items: |
| | if item.nodeid in self.lastfailed: |
| | previously_failed.append(item) |
| | else: |
| | previously_passed.append(item) |
| | self._previously_failed_count = len(previously_failed) |
| |
|
| | if not previously_failed: |
| | |
| | |
| | self._report_status = "%d known failures not in selected tests" % ( |
| | len(self.lastfailed), |
| | ) |
| | else: |
| | if self.config.getoption("lf"): |
| | items[:] = previously_failed |
| | config.hook.pytest_deselected(items=previously_passed) |
| | else: |
| | items[:] = previously_failed + previously_passed |
| |
|
| | noun = "failure" if self._previously_failed_count == 1 else "failures" |
| | suffix = " first" if self.config.getoption("failedfirst") else "" |
| | self._report_status = ( |
| | f"rerun previous {self._previously_failed_count} {noun}{suffix}" |
| | ) |
| |
|
| | if self._skipped_files > 0: |
| | files_noun = "file" if self._skipped_files == 1 else "files" |
| | self._report_status += f" (skipped {self._skipped_files} {files_noun})" |
| | else: |
| | self._report_status = "no previously failed tests, " |
| | if self.config.getoption("last_failed_no_failures") == "none": |
| | self._report_status += "deselecting all items." |
| | config.hook.pytest_deselected(items=items[:]) |
| | items[:] = [] |
| | else: |
| | self._report_status += "not deselecting items." |
| |
|
| | return res |
| |
|
| | def pytest_sessionfinish(self, session: Session) -> None: |
| | config = self.config |
| | if config.getoption("cacheshow") or hasattr(config, "workerinput"): |
| | return |
| |
|
| | assert config.cache is not None |
| | saved_lastfailed = config.cache.get("cache/lastfailed", {}) |
| | if saved_lastfailed != self.lastfailed: |
| | config.cache.set("cache/lastfailed", self.lastfailed) |
| |
|
| |
|
| | class NFPlugin: |
| | """Plugin which implements the --nf (run new-first) option.""" |
| |
|
| | def __init__(self, config: Config) -> None: |
| | self.config = config |
| | self.active = config.option.newfirst |
| | assert config.cache is not None |
| | self.cached_nodeids = set(config.cache.get("cache/nodeids", [])) |
| |
|
| | @hookimpl(wrapper=True, tryfirst=True) |
| | def pytest_collection_modifyitems(self, items: list[nodes.Item]) -> Generator[None]: |
| | res = yield |
| |
|
| | if self.active: |
| | new_items: dict[str, nodes.Item] = {} |
| | other_items: dict[str, nodes.Item] = {} |
| | for item in items: |
| | if item.nodeid not in self.cached_nodeids: |
| | new_items[item.nodeid] = item |
| | else: |
| | other_items[item.nodeid] = item |
| |
|
| | items[:] = self._get_increasing_order( |
| | new_items.values() |
| | ) + self._get_increasing_order(other_items.values()) |
| | self.cached_nodeids.update(new_items) |
| | else: |
| | self.cached_nodeids.update(item.nodeid for item in items) |
| |
|
| | return res |
| |
|
| | def _get_increasing_order(self, items: Iterable[nodes.Item]) -> list[nodes.Item]: |
| | return sorted(items, key=lambda item: item.path.stat().st_mtime, reverse=True) |
| |
|
| | def pytest_sessionfinish(self) -> None: |
| | config = self.config |
| | if config.getoption("cacheshow") or hasattr(config, "workerinput"): |
| | return |
| |
|
| | if config.getoption("collectonly"): |
| | return |
| |
|
| | assert config.cache is not None |
| | config.cache.set("cache/nodeids", sorted(self.cached_nodeids)) |
| |
|
| |
|
| | def pytest_addoption(parser: Parser) -> None: |
| | group = parser.getgroup("general") |
| | group.addoption( |
| | "--lf", |
| | "--last-failed", |
| | action="store_true", |
| | dest="lf", |
| | help="Rerun only the tests that failed " |
| | "at the last run (or all if none failed)", |
| | ) |
| | group.addoption( |
| | "--ff", |
| | "--failed-first", |
| | action="store_true", |
| | dest="failedfirst", |
| | help="Run all tests, but run the last failures first. " |
| | "This may re-order tests and thus lead to " |
| | "repeated fixture setup/teardown.", |
| | ) |
| | group.addoption( |
| | "--nf", |
| | "--new-first", |
| | action="store_true", |
| | dest="newfirst", |
| | help="Run tests from new files first, then the rest of the tests " |
| | "sorted by file mtime", |
| | ) |
| | group.addoption( |
| | "--cache-show", |
| | action="append", |
| | nargs="?", |
| | dest="cacheshow", |
| | help=( |
| | "Show cache contents, don't perform collection or tests. " |
| | "Optional argument: glob (default: '*')." |
| | ), |
| | ) |
| | group.addoption( |
| | "--cache-clear", |
| | action="store_true", |
| | dest="cacheclear", |
| | help="Remove all cache contents at start of test run", |
| | ) |
| | cache_dir_default = ".pytest_cache" |
| | if "TOX_ENV_DIR" in os.environ: |
| | cache_dir_default = os.path.join(os.environ["TOX_ENV_DIR"], cache_dir_default) |
| | parser.addini("cache_dir", default=cache_dir_default, help="Cache directory path") |
| | group.addoption( |
| | "--lfnf", |
| | "--last-failed-no-failures", |
| | action="store", |
| | dest="last_failed_no_failures", |
| | choices=("all", "none"), |
| | default="all", |
| | help="With ``--lf``, determines whether to execute tests when there " |
| | "are no previously (known) failures or when no " |
| | "cached ``lastfailed`` data was found. " |
| | "``all`` (the default) runs the full test suite again. " |
| | "``none`` just emits a message about no known failures and exits successfully.", |
| | ) |
| |
|
| |
|
| | def pytest_cmdline_main(config: Config) -> int | ExitCode | None: |
| | if config.option.cacheshow and not config.option.help: |
| | from _pytest.main import wrap_session |
| |
|
| | return wrap_session(config, cacheshow) |
| | return None |
| |
|
| |
|
| | @hookimpl(tryfirst=True) |
| | def pytest_configure(config: Config) -> None: |
| | config.cache = Cache.for_config(config, _ispytest=True) |
| | config.pluginmanager.register(LFPlugin(config), "lfplugin") |
| | config.pluginmanager.register(NFPlugin(config), "nfplugin") |
| |
|
| |
|
| | @fixture |
| | def cache(request: FixtureRequest) -> Cache: |
| | """Return a cache object that can persist state between testing sessions. |
| | |
| | cache.get(key, default) |
| | cache.set(key, value) |
| | |
| | Keys must be ``/`` separated strings, where the first part is usually the |
| | name of your plugin or application to avoid clashes with other cache users. |
| | |
| | Values can be any object handled by the json stdlib module. |
| | """ |
| | assert request.config.cache is not None |
| | return request.config.cache |
| |
|
| |
|
| | def pytest_report_header(config: Config) -> str | None: |
| | """Display cachedir with --cache-show and if non-default.""" |
| | if config.option.verbose > 0 or config.getini("cache_dir") != ".pytest_cache": |
| | assert config.cache is not None |
| | cachedir = config.cache._cachedir |
| | |
| | |
| |
|
| | try: |
| | displaypath = cachedir.relative_to(config.rootpath) |
| | except ValueError: |
| | displaypath = cachedir |
| | return f"cachedir: {displaypath}" |
| | return None |
| |
|
| |
|
| | def cacheshow(config: Config, session: Session) -> int: |
| | from pprint import pformat |
| |
|
| | assert config.cache is not None |
| |
|
| | tw = TerminalWriter() |
| | tw.line("cachedir: " + str(config.cache._cachedir)) |
| | if not config.cache._cachedir.is_dir(): |
| | tw.line("cache is empty") |
| | return 0 |
| |
|
| | glob = config.option.cacheshow[0] |
| | if glob is None: |
| | glob = "*" |
| |
|
| | dummy = object() |
| | basedir = config.cache._cachedir |
| | vdir = basedir / Cache._CACHE_PREFIX_VALUES |
| | tw.sep("-", f"cache values for {glob!r}") |
| | for valpath in sorted(x for x in vdir.rglob(glob) if x.is_file()): |
| | key = str(valpath.relative_to(vdir)) |
| | val = config.cache.get(key, dummy) |
| | if val is dummy: |
| | tw.line(f"{key} contains unreadable content, will be ignored") |
| | else: |
| | tw.line(f"{key} contains:") |
| | for line in pformat(val).splitlines(): |
| | tw.line(" " + line) |
| |
|
| | ddir = basedir / Cache._CACHE_PREFIX_DIRS |
| | if ddir.is_dir(): |
| | contents = sorted(ddir.rglob(glob)) |
| | tw.sep("-", f"cache directories for {glob!r}") |
| | for p in contents: |
| | |
| | |
| | if p.is_file(): |
| | key = str(p.relative_to(basedir)) |
| | tw.line(f"{key} is a file of length {p.stat().st_size:d}") |
| | return 0 |
| |
|