Spaces:
Runtime error
Runtime error
| import io | |
| import posixpath | |
| import zipfile | |
| import itertools | |
| import contextlib | |
| import sys | |
| import pathlib | |
| if sys.version_info < (3, 7): | |
| from collections import OrderedDict | |
| else: | |
| OrderedDict = dict | |
| __all__ = ['Path'] | |
| def _parents(path): | |
| """ | |
| Given a path with elements separated by | |
| posixpath.sep, generate all parents of that path. | |
| >>> list(_parents('b/d')) | |
| ['b'] | |
| >>> list(_parents('/b/d/')) | |
| ['/b'] | |
| >>> list(_parents('b/d/f/')) | |
| ['b/d', 'b'] | |
| >>> list(_parents('b')) | |
| [] | |
| >>> list(_parents('')) | |
| [] | |
| """ | |
| return itertools.islice(_ancestry(path), 1, None) | |
| def _ancestry(path): | |
| """ | |
| Given a path with elements separated by | |
| posixpath.sep, generate all elements of that path | |
| >>> list(_ancestry('b/d')) | |
| ['b/d', 'b'] | |
| >>> list(_ancestry('/b/d/')) | |
| ['/b/d', '/b'] | |
| >>> list(_ancestry('b/d/f/')) | |
| ['b/d/f', 'b/d', 'b'] | |
| >>> list(_ancestry('b')) | |
| ['b'] | |
| >>> list(_ancestry('')) | |
| [] | |
| """ | |
| path = path.rstrip(posixpath.sep) | |
| while path and path != posixpath.sep: | |
| yield path | |
| path, tail = posixpath.split(path) | |
| _dedupe = OrderedDict.fromkeys | |
| """Deduplicate an iterable in original order""" | |
| def _difference(minuend, subtrahend): | |
| """ | |
| Return items in minuend not in subtrahend, retaining order | |
| with O(1) lookup. | |
| """ | |
| return itertools.filterfalse(set(subtrahend).__contains__, minuend) | |
| class CompleteDirs(zipfile.ZipFile): | |
| """ | |
| A ZipFile subclass that ensures that implied directories | |
| are always included in the namelist. | |
| """ | |
| def _implied_dirs(names): | |
| parents = itertools.chain.from_iterable(map(_parents, names)) | |
| as_dirs = (p + posixpath.sep for p in parents) | |
| return _dedupe(_difference(as_dirs, names)) | |
| def namelist(self): | |
| names = super(CompleteDirs, self).namelist() | |
| return names + list(self._implied_dirs(names)) | |
| def _name_set(self): | |
| return set(self.namelist()) | |
| def resolve_dir(self, name): | |
| """ | |
| If the name represents a directory, return that name | |
| as a directory (with the trailing slash). | |
| """ | |
| names = self._name_set() | |
| dirname = name + '/' | |
| dir_match = name not in names and dirname in names | |
| return dirname if dir_match else name | |
| def make(cls, source): | |
| """ | |
| Given a source (filename or zipfile), return an | |
| appropriate CompleteDirs subclass. | |
| """ | |
| if isinstance(source, CompleteDirs): | |
| return source | |
| if not isinstance(source, zipfile.ZipFile): | |
| return cls(_pathlib_compat(source)) | |
| # Only allow for FastLookup when supplied zipfile is read-only | |
| if 'r' not in source.mode: | |
| cls = CompleteDirs | |
| source.__class__ = cls | |
| return source | |
| class FastLookup(CompleteDirs): | |
| """ | |
| ZipFile subclass to ensure implicit | |
| dirs exist and are resolved rapidly. | |
| """ | |
| def namelist(self): | |
| with contextlib.suppress(AttributeError): | |
| return self.__names | |
| self.__names = super(FastLookup, self).namelist() | |
| return self.__names | |
| def _name_set(self): | |
| with contextlib.suppress(AttributeError): | |
| return self.__lookup | |
| self.__lookup = super(FastLookup, self)._name_set() | |
| return self.__lookup | |
| def _pathlib_compat(path): | |
| """ | |
| For path-like objects, convert to a filename for compatibility | |
| on Python 3.6.1 and earlier. | |
| """ | |
| try: | |
| return path.__fspath__() | |
| except AttributeError: | |
| return str(path) | |
| class Path: | |
| """ | |
| A pathlib-compatible interface for zip files. | |
| Consider a zip file with this structure:: | |
| . | |
| βββ a.txt | |
| βββ b | |
| βββ c.txt | |
| βββ d | |
| βββ e.txt | |
| >>> data = io.BytesIO() | |
| >>> zf = zipfile.ZipFile(data, 'w') | |
| >>> zf.writestr('a.txt', 'content of a') | |
| >>> zf.writestr('b/c.txt', 'content of c') | |
| >>> zf.writestr('b/d/e.txt', 'content of e') | |
| >>> zf.filename = 'mem/abcde.zip' | |
| Path accepts the zipfile object itself or a filename | |
| >>> root = Path(zf) | |
| From there, several path operations are available. | |
| Directory iteration (including the zip file itself): | |
| >>> a, b = root.iterdir() | |
| >>> a | |
| Path('mem/abcde.zip', 'a.txt') | |
| >>> b | |
| Path('mem/abcde.zip', 'b/') | |
| name property: | |
| >>> b.name | |
| 'b' | |
| join with divide operator: | |
| >>> c = b / 'c.txt' | |
| >>> c | |
| Path('mem/abcde.zip', 'b/c.txt') | |
| >>> c.name | |
| 'c.txt' | |
| Read text: | |
| >>> c.read_text() | |
| 'content of c' | |
| existence: | |
| >>> c.exists() | |
| True | |
| >>> (b / 'missing.txt').exists() | |
| False | |
| Coercion to string: | |
| >>> import os | |
| >>> str(c).replace(os.sep, posixpath.sep) | |
| 'mem/abcde.zip/b/c.txt' | |
| At the root, ``name``, ``filename``, and ``parent`` | |
| resolve to the zipfile. Note these attributes are not | |
| valid and will raise a ``ValueError`` if the zipfile | |
| has no filename. | |
| >>> root.name | |
| 'abcde.zip' | |
| >>> str(root.filename).replace(os.sep, posixpath.sep) | |
| 'mem/abcde.zip' | |
| >>> str(root.parent) | |
| 'mem' | |
| """ | |
| __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" | |
| def __init__(self, root, at=""): | |
| """ | |
| Construct a Path from a ZipFile or filename. | |
| Note: When the source is an existing ZipFile object, | |
| its type (__class__) will be mutated to a | |
| specialized type. If the caller wishes to retain the | |
| original type, the caller should either create a | |
| separate ZipFile object or pass a filename. | |
| """ | |
| self.root = FastLookup.make(root) | |
| self.at = at | |
| def open(self, mode='r', *args, pwd=None, **kwargs): | |
| """ | |
| Open this entry as text or binary following the semantics | |
| of ``pathlib.Path.open()`` by passing arguments through | |
| to io.TextIOWrapper(). | |
| """ | |
| if self.is_dir(): | |
| raise IsADirectoryError(self) | |
| zip_mode = mode[0] | |
| if not self.exists() and zip_mode == 'r': | |
| raise FileNotFoundError(self) | |
| stream = self.root.open(self.at, zip_mode, pwd=pwd) | |
| if 'b' in mode: | |
| if args or kwargs: | |
| raise ValueError("encoding args invalid for binary operation") | |
| return stream | |
| return io.TextIOWrapper(stream, *args, **kwargs) | |
| def name(self): | |
| return pathlib.Path(self.at).name or self.filename.name | |
| def suffix(self): | |
| return pathlib.Path(self.at).suffix or self.filename.suffix | |
| def suffixes(self): | |
| return pathlib.Path(self.at).suffixes or self.filename.suffixes | |
| def stem(self): | |
| return pathlib.Path(self.at).stem or self.filename.stem | |
| def filename(self): | |
| return pathlib.Path(self.root.filename).joinpath(self.at) | |
| def read_text(self, *args, **kwargs): | |
| with self.open('r', *args, **kwargs) as strm: | |
| return strm.read() | |
| def read_bytes(self): | |
| with self.open('rb') as strm: | |
| return strm.read() | |
| def _is_child(self, path): | |
| return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") | |
| def _next(self, at): | |
| return self.__class__(self.root, at) | |
| def is_dir(self): | |
| return not self.at or self.at.endswith("/") | |
| def is_file(self): | |
| return self.exists() and not self.is_dir() | |
| def exists(self): | |
| return self.at in self.root._name_set() | |
| def iterdir(self): | |
| if not self.is_dir(): | |
| raise ValueError("Can't listdir a file") | |
| subs = map(self._next, self.root.namelist()) | |
| return filter(self._is_child, subs) | |
| def __str__(self): | |
| return posixpath.join(self.root.filename, self.at) | |
| def __repr__(self): | |
| return self.__repr.format(self=self) | |
| def joinpath(self, *other): | |
| next = posixpath.join(self.at, *map(_pathlib_compat, other)) | |
| return self._next(self.root.resolve_dir(next)) | |
| __truediv__ = joinpath | |
| def parent(self): | |
| if not self.at: | |
| return self.filename.parent | |
| parent_at = posixpath.dirname(self.at.rstrip('/')) | |
| if parent_at: | |
| parent_at += '/' | |
| return self._next(parent_at) | |