Spaces:
Paused
Paused
| import os | |
| import pathlib | |
| import tempfile | |
| import functools | |
| import contextlib | |
| import types | |
| import importlib | |
| import inspect | |
| import warnings | |
| import itertools | |
| from typing import Union, Optional, cast | |
| from .abc import ResourceReader, Traversable | |
| Package = Union[types.ModuleType, str] | |
| Anchor = Package | |
| def package_to_anchor(func): | |
| """ | |
| Replace 'package' parameter as 'anchor' and warn about the change. | |
| Other errors should fall through. | |
| >>> files('a', 'b') | |
| Traceback (most recent call last): | |
| TypeError: files() takes from 0 to 1 positional arguments but 2 were given | |
| Remove this compatibility in Python 3.14. | |
| """ | |
| undefined = object() | |
| def wrapper(anchor=undefined, package=undefined): | |
| if package is not undefined: | |
| if anchor is not undefined: | |
| return func(anchor, package) | |
| warnings.warn( | |
| "First parameter to files is renamed to 'anchor'", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| return func(package) | |
| elif anchor is undefined: | |
| return func() | |
| return func(anchor) | |
| return wrapper | |
| def files(anchor: Optional[Anchor] = None) -> Traversable: | |
| """ | |
| Get a Traversable resource for an anchor. | |
| """ | |
| return from_package(resolve(anchor)) | |
| def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: | |
| """ | |
| Return the package's loader if it's a ResourceReader. | |
| """ | |
| # We can't use | |
| # a issubclass() check here because apparently abc.'s __subclasscheck__() | |
| # hook wants to create a weak reference to the object, but | |
| # zipimport.zipimporter does not support weak references, resulting in a | |
| # TypeError. That seems terrible. | |
| spec = package.__spec__ | |
| reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore[union-attr] | |
| if reader is None: | |
| return None | |
| return reader(spec.name) # type: ignore[union-attr] | |
| def resolve(cand: Optional[Anchor]) -> types.ModuleType: | |
| return cast(types.ModuleType, cand) | |
| def _(cand: str) -> types.ModuleType: | |
| return importlib.import_module(cand) | |
| def _(cand: None) -> types.ModuleType: | |
| return resolve(_infer_caller().f_globals['__name__']) | |
| def _infer_caller(): | |
| """ | |
| Walk the stack and find the frame of the first caller not in this module. | |
| """ | |
| def is_this_file(frame_info): | |
| return frame_info.filename == stack[0].filename | |
| def is_wrapper(frame_info): | |
| return frame_info.function == 'wrapper' | |
| stack = inspect.stack() | |
| not_this_file = itertools.filterfalse(is_this_file, stack) | |
| # also exclude 'wrapper' due to singledispatch in the call stack | |
| callers = itertools.filterfalse(is_wrapper, not_this_file) | |
| return next(callers).frame | |
| def from_package(package: types.ModuleType): | |
| """ | |
| Return a Traversable object for the given package. | |
| """ | |
| # deferred for performance (python/cpython#109829) | |
| from .future.adapters import wrap_spec | |
| spec = wrap_spec(package) | |
| reader = spec.loader.get_resource_reader(spec.name) | |
| return reader.files() | |
| def _tempfile( | |
| reader, | |
| suffix='', | |
| # gh-93353: Keep a reference to call os.remove() in late Python | |
| # finalization. | |
| *, | |
| _os_remove=os.remove, | |
| ): | |
| # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' | |
| # blocks due to the need to close the temporary file to work on Windows | |
| # properly. | |
| fd, raw_path = tempfile.mkstemp(suffix=suffix) | |
| try: | |
| try: | |
| os.write(fd, reader()) | |
| finally: | |
| os.close(fd) | |
| del reader | |
| yield pathlib.Path(raw_path) | |
| finally: | |
| try: | |
| _os_remove(raw_path) | |
| except FileNotFoundError: | |
| pass | |
| def _temp_file(path): | |
| return _tempfile(path.read_bytes, suffix=path.name) | |
| def _is_present_dir(path: Traversable) -> bool: | |
| """ | |
| Some Traversables implement ``is_dir()`` to raise an | |
| exception (i.e. ``FileNotFoundError``) when the | |
| directory doesn't exist. This function wraps that call | |
| to always return a boolean and only return True | |
| if there's a dir and it exists. | |
| """ | |
| with contextlib.suppress(FileNotFoundError): | |
| return path.is_dir() | |
| return False | |
| def as_file(path): | |
| """ | |
| Given a Traversable object, return that object as a | |
| path on the local file system in a context manager. | |
| """ | |
| return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) | |
| def _(path): | |
| """ | |
| Degenerate behavior for pathlib.Path objects. | |
| """ | |
| yield path | |
| def _temp_path(dir: tempfile.TemporaryDirectory): | |
| """ | |
| Wrap tempfile.TemporaryDirectory to return a pathlib object. | |
| """ | |
| with dir as result: | |
| yield pathlib.Path(result) | |
| def _temp_dir(path): | |
| """ | |
| Given a traversable dir, recursively replicate the whole tree | |
| to the file system in a context manager. | |
| """ | |
| assert path.is_dir() | |
| with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: | |
| yield _write_contents(temp_dir, path) | |
| def _write_contents(target, source): | |
| child = target.joinpath(source.name) | |
| if source.is_dir(): | |
| child.mkdir() | |
| for item in source.iterdir(): | |
| _write_contents(child, item) | |
| else: | |
| child.write_bytes(source.read_bytes()) | |
| return child | |