| import os |
| import re |
| import tempfile |
| import threading |
| from collections import deque |
| from contextlib import ExitStack |
| from copy import deepcopy |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from itertools import chain |
| from pathlib import Path |
| from typing import Any, Iterator, NoReturn, Optional, Union |
| from urllib.parse import quote, unquote |
|
|
| import fsspec |
| import httpx |
| from fsspec.callbacks import _DEFAULT_CALLBACK, NoOpCallback, TqdmCallback |
| from fsspec.utils import isfilelike |
|
|
| from . import constants |
| from ._commit_api import CommitOperationCopy, CommitOperationDelete |
| from .errors import EntryNotFoundError, HfHubHTTPError, RepositoryNotFoundError, RevisionNotFoundError |
| from .file_download import hf_hub_url, http_get |
| from .hf_api import HfApi, LastCommitInfo, RepoFile |
| from .utils import HFValidationError, hf_raise_for_status, http_backoff, http_stream_backoff |
| from .utils.insecure_hashlib import md5 |
|
|
|
|
| |
| SPECIAL_REFS_REVISION_REGEX = re.compile( |
| r""" |
| (^refs\/convert\/\w+) # `refs/convert/parquet` revisions |
| | |
| (^refs\/pr\/\d+) # PR revisions |
| """, |
| re.VERBOSE, |
| ) |
|
|
|
|
| @dataclass |
| class HfFileSystemResolvedPath: |
| """Data structure containing information about a resolved Hugging Face file system path.""" |
|
|
| repo_type: str |
| repo_id: str |
| revision: str |
| path_in_repo: str |
| |
| |
| _raw_revision: Optional[str] = field(default=None, repr=False) |
|
|
| def unresolve(self) -> str: |
| repo_path = constants.REPO_TYPES_URL_PREFIXES.get(self.repo_type, "") + self.repo_id |
| if self._raw_revision: |
| return f"{repo_path}@{self._raw_revision}/{self.path_in_repo}".rstrip("/") |
| elif self.revision != constants.DEFAULT_REVISION: |
| return f"{repo_path}@{safe_revision(self.revision)}/{self.path_in_repo}".rstrip("/") |
| else: |
| return f"{repo_path}/{self.path_in_repo}".rstrip("/") |
|
|
|
|
| |
| _cached_base: Any = type(fsspec.AbstractFileSystem) |
|
|
|
|
| class _Cached(_cached_base): |
| """ |
| Metaclass for caching HfFileSystem instances according to the args. |
| |
| This creates an additional reference to the filesystem, which prevents the |
| filesystem from being garbage collected when all *user* references go away. |
| A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also* |
| be made for a filesystem instance to be garbage collected. |
| |
| This is a slightly modified version of `fsspec.spec._Cached` to improve it. |
| In particular in `_tokenize` the pid isn't taken into account for the |
| `fs_token` used to identify cached instances. The `fs_token` logic is also |
| robust to defaults values and the order of the args. Finally new instances |
| reuse the states from sister instances in the main thread. |
| """ |
|
|
| def __init__(cls, *args, **kwargs): |
| |
| super().__init__(*args, **kwargs) |
| |
| |
| |
| cls._cache = {} |
|
|
| def __call__(cls, *args, **kwargs): |
| |
| skip = kwargs.pop("skip_instance_cache", False) |
| fs_token = cls._tokenize(cls, threading.get_ident(), *args, **kwargs) |
| fs_token_main_thread = cls._tokenize(cls, threading.main_thread().ident, *args, **kwargs) |
| if not skip and cls.cachable and fs_token in cls._cache: |
| |
| cls._latest = fs_token |
| return cls._cache[fs_token] |
| else: |
| |
| obj = type.__call__(cls, *args, **kwargs) |
| if not skip and cls.cachable and fs_token_main_thread in cls._cache: |
| |
| instance_state = cls._cache[fs_token_main_thread]._get_instance_state() |
| for attr, state_value in instance_state.items(): |
| setattr(obj, attr, state_value) |
| obj._fs_token_ = fs_token |
| obj.storage_args = args |
| obj.storage_options = kwargs |
| if cls.cachable and not skip: |
| cls._latest = fs_token |
| cls._cache[fs_token] = obj |
| return obj |
|
|
|
|
| class HfFileSystem(fsspec.AbstractFileSystem, metaclass=_Cached): |
| """ |
| Access a remote Hugging Face Hub repository as if were a local file system. |
| |
| > [!WARNING] |
| > [`HfFileSystem`] provides fsspec compatibility, which is useful for libraries that require it (e.g., reading |
| > Hugging Face datasets directly with `pandas`). However, it introduces additional overhead due to this compatibility |
| > layer. For better performance and reliability, it's recommended to use `HfApi` methods when possible. |
| |
| Args: |
| endpoint (`str`, *optional*): |
| Endpoint of the Hub. Defaults to <https://huggingface.co>. |
| token (`bool` or `str`, *optional*): |
| A valid user access token (string). Defaults to the locally saved |
| token, which is the recommended method for authentication (see |
| https://huggingface.co/docs/huggingface_hub/quick-start#authentication). |
| To disable authentication, pass `False`. |
| block_size (`int`, *optional*): |
| Block size for reading and writing files. |
| expand_info (`bool`, *optional*): |
| Whether to expand the information of the files. |
| **storage_options (`dict`, *optional*): |
| Additional options for the filesystem. See [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.__init__). |
| |
| Usage: |
| |
| ```python |
| >>> from huggingface_hub import hffs |
| |
| >>> # List files |
| >>> hffs.glob("my-username/my-model/*.bin") |
| ['my-username/my-model/pytorch_model.bin'] |
| >>> hffs.ls("datasets/my-username/my-dataset", detail=False) |
| ['datasets/my-username/my-dataset/.gitattributes', 'datasets/my-username/my-dataset/README.md', 'datasets/my-username/my-dataset/data.json'] |
| |
| >>> # Read/write files |
| >>> with hffs.open("my-username/my-model/pytorch_model.bin") as f: |
| ... data = f.read() |
| >>> with hffs.open("my-username/my-model/pytorch_model.bin", "wb") as f: |
| ... f.write(data) |
| ``` |
| |
| Specify a token for authentication: |
| ```python |
| >>> from huggingface_hub import HfFileSystem |
| >>> hffs = HfFileSystem(token=token) |
| ``` |
| """ |
|
|
| root_marker = "" |
| protocol = "hf" |
|
|
| def __init__( |
| self, |
| *args, |
| endpoint: Optional[str] = None, |
| token: Union[bool, str, None] = None, |
| block_size: Optional[int] = None, |
| expand_info: Optional[bool] = None, |
| **storage_options, |
| ): |
| super().__init__(*args, **storage_options) |
| self.endpoint = endpoint or constants.ENDPOINT |
| self.token = token |
| self._api = HfApi(endpoint=endpoint, token=token) |
| self.block_size = block_size |
| self.expand_info = expand_info |
| |
| |
| |
| self._repo_and_revision_exists_cache: dict[ |
| tuple[str, str, Optional[str]], tuple[bool, Optional[Exception]] |
| ] = {} |
| |
| self.dircache: dict[str, list[dict[str, Any]]] = {} |
|
|
| @classmethod |
| def _tokenize(cls, threading_ident: int, *args, **kwargs) -> str: |
| """Deterministic token for caching""" |
| |
| kwargs["endpoint"] = kwargs.get("endpoint") or constants.ENDPOINT |
| kwargs["token"] = kwargs.get("token") |
| kwargs = {key: kwargs[key] for key in sorted(kwargs)} |
| |
| tokenize_args = (cls, threading_ident, args, kwargs) |
| h = md5(str(tokenize_args).encode()) |
| return h.hexdigest() |
|
|
| def _repo_and_revision_exist( |
| self, repo_type: str, repo_id: str, revision: Optional[str] |
| ) -> tuple[bool, Optional[Exception]]: |
| if (repo_type, repo_id, revision) not in self._repo_and_revision_exists_cache: |
| try: |
| self._api.repo_info( |
| repo_id, revision=revision, repo_type=repo_type, timeout=constants.HF_HUB_ETAG_TIMEOUT |
| ) |
| except (RepositoryNotFoundError, HFValidationError) as e: |
| self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] = False, e |
| self._repo_and_revision_exists_cache[(repo_type, repo_id, None)] = False, e |
| except RevisionNotFoundError as e: |
| self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] = False, e |
| self._repo_and_revision_exists_cache[(repo_type, repo_id, None)] = True, None |
| else: |
| self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] = True, None |
| self._repo_and_revision_exists_cache[(repo_type, repo_id, None)] = True, None |
| return self._repo_and_revision_exists_cache[(repo_type, repo_id, revision)] |
|
|
| def resolve_path(self, path: str, revision: Optional[str] = None) -> HfFileSystemResolvedPath: |
| """ |
| Resolve a Hugging Face file system path into its components. |
| |
| Args: |
| path (`str`): |
| Path to resolve. |
| revision (`str`, *optional*): |
| The revision of the repo to resolve. Defaults to the revision specified in the path. |
| |
| Returns: |
| [`HfFileSystemResolvedPath`]: Resolved path information containing `repo_type`, `repo_id`, `revision` and `path_in_repo`. |
| |
| Raises: |
| `ValueError`: |
| If path contains conflicting revision information. |
| `NotImplementedError`: |
| If trying to list repositories. |
| """ |
|
|
| def _align_revision_in_path_with_revision( |
| revision_in_path: Optional[str], revision: Optional[str] |
| ) -> Optional[str]: |
| if revision is not None: |
| if revision_in_path is not None and revision_in_path != revision: |
| raise ValueError( |
| f'Revision specified in path ("{revision_in_path}") and in `revision` argument ("{revision}")' |
| " are not the same." |
| ) |
| else: |
| revision = revision_in_path |
| return revision |
|
|
| path = self._strip_protocol(path) |
| if not path: |
| |
| raise NotImplementedError("Access to repositories lists is not implemented.") |
| elif path.split("/")[0] + "/" in constants.REPO_TYPES_URL_PREFIXES.values(): |
| if "/" not in path: |
| |
| raise NotImplementedError("Access to repositories lists is not implemented.") |
| repo_type, path = path.split("/", 1) |
| repo_type = constants.REPO_TYPES_MAPPING[repo_type] |
| else: |
| repo_type = constants.REPO_TYPE_MODEL |
| if path.count("/") > 0: |
| if "@" in path: |
| repo_id, revision_in_path = path.split("@", 1) |
| if "/" in revision_in_path: |
| match = SPECIAL_REFS_REVISION_REGEX.search(revision_in_path) |
| if match is not None and revision in (None, match.group()): |
| |
| path_in_repo = SPECIAL_REFS_REVISION_REGEX.sub("", revision_in_path).lstrip("/") |
| revision_in_path = match.group() |
| else: |
| revision_in_path, path_in_repo = revision_in_path.split("/", 1) |
| else: |
| path_in_repo = "" |
| revision = _align_revision_in_path_with_revision(unquote(revision_in_path), revision) |
| repo_and_revision_exist, err = self._repo_and_revision_exist(repo_type, repo_id, revision) |
| if not repo_and_revision_exist: |
| _raise_file_not_found(path, err) |
| else: |
| revision_in_path = None |
| repo_id_with_namespace = "/".join(path.split("/")[:2]) |
| path_in_repo_with_namespace = "/".join(path.split("/")[2:]) |
| repo_id_without_namespace = path.split("/")[0] |
| path_in_repo_without_namespace = "/".join(path.split("/")[1:]) |
| repo_id = repo_id_with_namespace |
| path_in_repo = path_in_repo_with_namespace |
| repo_and_revision_exist, err = self._repo_and_revision_exist(repo_type, repo_id, revision) |
| if not repo_and_revision_exist: |
| if isinstance(err, (RepositoryNotFoundError, HFValidationError)): |
| repo_id = repo_id_without_namespace |
| path_in_repo = path_in_repo_without_namespace |
| repo_and_revision_exist, _ = self._repo_and_revision_exist(repo_type, repo_id, revision) |
| if not repo_and_revision_exist: |
| _raise_file_not_found(path, err) |
| else: |
| _raise_file_not_found(path, err) |
| else: |
| repo_id = path |
| path_in_repo = "" |
| if "@" in path: |
| repo_id, revision_in_path = path.split("@", 1) |
| revision = _align_revision_in_path_with_revision(unquote(revision_in_path), revision) |
| else: |
| revision_in_path = None |
| repo_and_revision_exist, _ = self._repo_and_revision_exist(repo_type, repo_id, revision) |
| if not repo_and_revision_exist: |
| raise NotImplementedError("Access to repositories lists is not implemented.") |
|
|
| revision = revision if revision is not None else constants.DEFAULT_REVISION |
| return HfFileSystemResolvedPath(repo_type, repo_id, revision, path_in_repo, _raw_revision=revision_in_path) |
|
|
| def invalidate_cache(self, path: Optional[str] = None) -> None: |
| """ |
| Clear the cache for a given path. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.invalidate_cache). |
| |
| Args: |
| path (`str`, *optional*): |
| Path to clear from cache. If not provided, clear the entire cache. |
| |
| """ |
| if not path: |
| self.dircache.clear() |
| self._repo_and_revision_exists_cache.clear() |
| else: |
| resolved_path = self.resolve_path(path) |
| path = resolved_path.unresolve() |
| while path: |
| self.dircache.pop(path, None) |
| path = self._parent(path) |
|
|
| |
| if not resolved_path.path_in_repo: |
| self._repo_and_revision_exists_cache.pop((resolved_path.repo_type, resolved_path.repo_id, None), None) |
| self._repo_and_revision_exists_cache.pop( |
| (resolved_path.repo_type, resolved_path.repo_id, resolved_path.revision), None |
| ) |
|
|
| def _open( |
| self, |
| path: str, |
| mode: str = "rb", |
| block_size: Optional[int] = None, |
| revision: Optional[str] = None, |
| **kwargs, |
| ) -> Union["HfFileSystemFile", "HfFileSystemStreamFile"]: |
| block_size = block_size if block_size is not None else self.block_size |
| if block_size is not None: |
| kwargs["block_size"] = block_size |
| if "a" in mode: |
| raise NotImplementedError("Appending to remote files is not yet supported.") |
| if block_size == 0: |
| return HfFileSystemStreamFile(self, path, mode=mode, revision=revision, **kwargs) |
| else: |
| return HfFileSystemFile(self, path, mode=mode, revision=revision, **kwargs) |
|
|
| def _rm(self, path: str, revision: Optional[str] = None, **kwargs) -> None: |
| resolved_path = self.resolve_path(path, revision=revision) |
| self._api.delete_file( |
| path_in_repo=resolved_path.path_in_repo, |
| repo_id=resolved_path.repo_id, |
| token=self.token, |
| repo_type=resolved_path.repo_type, |
| revision=resolved_path.revision, |
| commit_message=kwargs.get("commit_message"), |
| commit_description=kwargs.get("commit_description"), |
| ) |
| self.invalidate_cache(path=resolved_path.unresolve()) |
|
|
| def rm( |
| self, |
| path: str, |
| recursive: bool = False, |
| maxdepth: Optional[int] = None, |
| revision: Optional[str] = None, |
| **kwargs, |
| ) -> None: |
| """ |
| Delete files from a repository. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.rm). |
| |
| > [!WARNING] |
| > Note: When possible, use `HfApi.delete_file()` for better performance. |
| |
| Args: |
| path (`str`): |
| Path to delete. |
| recursive (`bool`, *optional*): |
| If True, delete directory and all its contents. Defaults to False. |
| maxdepth (`int`, *optional*): |
| Maximum number of subdirectories to visit when deleting recursively. |
| revision (`str`, *optional*): |
| The git revision to delete from. |
| |
| """ |
| resolved_path = self.resolve_path(path, revision=revision) |
| paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth, revision=revision) |
| paths_in_repo = [self.resolve_path(path).path_in_repo for path in paths if not self.isdir(path)] |
| operations = [CommitOperationDelete(path_in_repo=path_in_repo) for path_in_repo in paths_in_repo] |
| commit_message = f"Delete {path} " |
| commit_message += "recursively " if recursive else "" |
| commit_message += f"up to depth {maxdepth} " if maxdepth is not None else "" |
| |
| self._api.create_commit( |
| repo_id=resolved_path.repo_id, |
| repo_type=resolved_path.repo_type, |
| token=self.token, |
| operations=operations, |
| revision=resolved_path.revision, |
| commit_message=kwargs.get("commit_message", commit_message), |
| commit_description=kwargs.get("commit_description"), |
| ) |
| self.invalidate_cache(path=resolved_path.unresolve()) |
|
|
| def ls( |
| self, path: str, detail: bool = True, refresh: bool = False, revision: Optional[str] = None, **kwargs |
| ) -> list[Union[str, dict[str, Any]]]: |
| """ |
| List the contents of a directory. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.ls). |
| |
| > [!WARNING] |
| > Note: When possible, use `HfApi.list_repo_tree()` for better performance. |
| |
| Args: |
| path (`str`): |
| Path to the directory. |
| detail (`bool`, *optional*): |
| If True, returns a list of dictionaries containing file information. If False, |
| returns a list of file paths. Defaults to True. |
| refresh (`bool`, *optional*): |
| If True, bypass the cache and fetch the latest data. Defaults to False. |
| revision (`str`, *optional*): |
| The git revision to list from. |
| |
| Returns: |
| `list[Union[str, dict[str, Any]]]`: List of file paths (if detail=False) or list of file information |
| dictionaries (if detail=True). |
| """ |
| resolved_path = self.resolve_path(path, revision=revision) |
| path = resolved_path.unresolve() |
| try: |
| out = self._ls_tree(path, refresh=refresh, revision=revision, **kwargs) |
| except EntryNotFoundError: |
| |
| if not resolved_path.path_in_repo: |
| _raise_file_not_found(path, None) |
| out = self._ls_tree(self._parent(path), refresh=refresh, revision=revision, **kwargs) |
| out = [o for o in out if o["name"] == path] |
| if len(out) == 0: |
| _raise_file_not_found(path, None) |
| return out if detail else [o["name"] for o in out] |
|
|
| def _ls_tree( |
| self, |
| path: str, |
| recursive: bool = False, |
| refresh: bool = False, |
| revision: Optional[str] = None, |
| expand_info: Optional[bool] = None, |
| maxdepth: Optional[int] = None, |
| ): |
| expand_info = ( |
| expand_info if expand_info is not None else (self.expand_info if self.expand_info is not None else False) |
| ) |
| resolved_path = self.resolve_path(path, revision=revision) |
| path = resolved_path.unresolve() |
| root_path = HfFileSystemResolvedPath( |
| resolved_path.repo_type, |
| resolved_path.repo_id, |
| resolved_path.revision, |
| path_in_repo="", |
| _raw_revision=resolved_path._raw_revision, |
| ).unresolve() |
|
|
| out = [] |
| if path in self.dircache and not refresh: |
| cached_path_infos = self.dircache[path] |
| out.extend(cached_path_infos) |
| dirs_not_in_dircache = [] |
| if recursive: |
| |
| |
| depth = 2 |
| dirs_to_visit = deque( |
| [(depth, path_info) for path_info in cached_path_infos if path_info["type"] == "directory"] |
| ) |
| while dirs_to_visit: |
| depth, dir_info = dirs_to_visit.popleft() |
| if maxdepth is None or depth <= maxdepth: |
| if dir_info["name"] not in self.dircache: |
| dirs_not_in_dircache.append(dir_info["name"]) |
| else: |
| cached_path_infos = self.dircache[dir_info["name"]] |
| out.extend(cached_path_infos) |
| dirs_to_visit.extend( |
| [ |
| (depth + 1, path_info) |
| for path_info in cached_path_infos |
| if path_info["type"] == "directory" |
| ] |
| ) |
|
|
| dirs_not_expanded = [] |
| if expand_info: |
| |
| dirs_not_expanded = [self._parent(o["name"]) for o in out if o["last_commit"] is None] |
|
|
| if (recursive and dirs_not_in_dircache) or (expand_info and dirs_not_expanded): |
| |
| |
| common_prefix = os.path.commonprefix(dirs_not_in_dircache + dirs_not_expanded) |
| |
| common_path = ( |
| common_prefix.rstrip("/") |
| if common_prefix.endswith("/") |
| or common_prefix == root_path |
| or common_prefix in chain(dirs_not_in_dircache, dirs_not_expanded) |
| else self._parent(common_prefix) |
| ) |
| if maxdepth is not None: |
| common_path_depth = common_path[len(path) :].count("/") |
| maxdepth -= common_path_depth |
| out = [o for o in out if not o["name"].startswith(common_path + "/")] |
| for cached_path in list(self.dircache): |
| if cached_path.startswith(common_path + "/"): |
| self.dircache.pop(cached_path, None) |
| self.dircache.pop(common_path, None) |
| out.extend( |
| self._ls_tree( |
| common_path, |
| recursive=recursive, |
| refresh=True, |
| revision=revision, |
| expand_info=expand_info, |
| maxdepth=maxdepth, |
| ) |
| ) |
| else: |
| tree = self._api.list_repo_tree( |
| resolved_path.repo_id, |
| resolved_path.path_in_repo, |
| recursive=recursive, |
| expand=expand_info, |
| revision=resolved_path.revision, |
| repo_type=resolved_path.repo_type, |
| ) |
| for path_info in tree: |
| cache_path = root_path + "/" + path_info.path |
| if isinstance(path_info, RepoFile): |
| cache_path_info = { |
| "name": cache_path, |
| "size": path_info.size, |
| "type": "file", |
| "blob_id": path_info.blob_id, |
| "lfs": path_info.lfs, |
| "last_commit": path_info.last_commit, |
| "security": path_info.security, |
| } |
| else: |
| cache_path_info = { |
| "name": cache_path, |
| "size": 0, |
| "type": "directory", |
| "tree_id": path_info.tree_id, |
| "last_commit": path_info.last_commit, |
| } |
| parent_path = self._parent(cache_path_info["name"]) |
| self.dircache.setdefault(parent_path, []).append(cache_path_info) |
| depth = cache_path[len(path) :].count("/") |
| if maxdepth is None or depth <= maxdepth: |
| out.append(cache_path_info) |
| return out |
|
|
| def walk(self, path: str, *args, **kwargs) -> Iterator[tuple[str, list[str], list[str]]]: |
| """ |
| Return all files below the given path. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.walk). |
| |
| Args: |
| path (`str`): |
| Root path to list files from. |
| |
| Returns: |
| `Iterator[tuple[str, list[str], list[str]]]`: An iterator of (path, list of directory names, list of file names) tuples. |
| """ |
| path = self.resolve_path(path, revision=kwargs.get("revision")).unresolve() |
| yield from super().walk(path, *args, **kwargs) |
|
|
| def glob(self, path: str, maxdepth: Optional[int] = None, **kwargs) -> list[str]: |
| """ |
| Find files by glob-matching. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.glob). |
| |
| Args: |
| path (`str`): |
| Path pattern to match. |
| |
| Returns: |
| `list[str]`: List of paths matching the pattern. |
| """ |
| path = self.resolve_path(path, revision=kwargs.get("revision")).unresolve() |
| return super().glob(path, maxdepth=maxdepth, **kwargs) |
|
|
| def find( |
| self, |
| path: str, |
| maxdepth: Optional[int] = None, |
| withdirs: bool = False, |
| detail: bool = False, |
| refresh: bool = False, |
| revision: Optional[str] = None, |
| **kwargs, |
| ) -> Union[list[str], dict[str, dict[str, Any]]]: |
| """ |
| List all files below path. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.find). |
| |
| Args: |
| path (`str`): |
| Root path to list files from. |
| maxdepth (`int`, *optional*): |
| Maximum depth to descend into subdirectories. |
| withdirs (`bool`, *optional*): |
| Include directory paths in the output. Defaults to False. |
| detail (`bool`, *optional*): |
| If True, returns a dict mapping paths to file information. Defaults to False. |
| refresh (`bool`, *optional*): |
| If True, bypass the cache and fetch the latest data. Defaults to False. |
| revision (`str`, *optional*): |
| The git revision to list from. |
| |
| Returns: |
| `Union[list[str], dict[str, dict[str, Any]]]`: List of paths or dict of file information. |
| """ |
| if maxdepth is not None and maxdepth < 1: |
| raise ValueError("maxdepth must be at least 1") |
| resolved_path = self.resolve_path(path, revision=revision) |
| path = resolved_path.unresolve() |
| try: |
| out = self._ls_tree( |
| path, recursive=True, refresh=refresh, revision=resolved_path.revision, maxdepth=maxdepth, **kwargs |
| ) |
| except EntryNotFoundError: |
| |
| try: |
| if self.info(path, revision=revision, **kwargs)["type"] == "file": |
| out = {path: {}} |
| else: |
| out = {} |
| except FileNotFoundError: |
| out = {} |
| else: |
| if not withdirs: |
| out = [o for o in out if o["type"] != "directory"] |
| else: |
| |
| path_info = self.info(path, revision=resolved_path.revision, **kwargs) |
| out = [path_info] + out if path_info["type"] == "directory" else out |
| out = {o["name"]: o for o in out} |
| names = sorted(out) |
| if not detail: |
| return names |
| else: |
| return {name: out[name] for name in names} |
|
|
| def cp_file(self, path1: str, path2: str, revision: Optional[str] = None, **kwargs) -> None: |
| """ |
| Copy a file within or between repositories. |
| |
| > [!WARNING] |
| > Note: When possible, use `HfApi.upload_file()` for better performance. |
| |
| Args: |
| path1 (`str`): |
| Source path to copy from. |
| path2 (`str`): |
| Destination path to copy to. |
| revision (`str`, *optional*): |
| The git revision to copy from. |
| |
| """ |
| resolved_path1 = self.resolve_path(path1, revision=revision) |
| resolved_path2 = self.resolve_path(path2, revision=revision) |
|
|
| same_repo = ( |
| resolved_path1.repo_type == resolved_path2.repo_type and resolved_path1.repo_id == resolved_path2.repo_id |
| ) |
|
|
| if same_repo: |
| commit_message = f"Copy {path1} to {path2}" |
| self._api.create_commit( |
| repo_id=resolved_path1.repo_id, |
| repo_type=resolved_path1.repo_type, |
| revision=resolved_path2.revision, |
| commit_message=kwargs.get("commit_message", commit_message), |
| commit_description=kwargs.get("commit_description", ""), |
| operations=[ |
| CommitOperationCopy( |
| src_path_in_repo=resolved_path1.path_in_repo, |
| path_in_repo=resolved_path2.path_in_repo, |
| src_revision=resolved_path1.revision, |
| ) |
| ], |
| ) |
| else: |
| with self.open(path1, "rb", revision=resolved_path1.revision) as f: |
| content = f.read() |
| commit_message = f"Copy {path1} to {path2}" |
| self._api.upload_file( |
| path_or_fileobj=content, |
| path_in_repo=resolved_path2.path_in_repo, |
| repo_id=resolved_path2.repo_id, |
| token=self.token, |
| repo_type=resolved_path2.repo_type, |
| revision=resolved_path2.revision, |
| commit_message=kwargs.get("commit_message", commit_message), |
| commit_description=kwargs.get("commit_description"), |
| ) |
| self.invalidate_cache(path=resolved_path1.unresolve()) |
| self.invalidate_cache(path=resolved_path2.unresolve()) |
|
|
| def modified(self, path: str, **kwargs) -> datetime: |
| """ |
| Get the last modified time of a file. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.modified). |
| |
| Args: |
| path (`str`): |
| Path to the file. |
| |
| Returns: |
| `datetime`: Last commit date of the file. |
| """ |
| info = self.info(path, **{**kwargs, "expand_info": True}) |
| return info["last_commit"]["date"] |
|
|
| def info(self, path: str, refresh: bool = False, revision: Optional[str] = None, **kwargs) -> dict[str, Any]: |
| """ |
| Get information about a file or directory. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.info). |
| |
| > [!WARNING] |
| > Note: When possible, use `HfApi.get_paths_info()` or `HfApi.repo_info()` for better performance. |
| |
| Args: |
| path (`str`): |
| Path to get info for. |
| refresh (`bool`, *optional*): |
| If True, bypass the cache and fetch the latest data. Defaults to False. |
| revision (`str`, *optional*): |
| The git revision to get info from. |
| |
| Returns: |
| `dict[str, Any]`: Dictionary containing file information (type, size, commit info, etc.). |
| |
| """ |
| resolved_path = self.resolve_path(path, revision=revision) |
| path = resolved_path.unresolve() |
| expand_info = kwargs.get( |
| "expand_info", self.expand_info if self.expand_info is not None else False |
| ) |
| if not resolved_path.path_in_repo: |
| |
| out = { |
| "name": path, |
| "size": 0, |
| "type": "directory", |
| "last_commit": None, |
| } |
| if expand_info: |
| last_commit = self._api.list_repo_commits( |
| resolved_path.repo_id, repo_type=resolved_path.repo_type, revision=resolved_path.revision |
| )[-1] |
| out = { |
| **out, |
| "tree_id": None, |
| "last_commit": LastCommitInfo( |
| oid=last_commit.commit_id, title=last_commit.title, date=last_commit.created_at |
| ), |
| } |
| else: |
| out = None |
| parent_path = self._parent(path) |
| if not expand_info and parent_path not in self.dircache: |
| |
| self.ls(parent_path) |
| if parent_path in self.dircache: |
| |
| out1 = [o for o in self.dircache[parent_path] if o["name"] == path] |
| if not out1: |
| _raise_file_not_found(path, None) |
| out = out1[0] |
| if refresh or out is None or (expand_info and out and out["last_commit"] is None): |
| paths_info = self._api.get_paths_info( |
| resolved_path.repo_id, |
| resolved_path.path_in_repo, |
| expand=expand_info, |
| revision=resolved_path.revision, |
| repo_type=resolved_path.repo_type, |
| ) |
| if not paths_info: |
| _raise_file_not_found(path, None) |
| path_info = paths_info[0] |
| root_path = HfFileSystemResolvedPath( |
| resolved_path.repo_type, |
| resolved_path.repo_id, |
| resolved_path.revision, |
| path_in_repo="", |
| _raw_revision=resolved_path._raw_revision, |
| ).unresolve() |
| if isinstance(path_info, RepoFile): |
| out = { |
| "name": root_path + "/" + path_info.path, |
| "size": path_info.size, |
| "type": "file", |
| "blob_id": path_info.blob_id, |
| "lfs": path_info.lfs, |
| "last_commit": path_info.last_commit, |
| "security": path_info.security, |
| } |
| else: |
| out = { |
| "name": root_path + "/" + path_info.path, |
| "size": 0, |
| "type": "directory", |
| "tree_id": path_info.tree_id, |
| "last_commit": path_info.last_commit, |
| } |
| if not expand_info: |
| out = {k: out[k] for k in ["name", "size", "type"]} |
| assert out is not None |
| return out |
|
|
| def exists(self, path, **kwargs): |
| """ |
| Check if a file exists. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists). |
| |
| > [!WARNING] |
| > Note: When possible, use `HfApi.file_exists()` for better performance. |
| |
| Args: |
| path (`str`): |
| Path to check. |
| |
| Returns: |
| `bool`: True if file exists, False otherwise. |
| """ |
| try: |
| if kwargs.get("refresh", False): |
| self.invalidate_cache(path) |
|
|
| self.info(path, **kwargs) |
| return True |
| except: |
| return False |
|
|
| def isdir(self, path): |
| """ |
| Check if a path is a directory. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.isdir). |
| |
| Args: |
| path (`str`): |
| Path to check. |
| |
| Returns: |
| `bool`: True if path is a directory, False otherwise. |
| """ |
| try: |
| return self.info(path)["type"] == "directory" |
| except OSError: |
| return False |
|
|
| def isfile(self, path): |
| """ |
| Check if a path is a file. |
| |
| For more details, refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.isfile). |
| |
| Args: |
| path (`str`): |
| Path to check. |
| |
| Returns: |
| `bool`: True if path is a file, False otherwise. |
| """ |
| try: |
| return self.info(path)["type"] == "file" |
| except: |
| return False |
|
|
| def url(self, path: str) -> str: |
| """ |
| Get the HTTP URL of the given path. |
| |
| Args: |
| path (`str`): |
| Path to get URL for. |
| |
| Returns: |
| `str`: HTTP URL to access the file or directory on the Hub. |
| """ |
| resolved_path = self.resolve_path(path) |
| url = hf_hub_url( |
| resolved_path.repo_id, |
| resolved_path.path_in_repo, |
| repo_type=resolved_path.repo_type, |
| revision=resolved_path.revision, |
| endpoint=self.endpoint, |
| ) |
| if self.isdir(path): |
| url = url.replace("/resolve/", "/tree/", 1) |
| return url |
|
|
| def get_file(self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs) -> None: |
| """ |
| Copy single remote file to local. |
| |
| > [!WARNING] |
| > Note: When possible, use `HfApi.hf_hub_download()` for better performance. |
| |
| Args: |
| rpath (`str`): |
| Remote path to download from. |
| lpath (`str`): |
| Local path to download to. |
| callback (`Callback`, *optional*): |
| Optional callback to track download progress. Defaults to no callback. |
| outfile (`IO`, *optional*): |
| Optional file-like object to write to. If provided, `lpath` is ignored. |
| |
| """ |
| revision = kwargs.get("revision") |
| unhandled_kwargs = set(kwargs.keys()) - {"revision"} |
| if not isinstance(callback, (NoOpCallback, TqdmCallback)) or len(unhandled_kwargs) > 0: |
| |
| |
| return super().get_file(rpath, lpath, callback=callback, outfile=outfile, **kwargs) |
|
|
| |
| if isfilelike(lpath): |
| outfile = lpath |
| elif self.isdir(rpath): |
| os.makedirs(lpath, exist_ok=True) |
| return None |
|
|
| if isinstance(lpath, (str, Path)): |
| os.makedirs(os.path.dirname(lpath), exist_ok=True) |
|
|
| |
| close_file = False |
| if outfile is None: |
| outfile = open(lpath, "wb") |
| close_file = True |
| initial_pos = outfile.tell() |
|
|
| |
| resolve_remote_path = self.resolve_path(rpath, revision=revision) |
| expected_size = self.info(rpath, revision=revision)["size"] |
| callback.set_size(expected_size) |
| try: |
| http_get( |
| url=hf_hub_url( |
| repo_id=resolve_remote_path.repo_id, |
| revision=resolve_remote_path.revision, |
| filename=resolve_remote_path.path_in_repo, |
| repo_type=resolve_remote_path.repo_type, |
| endpoint=self.endpoint, |
| ), |
| temp_file=outfile, |
| displayed_filename=rpath, |
| expected_size=expected_size, |
| resume_size=0, |
| headers=self._api._build_hf_headers(), |
| _tqdm_bar=callback.tqdm if isinstance(callback, TqdmCallback) else None, |
| ) |
| outfile.seek(initial_pos) |
| finally: |
| |
| if close_file: |
| outfile.close() |
|
|
| @property |
| def transaction(self): |
| """A context within which files are committed together upon exit |
| |
| Requires the file class to implement `.commit()` and `.discard()` |
| for the normal and exception cases. |
| """ |
| |
| |
| raise NotImplementedError("Transactional commits are not supported.") |
|
|
| def start_transaction(self): |
| """Begin write transaction for deferring files, non-context version""" |
| |
| |
| raise NotImplementedError("Transactional commits are not supported.") |
|
|
| def __reduce__(self): |
| |
| return make_instance, ( |
| type(self), |
| self.storage_args, |
| self.storage_options, |
| self._get_instance_state(), |
| ) |
|
|
| def _get_instance_state(self): |
| return { |
| "dircache": deepcopy(self.dircache), |
| "_repo_and_revision_exists_cache": deepcopy(self._repo_and_revision_exists_cache), |
| } |
|
|
|
|
| class HfFileSystemFile(fsspec.spec.AbstractBufferedFile): |
| def __init__(self, fs: HfFileSystem, path: str, revision: Optional[str] = None, **kwargs): |
| try: |
| self.resolved_path = fs.resolve_path(path, revision=revision) |
| except FileNotFoundError as e: |
| if "w" in kwargs.get("mode", ""): |
| raise FileNotFoundError( |
| f"{e}.\nMake sure the repository and revision exist before writing data." |
| ) from e |
| raise |
| super().__init__(fs, self.resolved_path.unresolve(), **kwargs) |
| self.fs: HfFileSystem |
|
|
| def __del__(self): |
| if not hasattr(self, "resolved_path"): |
| |
| return |
| return super().__del__() |
|
|
| def _fetch_range(self, start: int, end: int) -> bytes: |
| headers = { |
| "range": f"bytes={start}-{end - 1}", |
| **self.fs._api._build_hf_headers(), |
| } |
| url = hf_hub_url( |
| repo_id=self.resolved_path.repo_id, |
| revision=self.resolved_path.revision, |
| filename=self.resolved_path.path_in_repo, |
| repo_type=self.resolved_path.repo_type, |
| endpoint=self.fs.endpoint, |
| ) |
| r = http_backoff("GET", url, headers=headers, timeout=constants.HF_HUB_DOWNLOAD_TIMEOUT) |
| hf_raise_for_status(r) |
| return r.content |
|
|
| def _initiate_upload(self) -> None: |
| self.temp_file = tempfile.NamedTemporaryFile(prefix="hffs-", delete=False) |
|
|
| def _upload_chunk(self, final: bool = False) -> None: |
| self.buffer.seek(0) |
| block = self.buffer.read() |
| self.temp_file.write(block) |
| if final: |
| self.temp_file.close() |
| self.fs._api.upload_file( |
| path_or_fileobj=self.temp_file.name, |
| path_in_repo=self.resolved_path.path_in_repo, |
| repo_id=self.resolved_path.repo_id, |
| token=self.fs.token, |
| repo_type=self.resolved_path.repo_type, |
| revision=self.resolved_path.revision, |
| commit_message=self.kwargs.get("commit_message"), |
| commit_description=self.kwargs.get("commit_description"), |
| ) |
| os.remove(self.temp_file.name) |
| self.fs.invalidate_cache( |
| path=self.resolved_path.unresolve(), |
| ) |
|
|
| def read(self, length=-1): |
| """Read remote file. |
| |
| If `length` is not provided or is -1, the entire file is downloaded and read. On POSIX systems the file is |
| loaded in memory directly. Otherwise, the file is downloaded to a temporary file and read from there. |
| """ |
| if self.mode == "rb" and (length is None or length == -1) and self.loc == 0: |
| with self.fs.open(self.path, "rb", block_size=0) as f: |
| out = f.read() |
| self.loc += len(out) |
| return out |
| return super().read(length) |
|
|
| def url(self) -> str: |
| return self.fs.url(self.path) |
|
|
|
|
| class HfFileSystemStreamFile(fsspec.spec.AbstractBufferedFile): |
| def __init__( |
| self, |
| fs: HfFileSystem, |
| path: str, |
| mode: str = "rb", |
| revision: Optional[str] = None, |
| block_size: int = 0, |
| cache_type: str = "none", |
| **kwargs, |
| ): |
| if block_size != 0: |
| raise ValueError(f"HfFileSystemStreamFile only supports block_size=0 but got {block_size}") |
| if cache_type != "none": |
| raise ValueError(f"HfFileSystemStreamFile only supports cache_type='none' but got {cache_type}") |
| if "w" in mode: |
| raise ValueError(f"HfFileSystemStreamFile only supports reading but got mode='{mode}'") |
| try: |
| self.resolved_path = fs.resolve_path(path, revision=revision) |
| except FileNotFoundError as e: |
| if "w" in kwargs.get("mode", ""): |
| raise FileNotFoundError( |
| f"{e}.\nMake sure the repository and revision exist before writing data." |
| ) from e |
| |
| self.details = {"name": self.resolved_path.unresolve(), "size": None} |
| super().__init__( |
| fs, self.resolved_path.unresolve(), mode=mode, block_size=block_size, cache_type=cache_type, **kwargs |
| ) |
| self.response: Optional[httpx.Response] = None |
| self.fs: HfFileSystem |
| self._exit_stack = ExitStack() |
|
|
| def seek(self, loc: int, whence: int = 0): |
| if loc == 0 and whence == 1: |
| return |
| if loc == self.loc and whence == 0: |
| return |
| raise ValueError("Cannot seek streaming HF file") |
|
|
| def read(self, length: int = -1): |
| """Read the remote file. |
| |
| If the file is already open, we reuse the connection. |
| Otherwise, open a new connection and read from it. |
| |
| If reading the stream fails, we retry with a new connection. |
| """ |
| if self.response is None: |
| self._open_connection() |
|
|
| retried_once = False |
| while True: |
| try: |
| if self.response is None: |
| return b"" |
| out = _partial_read(self.response, length) |
| self.loc += len(out) |
| return out |
| except Exception: |
| if self.response is not None: |
| self.response.close() |
| if retried_once: |
| raise |
| |
| self._open_connection() |
| retried_once = True |
|
|
| def url(self) -> str: |
| return self.fs.url(self.path) |
|
|
| def __del__(self): |
| if not hasattr(self, "resolved_path"): |
| |
| return |
| self._exit_stack.close() |
| return super().__del__() |
|
|
| def __reduce__(self): |
| return reopen, (self.fs, self.path, self.mode, self.blocksize, self.cache.name) |
|
|
| def _open_connection(self): |
| """Open a connection to the remote file.""" |
| url = hf_hub_url( |
| repo_id=self.resolved_path.repo_id, |
| revision=self.resolved_path.revision, |
| filename=self.resolved_path.path_in_repo, |
| repo_type=self.resolved_path.repo_type, |
| endpoint=self.fs.endpoint, |
| ) |
| headers = self.fs._api._build_hf_headers() |
| if self.loc > 0: |
| headers["Range"] = f"bytes={self.loc}-" |
| self.response = self._exit_stack.enter_context( |
| http_stream_backoff( |
| "GET", |
| url, |
| headers=headers, |
| timeout=constants.HF_HUB_DOWNLOAD_TIMEOUT, |
| ) |
| ) |
|
|
| try: |
| hf_raise_for_status(self.response) |
| except HfHubHTTPError as e: |
| if e.response.status_code == 416: |
| |
| self.response = None |
| return |
| raise |
|
|
|
|
| def safe_revision(revision: str) -> str: |
| return revision if SPECIAL_REFS_REVISION_REGEX.match(revision) else safe_quote(revision) |
|
|
|
|
| def safe_quote(s: str) -> str: |
| return quote(s, safe="") |
|
|
|
|
| def _raise_file_not_found(path: str, err: Optional[Exception]) -> NoReturn: |
| msg = path |
| if isinstance(err, RepositoryNotFoundError): |
| msg = f"{path} (repository not found)" |
| elif isinstance(err, RevisionNotFoundError): |
| msg = f"{path} (revision not found)" |
| elif isinstance(err, HFValidationError): |
| msg = f"{path} (invalid repository id)" |
| raise FileNotFoundError(msg) from err |
|
|
|
|
| def reopen(fs: HfFileSystem, path: str, mode: str, block_size: int, cache_type: str): |
| return fs.open(path, mode=mode, block_size=block_size, cache_type=cache_type) |
|
|
|
|
| def _partial_read(response: httpx.Response, length: int = -1) -> bytes: |
| """ |
| Read up to `length` bytes from a streamed response. |
| If length == -1, read until EOF. |
| """ |
| buf = bytearray() |
| if length < -1: |
| raise ValueError("length must be -1 or >= 0") |
| if length == 0: |
| return b"" |
| if length == -1: |
| for chunk in response.iter_bytes(): |
| buf.extend(chunk) |
| return bytes(buf) |
|
|
| for chunk in response.iter_bytes(chunk_size=length): |
| buf.extend(chunk) |
| if len(buf) >= length: |
| return bytes(buf[:length]) |
|
|
| return bytes(buf) |
|
|
|
|
| def make_instance(cls, args, kwargs, instance_state): |
| fs = cls(*args, **kwargs) |
| for attr, state_value in instance_state.items(): |
| setattr(fs, attr, state_value) |
| return fs |
|
|
|
|
| hffs = HfFileSystem() |
|
|