HPCOpenenv / sysadmin_env /sandbox.py
huggingmenfordays's picture
deploy: ccyloopss/HPCOpenenv — with OPENENV_API_KEY auth guard
bc35a94
import asyncio
import os
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from sysadmin_env.overlayfs import OverlayFSManager
@dataclass
class CommandResult:
stdout: str = ""
stderr: str = ""
exit_code: int = -1
execution_time: float = 0.0
timed_out: bool = False
class Sandbox:
_HOST_RO_BINDS = [
"/usr/bin",
"/usr/sbin",
"/usr/lib",
"/usr/lib64",
"/usr/share",
"/bin",
"/sbin",
"/lib",
"/lib64",
"/etc/alternatives",
"/etc/ld.so.cache",
]
def __init__(
self,
lowerdir: str | Path,
*,
timeout: float = 30.0,
isolate_network: bool = True,
overlay_base_dir: str | None = None,
allow_nested_sandbox: bool = False,
):
self._lowerdir = Path(lowerdir).resolve()
self._timeout = timeout
self._isolate_network = isolate_network
self._overlay = OverlayFSManager(base_dir=overlay_base_dir)
self._allow_nested_sandbox = allow_nested_sandbox
self._created = False
self._destroyed = False
self._can_mount_proc = False
self._runtime_backend = "bwrap"
@property
def is_created(self) -> bool:
return self._created
@property
def is_destroyed(self) -> bool:
return self._destroyed
@property
def overlay(self) -> OverlayFSManager:
return self._overlay
@property
def merged_root(self) -> Path:
return Path("/")
@property
def state_root(self) -> Path | None:
return self._overlay.merged
@property
def runtime_backend(self) -> str:
return self._runtime_backend
def create(self) -> None:
if self._created:
raise RuntimeError("sandbox already created")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed and cannot be recreated")
print("sandbox verify bwrap start")
self._verify_bwrap_available()
print("sandbox verify bwrap complete")
print(f"sandbox create stack {self._lowerdir}")
self._overlay.create_stack(self._lowerdir)
print("sandbox overlay mount start")
try:
self._overlay.mount()
except Exception as exc:
print(f"sandbox overlay mount failed {type(exc).__name__.lower()}")
raise
print("sandbox overlay mount complete")
print("sandbox runtime layout start")
self._ensure_runtime_layout()
print("sandbox runtime layout complete")
self._select_runtime_backend()
self._created = True
print("sandbox created")
def _verify_bwrap_available(self) -> None:
bwrap_bin = shutil.which("bwrap")
if bwrap_bin is None:
raise FileNotFoundError("bwrap binary not found in path")
print(f"sandbox bwrap found {bwrap_bin}")
self._probe_proc_capability()
def _probe_proc_capability(self) -> None:
try:
result = subprocess.run(
["bwrap", "--ro-bind", "/", "/", "--proc", "/proc",
"--dev", "/dev", "--unshare-pid", "--", "/bin/true"],
capture_output=True, timeout=5,
)
self._can_mount_proc = result.returncode == 0
except Exception:
self._can_mount_proc = False
print(f"sandbox proc mount {'supported' if self._can_mount_proc else 'unavailable, using ro-bind fallback'}")
def _select_runtime_backend(self) -> None:
preferred = os.environ.get("OPENENV_SANDBOX_BACKEND", "auto").strip().lower()
bwrap_ok, bwrap_error = self._probe_bwrap_runtime()
proot_path = shutil.which("proot")
if preferred == "bwrap":
if not bwrap_ok:
raise RuntimeError(f"forced bwrap backend is unavailable: {bwrap_error}")
self._runtime_backend = "bwrap"
print("sandbox runtime backend bwrap")
return
if preferred == "proot":
if proot_path is None:
raise RuntimeError("forced proot backend requested but proot binary not found in path")
self._runtime_backend = "proot"
print("sandbox runtime backend proot")
return
if bwrap_ok:
self._runtime_backend = "bwrap"
print("sandbox runtime backend bwrap")
return
if proot_path is None:
raise RuntimeError(f"bwrap unavailable ({bwrap_error}) and proot binary not found")
self._runtime_backend = "proot"
print(f"sandbox runtime backend proot fallback reason {bwrap_error}")
def _probe_bwrap_runtime(self) -> tuple[bool, str]:
if self._overlay.merged is None:
return False, "overlay stack not ready"
probe_command = self._build_bwrap_command("true")
try:
result = subprocess.run(
probe_command,
capture_output=True,
text=True,
timeout=5,
env=self._command_env(),
)
except Exception as exc:
return False, str(exc)
if result.returncode == 0:
return True, ""
message = (result.stderr or result.stdout or f"exit {result.returncode}").strip()
return False, message
def _ensure_runtime_layout(self) -> None:
if self._overlay.merged is None:
raise RuntimeError("overlay stack not ready")
for relative in [
Path("bin"),
Path("sbin"),
Path("lib"),
Path("lib64"),
Path("usr"),
Path("usr/bin"),
Path("usr/sbin"),
Path("usr/lib"),
Path("usr/lib64"),
Path("usr/share"),
Path("usr/local"),
Path("usr/local/bin"),
Path("etc"),
Path("etc/alternatives"),
Path("var"),
Path("var/tmp"),
Path("tmp"),
Path("dev"),
Path("proc"),
Path("run"),
Path("root"),
Path("home"),
]:
(self._overlay.merged / relative).mkdir(parents=True, exist_ok=True)
def _build_bwrap_command(self, command: str) -> list[str]:
if self._overlay.merged is None:
raise RuntimeError("sandbox storage not ready")
merged = str(self._overlay.merged)
cmd = [
"bwrap",
"--bind",
merged,
"/",
]
if self._can_mount_proc:
cmd.extend(["--proc", "/proc", "--dev", "/dev", "--unshare-pid"])
else:
cmd.extend(["--ro-bind", "/proc", "/proc", "--dev-bind", "/dev", "/dev"])
cmd.extend([
"--tmpfs",
"/tmp",
"--unshare-uts",
"--unshare-cgroup-try",
"--die-with-parent",
"--hostname",
"sandbox",
"--clearenv",
"--setenv",
"PATH",
"/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin",
"--setenv",
"HOME",
"/root",
"--setenv",
"TERM",
"xterm",
"--uid",
"0",
"--gid",
"0",
])
if self._allow_nested_sandbox:
if self._can_mount_proc:
cmd.extend(["--unshare-user", "--cap-add", "CAP_SYS_ADMIN"])
else:
cmd.extend(["--cap-drop", "ALL"])
if self._isolate_network:
cmd.append("--unshare-net")
for host_path in self._HOST_RO_BINDS:
if Path(host_path).exists():
cmd.extend(["--ro-bind", host_path, host_path])
cmd.extend([
"--chdir",
"/",
"--",
"/bin/sh",
"-c",
command,
])
return cmd
def _build_proot_command(self, command: str) -> list[str]:
if self._overlay.merged is None:
raise RuntimeError("sandbox storage not ready")
merged = str(self._overlay.merged)
cmd = [
"proot",
"-R",
merged,
"-b",
"/proc:/proc",
"-b",
"/dev:/dev",
"-b",
"/tmp:/tmp",
]
for host_path in self._HOST_RO_BINDS:
if Path(host_path).exists():
cmd.extend(["-b", f"{host_path}:{host_path}"])
# Keep task-provided /usr/local/bin tools (sinfo, squeue, etc.) visible,
# and inject only the Python runtime bits needed by /usr/bin/env python3.
if Path("/usr/local/bin/python3").exists():
cmd.extend(["-b", "/usr/local/bin/python3:/usr/local/bin/python3"])
if Path("/usr/local/lib").exists():
cmd.extend(["-b", "/usr/local/lib:/usr/local/lib"])
cmd.extend([
"-w",
"/",
"/bin/sh",
"-c",
command,
])
return cmd
def _build_runtime_command(self, command: str) -> list[str]:
if self._runtime_backend == "proot":
return self._build_proot_command(command)
return self._build_bwrap_command(command)
def _command_env(self) -> dict[str, str]:
return {
"PATH": "/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin",
"HOME": "/root",
"TERM": "xterm",
"HOSTNAME": "sandbox",
"LANG": "C.UTF-8",
}
def execute(self, command: str, *, timeout: float | None = None) -> CommandResult:
if not self._created:
raise RuntimeError("sandbox not created call create first")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed")
effective_timeout = timeout if timeout is not None else self._timeout
runtime_cmd = self._build_runtime_command(command)
result = CommandResult()
start = time.perf_counter()
try:
proc = subprocess.run(
runtime_cmd,
capture_output=True,
text=True,
timeout=effective_timeout,
env=self._command_env(),
)
result.stdout = proc.stdout
result.stderr = proc.stderr
result.exit_code = proc.returncode
except subprocess.TimeoutExpired as exc:
result.stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="replace")
result.stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="replace")
result.exit_code = -1
result.timed_out = True
result.execution_time = time.perf_counter() - start
return result
async def execute_async(self, command: str, *, timeout: float | None = None) -> CommandResult:
if not self._created:
raise RuntimeError("sandbox not created call create first")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed")
effective_timeout = timeout if timeout is not None else self._timeout
runtime_cmd = self._build_runtime_command(command)
result = CommandResult()
start = time.perf_counter()
try:
proc = await asyncio.create_subprocess_exec(
*runtime_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._command_env(),
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(),
timeout=effective_timeout,
)
result.stdout = stdout_bytes.decode("utf-8", errors="replace")
result.stderr = stderr_bytes.decode("utf-8", errors="replace")
result.exit_code = proc.returncode
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
result.exit_code = -1
result.timed_out = True
except OSError as exc:
result.stderr = str(exc)
result.exit_code = -1
result.execution_time = time.perf_counter() - start
return result
def reset(self) -> float:
if not self._created:
raise RuntimeError("sandbox not created call create first")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed")
latency = self._overlay.reset()
self._ensure_runtime_layout()
print(f"sandbox reset {latency:.1f}ms")
return latency
def destroy(self) -> None:
if self._destroyed:
return
self._overlay.cleanup()
self._created = False
self._destroyed = True
print("sandbox destroyed")
def __enter__(self):
self.create()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.destroy()
return False