import asyncio import shutil import subprocess import time from dataclasses import dataclass from pathlib import Path from sysadmin_env.overlayfs import OverlayFSManager @dataclass class CommandResult: stdout: str = "" stderr: str = "" exit_code: int = -1 execution_time: float = 0.0 timed_out: bool = False class Sandbox: _HOST_RO_BINDS = [ "/usr/bin", "/usr/sbin", "/usr/lib", "/usr/lib64", "/usr/share", "/bin", "/sbin", "/lib", "/lib64", "/etc/alternatives", "/etc/ld.so.cache", ] def __init__( self, lowerdir: str | Path, *, timeout: float = 30.0, isolate_network: bool = True, overlay_base_dir: str | None = None, ): self._lowerdir = Path(lowerdir).resolve() self._timeout = timeout self._isolate_network = isolate_network self._overlay = OverlayFSManager(base_dir=overlay_base_dir) self._created = False self._destroyed = False @property def is_created(self) -> bool: return self._created @property def is_destroyed(self) -> bool: return self._destroyed @property def overlay(self) -> OverlayFSManager: return self._overlay @property def merged_root(self) -> Path: return Path("/") @property def state_root(self) -> Path | None: return self._overlay.merged def create(self) -> None: if self._created: raise RuntimeError("sandbox already created") if self._destroyed: raise RuntimeError("sandbox has been destroyed and cannot be recreated") print("sandbox verify bwrap start") self._verify_bwrap_available() print("sandbox verify bwrap complete") print(f"sandbox create stack {self._lowerdir}") self._overlay.create_stack(self._lowerdir) print("sandbox overlay mount start") try: self._overlay.mount() except Exception as exc: print(f"sandbox overlay mount failed {type(exc).__name__.lower()}") raise print("sandbox overlay mount complete") print("sandbox runtime layout start") self._ensure_runtime_layout() print("sandbox runtime layout complete") self._created = True print("sandbox created") def _verify_bwrap_available(self) -> None: bwrap_bin = shutil.which("bwrap") if bwrap_bin is None: raise FileNotFoundError("bwrap binary not found in path") print(f"sandbox bwrap found {bwrap_bin}") def _ensure_runtime_layout(self) -> None: if self._overlay.merged is None: raise RuntimeError("overlay stack not ready") for relative in [ Path("bin"), Path("sbin"), Path("lib"), Path("lib64"), Path("usr"), Path("usr/bin"), Path("usr/sbin"), Path("usr/lib"), Path("usr/lib64"), Path("usr/share"), Path("usr/local"), Path("usr/local/bin"), Path("etc"), Path("etc/alternatives"), Path("var"), Path("var/tmp"), Path("tmp"), Path("dev"), Path("proc"), Path("run"), Path("root"), Path("home"), ]: (self._overlay.merged / relative).mkdir(parents=True, exist_ok=True) def _build_bwrap_command(self, command: str) -> list[str]: if self._overlay.merged is None: raise RuntimeError("sandbox storage not ready") merged = str(self._overlay.merged) cmd = [ "bwrap", "--bind", merged, "/", "--proc", "/proc", "--dev", "/dev", "--tmpfs", "/tmp", "--unshare-pid", "--unshare-uts", "--unshare-cgroup-try", "--die-with-parent", "--hostname", "sandbox", "--clearenv", "--setenv", "PATH", "/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin", "--setenv", "HOME", "/root", "--setenv", "TERM", "xterm", "--uid", "0", "--gid", "0", "--cap-drop", "ALL", ] if self._isolate_network: cmd.append("--unshare-net") for host_path in self._HOST_RO_BINDS: if Path(host_path).exists(): cmd.extend(["--ro-bind", host_path, host_path]) cmd.extend([ "--chdir", "/", "--", "/bin/sh", "-c", command, ]) return cmd def execute(self, command: str, *, timeout: float | None = None) -> CommandResult: if not self._created: raise RuntimeError("sandbox not created call create first") if self._destroyed: raise RuntimeError("sandbox has been destroyed") effective_timeout = timeout if timeout is not None else self._timeout bwrap_cmd = self._build_bwrap_command(command) result = CommandResult() start = time.perf_counter() try: proc = subprocess.run( bwrap_cmd, capture_output=True, text=True, timeout=effective_timeout, ) result.stdout = proc.stdout result.stderr = proc.stderr result.exit_code = proc.returncode except subprocess.TimeoutExpired as exc: result.stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="replace") result.stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="replace") result.exit_code = -1 result.timed_out = True result.execution_time = time.perf_counter() - start return result async def execute_async(self, command: str, *, timeout: float | None = None) -> CommandResult: if not self._created: raise RuntimeError("sandbox not created call create first") if self._destroyed: raise RuntimeError("sandbox has been destroyed") effective_timeout = timeout if timeout is not None else self._timeout bwrap_cmd = self._build_bwrap_command(command) result = CommandResult() start = time.perf_counter() try: proc = await asyncio.create_subprocess_exec( *bwrap_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout_bytes, stderr_bytes = await asyncio.wait_for( proc.communicate(), timeout=effective_timeout, ) result.stdout = stdout_bytes.decode("utf-8", errors="replace") result.stderr = stderr_bytes.decode("utf-8", errors="replace") result.exit_code = proc.returncode except asyncio.TimeoutError: proc.kill() await proc.wait() result.exit_code = -1 result.timed_out = True except OSError as exc: result.stderr = str(exc) result.exit_code = -1 result.execution_time = time.perf_counter() - start return result def reset(self) -> float: if not self._created: raise RuntimeError("sandbox not created call create first") if self._destroyed: raise RuntimeError("sandbox has been destroyed") latency = self._overlay.reset() self._ensure_runtime_layout() print(f"sandbox reset {latency:.1f}ms") return latency def destroy(self) -> None: if self._destroyed: return self._overlay.cleanup() self._created = False self._destroyed = True print("sandbox destroyed") def __enter__(self): self.create() return self def __exit__(self, exc_type, exc_val, exc_tb): self.destroy() return False