| | import dask |
| | from distributed.client import Client, _get_global_client |
| | from distributed.worker import Worker |
| |
|
| | from fsspec import filesystem |
| | from fsspec.spec import AbstractBufferedFile, AbstractFileSystem |
| | from fsspec.utils import infer_storage_options |
| |
|
| |
|
| | def _get_client(client): |
| | if client is None: |
| | return _get_global_client() |
| | elif isinstance(client, Client): |
| | return client |
| | else: |
| | |
| | return Client(client) |
| |
|
| |
|
| | def _in_worker(): |
| | return bool(Worker._instances) |
| |
|
| |
|
| | class DaskWorkerFileSystem(AbstractFileSystem): |
| | """View files accessible to a worker as any other remote file-system |
| | |
| | When instances are run on the worker, uses the real filesystem. When |
| | run on the client, they call the worker to provide information or data. |
| | |
| | **Warning** this implementation is experimental, and read-only for now. |
| | """ |
| |
|
| | def __init__( |
| | self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs |
| | ): |
| | super().__init__(**kwargs) |
| | if not (fs is None) ^ (target_protocol is None): |
| | raise ValueError( |
| | "Please provide one of filesystem instance (fs) or" |
| | " target_protocol, not both" |
| | ) |
| | self.target_protocol = target_protocol |
| | self.target_options = target_options |
| | self.worker = None |
| | self.client = client |
| | self.fs = fs |
| | self._determine_worker() |
| |
|
| | @staticmethod |
| | def _get_kwargs_from_urls(path): |
| | so = infer_storage_options(path) |
| | if "host" in so and "port" in so: |
| | return {"client": f"{so['host']}:{so['port']}"} |
| | else: |
| | return {} |
| |
|
| | def _determine_worker(self): |
| | if _in_worker(): |
| | self.worker = True |
| | if self.fs is None: |
| | self.fs = filesystem( |
| | self.target_protocol, **(self.target_options or {}) |
| | ) |
| | else: |
| | self.worker = False |
| | self.client = _get_client(self.client) |
| | self.rfs = dask.delayed(self) |
| |
|
| | def mkdir(self, *args, **kwargs): |
| | if self.worker: |
| | self.fs.mkdir(*args, **kwargs) |
| | else: |
| | self.rfs.mkdir(*args, **kwargs).compute() |
| |
|
| | def rm(self, *args, **kwargs): |
| | if self.worker: |
| | self.fs.rm(*args, **kwargs) |
| | else: |
| | self.rfs.rm(*args, **kwargs).compute() |
| |
|
| | def copy(self, *args, **kwargs): |
| | if self.worker: |
| | self.fs.copy(*args, **kwargs) |
| | else: |
| | self.rfs.copy(*args, **kwargs).compute() |
| |
|
| | def mv(self, *args, **kwargs): |
| | if self.worker: |
| | self.fs.mv(*args, **kwargs) |
| | else: |
| | self.rfs.mv(*args, **kwargs).compute() |
| |
|
| | def ls(self, *args, **kwargs): |
| | if self.worker: |
| | return self.fs.ls(*args, **kwargs) |
| | else: |
| | return self.rfs.ls(*args, **kwargs).compute() |
| |
|
| | def _open( |
| | self, |
| | path, |
| | mode="rb", |
| | block_size=None, |
| | autocommit=True, |
| | cache_options=None, |
| | **kwargs, |
| | ): |
| | if self.worker: |
| | return self.fs._open( |
| | path, |
| | mode=mode, |
| | block_size=block_size, |
| | autocommit=autocommit, |
| | cache_options=cache_options, |
| | **kwargs, |
| | ) |
| | else: |
| | return DaskFile( |
| | fs=self, |
| | path=path, |
| | mode=mode, |
| | block_size=block_size, |
| | autocommit=autocommit, |
| | cache_options=cache_options, |
| | **kwargs, |
| | ) |
| |
|
| | def fetch_range(self, path, mode, start, end): |
| | if self.worker: |
| | with self._open(path, mode) as f: |
| | f.seek(start) |
| | return f.read(end - start) |
| | else: |
| | return self.rfs.fetch_range(path, mode, start, end).compute() |
| |
|
| |
|
| | class DaskFile(AbstractBufferedFile): |
| | def __init__(self, mode="rb", **kwargs): |
| | if mode != "rb": |
| | raise ValueError('Remote dask files can only be opened in "rb" mode') |
| | super().__init__(**kwargs) |
| |
|
| | def _upload_chunk(self, final=False): |
| | pass |
| |
|
| | def _initiate_upload(self): |
| | """Create remote file/upload""" |
| | pass |
| |
|
| | def _fetch_range(self, start, end): |
| | """Get the specified set of bytes from remote""" |
| | return self.fs.fetch_range(self.path, self.mode, start, end) |
| |
|