import os import shutil import subprocess import sys import tempfile import time from pathlib import Path from sysadmin_env.sandbox import Sandbox from sysadmin_env.sandbox import CommandResult def check_bwrap_available(): bwrap = shutil.which("bwrap") if bwrap is None: print("bwrap not installed") sys.exit(1) result = subprocess.run(["bwrap", "--version"], capture_output=True, text=True) print(f"bwrap found at {bwrap} version {result.stdout.strip()}") def check_fuse_overlayfs_available(): fuse_bin = shutil.which("fuse-overlayfs") if fuse_bin is None: print("fuse-overlayfs not installed attempting pacman install") result = subprocess.run( ["sudo", "pacman", "-S", "--noconfirm", "fuse-overlayfs"], capture_output=True, text=True, ) if result.returncode != 0: print(f"failed to install fuse-overlayfs {result.stderr.strip()}") sys.exit(1) print("fuse-overlayfs installed successfully") else: print(f"fuse-overlayfs found at {fuse_bin}") def create_test_lowerdir(path: Path) -> None: """ creates a minimal lowerdir with some test files the sandbox bind mounts host system dirs so no binaries needed here """ (path / "etc").mkdir(exist_ok=True) (path / "var" / "log").mkdir(parents=True, exist_ok=True) (path / "home").mkdir(exist_ok=True) (path / "etc" / "config.txt").write_text("setting=default\n") (path / "var" / "log" / "app.log").write_text("startup complete\n") (path / "home" / "readme.txt").write_text("task workspace\n") def test_basic_command_execution(): print("\n--- test basic command execution ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: result = sandbox.execute("echo hello") assert result.stdout.strip() == "hello", f"expected hello got {result.stdout.strip()!r}" assert result.exit_code == 0, f"expected exit 0 got {result.exit_code}" assert result.execution_time > 0 assert not result.timed_out print(f"basic execution passed stdout={result.stdout.strip()!r} exit={result.exit_code} time={result.execution_time:.3f}s") def test_command_stderr(): print("\n--- test command stderr ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: result = sandbox.execute("echo error message >&2") assert "error message" in result.stderr, f"expected error in stderr got {result.stderr!r}" assert result.exit_code == 0 print(f"stderr capture passed stderr={result.stderr.strip()!r}") def test_nonzero_exit_code(): print("\n--- test nonzero exit code ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: result = sandbox.execute("exit 42") assert result.exit_code == 42, f"expected exit 42 got {result.exit_code}" assert not result.timed_out print(f"nonzero exit passed exit_code={result.exit_code}") def test_read_lowerdir_files(): print("\n--- test read lowerdir files ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) merged_path = None with Sandbox(lowerdir) as sandbox: merged_path = str(sandbox.merged_root) result = sandbox.execute(f"cat {merged_path}/etc/config.txt") assert "setting=default" in result.stdout, f"lowerdir read failed {result.stdout!r}" assert result.exit_code == 0 print(f"read lowerdir files passed content={result.stdout.strip()!r}") def test_file_write_and_reset(): print("\n--- test file write and reset ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: merged = str(sandbox.merged_root) sandbox.execute(f"echo agent data > {merged}/agent_file.txt") verify = sandbox.execute(f"cat {merged}/agent_file.txt") assert "agent data" in verify.stdout, f"file write failed {verify.stdout!r} {verify.stderr!r}" latency = sandbox.reset() after_reset = sandbox.execute(f"cat {merged}/agent_file.txt 2>/dev/null") assert after_reset.exit_code != 0 or "agent data" not in after_reset.stdout original = sandbox.execute(f"cat {merged}/etc/config.txt") assert "setting=default" in original.stdout, "original lowerdir data lost after reset" print(f"file write and reset passed reset latency {latency:.1f}ms") def test_timeout_enforcement(): print("\n--- test timeout enforcement ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir, timeout=60.0) as sandbox: start = time.perf_counter() result = sandbox.execute("sleep 60", timeout=2.0) elapsed = time.perf_counter() - start assert result.timed_out, "expected timeout flag" assert result.exit_code == -1, f"expected exit -1 got {result.exit_code}" assert elapsed < 10.0, f"timeout took too long {elapsed:.1f}s" print(f"timeout enforcement passed elapsed={elapsed:.1f}s timed_out={result.timed_out}") def test_host_isolation(): print("\n--- test host isolation ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) host_home = os.path.expanduser("~") host_marker = Path(host_home) / ".sandbox_test_marker" host_marker_existed = host_marker.exists() try: if not host_marker_existed: host_marker.write_text("test marker") with Sandbox(lowerdir) as sandbox: result = sandbox.execute(f"cat {host_home}/.sandbox_test_marker 2>/dev/null && echo found || echo blocked") assert "blocked" in result.stdout or result.exit_code != 0, "sandbox accessed host home" result2 = sandbox.execute("ls /root 2>/dev/null || echo no_access") print(f"host isolation check passed home_result={result.stdout.strip()!r}") finally: if not host_marker_existed and host_marker.exists(): host_marker.unlink() def test_sandbox_not_created_errors(): print("\n--- test sandbox not created errors ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) sandbox = Sandbox(lowerdir) try: sandbox.execute("echo hello") assert False, "should have raised" except RuntimeError: print("execute before create correctly rejected") try: sandbox.reset() assert False, "should have raised" except RuntimeError: print("reset before create correctly rejected") sandbox.destroy() def test_double_create_rejected(): print("\n--- test double create rejected ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) sandbox = Sandbox(lowerdir) sandbox.create() try: sandbox.create() assert False, "should have raised" except RuntimeError: print("double create correctly rejected") sandbox.destroy() def test_destroy_idempotent(): print("\n--- test destroy idempotent ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) sandbox = Sandbox(lowerdir) sandbox.create() sandbox.destroy() sandbox.destroy() print("destroy idempotent confirmed") def test_context_manager(): print("\n--- test context manager ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: assert sandbox.is_created assert not sandbox.is_destroyed result = sandbox.execute("echo context works") assert result.exit_code == 0, f"context command failed exit={result.exit_code} stderr={result.stderr!r}" assert "context works" in result.stdout assert sandbox.is_destroyed print("context manager passed") def test_network_isolation(): print("\n--- test network isolation ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir, isolate_network=True) as sandbox: result = sandbox.execute("echo network isolated") assert result.exit_code == 0, f"network isolated failed {result.stderr!r}" print("network isolated sandbox runs commands") with Sandbox(lowerdir, isolate_network=False) as sandbox: result = sandbox.execute("echo network shared") assert result.exit_code == 0, f"network shared failed {result.stderr!r}" print("network shared sandbox runs commands") print("network isolation test passed") def test_multiple_commands_sequential(): print("\n--- test multiple commands sequential ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: r1 = sandbox.execute("echo first") assert r1.stdout.strip() == "first", f"first command failed {r1.stdout!r} {r1.stderr!r}" r2 = sandbox.execute("echo second") assert r2.stdout.strip() == "second" r3 = sandbox.execute("echo third") assert r3.stdout.strip() == "third" print("multiple sequential commands passed") def test_environment_clearenv(): print("\n--- test environment clearenv ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: result = sandbox.execute("echo $HOME") assert "/root" in result.stdout, f"expected /root in HOME got {result.stdout.strip()!r}" result2 = sandbox.execute("echo $PATH") assert "/usr/bin" in result2.stdout print(f"env clearenv passed HOME={result.stdout.strip()!r}") def test_hostname_isolation(): print("\n--- test hostname isolation ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) with Sandbox(lowerdir) as sandbox: result = sandbox.execute("uname -n") assert result.stdout.strip() == "sandbox", f"expected hostname sandbox got {result.stdout.strip()!r}" print(f"hostname isolation passed hostname={result.stdout.strip()!r}") def test_lowerdir_immutability(): print("\n--- test lowerdir immutability ---") with tempfile.TemporaryDirectory(prefix="test_sandbox_lower_") as lowerdir: create_test_lowerdir(Path(lowerdir)) original_config = (Path(lowerdir) / "etc" / "config.txt").read_text() original_files = set(os.listdir(lowerdir)) with Sandbox(lowerdir) as sandbox: merged = str(sandbox.merged_root) sandbox.execute(f"echo corrupted > {merged}/etc/config.txt") sandbox.execute(f"echo new > {merged}/new_file.txt") sandbox.reset() assert (Path(lowerdir) / "etc" / "config.txt").read_text() == original_config assert set(os.listdir(lowerdir)) == original_files print("lowerdir immutability confirmed") def main(): check_bwrap_available() check_fuse_overlayfs_available() tests = [ test_basic_command_execution, test_command_stderr, test_nonzero_exit_code, test_read_lowerdir_files, test_file_write_and_reset, test_timeout_enforcement, test_host_isolation, test_sandbox_not_created_errors, test_double_create_rejected, test_destroy_idempotent, test_context_manager, test_network_isolation, test_multiple_commands_sequential, test_environment_clearenv, test_hostname_isolation, test_lowerdir_immutability, ] passed = 0 failed = 0 for test in tests: try: test() passed += 1 except Exception as e: print(f"FAILED {test.__name__} {e}") import traceback traceback.print_exc() failed += 1 print(f"\n--- results {passed} passed {failed} failed ---") return failed == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)