| | import datetime |
| | import logging |
| | import os |
| | import types |
| | import uuid |
| | from stat import S_ISDIR, S_ISLNK |
| |
|
| | import paramiko |
| |
|
| | from .. import AbstractFileSystem |
| | from ..utils import infer_storage_options |
| |
|
| | logger = logging.getLogger("fsspec.sftp") |
| |
|
| |
|
| | class SFTPFileSystem(AbstractFileSystem): |
| | """Files over SFTP/SSH |
| | |
| | Peer-to-peer filesystem over SSH using paramiko. |
| | |
| | Note: if using this with the ``open`` or ``open_files``, with full URLs, |
| | there is no way to tell if a path is relative, so all paths are assumed |
| | to be absolute. |
| | """ |
| |
|
| | protocol = "sftp", "ssh" |
| |
|
| | def __init__(self, host, **ssh_kwargs): |
| | """ |
| | |
| | Parameters |
| | ---------- |
| | host: str |
| | Hostname or IP as a string |
| | temppath: str |
| | Location on the server to put files, when within a transaction |
| | ssh_kwargs: dict |
| | Parameters passed on to connection. See details in |
| | https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect |
| | May include port, username, password... |
| | """ |
| | if self._cached: |
| | return |
| | super().__init__(**ssh_kwargs) |
| | self.temppath = ssh_kwargs.pop("temppath", "/tmp") |
| | self.host = host |
| | self.ssh_kwargs = ssh_kwargs |
| | self._connect() |
| |
|
| | def _connect(self): |
| | logger.debug("Connecting to SFTP server %s", self.host) |
| | self.client = paramiko.SSHClient() |
| | self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
| | self.client.connect(self.host, **self.ssh_kwargs) |
| | self.ftp = self.client.open_sftp() |
| |
|
| | @classmethod |
| | def _strip_protocol(cls, path): |
| | return infer_storage_options(path)["path"] |
| |
|
| | @staticmethod |
| | def _get_kwargs_from_urls(urlpath): |
| | out = infer_storage_options(urlpath) |
| | out.pop("path", None) |
| | out.pop("protocol", None) |
| | return out |
| |
|
| | def mkdir(self, path, create_parents=True, mode=511): |
| | logger.debug("Creating folder %s", path) |
| | if self.exists(path): |
| | raise FileExistsError(f"File exists: {path}") |
| |
|
| | if create_parents: |
| | self.makedirs(path) |
| | else: |
| | self.ftp.mkdir(path, mode) |
| |
|
| | def makedirs(self, path, exist_ok=False, mode=511): |
| | if self.exists(path) and not exist_ok: |
| | raise FileExistsError(f"File exists: {path}") |
| |
|
| | parts = path.split("/") |
| | new_path = "/" if path[:1] == "/" else "" |
| |
|
| | for part in parts: |
| | if part: |
| | new_path = f"{new_path}/{part}" if new_path else part |
| | if not self.exists(new_path): |
| | self.ftp.mkdir(new_path, mode) |
| |
|
| | def rmdir(self, path): |
| | logger.debug("Removing folder %s", path) |
| | self.ftp.rmdir(path) |
| |
|
| | def info(self, path): |
| | stat = self._decode_stat(self.ftp.stat(path)) |
| | stat["name"] = path |
| | return stat |
| |
|
| | @staticmethod |
| | def _decode_stat(stat, parent_path=None): |
| | if S_ISDIR(stat.st_mode): |
| | t = "directory" |
| | elif S_ISLNK(stat.st_mode): |
| | t = "link" |
| | else: |
| | t = "file" |
| | out = { |
| | "name": "", |
| | "size": stat.st_size, |
| | "type": t, |
| | "uid": stat.st_uid, |
| | "gid": stat.st_gid, |
| | "time": datetime.datetime.fromtimestamp( |
| | stat.st_atime, tz=datetime.timezone.utc |
| | ), |
| | "mtime": datetime.datetime.fromtimestamp( |
| | stat.st_mtime, tz=datetime.timezone.utc |
| | ), |
| | } |
| | if parent_path: |
| | out["name"] = "/".join([parent_path.rstrip("/"), stat.filename]) |
| | return out |
| |
|
| | def ls(self, path, detail=False): |
| | logger.debug("Listing folder %s", path) |
| | stats = [self._decode_stat(stat, path) for stat in self.ftp.listdir_iter(path)] |
| | if detail: |
| | return stats |
| | else: |
| | paths = [stat["name"] for stat in stats] |
| | return sorted(paths) |
| |
|
| | def put(self, lpath, rpath, callback=None, **kwargs): |
| | logger.debug("Put file %s into %s", lpath, rpath) |
| | self.ftp.put(lpath, rpath) |
| |
|
| | def get_file(self, rpath, lpath, **kwargs): |
| | if self.isdir(rpath): |
| | os.makedirs(lpath, exist_ok=True) |
| | else: |
| | self.ftp.get(self._strip_protocol(rpath), lpath) |
| |
|
| | def _open(self, path, mode="rb", block_size=None, **kwargs): |
| | """ |
| | block_size: int or None |
| | If 0, no buffering, if 1, line buffering, if >1, buffer that many |
| | bytes, if None use default from paramiko. |
| | """ |
| | logger.debug("Opening file %s", path) |
| | if kwargs.get("autocommit", True) is False: |
| | |
| | path2 = "/".join([self.temppath, str(uuid.uuid4())]) |
| | f = self.ftp.open(path2, mode, bufsize=block_size if block_size else -1) |
| | f.temppath = path2 |
| | f.targetpath = path |
| | f.fs = self |
| | f.commit = types.MethodType(commit_a_file, f) |
| | f.discard = types.MethodType(discard_a_file, f) |
| | else: |
| | f = self.ftp.open(path, mode, bufsize=block_size if block_size else -1) |
| | return f |
| |
|
| | def _rm(self, path): |
| | if self.isdir(path): |
| | self.ftp.rmdir(path) |
| | else: |
| | self.ftp.remove(path) |
| |
|
| | def mv(self, old, new): |
| | logger.debug("Renaming %s into %s", old, new) |
| | self.ftp.posix_rename(old, new) |
| |
|
| |
|
| | def commit_a_file(self): |
| | self.fs.mv(self.temppath, self.targetpath) |
| |
|
| |
|
| | def discard_a_file(self): |
| | self.fs._rm(self.temppath) |
| |
|