Spaces:
Paused
Paused
| import atexit | |
| import contextlib | |
| from enum import Enum | |
| from errno import EBADF | |
| from errno import ELOOP | |
| from errno import ENOENT | |
| from errno import ENOTDIR | |
| import fnmatch | |
| from functools import partial | |
| from importlib.machinery import ModuleSpec | |
| import importlib.util | |
| import itertools | |
| import os | |
| from os.path import expanduser | |
| from os.path import expandvars | |
| from os.path import isabs | |
| from os.path import sep | |
| from pathlib import Path | |
| from pathlib import PurePath | |
| from posixpath import sep as posix_sep | |
| import shutil | |
| import sys | |
| import types | |
| from types import ModuleType | |
| from typing import Any | |
| from typing import Callable | |
| from typing import Dict | |
| from typing import Iterable | |
| from typing import Iterator | |
| from typing import List | |
| from typing import Optional | |
| from typing import Set | |
| from typing import Tuple | |
| from typing import Type | |
| from typing import TypeVar | |
| from typing import Union | |
| import uuid | |
| import warnings | |
| from _pytest.compat import assert_never | |
| from _pytest.outcomes import skip | |
| from _pytest.warning_types import PytestWarning | |
| LOCK_TIMEOUT = 60 * 60 * 24 * 3 | |
| _AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) | |
| # The following function, variables and comments were | |
| # copied from cpython 3.9 Lib/pathlib.py file. | |
| # EBADF - guard against macOS `stat` throwing EBADF | |
| _IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) | |
| _IGNORED_WINERRORS = ( | |
| 21, # ERROR_NOT_READY - drive exists but is not accessible | |
| 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself | |
| ) | |
| def _ignore_error(exception: Exception) -> bool: | |
| return ( | |
| getattr(exception, "errno", None) in _IGNORED_ERRORS | |
| or getattr(exception, "winerror", None) in _IGNORED_WINERRORS | |
| ) | |
| def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: | |
| return path.joinpath(".lock") | |
| def on_rm_rf_error( | |
| func: Optional[Callable[..., Any]], | |
| path: str, | |
| excinfo: Union[ | |
| BaseException, | |
| Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]], | |
| ], | |
| *, | |
| start_path: Path, | |
| ) -> bool: | |
| """Handle known read-only errors during rmtree. | |
| The returned value is used only by our own tests. | |
| """ | |
| if isinstance(excinfo, BaseException): | |
| exc = excinfo | |
| else: | |
| exc = excinfo[1] | |
| # Another process removed the file in the middle of the "rm_rf" (xdist for example). | |
| # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 | |
| if isinstance(exc, FileNotFoundError): | |
| return False | |
| if not isinstance(exc, PermissionError): | |
| warnings.warn( | |
| PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}") | |
| ) | |
| return False | |
| if func not in (os.rmdir, os.remove, os.unlink): | |
| if func not in (os.open,): | |
| warnings.warn( | |
| PytestWarning( | |
| f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" | |
| ) | |
| ) | |
| return False | |
| # Chmod + retry. | |
| import stat | |
| def chmod_rw(p: str) -> None: | |
| mode = os.stat(p).st_mode | |
| os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) | |
| # For files, we need to recursively go upwards in the directories to | |
| # ensure they all are also writable. | |
| p = Path(path) | |
| if p.is_file(): | |
| for parent in p.parents: | |
| chmod_rw(str(parent)) | |
| # Stop when we reach the original path passed to rm_rf. | |
| if parent == start_path: | |
| break | |
| chmod_rw(str(path)) | |
| func(path) | |
| return True | |
| def ensure_extended_length_path(path: Path) -> Path: | |
| """Get the extended-length version of a path (Windows). | |
| On Windows, by default, the maximum length of a path (MAX_PATH) is 260 | |
| characters, and operations on paths longer than that fail. But it is possible | |
| to overcome this by converting the path to "extended-length" form before | |
| performing the operation: | |
| https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation | |
| On Windows, this function returns the extended-length absolute version of path. | |
| On other platforms it returns path unchanged. | |
| """ | |
| if sys.platform.startswith("win32"): | |
| path = path.resolve() | |
| path = Path(get_extended_length_path_str(str(path))) | |
| return path | |
| def get_extended_length_path_str(path: str) -> str: | |
| """Convert a path to a Windows extended length path.""" | |
| long_path_prefix = "\\\\?\\" | |
| unc_long_path_prefix = "\\\\?\\UNC\\" | |
| if path.startswith((long_path_prefix, unc_long_path_prefix)): | |
| return path | |
| # UNC | |
| if path.startswith("\\\\"): | |
| return unc_long_path_prefix + path[2:] | |
| return long_path_prefix + path | |
| def rm_rf(path: Path) -> None: | |
| """Remove the path contents recursively, even if some elements | |
| are read-only.""" | |
| path = ensure_extended_length_path(path) | |
| onerror = partial(on_rm_rf_error, start_path=path) | |
| if sys.version_info >= (3, 12): | |
| shutil.rmtree(str(path), onexc=onerror) | |
| else: | |
| shutil.rmtree(str(path), onerror=onerror) | |
| def find_prefixed(root: Path, prefix: str) -> Iterator["os.DirEntry[str]"]: | |
| """Find all elements in root that begin with the prefix, case-insensitive.""" | |
| l_prefix = prefix.lower() | |
| for x in os.scandir(root): | |
| if x.name.lower().startswith(l_prefix): | |
| yield x | |
| def extract_suffixes(iter: Iterable["os.DirEntry[str]"], prefix: str) -> Iterator[str]: | |
| """Return the parts of the paths following the prefix. | |
| :param iter: Iterator over path names. | |
| :param prefix: Expected prefix of the path names. | |
| """ | |
| p_len = len(prefix) | |
| for entry in iter: | |
| yield entry.name[p_len:] | |
| def find_suffixes(root: Path, prefix: str) -> Iterator[str]: | |
| """Combine find_prefixes and extract_suffixes.""" | |
| return extract_suffixes(find_prefixed(root, prefix), prefix) | |
| def parse_num(maybe_num: str) -> int: | |
| """Parse number path suffixes, returns -1 on error.""" | |
| try: | |
| return int(maybe_num) | |
| except ValueError: | |
| return -1 | |
| def _force_symlink( | |
| root: Path, target: Union[str, PurePath], link_to: Union[str, Path] | |
| ) -> None: | |
| """Helper to create the current symlink. | |
| It's full of race conditions that are reasonably OK to ignore | |
| for the context of best effort linking to the latest test run. | |
| The presumption being that in case of much parallelism | |
| the inaccuracy is going to be acceptable. | |
| """ | |
| current_symlink = root.joinpath(target) | |
| try: | |
| current_symlink.unlink() | |
| except OSError: | |
| pass | |
| try: | |
| current_symlink.symlink_to(link_to) | |
| except Exception: | |
| pass | |
| def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: | |
| """Create a directory with an increased number as suffix for the given prefix.""" | |
| for i in range(10): | |
| # try up to 10 times to create the folder | |
| max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) | |
| new_number = max_existing + 1 | |
| new_path = root.joinpath(f"{prefix}{new_number}") | |
| try: | |
| new_path.mkdir(mode=mode) | |
| except Exception: | |
| pass | |
| else: | |
| _force_symlink(root, prefix + "current", new_path) | |
| return new_path | |
| else: | |
| raise OSError( | |
| "could not create numbered dir with prefix " | |
| f"{prefix} in {root} after 10 tries" | |
| ) | |
| def create_cleanup_lock(p: Path) -> Path: | |
| """Create a lock to prevent premature folder cleanup.""" | |
| lock_path = get_lock_path(p) | |
| try: | |
| fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) | |
| except FileExistsError as e: | |
| raise OSError(f"cannot create lockfile in {p}") from e | |
| else: | |
| pid = os.getpid() | |
| spid = str(pid).encode() | |
| os.write(fd, spid) | |
| os.close(fd) | |
| if not lock_path.is_file(): | |
| raise OSError("lock path got renamed after successful creation") | |
| return lock_path | |
| def register_cleanup_lock_removal( | |
| lock_path: Path, register: Any = atexit.register | |
| ) -> Any: | |
| """Register a cleanup function for removing a lock, by default on atexit.""" | |
| pid = os.getpid() | |
| def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: | |
| current_pid = os.getpid() | |
| if current_pid != original_pid: | |
| # fork | |
| return | |
| try: | |
| lock_path.unlink() | |
| except OSError: | |
| pass | |
| return register(cleanup_on_exit) | |
| def maybe_delete_a_numbered_dir(path: Path) -> None: | |
| """Remove a numbered directory if its lock can be obtained and it does | |
| not seem to be in use.""" | |
| path = ensure_extended_length_path(path) | |
| lock_path = None | |
| try: | |
| lock_path = create_cleanup_lock(path) | |
| parent = path.parent | |
| garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") | |
| path.rename(garbage) | |
| rm_rf(garbage) | |
| except OSError: | |
| # known races: | |
| # * other process did a cleanup at the same time | |
| # * deletable folder was found | |
| # * process cwd (Windows) | |
| return | |
| finally: | |
| # If we created the lock, ensure we remove it even if we failed | |
| # to properly remove the numbered dir. | |
| if lock_path is not None: | |
| try: | |
| lock_path.unlink() | |
| except OSError: | |
| pass | |
| def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: | |
| """Check if `path` is deletable based on whether the lock file is expired.""" | |
| if path.is_symlink(): | |
| return False | |
| lock = get_lock_path(path) | |
| try: | |
| if not lock.is_file(): | |
| return True | |
| except OSError: | |
| # we might not have access to the lock file at all, in this case assume | |
| # we don't have access to the entire directory (#7491). | |
| return False | |
| try: | |
| lock_time = lock.stat().st_mtime | |
| except Exception: | |
| return False | |
| else: | |
| if lock_time < consider_lock_dead_if_created_before: | |
| # We want to ignore any errors while trying to remove the lock such as: | |
| # - PermissionDenied, like the file permissions have changed since the lock creation; | |
| # - FileNotFoundError, in case another pytest process got here first; | |
| # and any other cause of failure. | |
| with contextlib.suppress(OSError): | |
| lock.unlink() | |
| return True | |
| return False | |
| def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: | |
| """Try to cleanup a folder if we can ensure it's deletable.""" | |
| if ensure_deletable(path, consider_lock_dead_if_created_before): | |
| maybe_delete_a_numbered_dir(path) | |
| def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: | |
| """List candidates for numbered directories to be removed - follows py.path.""" | |
| max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) | |
| max_delete = max_existing - keep | |
| entries = find_prefixed(root, prefix) | |
| entries, entries2 = itertools.tee(entries) | |
| numbers = map(parse_num, extract_suffixes(entries2, prefix)) | |
| for entry, number in zip(entries, numbers): | |
| if number <= max_delete: | |
| yield Path(entry) | |
| def cleanup_dead_symlinks(root: Path) -> None: | |
| for left_dir in root.iterdir(): | |
| if left_dir.is_symlink(): | |
| if not left_dir.resolve().exists(): | |
| left_dir.unlink() | |
| def cleanup_numbered_dir( | |
| root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float | |
| ) -> None: | |
| """Cleanup for lock driven numbered directories.""" | |
| if not root.exists(): | |
| return | |
| for path in cleanup_candidates(root, prefix, keep): | |
| try_cleanup(path, consider_lock_dead_if_created_before) | |
| for path in root.glob("garbage-*"): | |
| try_cleanup(path, consider_lock_dead_if_created_before) | |
| cleanup_dead_symlinks(root) | |
| def make_numbered_dir_with_cleanup( | |
| root: Path, | |
| prefix: str, | |
| keep: int, | |
| lock_timeout: float, | |
| mode: int, | |
| ) -> Path: | |
| """Create a numbered dir with a cleanup lock and remove old ones.""" | |
| e = None | |
| for i in range(10): | |
| try: | |
| p = make_numbered_dir(root, prefix, mode) | |
| # Only lock the current dir when keep is not 0 | |
| if keep != 0: | |
| lock_path = create_cleanup_lock(p) | |
| register_cleanup_lock_removal(lock_path) | |
| except Exception as exc: | |
| e = exc | |
| else: | |
| consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout | |
| # Register a cleanup for program exit | |
| atexit.register( | |
| cleanup_numbered_dir, | |
| root, | |
| prefix, | |
| keep, | |
| consider_lock_dead_if_created_before, | |
| ) | |
| return p | |
| assert e is not None | |
| raise e | |
| def resolve_from_str(input: str, rootpath: Path) -> Path: | |
| input = expanduser(input) | |
| input = expandvars(input) | |
| if isabs(input): | |
| return Path(input) | |
| else: | |
| return rootpath.joinpath(input) | |
| def fnmatch_ex(pattern: str, path: Union[str, "os.PathLike[str]"]) -> bool: | |
| """A port of FNMatcher from py.path.common which works with PurePath() instances. | |
| The difference between this algorithm and PurePath.match() is that the | |
| latter matches "**" glob expressions for each part of the path, while | |
| this algorithm uses the whole path instead. | |
| For example: | |
| "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" | |
| with this algorithm, but not with PurePath.match(). | |
| This algorithm was ported to keep backward-compatibility with existing | |
| settings which assume paths match according this logic. | |
| References: | |
| * https://bugs.python.org/issue29249 | |
| * https://bugs.python.org/issue34731 | |
| """ | |
| path = PurePath(path) | |
| iswin32 = sys.platform.startswith("win") | |
| if iswin32 and sep not in pattern and posix_sep in pattern: | |
| # Running on Windows, the pattern has no Windows path separators, | |
| # and the pattern has one or more Posix path separators. Replace | |
| # the Posix path separators with the Windows path separator. | |
| pattern = pattern.replace(posix_sep, sep) | |
| if sep not in pattern: | |
| name = path.name | |
| else: | |
| name = str(path) | |
| if path.is_absolute() and not os.path.isabs(pattern): | |
| pattern = f"*{os.sep}{pattern}" | |
| return fnmatch.fnmatch(name, pattern) | |
| def parts(s: str) -> Set[str]: | |
| parts = s.split(sep) | |
| return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} | |
| def symlink_or_skip( | |
| src: Union["os.PathLike[str]", str], | |
| dst: Union["os.PathLike[str]", str], | |
| **kwargs: Any, | |
| ) -> None: | |
| """Make a symlink, or skip the test in case symlinks are not supported.""" | |
| try: | |
| os.symlink(src, dst, **kwargs) | |
| except OSError as e: | |
| skip(f"symlinks not supported: {e}") | |
| class ImportMode(Enum): | |
| """Possible values for `mode` parameter of `import_path`.""" | |
| prepend = "prepend" | |
| append = "append" | |
| importlib = "importlib" | |
| class ImportPathMismatchError(ImportError): | |
| """Raised on import_path() if there is a mismatch of __file__'s. | |
| This can happen when `import_path` is called multiple times with different filenames that has | |
| the same basename but reside in packages | |
| (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). | |
| """ | |
| def import_path( | |
| path: Union[str, "os.PathLike[str]"], | |
| *, | |
| mode: Union[str, ImportMode] = ImportMode.prepend, | |
| root: Path, | |
| consider_namespace_packages: bool, | |
| ) -> ModuleType: | |
| """ | |
| Import and return a module from the given path, which can be a file (a module) or | |
| a directory (a package). | |
| :param path: | |
| Path to the file to import. | |
| :param mode: | |
| Controls the underlying import mechanism that will be used: | |
| * ImportMode.prepend: the directory containing the module (or package, taking | |
| `__init__.py` files into account) will be put at the *start* of `sys.path` before | |
| being imported with `importlib.import_module`. | |
| * ImportMode.append: same as `prepend`, but the directory will be appended | |
| to the end of `sys.path`, if not already in `sys.path`. | |
| * ImportMode.importlib: uses more fine control mechanisms provided by `importlib` | |
| to import the module, which avoids having to muck with `sys.path` at all. It effectively | |
| allows having same-named test modules in different places. | |
| :param root: | |
| Used as an anchor when mode == ImportMode.importlib to obtain | |
| a unique name for the module being imported so it can safely be stored | |
| into ``sys.modules``. | |
| :param consider_namespace_packages: | |
| If True, consider namespace packages when resolving module names. | |
| :raises ImportPathMismatchError: | |
| If after importing the given `path` and the module `__file__` | |
| are different. Only raised in `prepend` and `append` modes. | |
| """ | |
| path = Path(path) | |
| mode = ImportMode(mode) | |
| if not path.exists(): | |
| raise ImportError(path) | |
| if mode is ImportMode.importlib: | |
| # Try to import this module using the standard import mechanisms, but | |
| # without touching sys.path. | |
| try: | |
| pkg_root, module_name = resolve_pkg_root_and_module_name( | |
| path, consider_namespace_packages=consider_namespace_packages | |
| ) | |
| except CouldNotResolvePathError: | |
| pass | |
| else: | |
| # If the given module name is already in sys.modules, do not import it again. | |
| with contextlib.suppress(KeyError): | |
| return sys.modules[module_name] | |
| mod = _import_module_using_spec( | |
| module_name, path, pkg_root, insert_modules=False | |
| ) | |
| if mod is not None: | |
| return mod | |
| # Could not import the module with the current sys.path, so we fall back | |
| # to importing the file as a single module, not being a part of a package. | |
| module_name = module_name_from_path(path, root) | |
| with contextlib.suppress(KeyError): | |
| return sys.modules[module_name] | |
| mod = _import_module_using_spec( | |
| module_name, path, path.parent, insert_modules=True | |
| ) | |
| if mod is None: | |
| raise ImportError(f"Can't find module {module_name} at location {path}") | |
| return mod | |
| try: | |
| pkg_root, module_name = resolve_pkg_root_and_module_name( | |
| path, consider_namespace_packages=consider_namespace_packages | |
| ) | |
| except CouldNotResolvePathError: | |
| pkg_root, module_name = path.parent, path.stem | |
| # Change sys.path permanently: restoring it at the end of this function would cause surprising | |
| # problems because of delayed imports: for example, a conftest.py file imported by this function | |
| # might have local imports, which would fail at runtime if we restored sys.path. | |
| if mode is ImportMode.append: | |
| if str(pkg_root) not in sys.path: | |
| sys.path.append(str(pkg_root)) | |
| elif mode is ImportMode.prepend: | |
| if str(pkg_root) != sys.path[0]: | |
| sys.path.insert(0, str(pkg_root)) | |
| else: | |
| assert_never(mode) | |
| importlib.import_module(module_name) | |
| mod = sys.modules[module_name] | |
| if path.name == "__init__.py": | |
| return mod | |
| ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") | |
| if ignore != "1": | |
| module_file = mod.__file__ | |
| if module_file is None: | |
| raise ImportPathMismatchError(module_name, module_file, path) | |
| if module_file.endswith((".pyc", ".pyo")): | |
| module_file = module_file[:-1] | |
| if module_file.endswith(os.sep + "__init__.py"): | |
| module_file = module_file[: -(len(os.sep + "__init__.py"))] | |
| try: | |
| is_same = _is_same(str(path), module_file) | |
| except FileNotFoundError: | |
| is_same = False | |
| if not is_same: | |
| raise ImportPathMismatchError(module_name, module_file, path) | |
| return mod | |
| def _import_module_using_spec( | |
| module_name: str, module_path: Path, module_location: Path, *, insert_modules: bool | |
| ) -> Optional[ModuleType]: | |
| """ | |
| Tries to import a module by its canonical name, path to the .py file, and its | |
| parent location. | |
| :param insert_modules: | |
| If True, will call insert_missing_modules to create empty intermediate modules | |
| for made-up module names (when importing test files not reachable from sys.path). | |
| """ | |
| # Checking with sys.meta_path first in case one of its hooks can import this module, | |
| # such as our own assertion-rewrite hook. | |
| for meta_importer in sys.meta_path: | |
| spec = meta_importer.find_spec(module_name, [str(module_location)]) | |
| if spec_matches_module_path(spec, module_path): | |
| break | |
| else: | |
| spec = importlib.util.spec_from_file_location(module_name, str(module_path)) | |
| if spec_matches_module_path(spec, module_path): | |
| assert spec is not None | |
| # Attempt to import the parent module, seems is our responsibility: | |
| # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311 | |
| parent_module_name, _, name = module_name.rpartition(".") | |
| parent_module: Optional[ModuleType] = None | |
| if parent_module_name: | |
| parent_module = sys.modules.get(parent_module_name) | |
| if parent_module is None: | |
| # Find the directory of this module's parent. | |
| parent_dir = ( | |
| module_path.parent.parent | |
| if module_path.name == "__init__.py" | |
| else module_path.parent | |
| ) | |
| # Consider the parent module path as its __init__.py file, if it has one. | |
| parent_module_path = ( | |
| parent_dir / "__init__.py" | |
| if (parent_dir / "__init__.py").is_file() | |
| else parent_dir | |
| ) | |
| parent_module = _import_module_using_spec( | |
| parent_module_name, | |
| parent_module_path, | |
| parent_dir, | |
| insert_modules=insert_modules, | |
| ) | |
| # Find spec and import this module. | |
| mod = importlib.util.module_from_spec(spec) | |
| sys.modules[module_name] = mod | |
| spec.loader.exec_module(mod) # type: ignore[union-attr] | |
| # Set this module as an attribute of the parent module (#12194). | |
| if parent_module is not None: | |
| setattr(parent_module, name, mod) | |
| if insert_modules: | |
| insert_missing_modules(sys.modules, module_name) | |
| return mod | |
| return None | |
| def spec_matches_module_path( | |
| module_spec: Optional[ModuleSpec], module_path: Path | |
| ) -> bool: | |
| """Return true if the given ModuleSpec can be used to import the given module path.""" | |
| if module_spec is None or module_spec.origin is None: | |
| return False | |
| return Path(module_spec.origin) == module_path | |
| # Implement a special _is_same function on Windows which returns True if the two filenames | |
| # compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). | |
| if sys.platform.startswith("win"): | |
| def _is_same(f1: str, f2: str) -> bool: | |
| return Path(f1) == Path(f2) or os.path.samefile(f1, f2) | |
| else: | |
| def _is_same(f1: str, f2: str) -> bool: | |
| return os.path.samefile(f1, f2) | |
| def module_name_from_path(path: Path, root: Path) -> str: | |
| """ | |
| Return a dotted module name based on the given path, anchored on root. | |
| For example: path="projects/src/tests/test_foo.py" and root="/projects", the | |
| resulting module name will be "src.tests.test_foo". | |
| """ | |
| path = path.with_suffix("") | |
| try: | |
| relative_path = path.relative_to(root) | |
| except ValueError: | |
| # If we can't get a relative path to root, use the full path, except | |
| # for the first part ("d:\\" or "/" depending on the platform, for example). | |
| path_parts = path.parts[1:] | |
| else: | |
| # Use the parts for the relative path to the root path. | |
| path_parts = relative_path.parts | |
| # Module name for packages do not contain the __init__ file, unless | |
| # the `__init__.py` file is at the root. | |
| if len(path_parts) >= 2 and path_parts[-1] == "__init__": | |
| path_parts = path_parts[:-1] | |
| # Module names cannot contain ".", normalize them to "_". This prevents | |
| # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules. | |
| # Also, important to replace "." at the start of paths, as those are considered relative imports. | |
| path_parts = tuple(x.replace(".", "_") for x in path_parts) | |
| return ".".join(path_parts) | |
| def insert_missing_modules(modules: Dict[str, ModuleType], module_name: str) -> None: | |
| """ | |
| Used by ``import_path`` to create intermediate modules when using mode=importlib. | |
| When we want to import a module as "src.tests.test_foo" for example, we need | |
| to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo", | |
| otherwise "src.tests.test_foo" is not importable by ``__import__``. | |
| """ | |
| module_parts = module_name.split(".") | |
| while module_name: | |
| parent_module_name, _, child_name = module_name.rpartition(".") | |
| if parent_module_name: | |
| parent_module = modules.get(parent_module_name) | |
| if parent_module is None: | |
| try: | |
| # If sys.meta_path is empty, calling import_module will issue | |
| # a warning and raise ModuleNotFoundError. To avoid the | |
| # warning, we check sys.meta_path explicitly and raise the error | |
| # ourselves to fall back to creating a dummy module. | |
| if not sys.meta_path: | |
| raise ModuleNotFoundError | |
| parent_module = importlib.import_module(parent_module_name) | |
| except ModuleNotFoundError: | |
| parent_module = ModuleType( | |
| module_name, | |
| doc="Empty module created by pytest's importmode=importlib.", | |
| ) | |
| modules[parent_module_name] = parent_module | |
| # Add child attribute to the parent that can reference the child | |
| # modules. | |
| if not hasattr(parent_module, child_name): | |
| setattr(parent_module, child_name, modules[module_name]) | |
| module_parts.pop(-1) | |
| module_name = ".".join(module_parts) | |
| def resolve_package_path(path: Path) -> Optional[Path]: | |
| """Return the Python package path by looking for the last | |
| directory upwards which still contains an __init__.py. | |
| Returns None if it cannot be determined. | |
| """ | |
| result = None | |
| for parent in itertools.chain((path,), path.parents): | |
| if parent.is_dir(): | |
| if not (parent / "__init__.py").is_file(): | |
| break | |
| if not parent.name.isidentifier(): | |
| break | |
| result = parent | |
| return result | |
| def resolve_pkg_root_and_module_name( | |
| path: Path, *, consider_namespace_packages: bool = False | |
| ) -> Tuple[Path, str]: | |
| """ | |
| Return the path to the directory of the root package that contains the | |
| given Python file, and its module name: | |
| src/ | |
| app/ | |
| __init__.py | |
| core/ | |
| __init__.py | |
| models.py | |
| Passing the full path to `models.py` will yield Path("src") and "app.core.models". | |
| If consider_namespace_packages is True, then we additionally check upwards in the hierarchy | |
| for namespace packages: | |
| https://packaging.python.org/en/latest/guides/packaging-namespace-packages | |
| Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files). | |
| """ | |
| pkg_root: Optional[Path] = None | |
| pkg_path = resolve_package_path(path) | |
| if pkg_path is not None: | |
| pkg_root = pkg_path.parent | |
| if consider_namespace_packages: | |
| start = pkg_root if pkg_root is not None else path.parent | |
| for candidate in (start, *start.parents): | |
| module_name = compute_module_name(candidate, path) | |
| if module_name and is_importable(module_name, path): | |
| # Point the pkg_root to the root of the namespace package. | |
| pkg_root = candidate | |
| break | |
| if pkg_root is not None: | |
| module_name = compute_module_name(pkg_root, path) | |
| if module_name: | |
| return pkg_root, module_name | |
| raise CouldNotResolvePathError(f"Could not resolve for {path}") | |
| def is_importable(module_name: str, module_path: Path) -> bool: | |
| """ | |
| Return if the given module path could be imported normally by Python, akin to the user | |
| entering the REPL and importing the corresponding module name directly, and corresponds | |
| to the module_path specified. | |
| :param module_name: | |
| Full module name that we want to check if is importable. | |
| For example, "app.models". | |
| :param module_path: | |
| Full path to the python module/package we want to check if is importable. | |
| For example, "/projects/src/app/models.py". | |
| """ | |
| try: | |
| # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through | |
| # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``). | |
| # Using importlib.util.find_spec() is different, it gives the same results as trying to import | |
| # the module normally in the REPL. | |
| spec = importlib.util.find_spec(module_name) | |
| except (ImportError, ValueError, ImportWarning): | |
| return False | |
| else: | |
| return spec_matches_module_path(spec, module_path) | |
| def compute_module_name(root: Path, module_path: Path) -> Optional[str]: | |
| """Compute a module name based on a path and a root anchor.""" | |
| try: | |
| path_without_suffix = module_path.with_suffix("") | |
| except ValueError: | |
| # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter). | |
| return None | |
| try: | |
| relative = path_without_suffix.relative_to(root) | |
| except ValueError: # pragma: no cover | |
| return None | |
| names = list(relative.parts) | |
| if not names: | |
| return None | |
| if names[-1] == "__init__": | |
| names.pop() | |
| return ".".join(names) | |
| class CouldNotResolvePathError(Exception): | |
| """Custom exception raised by resolve_pkg_root_and_module_name.""" | |
| def scandir( | |
| path: Union[str, "os.PathLike[str]"], | |
| sort_key: Callable[["os.DirEntry[str]"], object] = lambda entry: entry.name, | |
| ) -> List["os.DirEntry[str]"]: | |
| """Scan a directory recursively, in breadth-first order. | |
| The returned entries are sorted according to the given key. | |
| The default is to sort by name. | |
| """ | |
| entries = [] | |
| with os.scandir(path) as s: | |
| # Skip entries with symlink loops and other brokenness, so the caller | |
| # doesn't have to deal with it. | |
| for entry in s: | |
| try: | |
| entry.is_file() | |
| except OSError as err: | |
| if _ignore_error(err): | |
| continue | |
| raise | |
| entries.append(entry) | |
| entries.sort(key=sort_key) # type: ignore[arg-type] | |
| return entries | |
| def visit( | |
| path: Union[str, "os.PathLike[str]"], recurse: Callable[["os.DirEntry[str]"], bool] | |
| ) -> Iterator["os.DirEntry[str]"]: | |
| """Walk a directory recursively, in breadth-first order. | |
| The `recurse` predicate determines whether a directory is recursed. | |
| Entries at each directory level are sorted. | |
| """ | |
| entries = scandir(path) | |
| yield from entries | |
| for entry in entries: | |
| if entry.is_dir() and recurse(entry): | |
| yield from visit(entry.path, recurse) | |
| def absolutepath(path: "Union[str, os.PathLike[str]]") -> Path: | |
| """Convert a path to an absolute path using os.path.abspath. | |
| Prefer this over Path.resolve() (see #6523). | |
| Prefer this over Path.absolute() (not public, doesn't normalize). | |
| """ | |
| return Path(os.path.abspath(path)) | |
| def commonpath(path1: Path, path2: Path) -> Optional[Path]: | |
| """Return the common part shared with the other path, or None if there is | |
| no common part. | |
| If one path is relative and one is absolute, returns None. | |
| """ | |
| try: | |
| return Path(os.path.commonpath((str(path1), str(path2)))) | |
| except ValueError: | |
| return None | |
| def bestrelpath(directory: Path, dest: Path) -> str: | |
| """Return a string which is a relative path from directory to dest such | |
| that directory/bestrelpath == dest. | |
| The paths must be either both absolute or both relative. | |
| If no such path can be determined, returns dest. | |
| """ | |
| assert isinstance(directory, Path) | |
| assert isinstance(dest, Path) | |
| if dest == directory: | |
| return os.curdir | |
| # Find the longest common directory. | |
| base = commonpath(directory, dest) | |
| # Can be the case on Windows for two absolute paths on different drives. | |
| # Can be the case for two relative paths without common prefix. | |
| # Can be the case for a relative path and an absolute path. | |
| if not base: | |
| return str(dest) | |
| reldirectory = directory.relative_to(base) | |
| reldest = dest.relative_to(base) | |
| return os.path.join( | |
| # Back from directory to base. | |
| *([os.pardir] * len(reldirectory.parts)), | |
| # Forward from base to dest. | |
| *reldest.parts, | |
| ) | |
| def safe_exists(p: Path) -> bool: | |
| """Like Path.exists(), but account for input arguments that might be too long (#11394).""" | |
| try: | |
| return p.exists() | |
| except (ValueError, OSError): | |
| # ValueError: stat: path too long for Windows | |
| # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect | |
| return False | |