| |
| """Implementation of the cache provider.""" |
|
|
| |
| |
| from __future__ import annotations |
|
|
| from collections.abc import Generator |
| from collections.abc import Iterable |
| import dataclasses |
| import errno |
| import json |
| import os |
| from pathlib import Path |
| import tempfile |
| from typing import final |
|
|
| from .pathlib import resolve_from_str |
| from .pathlib import rm_rf |
| from .reports import CollectReport |
| from _pytest import nodes |
| from _pytest._io import TerminalWriter |
| from _pytest.config import Config |
| from _pytest.config import ExitCode |
| from _pytest.config import hookimpl |
| from _pytest.config.argparsing import Parser |
| from _pytest.deprecated import check_ispytest |
| from _pytest.fixtures import fixture |
| from _pytest.fixtures import FixtureRequest |
| from _pytest.main import Session |
| from _pytest.nodes import Directory |
| from _pytest.nodes import File |
| from _pytest.reports import TestReport |
|
|
|
|
| README_CONTENT = """\ |
| # pytest cache directory # |
| |
| This directory contains data from the pytest's cache plugin, |
| which provides the `--lf` and `--ff` options, as well as the `cache` fixture. |
| |
| **Do not** commit this to version control. |
| |
| See [the docs](https://docs.pytest.org/en/stable/how-to/cache.html) for more information. |
| """ |
|
|
| CACHEDIR_TAG_CONTENT = b"""\ |
| Signature: 8a477f597d28d172789f06886806bc55 |
| # This file is a cache directory tag created by pytest. |
| # For information about cache directory tags, see: |
| # https://bford.info/cachedir/spec.html |
| """ |
|
|
|
|
| @final |
| @dataclasses.dataclass |
| class Cache: |
| """Instance of the `cache` fixture.""" |
|
|
| _cachedir: Path = dataclasses.field(repr=False) |
| _config: Config = dataclasses.field(repr=False) |
|
|
| |
| _CACHE_PREFIX_DIRS = "d" |
|
|
| |
| _CACHE_PREFIX_VALUES = "v" |
|
|
| def __init__( |
| self, cachedir: Path, config: Config, *, _ispytest: bool = False |
| ) -> None: |
| check_ispytest(_ispytest) |
| self._cachedir = cachedir |
| self._config = config |
|
|
| @classmethod |
| def for_config(cls, config: Config, *, _ispytest: bool = False) -> Cache: |
| """Create the Cache instance for a Config. |
| |
| :meta private: |
| """ |
| check_ispytest(_ispytest) |
| cachedir = cls.cache_dir_from_config(config, _ispytest=True) |
| if config.getoption("cacheclear") and cachedir.is_dir(): |
| cls.clear_cache(cachedir, _ispytest=True) |
| return cls(cachedir, config, _ispytest=True) |
|
|
| @classmethod |
| def clear_cache(cls, cachedir: Path, _ispytest: bool = False) -> None: |
| """Clear the sub-directories used to hold cached directories and values. |
| |
| :meta private: |
| """ |
| check_ispytest(_ispytest) |
| for prefix in (cls._CACHE_PREFIX_DIRS, cls._CACHE_PREFIX_VALUES): |
| d = cachedir / prefix |
| if d.is_dir(): |
| rm_rf(d) |
|
|
| @staticmethod |
| def cache_dir_from_config(config: Config, *, _ispytest: bool = False) -> Path: |
| """Get the path to the cache directory for a Config. |
| |
| :meta private: |
| """ |
| check_ispytest(_ispytest) |
| return resolve_from_str(config.getini("cache_dir"), config.rootpath) |
|
|
| def warn(self, fmt: str, *, _ispytest: bool = False, **args: object) -> None: |
| """Issue a cache warning. |
| |
| :meta private: |
| """ |
| check_ispytest(_ispytest) |
| import warnings |
|
|
| from _pytest.warning_types import PytestCacheWarning |
|
|
| warnings.warn( |
| PytestCacheWarning(fmt.format(**args) if args else fmt), |
| self._config.hook, |
| stacklevel=3, |
| ) |
|
|
| def _mkdir(self, path: Path) -> None: |
| self._ensure_cache_dir_and_supporting_files() |
| path.mkdir(exist_ok=True, parents=True) |
|
|
| def mkdir(self, name: str) -> Path: |
| """Return a directory path object with the given name. |
| |
| If the directory does not yet exist, it will be created. You can use |
| it to manage files to e.g. store/retrieve database dumps across test |
| sessions. |
| |
| .. versionadded:: 7.0 |
| |
| :param name: |
| Must be a string not containing a ``/`` separator. |
| Make sure the name contains your plugin or application |
| identifiers to prevent clashes with other cache users. |
| """ |
| path = Path(name) |
| if len(path.parts) > 1: |
| raise ValueError("name is not allowed to contain path separators") |
| res = self._cachedir.joinpath(self._CACHE_PREFIX_DIRS, path) |
| self._mkdir(res) |
| return res |
|
|
| def _getvaluepath(self, key: str) -> Path: |
| return self._cachedir.joinpath(self._CACHE_PREFIX_VALUES, Path(key)) |
|
|
| def get(self, key: str, default): |
| """Return the cached value for the given key. |
| |
| If no value was yet cached or the value cannot be read, the specified |
| default is returned. |
| |
| :param key: |
| Must be a ``/`` separated value. Usually the first |
| name is the name of your plugin or your application. |
| :param default: |
| The value to return in case of a cache-miss or invalid cache value. |
| """ |
| path = self._getvaluepath(key) |
| try: |
| with path.open("r", encoding="UTF-8") as f: |
| return json.load(f) |
| except (ValueError, OSError): |
| return default |
|
|
| def set(self, key: str, value: object) -> None: |
| """Save value for the given key. |
| |
| :param key: |
| Must be a ``/`` separated value. Usually the first |
| name is the name of your plugin or your application. |
| :param value: |
| Must be of any combination of basic python types, |
| including nested types like lists of dictionaries. |
| """ |
| path = self._getvaluepath(key) |
| try: |
| self._mkdir(path.parent) |
| except OSError as exc: |
| self.warn( |
| f"could not create cache path {path}: {exc}", |
| _ispytest=True, |
| ) |
| return |
| data = json.dumps(value, ensure_ascii=False, indent=2) |
| try: |
| f = path.open("w", encoding="UTF-8") |
| except OSError as exc: |
| self.warn( |
| f"cache could not write path {path}: {exc}", |
| _ispytest=True, |
| ) |
| else: |
| with f: |
| f.write(data) |
|
|
| def _ensure_cache_dir_and_supporting_files(self) -> None: |
| """Create the cache dir and its supporting files.""" |
| if self._cachedir.is_dir(): |
| return |
|
|
| self._cachedir.parent.mkdir(parents=True, exist_ok=True) |
| with tempfile.TemporaryDirectory( |
| prefix="pytest-cache-files-", |
| dir=self._cachedir.parent, |
| ) as newpath: |
| path = Path(newpath) |
|
|
| |
| |
| umask = os.umask(0o022) |
| os.umask(umask) |
| path.chmod(0o777 - umask) |
|
|
| with open(path.joinpath("README.md"), "x", encoding="UTF-8") as f: |
| f.write(README_CONTENT) |
| with open(path.joinpath(".gitignore"), "x", encoding="UTF-8") as f: |
| f.write("# Created by pytest automatically.\n*\n") |
| with open(path.joinpath("CACHEDIR.TAG"), "xb") as f: |
| f.write(CACHEDIR_TAG_CONTENT) |
|
|
| try: |
| path.rename(self._cachedir) |
| except OSError as e: |
| |
| |
| |
| |
| |
| if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): |
| raise |
| else: |
| |
| |
| |
| |
| |
| |
| |
| path.mkdir() |
|
|
|
|
| class LFPluginCollWrapper: |
| def __init__(self, lfplugin: LFPlugin) -> None: |
| self.lfplugin = lfplugin |
| self._collected_at_least_one_failure = False |
|
|
| @hookimpl(wrapper=True) |
| def pytest_make_collect_report( |
| self, collector: nodes.Collector |
| ) -> Generator[None, CollectReport, CollectReport]: |
| res = yield |
| if isinstance(collector, (Session, Directory)): |
| |
| lf_paths = self.lfplugin._last_failed_paths |
|
|
| |
| def sort_key(node: nodes.Item | nodes.Collector) -> bool: |
| return node.path in lf_paths |
|
|
| res.result = sorted( |
| res.result, |
| key=sort_key, |
| reverse=True, |
| ) |
|
|
| elif isinstance(collector, File): |
| if collector.path in self.lfplugin._last_failed_paths: |
| result = res.result |
| lastfailed = self.lfplugin.lastfailed |
|
|
| |
| if not self._collected_at_least_one_failure: |
| if not any(x.nodeid in lastfailed for x in result): |
| return res |
| self.lfplugin.config.pluginmanager.register( |
| LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip" |
| ) |
| self._collected_at_least_one_failure = True |
|
|
| session = collector.session |
| result[:] = [ |
| x |
| for x in result |
| if x.nodeid in lastfailed |
| |
| or session.isinitpath(x.path) |
| |
| or isinstance(x, nodes.Collector) |
| ] |
|
|
| return res |
|
|
|
|
| class LFPluginCollSkipfiles: |
| def __init__(self, lfplugin: LFPlugin) -> None: |
| self.lfplugin = lfplugin |
|
|
| @hookimpl |
| def pytest_make_collect_report( |
| self, collector: nodes.Collector |
| ) -> CollectReport | None: |
| if isinstance(collector, File): |
| if collector.path not in self.lfplugin._last_failed_paths: |
| self.lfplugin._skipped_files += 1 |
|
|
| return CollectReport( |
| collector.nodeid, "passed", longrepr=None, result=[] |
| ) |
| return None |
|
|
|
|
| class LFPlugin: |
| """Plugin which implements the --lf (run last-failing) option.""" |
|
|
| def __init__(self, config: Config) -> None: |
| self.config = config |
| active_keys = "lf", "failedfirst" |
| self.active = any(config.getoption(key) for key in active_keys) |
| assert config.cache |
| self.lastfailed: dict[str, bool] = config.cache.get("cache/lastfailed", {}) |
| self._previously_failed_count: int | None = None |
| self._report_status: str | None = None |
| self._skipped_files = 0 |
|
|
| if config.getoption("lf"): |
| self._last_failed_paths = self.get_last_failed_paths() |
| config.pluginmanager.register( |
| LFPluginCollWrapper(self), "lfplugin-collwrapper" |
| ) |
|
|
| def get_last_failed_paths(self) -> set[Path]: |
| """Return a set with all Paths of the previously failed nodeids and |
| their parents.""" |
| rootpath = self.config.rootpath |
| result = set() |
| for nodeid in self.lastfailed: |
| path = rootpath / nodeid.split("::")[0] |
| result.add(path) |
| result.update(path.parents) |
| return {x for x in result if x.exists()} |
|
|
| def pytest_report_collectionfinish(self) -> str | None: |
| if self.active and self.config.get_verbosity() >= 0: |
| return f"run-last-failure: {self._report_status}" |
| return None |
|
|
| def pytest_runtest_logreport(self, report: TestReport) -> None: |
| if (report.when == "call" and report.passed) or report.skipped: |
| self.lastfailed.pop(report.nodeid, None) |
| elif report.failed: |
| self.lastfailed[report.nodeid] = True |
|
|
| def pytest_collectreport(self, report: CollectReport) -> None: |
| passed = report.outcome in ("passed", "skipped") |
| if passed: |
| if report.nodeid in self.lastfailed: |
| self.lastfailed.pop(report.nodeid) |
| self.lastfailed.update((item.nodeid, True) for item in report.result) |
| else: |
| self.lastfailed[report.nodeid] = True |
|
|
| @hookimpl(wrapper=True, tryfirst=True) |
| def pytest_collection_modifyitems( |
| self, config: Config, items: list[nodes.Item] |
| ) -> Generator[None]: |
| res = yield |
|
|
| if not self.active: |
| return res |
|
|
| if self.lastfailed: |
| previously_failed = [] |
| previously_passed = [] |
| for item in items: |
| if item.nodeid in self.lastfailed: |
| previously_failed.append(item) |
| else: |
| previously_passed.append(item) |
| self._previously_failed_count = len(previously_failed) |
|
|
| if not previously_failed: |
| |
| |
| self._report_status = ( |
| f"{len(self.lastfailed)} known failures not in selected tests" |
| ) |
| else: |
| if self.config.getoption("lf"): |
| items[:] = previously_failed |
| config.hook.pytest_deselected(items=previously_passed) |
| else: |
| items[:] = previously_failed + previously_passed |
|
|
| noun = "failure" if self._previously_failed_count == 1 else "failures" |
| suffix = " first" if self.config.getoption("failedfirst") else "" |
| self._report_status = ( |
| f"rerun previous {self._previously_failed_count} {noun}{suffix}" |
| ) |
|
|
| if self._skipped_files > 0: |
| files_noun = "file" if self._skipped_files == 1 else "files" |
| self._report_status += f" (skipped {self._skipped_files} {files_noun})" |
| else: |
| self._report_status = "no previously failed tests, " |
| if self.config.getoption("last_failed_no_failures") == "none": |
| self._report_status += "deselecting all items." |
| config.hook.pytest_deselected(items=items[:]) |
| items[:] = [] |
| else: |
| self._report_status += "not deselecting items." |
|
|
| return res |
|
|
| def pytest_sessionfinish(self, session: Session) -> None: |
| config = self.config |
| if config.getoption("cacheshow") or hasattr(config, "workerinput"): |
| return |
|
|
| assert config.cache is not None |
| saved_lastfailed = config.cache.get("cache/lastfailed", {}) |
| if saved_lastfailed != self.lastfailed: |
| config.cache.set("cache/lastfailed", self.lastfailed) |
|
|
|
|
| class NFPlugin: |
| """Plugin which implements the --nf (run new-first) option.""" |
|
|
| def __init__(self, config: Config) -> None: |
| self.config = config |
| self.active = config.option.newfirst |
| assert config.cache is not None |
| self.cached_nodeids = set(config.cache.get("cache/nodeids", [])) |
|
|
| @hookimpl(wrapper=True, tryfirst=True) |
| def pytest_collection_modifyitems(self, items: list[nodes.Item]) -> Generator[None]: |
| res = yield |
|
|
| if self.active: |
| new_items: dict[str, nodes.Item] = {} |
| other_items: dict[str, nodes.Item] = {} |
| for item in items: |
| if item.nodeid not in self.cached_nodeids: |
| new_items[item.nodeid] = item |
| else: |
| other_items[item.nodeid] = item |
|
|
| items[:] = self._get_increasing_order( |
| new_items.values() |
| ) + self._get_increasing_order(other_items.values()) |
| self.cached_nodeids.update(new_items) |
| else: |
| self.cached_nodeids.update(item.nodeid for item in items) |
|
|
| return res |
|
|
| def _get_increasing_order(self, items: Iterable[nodes.Item]) -> list[nodes.Item]: |
| return sorted(items, key=lambda item: item.path.stat().st_mtime, reverse=True) |
|
|
| def pytest_sessionfinish(self) -> None: |
| config = self.config |
| if config.getoption("cacheshow") or hasattr(config, "workerinput"): |
| return |
|
|
| if config.getoption("collectonly"): |
| return |
|
|
| assert config.cache is not None |
| config.cache.set("cache/nodeids", sorted(self.cached_nodeids)) |
|
|
|
|
| def pytest_addoption(parser: Parser) -> None: |
| group = parser.getgroup("general") |
| group.addoption( |
| "--lf", |
| "--last-failed", |
| action="store_true", |
| dest="lf", |
| help="Rerun only the tests that failed at the last run (or all if none failed)", |
| ) |
| group.addoption( |
| "--ff", |
| "--failed-first", |
| action="store_true", |
| dest="failedfirst", |
| help="Run all tests, but run the last failures first. " |
| "This may re-order tests and thus lead to " |
| "repeated fixture setup/teardown.", |
| ) |
| group.addoption( |
| "--nf", |
| "--new-first", |
| action="store_true", |
| dest="newfirst", |
| help="Run tests from new files first, then the rest of the tests " |
| "sorted by file mtime", |
| ) |
| group.addoption( |
| "--cache-show", |
| action="append", |
| nargs="?", |
| dest="cacheshow", |
| help=( |
| "Show cache contents, don't perform collection or tests. " |
| "Optional argument: glob (default: '*')." |
| ), |
| ) |
| group.addoption( |
| "--cache-clear", |
| action="store_true", |
| dest="cacheclear", |
| help="Remove all cache contents at start of test run", |
| ) |
| cache_dir_default = ".pytest_cache" |
| if "TOX_ENV_DIR" in os.environ: |
| cache_dir_default = os.path.join(os.environ["TOX_ENV_DIR"], cache_dir_default) |
| parser.addini("cache_dir", default=cache_dir_default, help="Cache directory path") |
| group.addoption( |
| "--lfnf", |
| "--last-failed-no-failures", |
| action="store", |
| dest="last_failed_no_failures", |
| choices=("all", "none"), |
| default="all", |
| help="With ``--lf``, determines whether to execute tests when there " |
| "are no previously (known) failures or when no " |
| "cached ``lastfailed`` data was found. " |
| "``all`` (the default) runs the full test suite again. " |
| "``none`` just emits a message about no known failures and exits successfully.", |
| ) |
|
|
|
|
| def pytest_cmdline_main(config: Config) -> int | ExitCode | None: |
| if config.option.cacheshow and not config.option.help: |
| from _pytest.main import wrap_session |
|
|
| return wrap_session(config, cacheshow) |
| return None |
|
|
|
|
| @hookimpl(tryfirst=True) |
| def pytest_configure(config: Config) -> None: |
| config.cache = Cache.for_config(config, _ispytest=True) |
| config.pluginmanager.register(LFPlugin(config), "lfplugin") |
| config.pluginmanager.register(NFPlugin(config), "nfplugin") |
|
|
|
|
| @fixture |
| def cache(request: FixtureRequest) -> Cache: |
| """Return a cache object that can persist state between testing sessions. |
| |
| cache.get(key, default) |
| cache.set(key, value) |
| |
| Keys must be ``/`` separated strings, where the first part is usually the |
| name of your plugin or application to avoid clashes with other cache users. |
| |
| Values can be any object handled by the json stdlib module. |
| """ |
| assert request.config.cache is not None |
| return request.config.cache |
|
|
|
|
| def pytest_report_header(config: Config) -> str | None: |
| """Display cachedir with --cache-show and if non-default.""" |
| if config.option.verbose > 0 or config.getini("cache_dir") != ".pytest_cache": |
| assert config.cache is not None |
| cachedir = config.cache._cachedir |
| |
| |
|
|
| try: |
| displaypath = cachedir.relative_to(config.rootpath) |
| except ValueError: |
| displaypath = cachedir |
| return f"cachedir: {displaypath}" |
| return None |
|
|
|
|
| def cacheshow(config: Config, session: Session) -> int: |
| from pprint import pformat |
|
|
| assert config.cache is not None |
|
|
| tw = TerminalWriter() |
| tw.line("cachedir: " + str(config.cache._cachedir)) |
| if not config.cache._cachedir.is_dir(): |
| tw.line("cache is empty") |
| return 0 |
|
|
| glob = config.option.cacheshow[0] |
| if glob is None: |
| glob = "*" |
|
|
| dummy = object() |
| basedir = config.cache._cachedir |
| vdir = basedir / Cache._CACHE_PREFIX_VALUES |
| tw.sep("-", f"cache values for {glob!r}") |
| for valpath in sorted(x for x in vdir.rglob(glob) if x.is_file()): |
| key = str(valpath.relative_to(vdir)) |
| val = config.cache.get(key, dummy) |
| if val is dummy: |
| tw.line(f"{key} contains unreadable content, will be ignored") |
| else: |
| tw.line(f"{key} contains:") |
| for line in pformat(val).splitlines(): |
| tw.line(" " + line) |
|
|
| ddir = basedir / Cache._CACHE_PREFIX_DIRS |
| if ddir.is_dir(): |
| contents = sorted(ddir.rglob(glob)) |
| tw.sep("-", f"cache directories for {glob!r}") |
| for p in contents: |
| |
| |
| if p.is_file(): |
| key = str(p.relative_to(basedir)) |
| tw.line(f"{key} is a file of length {p.stat().st_size}") |
| return 0 |
|
|