hermes / tools /environments /singularity.py
lenson78's picture
initial upload: v2026.3.23 with HF Spaces deployment
9aa5185 verified
"""Singularity/Apptainer persistent container environment.
Security-hardened with --containall, --no-home, capability dropping.
Supports configurable resource limits and optional filesystem persistence
via writable overlay directories that survive across sessions.
"""
import json
import logging
import os
import shutil
import subprocess
import tempfile
import threading
import uuid
from pathlib import Path
from typing import Any, Dict, Optional
from hermes_cli.config import get_hermes_home
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
_SNAPSHOT_STORE = get_hermes_home() / "singularity_snapshots.json"
def _find_singularity_executable() -> str:
"""Locate the apptainer or singularity CLI binary.
Returns the executable name (``"apptainer"`` or ``"singularity"``).
Raises ``RuntimeError`` with install instructions if neither is found.
"""
if shutil.which("apptainer"):
return "apptainer"
if shutil.which("singularity"):
return "singularity"
raise RuntimeError(
"Neither 'apptainer' nor 'singularity' was found in PATH. "
"Install Apptainer (https://apptainer.org/docs/admin/main/installation.html) "
"or Singularity and ensure the CLI is available."
)
def _ensure_singularity_available() -> str:
"""Preflight check: resolve the executable and verify it responds.
Returns the executable name on success.
Raises ``RuntimeError`` with an actionable message on failure.
"""
exe = _find_singularity_executable()
try:
result = subprocess.run(
[exe, "version"],
capture_output=True,
text=True,
timeout=10,
)
except FileNotFoundError:
raise RuntimeError(
f"Singularity backend selected but the resolved executable '{exe}' "
"could not be executed. Check your installation."
)
except subprocess.TimeoutExpired:
raise RuntimeError(
f"'{exe} version' timed out. The runtime may be misconfigured."
)
if result.returncode != 0:
stderr = result.stderr.strip()[:200]
raise RuntimeError(
f"'{exe} version' failed (exit code {result.returncode}): {stderr}"
)
return exe
def _load_snapshots() -> Dict[str, str]:
if _SNAPSHOT_STORE.exists():
try:
return json.loads(_SNAPSHOT_STORE.read_text())
except Exception:
pass
return {}
def _save_snapshots(data: Dict[str, str]) -> None:
_SNAPSHOT_STORE.parent.mkdir(parents=True, exist_ok=True)
_SNAPSHOT_STORE.write_text(json.dumps(data, indent=2))
# -------------------------------------------------------------------------
# Singularity helpers (scratch dir, SIF cache, SIF building)
# -------------------------------------------------------------------------
def _get_scratch_dir() -> Path:
"""Get the best directory for Singularity sandboxes.
Resolution order:
1. TERMINAL_SCRATCH_DIR (explicit override)
2. TERMINAL_SANDBOX_DIR / singularity (shared sandbox root)
3. /scratch (common on HPC clusters)
4. ~/.hermes/sandboxes/singularity (fallback)
"""
custom_scratch = os.getenv("TERMINAL_SCRATCH_DIR")
if custom_scratch:
scratch_path = Path(custom_scratch)
scratch_path.mkdir(parents=True, exist_ok=True)
return scratch_path
from tools.environments.base import get_sandbox_dir
sandbox = get_sandbox_dir() / "singularity"
scratch = Path("/scratch")
if scratch.exists() and os.access(scratch, os.W_OK):
user_scratch = scratch / os.getenv("USER", "hermes") / "hermes-agent"
user_scratch.mkdir(parents=True, exist_ok=True)
logger.info("Using /scratch for sandboxes: %s", user_scratch)
return user_scratch
sandbox.mkdir(parents=True, exist_ok=True)
return sandbox
def _get_apptainer_cache_dir() -> Path:
"""Get the Apptainer cache directory for SIF images."""
cache_dir = os.getenv("APPTAINER_CACHEDIR")
if cache_dir:
cache_path = Path(cache_dir)
cache_path.mkdir(parents=True, exist_ok=True)
return cache_path
scratch = _get_scratch_dir()
cache_path = scratch / ".apptainer"
cache_path.mkdir(parents=True, exist_ok=True)
return cache_path
_sif_build_lock = threading.Lock()
def _get_or_build_sif(image: str, executable: str = "apptainer") -> str:
"""Get or build a SIF image from a docker:// URL.
Returns the path unchanged if it's already a .sif file.
For docker:// URLs, checks the cache and builds if needed.
"""
if image.endswith('.sif') and Path(image).exists():
return image
if not image.startswith('docker://'):
return image
image_name = image.replace('docker://', '').replace('/', '-').replace(':', '-')
cache_dir = _get_apptainer_cache_dir()
sif_path = cache_dir / f"{image_name}.sif"
if sif_path.exists():
return str(sif_path)
with _sif_build_lock:
if sif_path.exists():
return str(sif_path)
logger.info("Building SIF image (one-time setup)...")
logger.info(" Source: %s", image)
logger.info(" Target: %s", sif_path)
tmp_dir = cache_dir / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["APPTAINER_TMPDIR"] = str(tmp_dir)
env["APPTAINER_CACHEDIR"] = str(cache_dir)
try:
result = subprocess.run(
[executable, "build", str(sif_path), image],
capture_output=True, text=True, timeout=600, env=env,
)
if result.returncode != 0:
logger.warning("SIF build failed, falling back to docker:// URL")
logger.warning(" Error: %s", result.stderr[:500])
return image
logger.info("SIF image built successfully")
return str(sif_path)
except subprocess.TimeoutExpired:
logger.warning("SIF build timed out, falling back to docker:// URL")
if sif_path.exists():
sif_path.unlink()
return image
except Exception as e:
logger.warning("SIF build error: %s, falling back to docker:// URL", e)
return image
# -------------------------------------------------------------------------
# SingularityEnvironment
# -------------------------------------------------------------------------
class SingularityEnvironment(BaseEnvironment):
"""Hardened Singularity/Apptainer container with resource limits and persistence.
Security: --containall (isolated PID/IPC/mount namespaces, no host home mount),
--no-home, writable-tmpfs for scratch space. The container cannot see or modify
the host filesystem outside of explicitly bound paths.
Persistence: when enabled, the writable overlay directory is preserved across
sessions so installed packages and files survive cleanup/restore.
"""
def __init__(
self,
image: str,
cwd: str = "~",
timeout: int = 60,
cpu: float = 0,
memory: int = 0,
disk: int = 0,
persistent_filesystem: bool = False,
task_id: str = "default",
):
super().__init__(cwd=cwd, timeout=timeout)
self.executable = _ensure_singularity_available()
self.image = _get_or_build_sif(image, self.executable)
self.instance_id = f"hermes_{uuid.uuid4().hex[:12]}"
self._instance_started = False
self._persistent = persistent_filesystem
self._task_id = task_id
self._overlay_dir: Optional[Path] = None
# Resource limits
self._cpu = cpu
self._memory = memory
# Persistent overlay directory
if self._persistent:
overlay_base = _get_scratch_dir() / "hermes-overlays"
overlay_base.mkdir(parents=True, exist_ok=True)
self._overlay_dir = overlay_base / f"overlay-{task_id}"
self._overlay_dir.mkdir(parents=True, exist_ok=True)
self._start_instance()
def _start_instance(self):
cmd = [self.executable, "instance", "start"]
# Security: full isolation from host
cmd.extend(["--containall", "--no-home"])
# Writable layer
if self._persistent and self._overlay_dir:
# Persistent writable overlay -- survives across restarts
cmd.extend(["--overlay", str(self._overlay_dir)])
else:
cmd.append("--writable-tmpfs")
# Resource limits (cgroup-based, may require root or appropriate config)
if self._memory > 0:
cmd.extend(["--memory", f"{self._memory}M"])
if self._cpu > 0:
cmd.extend(["--cpus", str(self._cpu)])
cmd.extend([str(self.image), self.instance_id])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"Failed to start instance: {result.stderr}")
self._instance_started = True
logger.info("Singularity instance %s started (persistent=%s)",
self.instance_id, self._persistent)
except subprocess.TimeoutExpired:
raise RuntimeError("Instance start timed out")
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
if not self._instance_started:
return {"output": "Instance not started", "returncode": -1}
effective_timeout = timeout or self.timeout
work_dir = cwd or self.cwd
exec_command, sudo_stdin = self._prepare_command(command)
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
exec_command = f"cd {work_dir} && {exec_command}"
work_dir = "/tmp"
cmd = [self.executable, "exec", "--pwd", work_dir,
f"instance://{self.instance_id}",
"bash", "-c", exec_command]
try:
import time as _time
_output_chunks = []
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if effective_stdin:
try:
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass
def _drain():
try:
for line in proc.stdout:
_output_chunks.append(line)
except Exception:
pass
reader = threading.Thread(target=_drain, daemon=True)
reader.start()
deadline = _time.monotonic() + effective_timeout
while proc.poll() is None:
if is_interrupted():
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
reader.join(timeout=2)
return {
"output": "".join(_output_chunks) + "\n[Command interrupted]",
"returncode": 130,
}
if _time.monotonic() > deadline:
proc.kill()
reader.join(timeout=2)
return self._timeout_result(effective_timeout)
_time.sleep(0.2)
reader.join(timeout=5)
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
except Exception as e:
return {"output": f"Singularity execution error: {e}", "returncode": 1}
def cleanup(self):
"""Stop the instance. If persistent, the overlay dir survives for next creation."""
if self._instance_started:
try:
subprocess.run(
[self.executable, "instance", "stop", self.instance_id],
capture_output=True, text=True, timeout=30,
)
logger.info("Singularity instance %s stopped", self.instance_id)
except Exception as e:
logger.warning("Failed to stop Singularity instance %s: %s", self.instance_id, e)
self._instance_started = False
# Record overlay path for persistence restoration
if self._persistent and self._overlay_dir:
snapshots = _load_snapshots()
snapshots[self._task_id] = str(self._overlay_dir)
_save_snapshots(snapshots)