| | import os |
| | import zipfile |
| |
|
| | import fsspec |
| | from fsspec.archive import AbstractArchiveFileSystem |
| |
|
| |
|
| | class ZipFileSystem(AbstractArchiveFileSystem): |
| | """Read/Write contents of ZIP archive as a file-system |
| | |
| | Keeps file object open while instance lives. |
| | |
| | This class is pickleable, but not necessarily thread-safe |
| | """ |
| |
|
| | root_marker = "" |
| | protocol = "zip" |
| | cachable = False |
| |
|
| | def __init__( |
| | self, |
| | fo="", |
| | mode="r", |
| | target_protocol=None, |
| | target_options=None, |
| | compression=zipfile.ZIP_STORED, |
| | allowZip64=True, |
| | compresslevel=None, |
| | **kwargs, |
| | ): |
| | """ |
| | Parameters |
| | ---------- |
| | fo: str or file-like |
| | Contains ZIP, and must exist. If a str, will fetch file using |
| | :meth:`~fsspec.open_files`, which must return one file exactly. |
| | mode: str |
| | Accept: "r", "w", "a" |
| | target_protocol: str (optional) |
| | If ``fo`` is a string, this value can be used to override the |
| | FS protocol inferred from a URL |
| | target_options: dict (optional) |
| | Kwargs passed when instantiating the target FS, if ``fo`` is |
| | a string. |
| | compression, allowZip64, compresslevel: passed to ZipFile |
| | Only relevant when creating a ZIP |
| | """ |
| | super().__init__(self, **kwargs) |
| | if mode not in set("rwa"): |
| | raise ValueError(f"mode '{mode}' no understood") |
| | self.mode = mode |
| | if isinstance(fo, (str, os.PathLike)): |
| | if mode == "a": |
| | m = "r+b" |
| | else: |
| | m = mode + "b" |
| | fo = fsspec.open( |
| | fo, mode=m, protocol=target_protocol, **(target_options or {}) |
| | ) |
| | self.force_zip_64 = allowZip64 |
| | self.of = fo |
| | self.fo = fo.__enter__() |
| | self.zip = zipfile.ZipFile( |
| | self.fo, |
| | mode=mode, |
| | compression=compression, |
| | allowZip64=allowZip64, |
| | compresslevel=compresslevel, |
| | ) |
| | self.dir_cache = None |
| |
|
| | @classmethod |
| | def _strip_protocol(cls, path): |
| | |
| | return super()._strip_protocol(path).lstrip("/") |
| |
|
| | def __del__(self): |
| | if hasattr(self, "zip"): |
| | self.close() |
| | del self.zip |
| |
|
| | def close(self): |
| | """Commits any write changes to the file. Done on ``del`` too.""" |
| | self.zip.close() |
| |
|
| | def _get_dirs(self): |
| | if self.dir_cache is None or self.mode in set("wa"): |
| | |
| | |
| | files = self.zip.infolist() |
| | self.dir_cache = { |
| | dirname.rstrip("/"): { |
| | "name": dirname.rstrip("/"), |
| | "size": 0, |
| | "type": "directory", |
| | } |
| | for dirname in self._all_dirnames(self.zip.namelist()) |
| | } |
| | for z in files: |
| | f = {s: getattr(z, s, None) for s in zipfile.ZipInfo.__slots__} |
| | f.update( |
| | { |
| | "name": z.filename.rstrip("/"), |
| | "size": z.file_size, |
| | "type": ("directory" if z.is_dir() else "file"), |
| | } |
| | ) |
| | self.dir_cache[f["name"]] = f |
| |
|
| | def pipe_file(self, path, value, **kwargs): |
| | |
| | self.zip.writestr(path, value, **kwargs) |
| |
|
| | def _open( |
| | self, |
| | path, |
| | mode="rb", |
| | block_size=None, |
| | autocommit=True, |
| | cache_options=None, |
| | **kwargs, |
| | ): |
| | path = self._strip_protocol(path) |
| | if "r" in mode and self.mode in set("wa"): |
| | if self.exists(path): |
| | raise OSError("ZipFS can only be open for reading or writing, not both") |
| | raise FileNotFoundError(path) |
| | if "r" in self.mode and "w" in mode: |
| | raise OSError("ZipFS can only be open for reading or writing, not both") |
| | out = self.zip.open(path, mode.strip("b"), force_zip64=self.force_zip_64) |
| | if "r" in mode: |
| | info = self.info(path) |
| | out.size = info["size"] |
| | out.name = info["name"] |
| | return out |
| |
|
| | def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): |
| | if maxdepth is not None and maxdepth < 1: |
| | raise ValueError("maxdepth must be at least 1") |
| |
|
| | def to_parts(_path: str): |
| | return list(filter(None, _path.replace("\\", "/").split("/"))) |
| |
|
| | if not isinstance(path, str): |
| | path = str(path) |
| |
|
| | |
| | |
| | path = path.lstrip("/") |
| | path_parts = to_parts(path) |
| | path_depth = len(path_parts) |
| |
|
| | self._get_dirs() |
| |
|
| | result = {} |
| | |
| | |
| | if path in self.dir_cache and self.dir_cache[path]["type"] == "file": |
| | result[path] = self.dir_cache[path] |
| | return result if detail else [path] |
| |
|
| | for file_path, file_info in self.dir_cache.items(): |
| | if len(file_parts := to_parts(file_path)) < path_depth or any( |
| | a != b for a, b in zip(path_parts, file_parts) |
| | ): |
| | |
| | continue |
| |
|
| | if file_info["type"] == "directory": |
| | if withdirs and file_path not in result: |
| | result[file_path.strip("/")] = file_info |
| | continue |
| |
|
| | if file_path not in result: |
| | result[file_path] = file_info if detail else None |
| |
|
| | if maxdepth: |
| | result = { |
| | k: v for k, v in result.items() if k.count("/") < maxdepth + path_depth |
| | } |
| | return result if detail else sorted(result) |
| |
|