File size: 4,002 Bytes
e5dbee7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
import base64
import io
import re
import requests
import fsspec
class JupyterFileSystem(fsspec.AbstractFileSystem):
"""View of the files as seen by a Jupyter server (notebook or lab)"""
protocol = ("jupyter", "jlab")
def __init__(self, url, tok=None, **kwargs):
"""
Parameters
----------
url : str
Base URL of the server, like "http://127.0.0.1:8888". May include
token in the string, which is given by the process when starting up
tok : str
If the token is obtained separately, can be given here
kwargs
"""
if "?" in url:
if tok is None:
try:
tok = re.findall("token=([a-z0-9]+)", url)[0]
except IndexError as e:
raise ValueError("Could not determine token") from e
url = url.split("?", 1)[0]
self.url = url.rstrip("/") + "/api/contents"
self.session = requests.Session()
if tok:
self.session.headers["Authorization"] = f"token {tok}"
super().__init__(**kwargs)
def ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
r = self.session.get(f"{self.url}/{path}")
if r.status_code == 404:
raise FileNotFoundError(path)
r.raise_for_status()
out = r.json()
if out["type"] == "directory":
out = out["content"]
else:
out = [out]
for o in out:
o["name"] = o.pop("path")
o.pop("content")
if o["type"] == "notebook":
o["type"] = "file"
if detail:
return out
return [o["name"] for o in out]
def cat_file(self, path, start=None, end=None, **kwargs):
path = self._strip_protocol(path)
r = self.session.get(f"{self.url}/{path}")
if r.status_code == 404:
raise FileNotFoundError(path)
r.raise_for_status()
out = r.json()
if out["format"] == "text":
# data should be binary
b = out["content"].encode()
else:
b = base64.b64decode(out["content"])
return b[start:end]
def pipe_file(self, path, value, **_):
path = self._strip_protocol(path)
json = {
"name": path.rsplit("/", 1)[-1],
"path": path,
"size": len(value),
"content": base64.b64encode(value).decode(),
"format": "base64",
"type": "file",
}
self.session.put(f"{self.url}/{path}", json=json)
def mkdir(self, path, create_parents=True, **kwargs):
path = self._strip_protocol(path)
if create_parents and "/" in path:
self.mkdir(path.rsplit("/", 1)[0], True)
json = {
"name": path.rsplit("/", 1)[-1],
"path": path,
"size": None,
"content": None,
"type": "directory",
}
self.session.put(f"{self.url}/{path}", json=json)
def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
if path1 == path2:
return
self.session.patch(f"{self.url}/{path1}", json={"path": path2})
def _rm(self, path):
path = self._strip_protocol(path)
self.session.delete(f"{self.url}/{path}")
def _open(self, path, mode="rb", **kwargs):
path = self._strip_protocol(path)
if mode == "rb":
data = self.cat_file(path)
return io.BytesIO(data)
else:
return SimpleFileWriter(self, path, mode="wb")
class SimpleFileWriter(fsspec.spec.AbstractBufferedFile):
def _upload_chunk(self, final=False):
"""Never uploads a chunk until file is done
Not suitable for large files
"""
if final is False:
return False
self.buffer.seek(0)
data = self.buffer.read()
self.fs.pipe_file(self.path, data)
|