| | from contextlib import contextmanager |
| | from ctypes import ( |
| | CFUNCTYPE, |
| | POINTER, |
| | c_int, |
| | c_longlong, |
| | c_void_p, |
| | cast, |
| | create_string_buffer, |
| | ) |
| |
|
| | import libarchive |
| | import libarchive.ffi as ffi |
| |
|
| | from fsspec import open_files |
| | from fsspec.archive import AbstractArchiveFileSystem |
| | from fsspec.implementations.memory import MemoryFile |
| | from fsspec.utils import DEFAULT_BLOCK_SIZE |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | SEEK_CALLBACK = CFUNCTYPE(c_longlong, c_int, c_void_p, c_longlong, c_int) |
| | read_set_seek_callback = ffi.ffi( |
| | "read_set_seek_callback", [ffi.c_archive_p, SEEK_CALLBACK], c_int, ffi.check_int |
| | ) |
| | new_api = hasattr(ffi, "NO_OPEN_CB") |
| |
|
| |
|
| | @contextmanager |
| | def custom_reader(file, format_name="all", filter_name="all", block_size=ffi.page_size): |
| | """Read an archive from a seekable file-like object. |
| | |
| | The `file` object must support the standard `readinto` and 'seek' methods. |
| | """ |
| | buf = create_string_buffer(block_size) |
| | buf_p = cast(buf, c_void_p) |
| |
|
| | def read_func(archive_p, context, ptrptr): |
| | |
| | length = file.readinto(buf) |
| | |
| | ptrptr = cast(ptrptr, POINTER(c_void_p)) |
| | ptrptr[0] = buf_p |
| | |
| | return length |
| |
|
| | def seek_func(archive_p, context, offset, whence): |
| | file.seek(offset, whence) |
| | |
| | return file.tell() |
| |
|
| | read_cb = ffi.READ_CALLBACK(read_func) |
| | seek_cb = SEEK_CALLBACK(seek_func) |
| |
|
| | if new_api: |
| | open_cb = ffi.NO_OPEN_CB |
| | close_cb = ffi.NO_CLOSE_CB |
| | else: |
| | open_cb = libarchive.read.OPEN_CALLBACK(ffi.VOID_CB) |
| | close_cb = libarchive.read.CLOSE_CALLBACK(ffi.VOID_CB) |
| |
|
| | with libarchive.read.new_archive_read(format_name, filter_name) as archive_p: |
| | read_set_seek_callback(archive_p, seek_cb) |
| | ffi.read_open(archive_p, None, open_cb, read_cb, close_cb) |
| | yield libarchive.read.ArchiveRead(archive_p) |
| |
|
| |
|
| | class LibArchiveFileSystem(AbstractArchiveFileSystem): |
| | """Compressed archives as a file-system (read-only) |
| | |
| | Supports the following formats: |
| | tar, pax , cpio, ISO9660, zip, mtree, shar, ar, raw, xar, lha/lzh, rar |
| | Microsoft CAB, 7-Zip, WARC |
| | |
| | See the libarchive documentation for further restrictions. |
| | https://www.libarchive.org/ |
| | |
| | Keeps file object open while instance lives. It only works in seekable |
| | file-like objects. In case the filesystem does not support this kind of |
| | file object, it is recommended to cache locally. |
| | |
| | This class is pickleable, but not necessarily thread-safe (depends on the |
| | platform). See libarchive documentation for details. |
| | """ |
| |
|
| | root_marker = "" |
| | protocol = "libarchive" |
| | cachable = False |
| |
|
| | def __init__( |
| | self, |
| | fo="", |
| | mode="r", |
| | target_protocol=None, |
| | target_options=None, |
| | block_size=DEFAULT_BLOCK_SIZE, |
| | **kwargs, |
| | ): |
| | """ |
| | Parameters |
| | ---------- |
| | fo: str or file-like |
| | Contains ZIP, and must exist. If a str, will fetch file using |
| | :meth:`~fsspec.open_files`, which must return one file exactly. |
| | mode: str |
| | Currently, only 'r' accepted |
| | target_protocol: str (optional) |
| | If ``fo`` is a string, this value can be used to override the |
| | FS protocol inferred from a URL |
| | target_options: dict (optional) |
| | Kwargs passed when instantiating the target FS, if ``fo`` is |
| | a string. |
| | """ |
| | super().__init__(self, **kwargs) |
| | if mode != "r": |
| | raise ValueError("Only read from archive files accepted") |
| | if isinstance(fo, str): |
| | files = open_files(fo, protocol=target_protocol, **(target_options or {})) |
| | if len(files) != 1: |
| | raise ValueError( |
| | f'Path "{fo}" did not resolve to exactly one file: "{files}"' |
| | ) |
| | fo = files[0] |
| | self.of = fo |
| | self.fo = fo.__enter__() |
| | self.block_size = block_size |
| | self.dir_cache = None |
| |
|
| | @contextmanager |
| | def _open_archive(self): |
| | self.fo.seek(0) |
| | with custom_reader(self.fo, block_size=self.block_size) as arc: |
| | yield arc |
| |
|
| | @classmethod |
| | def _strip_protocol(cls, path): |
| | |
| | return super()._strip_protocol(path).lstrip("/") |
| |
|
| | def _get_dirs(self): |
| | fields = { |
| | "name": "pathname", |
| | "size": "size", |
| | "created": "ctime", |
| | "mode": "mode", |
| | "uid": "uid", |
| | "gid": "gid", |
| | "mtime": "mtime", |
| | } |
| |
|
| | if self.dir_cache is not None: |
| | return |
| |
|
| | self.dir_cache = {} |
| | list_names = [] |
| | with self._open_archive() as arc: |
| | for entry in arc: |
| | if not entry.isdir and not entry.isfile: |
| | |
| | continue |
| | self.dir_cache.update( |
| | { |
| | dirname: {"name": dirname, "size": 0, "type": "directory"} |
| | for dirname in self._all_dirnames(set(entry.name)) |
| | } |
| | ) |
| | f = {key: getattr(entry, fields[key]) for key in fields} |
| | f["type"] = "directory" if entry.isdir else "file" |
| | list_names.append(entry.name) |
| |
|
| | self.dir_cache[f["name"]] = f |
| | |
| | |
| | self.dir_cache.update( |
| | { |
| | dirname: {"name": dirname, "size": 0, "type": "directory"} |
| | for dirname in self._all_dirnames(list_names) |
| | } |
| | ) |
| |
|
| | def _open( |
| | self, |
| | path, |
| | mode="rb", |
| | block_size=None, |
| | autocommit=True, |
| | cache_options=None, |
| | **kwargs, |
| | ): |
| | path = self._strip_protocol(path) |
| | if mode != "rb": |
| | raise NotImplementedError |
| |
|
| | data = b"" |
| | with self._open_archive() as arc: |
| | for entry in arc: |
| | if entry.pathname != path: |
| | continue |
| |
|
| | if entry.size == 0: |
| | |
| | break |
| |
|
| | for block in entry.get_blocks(entry.size): |
| | data = block |
| | break |
| | else: |
| | raise ValueError |
| | return MemoryFile(fs=self, path=path, data=data) |
| |
|