| | from __future__ import annotations |
| |
|
| | import io |
| | import logging |
| | import os |
| | import re |
| | from glob import has_magic |
| | from pathlib import Path |
| |
|
| | |
| | from .caching import ( |
| | BaseCache, |
| | BlockCache, |
| | BytesCache, |
| | MMapCache, |
| | ReadAheadCache, |
| | caches, |
| | ) |
| | from .compression import compr |
| | from .registry import filesystem, get_filesystem_class |
| | from .utils import ( |
| | _unstrip_protocol, |
| | build_name_function, |
| | infer_compression, |
| | stringify_path, |
| | ) |
| |
|
| | logger = logging.getLogger("fsspec") |
| |
|
| |
|
| | class OpenFile: |
| | """ |
| | File-like object to be used in a context |
| | |
| | Can layer (buffered) text-mode and compression over any file-system, which |
| | are typically binary-only. |
| | |
| | These instances are safe to serialize, as the low-level file object |
| | is not created until invoked using ``with``. |
| | |
| | Parameters |
| | ---------- |
| | fs: FileSystem |
| | The file system to use for opening the file. Should be a subclass or duck-type |
| | with ``fsspec.spec.AbstractFileSystem`` |
| | path: str |
| | Location to open |
| | mode: str like 'rb', optional |
| | Mode of the opened file |
| | compression: str or None, optional |
| | Compression to apply |
| | encoding: str or None, optional |
| | The encoding to use if opened in text mode. |
| | errors: str or None, optional |
| | How to handle encoding errors if opened in text mode. |
| | newline: None or str |
| | Passed to TextIOWrapper in text mode, how to handle line endings. |
| | autoopen: bool |
| | If True, calls open() immediately. Mostly used by pickle |
| | pos: int |
| | If given and autoopen is True, seek to this location immediately |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | fs, |
| | path, |
| | mode="rb", |
| | compression=None, |
| | encoding=None, |
| | errors=None, |
| | newline=None, |
| | ): |
| | self.fs = fs |
| | self.path = path |
| | self.mode = mode |
| | self.compression = get_compression(path, compression) |
| | self.encoding = encoding |
| | self.errors = errors |
| | self.newline = newline |
| | self.fobjects = [] |
| |
|
| | def __reduce__(self): |
| | return ( |
| | OpenFile, |
| | ( |
| | self.fs, |
| | self.path, |
| | self.mode, |
| | self.compression, |
| | self.encoding, |
| | self.errors, |
| | self.newline, |
| | ), |
| | ) |
| |
|
| | def __repr__(self): |
| | return f"<OpenFile '{self.path}'>" |
| |
|
| | def __enter__(self): |
| | mode = self.mode.replace("t", "").replace("b", "") + "b" |
| |
|
| | f = self.fs.open(self.path, mode=mode) |
| |
|
| | self.fobjects = [f] |
| |
|
| | if self.compression is not None: |
| | compress = compr[self.compression] |
| | f = compress(f, mode=mode[0]) |
| | self.fobjects.append(f) |
| |
|
| | if "b" not in self.mode: |
| | |
| | f = PickleableTextIOWrapper( |
| | f, encoding=self.encoding, errors=self.errors, newline=self.newline |
| | ) |
| | self.fobjects.append(f) |
| |
|
| | return self.fobjects[-1] |
| |
|
| | def __exit__(self, *args): |
| | self.close() |
| |
|
| | @property |
| | def full_name(self): |
| | return _unstrip_protocol(self.path, self.fs) |
| |
|
| | def open(self): |
| | """Materialise this as a real open file without context |
| | |
| | The OpenFile object should be explicitly closed to avoid enclosed file |
| | instances persisting. You must, therefore, keep a reference to the OpenFile |
| | during the life of the file-like it generates. |
| | """ |
| | return self.__enter__() |
| |
|
| | def close(self): |
| | """Close all encapsulated file objects""" |
| | for f in reversed(self.fobjects): |
| | if "r" not in self.mode and not f.closed: |
| | f.flush() |
| | f.close() |
| | self.fobjects.clear() |
| |
|
| |
|
| | class OpenFiles(list): |
| | """List of OpenFile instances |
| | |
| | Can be used in a single context, which opens and closes all of the |
| | contained files. Normal list access to get the elements works as |
| | normal. |
| | |
| | A special case is made for caching filesystems - the files will |
| | be down/uploaded together at the start or end of the context, and |
| | this may happen concurrently, if the target filesystem supports it. |
| | """ |
| |
|
| | def __init__(self, *args, mode="rb", fs=None): |
| | self.mode = mode |
| | self.fs = fs |
| | self.files = [] |
| | super().__init__(*args) |
| |
|
| | def __enter__(self): |
| | if self.fs is None: |
| | raise ValueError("Context has already been used") |
| |
|
| | fs = self.fs |
| | while True: |
| | if hasattr(fs, "open_many"): |
| | |
| | self.files = fs.open_many(self) |
| | return self.files |
| | if hasattr(fs, "fs") and fs.fs is not None: |
| | fs = fs.fs |
| | else: |
| | break |
| | return [s.__enter__() for s in self] |
| |
|
| | def __exit__(self, *args): |
| | fs = self.fs |
| | [s.__exit__(*args) for s in self] |
| | if "r" not in self.mode: |
| | while True: |
| | if hasattr(fs, "open_many"): |
| | |
| | fs.commit_many(self.files) |
| | return |
| | if hasattr(fs, "fs") and fs.fs is not None: |
| | fs = fs.fs |
| | else: |
| | break |
| |
|
| | def __getitem__(self, item): |
| | out = super().__getitem__(item) |
| | if isinstance(item, slice): |
| | return OpenFiles(out, mode=self.mode, fs=self.fs) |
| | return out |
| |
|
| | def __repr__(self): |
| | return f"<List of {len(self)} OpenFile instances>" |
| |
|
| |
|
| | def open_files( |
| | urlpath, |
| | mode="rb", |
| | compression=None, |
| | encoding="utf8", |
| | errors=None, |
| | name_function=None, |
| | num=1, |
| | protocol=None, |
| | newline=None, |
| | auto_mkdir=True, |
| | expand=True, |
| | **kwargs, |
| | ): |
| | """Given a path or paths, return a list of ``OpenFile`` objects. |
| | |
| | For writing, a str path must contain the "*" character, which will be filled |
| | in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. |
| | |
| | For either reading or writing, can instead provide explicit list of paths. |
| | |
| | Parameters |
| | ---------- |
| | urlpath: string or list |
| | Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` |
| | to read from alternative filesystems. To read from multiple files you |
| | can pass a globstring or a list of paths, with the caveat that they |
| | must all have the same protocol. |
| | mode: 'rb', 'wt', etc. |
| | compression: string or None |
| | If given, open file using compression codec. Can either be a compression |
| | name (a key in ``fsspec.compression.compr``) or "infer" to guess the |
| | compression from the filename suffix. |
| | encoding: str |
| | For text mode only |
| | errors: None or str |
| | Passed to TextIOWrapper in text mode |
| | name_function: function or None |
| | if opening a set of files for writing, those files do not yet exist, |
| | so we need to generate their names by formatting the urlpath for |
| | each sequence number |
| | num: int [1] |
| | if writing mode, number of files we expect to create (passed to |
| | name+function) |
| | protocol: str or None |
| | If given, overrides the protocol found in the URL. |
| | newline: bytes or None |
| | Used for line terminator in text mode. If None, uses system default; |
| | if blank, uses no translation. |
| | auto_mkdir: bool (True) |
| | If in write mode, this will ensure the target directory exists before |
| | writing, by calling ``fs.mkdirs(exist_ok=True)``. |
| | expand: bool |
| | **kwargs: dict |
| | Extra options that make sense to a particular storage connection, e.g. |
| | host, port, username, password, etc. |
| | |
| | Examples |
| | -------- |
| | >>> files = open_files('2015-*-*.csv') # doctest: +SKIP |
| | >>> files = open_files( |
| | ... 's3://bucket/2015-*-*.csv.gz', compression='gzip' |
| | ... ) # doctest: +SKIP |
| | |
| | Returns |
| | ------- |
| | An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can |
| | be used as a single context |
| | |
| | Notes |
| | ----- |
| | For a full list of the available protocols and the implementations that |
| | they map across to see the latest online documentation: |
| | |
| | - For implementations built into ``fsspec`` see |
| | https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations |
| | - For implementations in separate packages see |
| | https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations |
| | """ |
| | fs, fs_token, paths = get_fs_token_paths( |
| | urlpath, |
| | mode, |
| | num=num, |
| | name_function=name_function, |
| | storage_options=kwargs, |
| | protocol=protocol, |
| | expand=expand, |
| | ) |
| | if fs.protocol == "file": |
| | fs.auto_mkdir = auto_mkdir |
| | elif "r" not in mode and auto_mkdir: |
| | parents = {fs._parent(path) for path in paths} |
| | for parent in parents: |
| | try: |
| | fs.makedirs(parent, exist_ok=True) |
| | except PermissionError: |
| | pass |
| | return OpenFiles( |
| | [ |
| | OpenFile( |
| | fs, |
| | path, |
| | mode=mode, |
| | compression=compression, |
| | encoding=encoding, |
| | errors=errors, |
| | newline=newline, |
| | ) |
| | for path in paths |
| | ], |
| | mode=mode, |
| | fs=fs, |
| | ) |
| |
|
| |
|
| | def _un_chain(path, kwargs): |
| | x = re.compile(".*[^a-z]+.*") |
| | bits = ( |
| | [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] |
| | if "::" in path |
| | else [path] |
| | ) |
| | |
| | out = [] |
| | previous_bit = None |
| | kwargs = kwargs.copy() |
| | for bit in reversed(bits): |
| | protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file" |
| | cls = get_filesystem_class(protocol) |
| | extra_kwargs = cls._get_kwargs_from_urls(bit) |
| | kws = kwargs.pop(protocol, {}) |
| | if bit is bits[0]: |
| | kws.update(kwargs) |
| | kw = dict(**extra_kwargs, **kws) |
| | bit = cls._strip_protocol(bit) |
| | if ( |
| | protocol in {"blockcache", "filecache", "simplecache"} |
| | and "target_protocol" not in kw |
| | ): |
| | bit = previous_bit |
| | out.append((bit, protocol, kw)) |
| | previous_bit = bit |
| | out = list(reversed(out)) |
| | return out |
| |
|
| |
|
| | def url_to_fs(url, **kwargs): |
| | """ |
| | Turn fully-qualified and potentially chained URL into filesystem instance |
| | |
| | Parameters |
| | ---------- |
| | url : str |
| | The fsspec-compatible URL |
| | **kwargs: dict |
| | Extra options that make sense to a particular storage connection, e.g. |
| | host, port, username, password, etc. |
| | |
| | Returns |
| | ------- |
| | filesystem : FileSystem |
| | The new filesystem discovered from ``url`` and created with |
| | ``**kwargs``. |
| | urlpath : str |
| | The file-systems-specific URL for ``url``. |
| | """ |
| | |
| | |
| | known_kwargs = { |
| | "compression", |
| | "encoding", |
| | "errors", |
| | "expand", |
| | "mode", |
| | "name_function", |
| | "newline", |
| | "num", |
| | } |
| | kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs} |
| | chain = _un_chain(url, kwargs) |
| | inkwargs = {} |
| | |
| | for i, ch in enumerate(reversed(chain)): |
| | urls, protocol, kw = ch |
| | if i == len(chain) - 1: |
| | inkwargs = dict(**kw, **inkwargs) |
| | continue |
| | inkwargs["target_options"] = dict(**kw, **inkwargs) |
| | inkwargs["target_protocol"] = protocol |
| | inkwargs["fo"] = urls |
| | urlpath, protocol, _ = chain[0] |
| | fs = filesystem(protocol, **inkwargs) |
| | return fs, urlpath |
| |
|
| |
|
| | def open( |
| | urlpath, |
| | mode="rb", |
| | compression=None, |
| | encoding="utf8", |
| | errors=None, |
| | protocol=None, |
| | newline=None, |
| | **kwargs, |
| | ): |
| | """Given a path or paths, return one ``OpenFile`` object. |
| | |
| | Parameters |
| | ---------- |
| | urlpath: string or list |
| | Absolute or relative filepath. Prefix with a protocol like ``s3://`` |
| | to read from alternative filesystems. Should not include glob |
| | character(s). |
| | mode: 'rb', 'wt', etc. |
| | compression: string or None |
| | If given, open file using compression codec. Can either be a compression |
| | name (a key in ``fsspec.compression.compr``) or "infer" to guess the |
| | compression from the filename suffix. |
| | encoding: str |
| | For text mode only |
| | errors: None or str |
| | Passed to TextIOWrapper in text mode |
| | protocol: str or None |
| | If given, overrides the protocol found in the URL. |
| | newline: bytes or None |
| | Used for line terminator in text mode. If None, uses system default; |
| | if blank, uses no translation. |
| | **kwargs: dict |
| | Extra options that make sense to a particular storage connection, e.g. |
| | host, port, username, password, etc. |
| | |
| | Examples |
| | -------- |
| | >>> openfile = open('2015-01-01.csv') # doctest: +SKIP |
| | >>> openfile = open( |
| | ... 's3://bucket/2015-01-01.csv.gz', compression='gzip' |
| | ... ) # doctest: +SKIP |
| | >>> with openfile as f: |
| | ... df = pd.read_csv(f) # doctest: +SKIP |
| | ... |
| | |
| | Returns |
| | ------- |
| | ``OpenFile`` object. |
| | |
| | Notes |
| | ----- |
| | For a full list of the available protocols and the implementations that |
| | they map across to see the latest online documentation: |
| | |
| | - For implementations built into ``fsspec`` see |
| | https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations |
| | - For implementations in separate packages see |
| | https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations |
| | """ |
| | out = open_files( |
| | urlpath=[urlpath], |
| | mode=mode, |
| | compression=compression, |
| | encoding=encoding, |
| | errors=errors, |
| | protocol=protocol, |
| | newline=newline, |
| | expand=False, |
| | **kwargs, |
| | ) |
| | if not out: |
| | raise FileNotFoundError(urlpath) |
| | return out[0] |
| |
|
| |
|
| | def open_local( |
| | url: str | list[str] | Path | list[Path], |
| | mode: str = "rb", |
| | **storage_options: dict, |
| | ) -> str | list[str]: |
| | """Open file(s) which can be resolved to local |
| | |
| | For files which either are local, or get downloaded upon open |
| | (e.g., by file caching) |
| | |
| | Parameters |
| | ---------- |
| | url: str or list(str) |
| | mode: str |
| | Must be read mode |
| | storage_options: |
| | passed on to FS for or used by open_files (e.g., compression) |
| | """ |
| | if "r" not in mode: |
| | raise ValueError("Can only ensure local files when reading") |
| | of = open_files(url, mode=mode, **storage_options) |
| | if not getattr(of[0].fs, "local_file", False): |
| | raise ValueError( |
| | "open_local can only be used on a filesystem which" |
| | " has attribute local_file=True" |
| | ) |
| | with of as files: |
| | paths = [f.name for f in files] |
| | if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path): |
| | return paths[0] |
| | return paths |
| |
|
| |
|
| | def get_compression(urlpath, compression): |
| | if compression == "infer": |
| | compression = infer_compression(urlpath) |
| | if compression is not None and compression not in compr: |
| | raise ValueError(f"Compression type {compression} not supported") |
| | return compression |
| |
|
| |
|
| | def split_protocol(urlpath): |
| | """Return protocol, path pair""" |
| | urlpath = stringify_path(urlpath) |
| | if "://" in urlpath: |
| | protocol, path = urlpath.split("://", 1) |
| | if len(protocol) > 1: |
| | |
| | return protocol, path |
| | if urlpath.startswith("data:"): |
| | return urlpath.split(":", 1) |
| | return None, urlpath |
| |
|
| |
|
| | def strip_protocol(urlpath): |
| | """Return only path part of full URL, according to appropriate backend""" |
| | protocol, _ = split_protocol(urlpath) |
| | cls = get_filesystem_class(protocol) |
| | return cls._strip_protocol(urlpath) |
| |
|
| |
|
| | def expand_paths_if_needed(paths, mode, num, fs, name_function): |
| | """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]`` |
| | in them (read mode). |
| | |
| | :param paths: list of paths |
| | mode: str |
| | Mode in which to open files. |
| | num: int |
| | If opening in writing mode, number of files we expect to create. |
| | fs: filesystem object |
| | name_function: callable |
| | If opening in writing mode, this callable is used to generate path |
| | names. Names are generated for each partition by |
| | ``urlpath.replace('*', name_function(partition_index))``. |
| | :return: list of paths |
| | """ |
| | expanded_paths = [] |
| | paths = list(paths) |
| |
|
| | if "w" in mode: |
| | if sum([1 for p in paths if "*" in p]) > 1: |
| | raise ValueError( |
| | "When writing data, only one filename mask can be specified." |
| | ) |
| | num = max(num, len(paths)) |
| |
|
| | for curr_path in paths: |
| | if "*" in curr_path: |
| | |
| | expanded_paths.extend(_expand_paths(curr_path, name_function, num)) |
| | else: |
| | expanded_paths.append(curr_path) |
| | |
| | if len(expanded_paths) > num: |
| | expanded_paths = expanded_paths[:num] |
| |
|
| | else: |
| | for curr_path in paths: |
| | if has_magic(curr_path): |
| | |
| | expanded_paths.extend(fs.glob(curr_path)) |
| | else: |
| | expanded_paths.append(curr_path) |
| |
|
| | return expanded_paths |
| |
|
| |
|
| | def get_fs_token_paths( |
| | urlpath, |
| | mode="rb", |
| | num=1, |
| | name_function=None, |
| | storage_options=None, |
| | protocol=None, |
| | expand=True, |
| | ): |
| | """Filesystem, deterministic token, and paths from a urlpath and options. |
| | |
| | Parameters |
| | ---------- |
| | urlpath: string or iterable |
| | Absolute or relative filepath, URL (may include protocols like |
| | ``s3://``), or globstring pointing to data. |
| | mode: str, optional |
| | Mode in which to open files. |
| | num: int, optional |
| | If opening in writing mode, number of files we expect to create. |
| | name_function: callable, optional |
| | If opening in writing mode, this callable is used to generate path |
| | names. Names are generated for each partition by |
| | ``urlpath.replace('*', name_function(partition_index))``. |
| | storage_options: dict, optional |
| | Additional keywords to pass to the filesystem class. |
| | protocol: str or None |
| | To override the protocol specifier in the URL |
| | expand: bool |
| | Expand string paths for writing, assuming the path is a directory |
| | """ |
| | if isinstance(urlpath, (list, tuple, set)): |
| | if not urlpath: |
| | raise ValueError("empty urlpath sequence") |
| | urlpath0 = stringify_path(list(urlpath)[0]) |
| | else: |
| | urlpath0 = stringify_path(urlpath) |
| | storage_options = storage_options or {} |
| | if protocol: |
| | storage_options["protocol"] = protocol |
| | chain = _un_chain(urlpath0, storage_options or {}) |
| | inkwargs = {} |
| | |
| | for i, ch in enumerate(reversed(chain)): |
| | urls, nested_protocol, kw = ch |
| | if i == len(chain) - 1: |
| | inkwargs = dict(**kw, **inkwargs) |
| | continue |
| | inkwargs["target_options"] = dict(**kw, **inkwargs) |
| | inkwargs["target_protocol"] = nested_protocol |
| | inkwargs["fo"] = urls |
| | paths, protocol, _ = chain[0] |
| | fs = filesystem(protocol, **inkwargs) |
| | if isinstance(urlpath, (list, tuple, set)): |
| | pchains = [ |
| | _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath |
| | ] |
| | if len({pc[1] for pc in pchains}) > 1: |
| | raise ValueError("Protocol mismatch getting fs from %s", urlpath) |
| | paths = [pc[0] for pc in pchains] |
| | else: |
| | paths = fs._strip_protocol(paths) |
| | if isinstance(paths, (list, tuple, set)): |
| | paths = expand_paths_if_needed(paths, mode, num, fs, name_function) |
| | else: |
| | if "w" in mode and expand: |
| | paths = _expand_paths(paths, name_function, num) |
| | elif "x" in mode and expand: |
| | paths = _expand_paths(paths, name_function, num) |
| | elif "*" in paths: |
| | paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] |
| | else: |
| | paths = [paths] |
| |
|
| | return fs, fs._fs_token, paths |
| |
|
| |
|
| | def _expand_paths(path, name_function, num): |
| | if isinstance(path, str): |
| | if path.count("*") > 1: |
| | raise ValueError("Output path spec must contain exactly one '*'.") |
| | elif "*" not in path: |
| | path = os.path.join(path, "*.part") |
| |
|
| | if name_function is None: |
| | name_function = build_name_function(num - 1) |
| |
|
| | paths = [path.replace("*", name_function(i)) for i in range(num)] |
| | if paths != sorted(paths): |
| | logger.warning( |
| | "In order to preserve order between partitions" |
| | " paths created with ``name_function`` should " |
| | "sort to partition order" |
| | ) |
| | elif isinstance(path, (tuple, list)): |
| | assert len(path) == num |
| | paths = list(path) |
| | else: |
| | raise ValueError( |
| | "Path should be either\n" |
| | "1. A list of paths: ['foo.json', 'bar.json', ...]\n" |
| | "2. A directory: 'foo/\n" |
| | "3. A path with a '*' in it: 'foo.*.json'" |
| | ) |
| | return paths |
| |
|
| |
|
| | class PickleableTextIOWrapper(io.TextIOWrapper): |
| | """TextIOWrapper cannot be pickled. This solves it. |
| | |
| | Requires that ``buffer`` be pickleable, which all instances of |
| | AbstractBufferedFile are. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | buffer, |
| | encoding=None, |
| | errors=None, |
| | newline=None, |
| | line_buffering=False, |
| | write_through=False, |
| | ): |
| | self.args = buffer, encoding, errors, newline, line_buffering, write_through |
| | super().__init__(*self.args) |
| |
|
| | def __reduce__(self): |
| | return PickleableTextIOWrapper, self.args |
| |
|