| | import io |
| | import posixpath |
| | import zipfile |
| | import itertools |
| | import contextlib |
| | import sys |
| | import pathlib |
| |
|
| | if sys.version_info < (3, 7): |
| | from collections import OrderedDict |
| | else: |
| | OrderedDict = dict |
| |
|
| |
|
| | __all__ = ['Path'] |
| |
|
| |
|
| | def _parents(path): |
| | """ |
| | Given a path with elements separated by |
| | posixpath.sep, generate all parents of that path. |
| | |
| | >>> list(_parents('b/d')) |
| | ['b'] |
| | >>> list(_parents('/b/d/')) |
| | ['/b'] |
| | >>> list(_parents('b/d/f/')) |
| | ['b/d', 'b'] |
| | >>> list(_parents('b')) |
| | [] |
| | >>> list(_parents('')) |
| | [] |
| | """ |
| | return itertools.islice(_ancestry(path), 1, None) |
| |
|
| |
|
| | def _ancestry(path): |
| | """ |
| | Given a path with elements separated by |
| | posixpath.sep, generate all elements of that path |
| | |
| | >>> list(_ancestry('b/d')) |
| | ['b/d', 'b'] |
| | >>> list(_ancestry('/b/d/')) |
| | ['/b/d', '/b'] |
| | >>> list(_ancestry('b/d/f/')) |
| | ['b/d/f', 'b/d', 'b'] |
| | >>> list(_ancestry('b')) |
| | ['b'] |
| | >>> list(_ancestry('')) |
| | [] |
| | """ |
| | path = path.rstrip(posixpath.sep) |
| | while path and path != posixpath.sep: |
| | yield path |
| | path, tail = posixpath.split(path) |
| |
|
| |
|
| | _dedupe = OrderedDict.fromkeys |
| | """Deduplicate an iterable in original order""" |
| |
|
| |
|
| | def _difference(minuend, subtrahend): |
| | """ |
| | Return items in minuend not in subtrahend, retaining order |
| | with O(1) lookup. |
| | """ |
| | return itertools.filterfalse(set(subtrahend).__contains__, minuend) |
| |
|
| |
|
| | class CompleteDirs(zipfile.ZipFile): |
| | """ |
| | A ZipFile subclass that ensures that implied directories |
| | are always included in the namelist. |
| | """ |
| |
|
| | @staticmethod |
| | def _implied_dirs(names): |
| | parents = itertools.chain.from_iterable(map(_parents, names)) |
| | as_dirs = (p + posixpath.sep for p in parents) |
| | return _dedupe(_difference(as_dirs, names)) |
| |
|
| | def namelist(self): |
| | names = super(CompleteDirs, self).namelist() |
| | return names + list(self._implied_dirs(names)) |
| |
|
| | def _name_set(self): |
| | return set(self.namelist()) |
| |
|
| | def resolve_dir(self, name): |
| | """ |
| | If the name represents a directory, return that name |
| | as a directory (with the trailing slash). |
| | """ |
| | names = self._name_set() |
| | dirname = name + '/' |
| | dir_match = name not in names and dirname in names |
| | return dirname if dir_match else name |
| |
|
| | @classmethod |
| | def make(cls, source): |
| | """ |
| | Given a source (filename or zipfile), return an |
| | appropriate CompleteDirs subclass. |
| | """ |
| | if isinstance(source, CompleteDirs): |
| | return source |
| |
|
| | if not isinstance(source, zipfile.ZipFile): |
| | return cls(_pathlib_compat(source)) |
| |
|
| | |
| | if 'r' not in source.mode: |
| | cls = CompleteDirs |
| |
|
| | source.__class__ = cls |
| | return source |
| |
|
| |
|
| | class FastLookup(CompleteDirs): |
| | """ |
| | ZipFile subclass to ensure implicit |
| | dirs exist and are resolved rapidly. |
| | """ |
| |
|
| | def namelist(self): |
| | with contextlib.suppress(AttributeError): |
| | return self.__names |
| | self.__names = super(FastLookup, self).namelist() |
| | return self.__names |
| |
|
| | def _name_set(self): |
| | with contextlib.suppress(AttributeError): |
| | return self.__lookup |
| | self.__lookup = super(FastLookup, self)._name_set() |
| | return self.__lookup |
| |
|
| |
|
| | def _pathlib_compat(path): |
| | """ |
| | For path-like objects, convert to a filename for compatibility |
| | on Python 3.6.1 and earlier. |
| | """ |
| | try: |
| | return path.__fspath__() |
| | except AttributeError: |
| | return str(path) |
| |
|
| |
|
| | class Path: |
| | """ |
| | A pathlib-compatible interface for zip files. |
| | |
| | Consider a zip file with this structure:: |
| | |
| | . |
| | ├── a.txt |
| | └── b |
| | ├── c.txt |
| | └── d |
| | └── e.txt |
| | |
| | >>> data = io.BytesIO() |
| | >>> zf = zipfile.ZipFile(data, 'w') |
| | >>> zf.writestr('a.txt', 'content of a') |
| | >>> zf.writestr('b/c.txt', 'content of c') |
| | >>> zf.writestr('b/d/e.txt', 'content of e') |
| | >>> zf.filename = 'mem/abcde.zip' |
| | |
| | Path accepts the zipfile object itself or a filename |
| | |
| | >>> root = Path(zf) |
| | |
| | From there, several path operations are available. |
| | |
| | Directory iteration (including the zip file itself): |
| | |
| | >>> a, b = root.iterdir() |
| | >>> a |
| | Path('mem/abcde.zip', 'a.txt') |
| | >>> b |
| | Path('mem/abcde.zip', 'b/') |
| | |
| | name property: |
| | |
| | >>> b.name |
| | 'b' |
| | |
| | join with divide operator: |
| | |
| | >>> c = b / 'c.txt' |
| | >>> c |
| | Path('mem/abcde.zip', 'b/c.txt') |
| | >>> c.name |
| | 'c.txt' |
| | |
| | Read text: |
| | |
| | >>> c.read_text() |
| | 'content of c' |
| | |
| | existence: |
| | |
| | >>> c.exists() |
| | True |
| | >>> (b / 'missing.txt').exists() |
| | False |
| | |
| | Coercion to string: |
| | |
| | >>> import os |
| | >>> str(c).replace(os.sep, posixpath.sep) |
| | 'mem/abcde.zip/b/c.txt' |
| | |
| | At the root, ``name``, ``filename``, and ``parent`` |
| | resolve to the zipfile. Note these attributes are not |
| | valid and will raise a ``ValueError`` if the zipfile |
| | has no filename. |
| | |
| | >>> root.name |
| | 'abcde.zip' |
| | >>> str(root.filename).replace(os.sep, posixpath.sep) |
| | 'mem/abcde.zip' |
| | >>> str(root.parent) |
| | 'mem' |
| | """ |
| |
|
| | __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" |
| |
|
| | def __init__(self, root, at=""): |
| | """ |
| | Construct a Path from a ZipFile or filename. |
| | |
| | Note: When the source is an existing ZipFile object, |
| | its type (__class__) will be mutated to a |
| | specialized type. If the caller wishes to retain the |
| | original type, the caller should either create a |
| | separate ZipFile object or pass a filename. |
| | """ |
| | self.root = FastLookup.make(root) |
| | self.at = at |
| |
|
| | def open(self, mode='r', *args, pwd=None, **kwargs): |
| | """ |
| | Open this entry as text or binary following the semantics |
| | of ``pathlib.Path.open()`` by passing arguments through |
| | to io.TextIOWrapper(). |
| | """ |
| | if self.is_dir(): |
| | raise IsADirectoryError(self) |
| | zip_mode = mode[0] |
| | if not self.exists() and zip_mode == 'r': |
| | raise FileNotFoundError(self) |
| | stream = self.root.open(self.at, zip_mode, pwd=pwd) |
| | if 'b' in mode: |
| | if args or kwargs: |
| | raise ValueError("encoding args invalid for binary operation") |
| | return stream |
| | return io.TextIOWrapper(stream, *args, **kwargs) |
| |
|
| | @property |
| | def name(self): |
| | return pathlib.Path(self.at).name or self.filename.name |
| |
|
| | @property |
| | def suffix(self): |
| | return pathlib.Path(self.at).suffix or self.filename.suffix |
| |
|
| | @property |
| | def suffixes(self): |
| | return pathlib.Path(self.at).suffixes or self.filename.suffixes |
| |
|
| | @property |
| | def stem(self): |
| | return pathlib.Path(self.at).stem or self.filename.stem |
| |
|
| | @property |
| | def filename(self): |
| | return pathlib.Path(self.root.filename).joinpath(self.at) |
| |
|
| | def read_text(self, *args, **kwargs): |
| | with self.open('r', *args, **kwargs) as strm: |
| | return strm.read() |
| |
|
| | def read_bytes(self): |
| | with self.open('rb') as strm: |
| | return strm.read() |
| |
|
| | def _is_child(self, path): |
| | return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") |
| |
|
| | def _next(self, at): |
| | return self.__class__(self.root, at) |
| |
|
| | def is_dir(self): |
| | return not self.at or self.at.endswith("/") |
| |
|
| | def is_file(self): |
| | return self.exists() and not self.is_dir() |
| |
|
| | def exists(self): |
| | return self.at in self.root._name_set() |
| |
|
| | def iterdir(self): |
| | if not self.is_dir(): |
| | raise ValueError("Can't listdir a file") |
| | subs = map(self._next, self.root.namelist()) |
| | return filter(self._is_child, subs) |
| |
|
| | def __str__(self): |
| | return posixpath.join(self.root.filename, self.at) |
| |
|
| | def __repr__(self): |
| | return self.__repr.format(self=self) |
| |
|
| | def joinpath(self, *other): |
| | next = posixpath.join(self.at, *map(_pathlib_compat, other)) |
| | return self._next(self.root.resolve_dir(next)) |
| |
|
| | __truediv__ = joinpath |
| |
|
| | @property |
| | def parent(self): |
| | if not self.at: |
| | return self.filename.parent |
| | parent_at = posixpath.dirname(self.at.rstrip('/')) |
| | if parent_at: |
| | parent_at += '/' |
| | return self._next(parent_at) |
| |
|