| | import base64 |
| | import urllib |
| |
|
| | import requests |
| |
|
| | from fsspec import AbstractFileSystem |
| | from fsspec.spec import AbstractBufferedFile |
| |
|
| |
|
| | class DatabricksException(Exception): |
| | """ |
| | Helper class for exceptions raised in this module. |
| | """ |
| |
|
| | def __init__(self, error_code, message): |
| | """Create a new DatabricksException""" |
| | super().__init__(message) |
| |
|
| | self.error_code = error_code |
| | self.message = message |
| |
|
| |
|
| | class DatabricksFileSystem(AbstractFileSystem): |
| | """ |
| | Get access to the Databricks filesystem implementation over HTTP. |
| | Can be used inside and outside of a databricks cluster. |
| | """ |
| |
|
| | def __init__(self, instance, token, **kwargs): |
| | """ |
| | Create a new DatabricksFileSystem. |
| | |
| | Parameters |
| | ---------- |
| | instance: str |
| | The instance URL of the databricks cluster. |
| | For example for an Azure databricks cluster, this |
| | has the form adb-<some-number>.<two digits>.azuredatabricks.net. |
| | token: str |
| | Your personal token. Find out more |
| | here: https://docs.databricks.com/dev-tools/api/latest/authentication.html |
| | """ |
| | self.instance = instance |
| | self.token = token |
| |
|
| | self.session = requests.Session() |
| | self.session.headers.update({"Authorization": f"Bearer {self.token}"}) |
| |
|
| | super().__init__(**kwargs) |
| |
|
| | def ls(self, path, detail=True): |
| | """ |
| | List the contents of the given path. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Absolute path |
| | detail: bool |
| | Return not only the list of filenames, |
| | but also additional information on file sizes |
| | and types. |
| | """ |
| | out = self._ls_from_cache(path) |
| | if not out: |
| | try: |
| | r = self._send_to_api( |
| | method="get", endpoint="list", json={"path": path} |
| | ) |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
| | raise FileNotFoundError(e.message) |
| |
|
| | raise e |
| | files = r["files"] |
| | out = [ |
| | { |
| | "name": o["path"], |
| | "type": "directory" if o["is_dir"] else "file", |
| | "size": o["file_size"], |
| | } |
| | for o in files |
| | ] |
| | self.dircache[path] = out |
| |
|
| | if detail: |
| | return out |
| | return [o["name"] for o in out] |
| |
|
| | def makedirs(self, path, exist_ok=True): |
| | """ |
| | Create a given absolute path and all of its parents. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Absolute path to create |
| | exist_ok: bool |
| | If false, checks if the folder |
| | exists before creating it (and raises an |
| | Exception if this is the case) |
| | """ |
| | if not exist_ok: |
| | try: |
| | |
| | self._send_to_api( |
| | method="get", endpoint="get-status", json={"path": path} |
| | ) |
| | raise FileExistsError(f"Path {path} already exists") |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
| | pass |
| |
|
| | try: |
| | self._send_to_api(method="post", endpoint="mkdirs", json={"path": path}) |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_ALREADY_EXISTS": |
| | raise FileExistsError(e.message) |
| |
|
| | raise e |
| | self.invalidate_cache(self._parent(path)) |
| |
|
| | def mkdir(self, path, create_parents=True, **kwargs): |
| | """ |
| | Create a given absolute path and all of its parents. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Absolute path to create |
| | create_parents: bool |
| | Whether to create all parents or not. |
| | "False" is not implemented so far. |
| | """ |
| | if not create_parents: |
| | raise NotImplementedError |
| |
|
| | self.mkdirs(path, **kwargs) |
| |
|
| | def rm(self, path, recursive=False): |
| | """ |
| | Remove the file or folder at the given absolute path. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Absolute path what to remove |
| | recursive: bool |
| | Recursively delete all files in a folder. |
| | """ |
| | try: |
| | self._send_to_api( |
| | method="post", |
| | endpoint="delete", |
| | json={"path": path, "recursive": recursive}, |
| | ) |
| | except DatabricksException as e: |
| | |
| | |
| | if e.error_code == "PARTIAL_DELETE": |
| | self.rm(path=path, recursive=recursive) |
| | elif e.error_code == "IO_ERROR": |
| | |
| | raise OSError(e.message) |
| |
|
| | raise e |
| | self.invalidate_cache(self._parent(path)) |
| |
|
| | def mv(self, source_path, destination_path, recursive=False, maxdepth=None): |
| | """ |
| | Move a source to a destination path. |
| | |
| | A note from the original [databricks API manual] |
| | (https://docs.databricks.com/dev-tools/api/latest/dbfs.html#move). |
| | |
| | When moving a large number of files the API call will time out after |
| | approximately 60s, potentially resulting in partially moved data. |
| | Therefore, for operations that move more than 10k files, we strongly |
| | discourage using the DBFS REST API. |
| | |
| | Parameters |
| | ---------- |
| | source_path: str |
| | From where to move (absolute path) |
| | destination_path: str |
| | To where to move (absolute path) |
| | recursive: bool |
| | Not implemented to far. |
| | maxdepth: |
| | Not implemented to far. |
| | """ |
| | if recursive: |
| | raise NotImplementedError |
| | if maxdepth: |
| | raise NotImplementedError |
| |
|
| | try: |
| | self._send_to_api( |
| | method="post", |
| | endpoint="move", |
| | json={"source_path": source_path, "destination_path": destination_path}, |
| | ) |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
| | raise FileNotFoundError(e.message) |
| | elif e.error_code == "RESOURCE_ALREADY_EXISTS": |
| | raise FileExistsError(e.message) |
| |
|
| | raise e |
| | self.invalidate_cache(self._parent(source_path)) |
| | self.invalidate_cache(self._parent(destination_path)) |
| |
|
| | def _open(self, path, mode="rb", block_size="default", **kwargs): |
| | """ |
| | Overwrite the base class method to make sure to create a DBFile. |
| | All arguments are copied from the base method. |
| | |
| | Only the default blocksize is allowed. |
| | """ |
| | return DatabricksFile(self, path, mode=mode, block_size=block_size, **kwargs) |
| |
|
| | def _send_to_api(self, method, endpoint, json): |
| | """ |
| | Send the given json to the DBFS API |
| | using a get or post request (specified by the argument `method`). |
| | |
| | Parameters |
| | ---------- |
| | method: str |
| | Which http method to use for communication; "get" or "post". |
| | endpoint: str |
| | Where to send the request to (last part of the API URL) |
| | json: dict |
| | Dictionary of information to send |
| | """ |
| | if method == "post": |
| | session_call = self.session.post |
| | elif method == "get": |
| | session_call = self.session.get |
| | else: |
| | raise ValueError(f"Do not understand method {method}") |
| |
|
| | url = urllib.parse.urljoin(f"https://{self.instance}/api/2.0/dbfs/", endpoint) |
| |
|
| | r = session_call(url, json=json) |
| |
|
| | |
| | |
| | try: |
| | r.raise_for_status() |
| | except requests.HTTPError as e: |
| | |
| | |
| | try: |
| | exception_json = e.response.json() |
| | except Exception: |
| | raise e |
| |
|
| | raise DatabricksException(**exception_json) |
| |
|
| | return r.json() |
| |
|
| | def _create_handle(self, path, overwrite=True): |
| | """ |
| | Internal function to create a handle, which can be used to |
| | write blocks of a file to DBFS. |
| | A handle has a unique identifier which needs to be passed |
| | whenever written during this transaction. |
| | The handle is active for 10 minutes - after that a new |
| | write transaction needs to be created. |
| | Make sure to close the handle after you are finished. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Absolute path for this file. |
| | overwrite: bool |
| | If a file already exist at this location, either overwrite |
| | it or raise an exception. |
| | """ |
| | try: |
| | r = self._send_to_api( |
| | method="post", |
| | endpoint="create", |
| | json={"path": path, "overwrite": overwrite}, |
| | ) |
| | return r["handle"] |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_ALREADY_EXISTS": |
| | raise FileExistsError(e.message) |
| |
|
| | raise e |
| |
|
| | def _close_handle(self, handle): |
| | """ |
| | Close a handle, which was opened by :func:`_create_handle`. |
| | |
| | Parameters |
| | ---------- |
| | handle: str |
| | Which handle to close. |
| | """ |
| | try: |
| | self._send_to_api(method="post", endpoint="close", json={"handle": handle}) |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
| | raise FileNotFoundError(e.message) |
| |
|
| | raise e |
| |
|
| | def _add_data(self, handle, data): |
| | """ |
| | Upload data to an already opened file handle |
| | (opened by :func:`_create_handle`). |
| | The maximal allowed data size is 1MB after |
| | conversion to base64. |
| | Remember to close the handle when you are finished. |
| | |
| | Parameters |
| | ---------- |
| | handle: str |
| | Which handle to upload data to. |
| | data: bytes |
| | Block of data to add to the handle. |
| | """ |
| | data = base64.b64encode(data).decode() |
| | try: |
| | self._send_to_api( |
| | method="post", |
| | endpoint="add-block", |
| | json={"handle": handle, "data": data}, |
| | ) |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
| | raise FileNotFoundError(e.message) |
| | elif e.error_code == "MAX_BLOCK_SIZE_EXCEEDED": |
| | raise ValueError(e.message) |
| |
|
| | raise e |
| |
|
| | def _get_data(self, path, start, end): |
| | """ |
| | Download data in bytes from a given absolute path in a block |
| | from [start, start+length]. |
| | The maximum number of allowed bytes to read is 1MB. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Absolute path to download data from |
| | start: int |
| | Start position of the block |
| | end: int |
| | End position of the block |
| | """ |
| | try: |
| | r = self._send_to_api( |
| | method="get", |
| | endpoint="read", |
| | json={"path": path, "offset": start, "length": end - start}, |
| | ) |
| | return base64.b64decode(r["data"]) |
| | except DatabricksException as e: |
| | if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
| | raise FileNotFoundError(e.message) |
| | elif e.error_code in ["INVALID_PARAMETER_VALUE", "MAX_READ_SIZE_EXCEEDED"]: |
| | raise ValueError(e.message) |
| |
|
| | raise e |
| |
|
| | def invalidate_cache(self, path=None): |
| | if path is None: |
| | self.dircache.clear() |
| | else: |
| | self.dircache.pop(path, None) |
| | super().invalidate_cache(path) |
| |
|
| |
|
| | class DatabricksFile(AbstractBufferedFile): |
| | """ |
| | Helper class for files referenced in the DatabricksFileSystem. |
| | """ |
| |
|
| | DEFAULT_BLOCK_SIZE = 1 * 2**20 |
| |
|
| | def __init__( |
| | self, |
| | fs, |
| | path, |
| | mode="rb", |
| | block_size="default", |
| | autocommit=True, |
| | cache_type="readahead", |
| | cache_options=None, |
| | **kwargs, |
| | ): |
| | """ |
| | Create a new instance of the DatabricksFile. |
| | |
| | The blocksize needs to be the default one. |
| | """ |
| | if block_size is None or block_size == "default": |
| | block_size = self.DEFAULT_BLOCK_SIZE |
| |
|
| | assert ( |
| | block_size == self.DEFAULT_BLOCK_SIZE |
| | ), f"Only the default block size is allowed, not {block_size}" |
| |
|
| | super().__init__( |
| | fs, |
| | path, |
| | mode=mode, |
| | block_size=block_size, |
| | autocommit=autocommit, |
| | cache_type=cache_type, |
| | cache_options=cache_options or {}, |
| | **kwargs, |
| | ) |
| |
|
| | def _initiate_upload(self): |
| | """Internal function to start a file upload""" |
| | self.handle = self.fs._create_handle(self.path) |
| |
|
| | def _upload_chunk(self, final=False): |
| | """Internal function to add a chunk of data to a started upload""" |
| | self.buffer.seek(0) |
| | data = self.buffer.getvalue() |
| |
|
| | data_chunks = [ |
| | data[start:end] for start, end in self._to_sized_blocks(len(data)) |
| | ] |
| |
|
| | for data_chunk in data_chunks: |
| | self.fs._add_data(handle=self.handle, data=data_chunk) |
| |
|
| | if final: |
| | self.fs._close_handle(handle=self.handle) |
| | return True |
| |
|
| | def _fetch_range(self, start, end): |
| | """Internal function to download a block of data""" |
| | return_buffer = b"" |
| | length = end - start |
| | for chunk_start, chunk_end in self._to_sized_blocks(length, start): |
| | return_buffer += self.fs._get_data( |
| | path=self.path, start=chunk_start, end=chunk_end |
| | ) |
| |
|
| | return return_buffer |
| |
|
| | def _to_sized_blocks(self, length, start=0): |
| | """Helper function to split a range from 0 to total_length into bloksizes""" |
| | end = start + length |
| | for data_chunk in range(start, end, self.blocksize): |
| | data_start = data_chunk |
| | data_end = min(end, data_chunk + self.blocksize) |
| | yield data_start, data_end |
| |
|