Spaces:
Sleeping
Sleeping
| import requests | |
| from ..spec import AbstractFileSystem | |
| from ..utils import infer_storage_options | |
| from .memory import MemoryFile | |
| class GistFileSystem(AbstractFileSystem): | |
| """ | |
| Interface to files in a single GitHub Gist. | |
| Provides read-only access to a gist's files. Gists do not contain | |
| subdirectories, so file listing is straightforward. | |
| Parameters | |
| ---------- | |
| gist_id : str | |
| The ID of the gist you want to access (the long hex value from the URL). | |
| filenames : list[str] (optional) | |
| If provided, only make a file system representing these files, and do not fetch | |
| the list of all files for this gist. | |
| sha : str (optional) | |
| If provided, fetch a particular revision of the gist. If omitted, | |
| the latest revision is used. | |
| username : str (optional) | |
| GitHub username for authentication (required if token is given). | |
| token : str (optional) | |
| GitHub personal access token (required if username is given). | |
| timeout : (float, float) or float, optional | |
| Connect and read timeouts for requests (default 60s each). | |
| kwargs : dict | |
| Stored on `self.request_kw` and passed to `requests.get` when fetching Gist | |
| metadata or reading ("opening") a file. | |
| """ | |
| protocol = "gist" | |
| gist_url = "https://api.github.com/gists/{gist_id}" | |
| gist_rev_url = "https://api.github.com/gists/{gist_id}/{sha}" | |
| def __init__( | |
| self, | |
| gist_id, | |
| filenames=None, | |
| sha=None, | |
| username=None, | |
| token=None, | |
| timeout=None, | |
| **kwargs, | |
| ): | |
| super().__init__() | |
| self.gist_id = gist_id | |
| self.filenames = filenames | |
| self.sha = sha # revision of the gist (optional) | |
| if (username is None) ^ (token is None): | |
| # Both or neither must be set | |
| if username or token: | |
| raise ValueError("Auth requires both username and token, or neither.") | |
| self.username = username | |
| self.token = token | |
| self.request_kw = kwargs | |
| # Default timeouts to 60s connect/read if none provided | |
| self.timeout = timeout if timeout is not None else (60, 60) | |
| # We use a single-level "directory" cache, because a gist is essentially flat | |
| self.dircache[""] = self._fetch_file_list() | |
| def kw(self): | |
| """Auth parameters passed to 'requests' if we have username/token.""" | |
| if self.username is not None and self.token is not None: | |
| return {"auth": (self.username, self.token), **self.request_kw} | |
| return self.request_kw | |
| def _fetch_gist_metadata(self): | |
| """ | |
| Fetch the JSON metadata for this gist (possibly for a specific revision). | |
| """ | |
| if self.sha: | |
| url = self.gist_rev_url.format(gist_id=self.gist_id, sha=self.sha) | |
| else: | |
| url = self.gist_url.format(gist_id=self.gist_id) | |
| r = requests.get(url, timeout=self.timeout, **self.kw) | |
| if r.status_code == 404: | |
| raise FileNotFoundError( | |
| f"Gist not found: {self.gist_id}@{self.sha or 'latest'}" | |
| ) | |
| r.raise_for_status() | |
| return r.json() | |
| def _fetch_file_list(self): | |
| """ | |
| Returns a list of dicts describing each file in the gist. These get stored | |
| in self.dircache[""]. | |
| """ | |
| meta = self._fetch_gist_metadata() | |
| if self.filenames: | |
| available_files = meta.get("files", {}) | |
| files = {} | |
| for fn in self.filenames: | |
| if fn not in available_files: | |
| raise FileNotFoundError(fn) | |
| files[fn] = available_files[fn] | |
| else: | |
| files = meta.get("files", {}) | |
| out = [] | |
| for fname, finfo in files.items(): | |
| if finfo is None: | |
| # Occasionally GitHub returns a file entry with null if it was deleted | |
| continue | |
| # Build a directory entry | |
| out.append( | |
| { | |
| "name": fname, # file's name | |
| "type": "file", # gists have no subdirectories | |
| "size": finfo.get("size", 0), # file size in bytes | |
| "raw_url": finfo.get("raw_url"), | |
| } | |
| ) | |
| return out | |
| def _strip_protocol(cls, path): | |
| """ | |
| Remove 'gist://' from the path, if present. | |
| """ | |
| # The default infer_storage_options can handle gist://username:token@id/file | |
| # or gist://id/file, but let's ensure we handle a normal usage too. | |
| # We'll just strip the protocol prefix if it exists. | |
| path = infer_storage_options(path).get("path", path) | |
| return path.lstrip("/") | |
| def _get_kwargs_from_urls(path): | |
| """ | |
| Parse 'gist://' style URLs into GistFileSystem constructor kwargs. | |
| For example: | |
| gist://:TOKEN@<gist_id>/file.txt | |
| gist://username:TOKEN@<gist_id>/file.txt | |
| """ | |
| so = infer_storage_options(path) | |
| out = {} | |
| if "username" in so and so["username"]: | |
| out["username"] = so["username"] | |
| if "password" in so and so["password"]: | |
| out["token"] = so["password"] | |
| if "host" in so and so["host"]: | |
| # We interpret 'host' as the gist ID | |
| out["gist_id"] = so["host"] | |
| # Extract SHA and filename from path | |
| if "path" in so and so["path"]: | |
| path_parts = so["path"].rsplit("/", 2)[-2:] | |
| if len(path_parts) == 2: | |
| if path_parts[0]: # SHA present | |
| out["sha"] = path_parts[0] | |
| if path_parts[1]: # filename also present | |
| out["filenames"] = [path_parts[1]] | |
| return out | |
| def ls(self, path="", detail=False, **kwargs): | |
| """ | |
| List files in the gist. Gists are single-level, so any 'path' is basically | |
| the filename, or empty for all files. | |
| Parameters | |
| ---------- | |
| path : str, optional | |
| The filename to list. If empty, returns all files in the gist. | |
| detail : bool, default False | |
| If True, return a list of dicts; if False, return a list of filenames. | |
| """ | |
| path = self._strip_protocol(path or "") | |
| # If path is empty, return all | |
| if path == "": | |
| results = self.dircache[""] | |
| else: | |
| # We want just the single file with this name | |
| all_files = self.dircache[""] | |
| results = [f for f in all_files if f["name"] == path] | |
| if not results: | |
| raise FileNotFoundError(path) | |
| if detail: | |
| return results | |
| else: | |
| return sorted(f["name"] for f in results) | |
| def _open(self, path, mode="rb", block_size=None, **kwargs): | |
| """ | |
| Read a single file from the gist. | |
| """ | |
| if mode != "rb": | |
| raise NotImplementedError("GitHub Gist FS is read-only (no write).") | |
| path = self._strip_protocol(path) | |
| # Find the file entry in our dircache | |
| matches = [f for f in self.dircache[""] if f["name"] == path] | |
| if not matches: | |
| raise FileNotFoundError(path) | |
| finfo = matches[0] | |
| raw_url = finfo.get("raw_url") | |
| if not raw_url: | |
| raise FileNotFoundError(f"No raw_url for file: {path}") | |
| r = requests.get(raw_url, timeout=self.timeout, **self.kw) | |
| if r.status_code == 404: | |
| raise FileNotFoundError(path) | |
| r.raise_for_status() | |
| return MemoryFile(path, None, r.content) | |
| def cat(self, path, recursive=False, on_error="raise", **kwargs): | |
| """ | |
| Return {path: contents} for the given file or files. If 'recursive' is True, | |
| and path is empty, returns all files in the gist. | |
| """ | |
| paths = self.expand_path(path, recursive=recursive) | |
| out = {} | |
| for p in paths: | |
| try: | |
| with self.open(p, "rb") as f: | |
| out[p] = f.read() | |
| except FileNotFoundError as e: | |
| if on_error == "raise": | |
| raise e | |
| elif on_error == "omit": | |
| pass # skip | |
| else: | |
| out[p] = e | |
| return out | |