sysadmin-env / sysadmin_env /sandbox.py
huggingmenfordays's picture
Upload folder using huggingface_hub
474686b verified
import asyncio
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from sysadmin_env.overlayfs import OverlayFSManager
@dataclass
class CommandResult:
stdout: str = ""
stderr: str = ""
exit_code: int = -1
execution_time: float = 0.0
timed_out: bool = False
class Sandbox:
_HOST_RO_BINDS = [
"/usr/bin",
"/usr/sbin",
"/usr/lib",
"/usr/lib64",
"/usr/share",
"/bin",
"/sbin",
"/lib",
"/lib64",
"/etc/alternatives",
"/etc/ld.so.cache",
]
def __init__(
self,
lowerdir: str | Path,
*,
timeout: float = 30.0,
isolate_network: bool = True,
overlay_base_dir: str | None = None,
):
self._lowerdir = Path(lowerdir).resolve()
self._timeout = timeout
self._isolate_network = isolate_network
self._overlay = OverlayFSManager(base_dir=overlay_base_dir)
self._created = False
self._destroyed = False
@property
def is_created(self) -> bool:
return self._created
@property
def is_destroyed(self) -> bool:
return self._destroyed
@property
def overlay(self) -> OverlayFSManager:
return self._overlay
@property
def merged_root(self) -> Path:
return Path("/")
@property
def state_root(self) -> Path | None:
return self._overlay.merged
def create(self) -> None:
if self._created:
raise RuntimeError("sandbox already created")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed and cannot be recreated")
print("sandbox verify bwrap start")
self._verify_bwrap_available()
print("sandbox verify bwrap complete")
print(f"sandbox create stack {self._lowerdir}")
self._overlay.create_stack(self._lowerdir)
print("sandbox overlay mount start")
try:
self._overlay.mount()
except Exception as exc:
print(f"sandbox overlay mount failed {type(exc).__name__.lower()}")
raise
print("sandbox overlay mount complete")
print("sandbox runtime layout start")
self._ensure_runtime_layout()
print("sandbox runtime layout complete")
self._created = True
print("sandbox created")
def _verify_bwrap_available(self) -> None:
bwrap_bin = shutil.which("bwrap")
if bwrap_bin is None:
raise FileNotFoundError("bwrap binary not found in path")
print(f"sandbox bwrap found {bwrap_bin}")
def _ensure_runtime_layout(self) -> None:
if self._overlay.merged is None:
raise RuntimeError("overlay stack not ready")
for relative in [
Path("bin"),
Path("sbin"),
Path("lib"),
Path("lib64"),
Path("usr"),
Path("usr/bin"),
Path("usr/sbin"),
Path("usr/lib"),
Path("usr/lib64"),
Path("usr/share"),
Path("usr/local"),
Path("usr/local/bin"),
Path("etc"),
Path("etc/alternatives"),
Path("var"),
Path("var/tmp"),
Path("tmp"),
Path("dev"),
Path("proc"),
Path("run"),
Path("root"),
Path("home"),
]:
(self._overlay.merged / relative).mkdir(parents=True, exist_ok=True)
def _build_bwrap_command(self, command: str) -> list[str]:
if self._overlay.merged is None:
raise RuntimeError("sandbox storage not ready")
merged = str(self._overlay.merged)
cmd = [
"bwrap",
"--bind",
merged,
"/",
"--proc",
"/proc",
"--dev",
"/dev",
"--tmpfs",
"/tmp",
"--unshare-pid",
"--unshare-uts",
"--unshare-cgroup-try",
"--die-with-parent",
"--hostname",
"sandbox",
"--clearenv",
"--setenv",
"PATH",
"/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin",
"--setenv",
"HOME",
"/root",
"--setenv",
"TERM",
"xterm",
"--uid",
"0",
"--gid",
"0",
"--cap-drop",
"ALL",
]
if self._isolate_network:
cmd.append("--unshare-net")
for host_path in self._HOST_RO_BINDS:
if Path(host_path).exists():
cmd.extend(["--ro-bind", host_path, host_path])
cmd.extend([
"--chdir",
"/",
"--",
"/bin/sh",
"-c",
command,
])
return cmd
def execute(self, command: str, *, timeout: float | None = None) -> CommandResult:
if not self._created:
raise RuntimeError("sandbox not created call create first")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed")
effective_timeout = timeout if timeout is not None else self._timeout
bwrap_cmd = self._build_bwrap_command(command)
result = CommandResult()
start = time.perf_counter()
try:
proc = subprocess.run(
bwrap_cmd,
capture_output=True,
text=True,
timeout=effective_timeout,
)
result.stdout = proc.stdout
result.stderr = proc.stderr
result.exit_code = proc.returncode
except subprocess.TimeoutExpired as exc:
result.stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="replace")
result.stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="replace")
result.exit_code = -1
result.timed_out = True
result.execution_time = time.perf_counter() - start
return result
async def execute_async(self, command: str, *, timeout: float | None = None) -> CommandResult:
if not self._created:
raise RuntimeError("sandbox not created call create first")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed")
effective_timeout = timeout if timeout is not None else self._timeout
bwrap_cmd = self._build_bwrap_command(command)
result = CommandResult()
start = time.perf_counter()
try:
proc = await asyncio.create_subprocess_exec(
*bwrap_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(),
timeout=effective_timeout,
)
result.stdout = stdout_bytes.decode("utf-8", errors="replace")
result.stderr = stderr_bytes.decode("utf-8", errors="replace")
result.exit_code = proc.returncode
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
result.exit_code = -1
result.timed_out = True
except OSError as exc:
result.stderr = str(exc)
result.exit_code = -1
result.execution_time = time.perf_counter() - start
return result
def reset(self) -> float:
if not self._created:
raise RuntimeError("sandbox not created call create first")
if self._destroyed:
raise RuntimeError("sandbox has been destroyed")
latency = self._overlay.reset()
self._ensure_runtime_layout()
print(f"sandbox reset {latency:.1f}ms")
return latency
def destroy(self) -> None:
if self._destroyed:
return
self._overlay.cleanup()
self._created = False
self._destroyed = True
print("sandbox destroyed")
def __enter__(self):
self.create()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.destroy()
return False