| """SSH remote execution environment with ControlMaster connection persistence.""" |
|
|
| import hashlib |
| import logging |
| import os |
| import shlex |
| import shutil |
| import subprocess |
| import tempfile |
| from pathlib import Path |
|
|
| from tools.environments.base import BaseEnvironment, _popen_bash |
| from tools.environments.file_sync import ( |
| FileSyncManager, |
| iter_sync_files, |
| quoted_mkdir_command, |
| quoted_rm_command, |
| unique_parent_dirs, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _ensure_ssh_available() -> None: |
| """Fail fast with a clear error when the SSH client is unavailable.""" |
| if not shutil.which("ssh"): |
| raise RuntimeError( |
| "SSH is not installed or not in PATH. Install OpenSSH client: apt install openssh-client" |
| ) |
|
|
|
|
| class SSHEnvironment(BaseEnvironment): |
| """Run commands on a remote machine over SSH. |
| |
| Spawn-per-call: every execute() spawns a fresh ``ssh ... bash -c`` process. |
| Session snapshot preserves env vars across calls. |
| CWD persists via in-band stdout markers. |
| Uses SSH ControlMaster for connection reuse. |
| """ |
|
|
| def __init__(self, host: str, user: str, cwd: str = "~", |
| timeout: int = 60, port: int = 22, key_path: str = ""): |
| super().__init__(cwd=cwd, timeout=timeout) |
| self.host = host |
| self.user = user |
| self.port = port |
| self.key_path = key_path |
|
|
| self.control_dir = Path(tempfile.gettempdir()) / "hermes-ssh" |
| self.control_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| |
| |
| |
| |
| |
| |
| _socket_id = hashlib.sha256( |
| f"{user}@{host}:{port}".encode() |
| ).hexdigest()[:16] |
| self.control_socket = self.control_dir / f"{_socket_id}.sock" |
| _ensure_ssh_available() |
| self._establish_connection() |
| self._remote_home = self._detect_remote_home() |
|
|
| self._ensure_remote_dirs() |
| self._sync_manager = FileSyncManager( |
| get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"), |
| upload_fn=self._scp_upload, |
| delete_fn=self._ssh_delete, |
| bulk_upload_fn=self._ssh_bulk_upload, |
| bulk_download_fn=self._ssh_bulk_download, |
| ) |
| self._sync_manager.sync(force=True) |
|
|
| self.init_session() |
|
|
| def _build_ssh_command(self, extra_args: list | None = None) -> list: |
| cmd = ["ssh"] |
| cmd.extend(["-o", f"ControlPath={self.control_socket}"]) |
| cmd.extend(["-o", "ControlMaster=auto"]) |
| cmd.extend(["-o", "ControlPersist=300"]) |
| cmd.extend(["-o", "BatchMode=yes"]) |
| cmd.extend(["-o", "StrictHostKeyChecking=accept-new"]) |
| cmd.extend(["-o", "ConnectTimeout=10"]) |
| if self.port != 22: |
| cmd.extend(["-p", str(self.port)]) |
| if self.key_path: |
| cmd.extend(["-i", self.key_path]) |
| if extra_args: |
| cmd.extend(extra_args) |
| cmd.append(f"{self.user}@{self.host}") |
| return cmd |
|
|
| def _establish_connection(self): |
| cmd = self._build_ssh_command() |
| cmd.append("echo 'SSH connection established'") |
| try: |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) |
| if result.returncode != 0: |
| error_msg = result.stderr.strip() or result.stdout.strip() |
| raise RuntimeError(f"SSH connection failed: {error_msg}") |
| except subprocess.TimeoutExpired: |
| raise RuntimeError(f"SSH connection to {self.user}@{self.host} timed out") |
|
|
| def _detect_remote_home(self) -> str: |
| """Detect the remote user's home directory.""" |
| try: |
| cmd = self._build_ssh_command() |
| cmd.append("echo $HOME") |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) |
| home = result.stdout.strip() |
| if home and result.returncode == 0: |
| logger.debug("SSH: remote home = %s", home) |
| return home |
| except Exception: |
| pass |
| if self.user == "root": |
| return "/root" |
| return f"/home/{self.user}" |
|
|
| |
| |
| |
|
|
| def _ensure_remote_dirs(self) -> None: |
| """Create base ~/.hermes directory tree on remote in one SSH call.""" |
| base = f"{self._remote_home}/.hermes" |
| dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"] |
| cmd = self._build_ssh_command() |
| cmd.append(quoted_mkdir_command(dirs)) |
| subprocess.run(cmd, capture_output=True, text=True, timeout=10) |
|
|
| |
|
|
| def _scp_upload(self, host_path: str, remote_path: str) -> None: |
| """Upload a single file via scp over ControlMaster.""" |
| parent = str(Path(remote_path).parent) |
| mkdir_cmd = self._build_ssh_command() |
| mkdir_cmd.append(f"mkdir -p {shlex.quote(parent)}") |
| subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10) |
|
|
| scp_cmd = ["scp", "-o", f"ControlPath={self.control_socket}"] |
| if self.port != 22: |
| scp_cmd.extend(["-P", str(self.port)]) |
| if self.key_path: |
| scp_cmd.extend(["-i", self.key_path]) |
| scp_cmd.extend([host_path, f"{self.user}@{self.host}:{remote_path}"]) |
| result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=30) |
| if result.returncode != 0: |
| raise RuntimeError(f"scp failed: {result.stderr.strip()}") |
|
|
| def _ssh_bulk_upload(self, files: list[tuple[str, str]]) -> None: |
| """Upload many files in a single tar-over-SSH stream. |
| |
| Pipes ``tar c`` on the local side through an SSH connection to |
| ``tar x`` on the remote, transferring all files in one TCP stream |
| instead of spawning a subprocess per file. Directory creation is |
| batched into a single ``mkdir -p`` call beforehand. |
| |
| Typical improvement: ~580 files goes from O(N) scp round-trips |
| to a single streaming transfer. |
| """ |
| if not files: |
| return |
|
|
| parents = unique_parent_dirs(files) |
| if parents: |
| cmd = self._build_ssh_command() |
| cmd.append(quoted_mkdir_command(parents)) |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) |
| if result.returncode != 0: |
| raise RuntimeError(f"remote mkdir failed: {result.stderr.strip()}") |
|
|
| |
| with tempfile.TemporaryDirectory(prefix="hermes-ssh-bulk-") as staging: |
| for host_path, remote_path in files: |
| staged = os.path.join(staging, remote_path.lstrip("/")) |
| os.makedirs(os.path.dirname(staged), exist_ok=True) |
| os.symlink(os.path.abspath(host_path), staged) |
|
|
| tar_cmd = ["tar", "-chf", "-", "-C", staging, "."] |
| ssh_cmd = self._build_ssh_command() |
| ssh_cmd.append("tar xf - -C /") |
|
|
| tar_proc = subprocess.Popen( |
| tar_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| try: |
| ssh_proc = subprocess.Popen( |
| ssh_cmd, stdin=tar_proc.stdout, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) |
| except Exception: |
| tar_proc.kill() |
| tar_proc.wait() |
| raise |
|
|
| |
| tar_proc.stdout.close() |
|
|
| try: |
| _, ssh_stderr = ssh_proc.communicate(timeout=120) |
| |
| |
| tar_stderr_raw = b"" |
| if tar_proc.poll() is None: |
| _, tar_stderr_raw = tar_proc.communicate(timeout=10) |
| else: |
| tar_stderr_raw = tar_proc.stderr.read() if tar_proc.stderr else b"" |
| except subprocess.TimeoutExpired: |
| tar_proc.kill() |
| ssh_proc.kill() |
| tar_proc.wait() |
| ssh_proc.wait() |
| raise RuntimeError("SSH bulk upload timed out") |
|
|
| if tar_proc.returncode != 0: |
| raise RuntimeError( |
| f"tar create failed (rc={tar_proc.returncode}): " |
| f"{tar_stderr_raw.decode(errors='replace').strip()}" |
| ) |
| if ssh_proc.returncode != 0: |
| raise RuntimeError( |
| f"tar extract over SSH failed (rc={ssh_proc.returncode}): " |
| f"{ssh_stderr.decode(errors='replace').strip()}" |
| ) |
|
|
| logger.debug("SSH: bulk-uploaded %d file(s) via tar pipe", len(files)) |
|
|
| def _ssh_bulk_download(self, dest: Path) -> None: |
| """Download remote .hermes/ as a tar archive.""" |
| |
| |
| rel_base = f"{self._remote_home}/.hermes".lstrip("/") |
| ssh_cmd = self._build_ssh_command() |
| ssh_cmd.append(f"tar cf - -C / {shlex.quote(rel_base)}") |
| with open(dest, "wb") as f: |
| result = subprocess.run(ssh_cmd, stdout=f, stderr=subprocess.PIPE, timeout=120) |
| if result.returncode != 0: |
| raise RuntimeError(f"SSH bulk download failed: {result.stderr.decode(errors='replace').strip()}") |
|
|
| def _ssh_delete(self, remote_paths: list[str]) -> None: |
| """Batch-delete remote files in one SSH call.""" |
| cmd = self._build_ssh_command() |
| cmd.append(quoted_rm_command(remote_paths)) |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) |
| if result.returncode != 0: |
| raise RuntimeError(f"remote rm failed: {result.stderr.strip()}") |
|
|
| def _before_execute(self) -> None: |
| """Sync files to remote via FileSyncManager (rate-limited internally).""" |
| self._sync_manager.sync() |
|
|
| |
| |
| |
|
|
| def _run_bash(self, cmd_string: str, *, login: bool = False, |
| timeout: int = 120, |
| stdin_data: str | None = None) -> subprocess.Popen: |
| """Spawn an SSH process that runs bash on the remote host.""" |
| cmd = self._build_ssh_command() |
| if login: |
| cmd.extend(["bash", "-l", "-c", shlex.quote(cmd_string)]) |
| else: |
| cmd.extend(["bash", "-c", shlex.quote(cmd_string)]) |
|
|
| return _popen_bash(cmd, stdin_data) |
|
|
| def cleanup(self): |
| if self._sync_manager: |
| logger.info("SSH: syncing files from sandbox...") |
| self._sync_manager.sync_back() |
|
|
| if self.control_socket.exists(): |
| try: |
| cmd = ["ssh", "-o", f"ControlPath={self.control_socket}", |
| "-O", "exit", f"{self.user}@{self.host}"] |
| subprocess.run(cmd, capture_output=True, timeout=5) |
| except (OSError, subprocess.SubprocessError): |
| pass |
| try: |
| self.control_socket.unlink() |
| except OSError: |
| pass |
|
|