bbkdevops's picture
download
raw
25.4 kB
"""Isolated filesystem sandbox environments for commands, builds, and tests."""
from __future__ import annotations
from dataclasses import asdict, dataclass
import base64
import hashlib
import json
import os
from pathlib import Path
import shutil
import signal
import subprocess
import time
from typing import Any
from model.sandbox_rl import ALLOWED_CMD
BASE_LINUX_PACKAGE_COMMANDS = {
"bzip2",
"bunzip2",
"bzcat",
"dig",
"find",
"git",
"gzip",
"gunzip",
"host",
"nslookup",
"openssl",
"ping",
"ps",
"tar",
"unzip",
"which",
"whois",
"zstd",
"unzstd",
"zstdcat",
}
def _sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _safe_rel(path: str | Path) -> Path:
rel = Path(path)
if rel.is_absolute() or ".." in rel.parts:
raise ValueError("path_escape")
return rel
def _resolve_inside(root: Path, rel: str | Path) -> Path:
target = (root / _safe_rel(rel)).resolve()
if root != target and root not in target.parents:
raise ValueError("path_escape")
return target
def _manifest(root: Path, max_files: int = 10_000) -> dict:
files: list[dict] = []
for path in sorted(p for p in root.rglob("*") if p.is_file()):
if len(files) >= max_files:
break
data = path.read_bytes()
files.append(
{
"path": path.relative_to(root).as_posix(),
"bytes": len(data),
"sha256": _sha256_bytes(data),
}
)
digest = _sha256_bytes(json.dumps(files, sort_keys=True, separators=(",", ":")).encode("utf-8"))
return {"file_count": len(files), "sha256": digest, "truncated": len(files) >= max_files, "files": files}
@dataclass(frozen=True)
class SandboxEnvironmentPolicy:
command_allowlist: tuple[str, ...] = tuple(sorted(ALLOWED_CMD | BASE_LINUX_PACKAGE_COMMANDS | {"pytest"}))
timeout_s: float = 30.0
max_copy_bytes: int = 64_000_000
max_snapshot_files: int = 10_000
provisioned_memory_mb: int = 1024
active_cpu: bool = True
active_gpu: bool = False
max_creations: int = 32
max_total_transfer_bytes: int = 1_000_000_000
max_storage_bytes: int = 2_000_000_000
max_runtime_duration_s: float = 300.0
max_concurrent_sandboxes: int = 4
class SandboxEnvironmentManager:
"""Manage reusable isolated working directories with audited operations."""
def __init__(self, root: str | Path, policy: SandboxEnvironmentPolicy | None = None):
self.root = Path(root).resolve()
self.root.mkdir(parents=True, exist_ok=True)
self.policy = policy or SandboxEnvironmentPolicy()
self.ledger_path = self.root / "sandbox_env_ledger.jsonl"
self.envs_dir = self.root / "envs"
self.snapshots_dir = self.root / "snapshots"
self.jobs_dir = self.root / "jobs"
self.envs_dir.mkdir(exist_ok=True)
self.snapshots_dir.mkdir(exist_ok=True)
self.jobs_dir.mkdir(exist_ok=True)
def create(self, name: str, from_snapshot: str | None = None) -> dict:
quota = self.resource_report()
if quota["sandbox_creations"] >= self.policy.max_creations:
return self._record("create", False, name, error="sandbox_creation_limit")
if quota["concurrent_sandboxes"] >= self.policy.max_concurrent_sandboxes:
return self._record("create", False, name, error="concurrent_sandbox_limit")
env = self._env_path(name)
if env.exists():
return self._record("create", False, name, error="env_exists")
env.mkdir(parents=True)
if from_snapshot:
snap = self._snapshot_path(from_snapshot)
if not snap.exists():
return self._record("create", False, name, error="snapshot_not_found")
self._copy_tree_contents(snap / "files", env)
self._write_env_status(name, "running")
return self._record("create", True, name, result={"env_path": str(env), "manifest": _manifest(env)})
def run(self, name: str, argv: list[str], timeout_s: float | None = None) -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("run", False, name, error="env_not_found")
requested_timeout = timeout_s or self.policy.timeout_s
if requested_timeout > self.policy.max_runtime_duration_s:
return self._record("run", False, name, error="runtime_duration_limit")
if not self.policy.active_cpu:
return self._record("run", False, name, error="cpu_disabled")
if not argv:
return self._record("run", False, name, error="empty_command")
executable = Path(argv[0]).name.lower()
if executable not in self.policy.command_allowlist:
return self._record("run", False, name, error="command_not_allowlisted", details={"argv": argv})
before = _manifest(env, self.policy.max_snapshot_files)
started = time.time()
if executable == "echo":
result = {
"returncode": 0,
"stdout": " ".join(argv[1:]) + "\n",
"stderr": "",
"elapsed_ms": (time.time() - started) * 1000.0,
"manifest_before": before,
"manifest_after": _manifest(env, self.policy.max_snapshot_files),
"builtin": True,
}
return self._record("run", True, name, result=result)
try:
proc = subprocess.run(
argv,
cwd=env,
capture_output=True,
text=True,
shell=False,
timeout=requested_timeout,
check=False,
)
result = {
"returncode": proc.returncode,
"stdout": proc.stdout,
"stderr": proc.stderr,
"elapsed_ms": (time.time() - started) * 1000.0,
"manifest_before": before,
"manifest_after": _manifest(env, self.policy.max_snapshot_files),
}
return self._record("run", proc.returncode == 0, name, result=result)
except subprocess.TimeoutExpired as exc:
return self._record(
"run",
False,
name,
error="timeout",
details={"elapsed_ms": (time.time() - started) * 1000.0, "stdout": exc.stdout, "stderr": exc.stderr},
)
def copy_in(self, name: str, src: str | Path, dest: str | Path) -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("copy_in", False, name, error="env_not_found")
source = Path(src).resolve()
if not source.exists():
return self._record("copy_in", False, name, error="source_not_found")
target = _resolve_inside(env, dest)
try:
copied = self._copy_path(source, target)
except ValueError as exc:
return self._record("copy_in", False, name, error=str(exc))
if self.resource_report()["sandbox_data_transfer_bytes"] + copied > self.policy.max_total_transfer_bytes:
target.unlink(missing_ok=True) if target.is_file() else shutil.rmtree(target, ignore_errors=True)
return self._record("copy_in", False, name, error="data_transfer_limit")
if self.resource_report()["sandbox_storage_bytes"] > self.policy.max_storage_bytes:
target.unlink(missing_ok=True) if target.is_file() else shutil.rmtree(target, ignore_errors=True)
return self._record("copy_in", False, name, error="storage_limit")
return self._record("copy_in", True, name, result={"bytes": copied, "dest": str(target.relative_to(env))})
def copy_out(self, name: str, src: str | Path, dest: str | Path) -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("copy_out", False, name, error="env_not_found")
try:
source = _resolve_inside(env, src)
except ValueError as exc:
return self._record("copy_out", False, name, error=str(exc))
if not source.exists():
return self._record("copy_out", False, name, error="source_not_found")
target = Path(dest).resolve()
copied = self._copy_path(source, target)
if self.resource_report()["sandbox_data_transfer_bytes"] + copied > self.policy.max_total_transfer_bytes:
target.unlink(missing_ok=True) if target.is_file() else shutil.rmtree(target, ignore_errors=True)
return self._record("copy_out", False, name, error="data_transfer_limit")
return self._record("copy_out", True, name, result={"bytes": copied, "dest": str(target)})
def file_put(self, name: str, path: str | Path, content: str, encoding: str = "utf-8") -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("file_put", False, name, error="env_not_found")
try:
target = _resolve_inside(env, path)
except ValueError as exc:
return self._record("file_put", False, name, error=str(exc))
if encoding == "base64":
data = base64.b64decode(content.encode("ascii"))
elif encoding == "utf-8":
data = content.encode("utf-8")
else:
return self._record("file_put", False, name, error="unsupported_encoding")
if len(data) > self.policy.max_copy_bytes:
return self._record("file_put", False, name, error="copy_too_large")
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
return self._record("file_put", True, name, result={"path": str(target.relative_to(env)), "bytes": len(data)})
def file_get(self, name: str, path: str | Path, encoding: str = "utf-8") -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("file_get", False, name, error="env_not_found")
try:
source = _resolve_inside(env, path)
except ValueError as exc:
return self._record("file_get", False, name, error=str(exc))
if not source.exists() or not source.is_file():
return self._record("file_get", False, name, error="source_not_found")
data = source.read_bytes()
if len(data) > self.policy.max_copy_bytes:
return self._record("file_get", False, name, error="copy_too_large")
if encoding == "base64":
content = base64.b64encode(data).decode("ascii")
elif encoding == "utf-8":
content = data.decode("utf-8")
else:
return self._record("file_get", False, name, error="unsupported_encoding")
return self._record(
"file_get",
True,
name,
result={"path": str(source.relative_to(env)), "bytes": len(data), "encoding": encoding, "content": content},
)
def run_detached(self, name: str, argv: list[str], timeout_s: float | None = None) -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("run_detached", False, name, error="env_not_found")
requested_timeout = timeout_s or self.policy.timeout_s
if requested_timeout > self.policy.max_runtime_duration_s:
return self._record("run_detached", False, name, error="runtime_duration_limit")
if not argv:
return self._record("run_detached", False, name, error="empty_command")
executable = Path(argv[0]).name.lower()
if executable not in self.policy.command_allowlist:
return self._record("run_detached", False, name, error="command_not_allowlisted", details={"argv": argv})
job_id = f"job-{int(time.time() * 1000)}-{hashlib.sha1(json.dumps(argv).encode('utf-8')).hexdigest()[:8]}"
job_dir = self.jobs_dir / job_id
job_dir.mkdir()
stdout_path = job_dir / "stdout.log"
stderr_path = job_dir / "stderr.log"
before = _manifest(env, self.policy.max_snapshot_files)
if executable == "echo":
stdout_path.write_text(" ".join(argv[1:]) + "\n", encoding="utf-8")
stderr_path.write_text("", encoding="utf-8")
meta = {
"job_id": job_id,
"env": name,
"argv": argv,
"pid": None,
"status": "exited",
"returncode": 0,
"started_at": time.time(),
"ended_at": time.time(),
"stdout_path": str(stdout_path),
"stderr_path": str(stderr_path),
"manifest_before": before,
"manifest_after": _manifest(env, self.policy.max_snapshot_files),
"builtin": True,
}
self._write_job_meta(job_id, meta)
return self._record("run_detached", True, name, result=meta)
stdout_f = stdout_path.open("wb")
stderr_f = stderr_path.open("wb")
proc = subprocess.Popen(argv, cwd=env, stdout=stdout_f, stderr=stderr_f, shell=False)
stdout_f.close()
stderr_f.close()
meta = {
"job_id": job_id,
"env": name,
"argv": argv,
"pid": proc.pid,
"status": "running",
"returncode": None,
"started_at": time.time(),
"ended_at": None,
"timeout_s": requested_timeout,
"stdout_path": str(stdout_path),
"stderr_path": str(stderr_path),
"manifest_before": before,
}
self._write_job_meta(job_id, meta)
return self._record("run_detached", True, name, result=meta)
def job_status(self, job_id: str) -> dict:
meta = self._read_job_meta(job_id)
if not meta:
return self._record("job_status", False, job_id, error="job_not_found")
if meta.get("status") == "running" and meta.get("pid"):
try:
os.kill(int(meta["pid"]), 0)
except OSError:
meta["status"] = "exited"
meta["ended_at"] = time.time()
self._write_job_meta(job_id, meta)
return self._record("job_status", True, str(meta.get("env", job_id)), result=meta)
def job_stream(self, job_id: str, stream: str = "stdout", offset: int = 0, max_bytes: int = 64_000) -> dict:
meta = self._read_job_meta(job_id)
if not meta:
return self._record("job_stream", False, job_id, error="job_not_found")
path = Path(meta["stderr_path"] if stream == "stderr" else meta["stdout_path"])
data = path.read_bytes() if path.exists() else b""
chunk = data[offset : offset + max_bytes]
result = {
"job_id": job_id,
"stream": stream,
"offset": offset,
"next_offset": offset + len(chunk),
"content": chunk.decode("utf-8", errors="replace"),
"eof": offset + len(chunk) >= len(data),
}
return self._record("job_stream", True, str(meta.get("env", job_id)), result=result)
def stop(self, name: str, snapshot: bool = True) -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("stop", False, name, error="env_not_found")
stopped_jobs: list[str] = []
for meta_path in self.jobs_dir.glob("*/meta.json"):
meta = json.loads(meta_path.read_text(encoding="utf-8"))
if meta.get("env") == name and meta.get("status") == "running" and meta.get("pid"):
try:
os.kill(int(meta["pid"]), signal.SIGTERM)
except OSError:
pass
meta["status"] = "stopped"
meta["ended_at"] = time.time()
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
stopped_jobs.append(str(meta.get("job_id")))
snapshot_result = None
if snapshot:
snapshot_result = self.save_snapshot(name, f"{name}-auto-stop")
self._write_env_status(name, "stopped")
return self._record("stop", True, name, result={"stopped_jobs": stopped_jobs, "auto_snapshot": snapshot_result})
def fork(self, source_name: str, child_name: str) -> dict:
latest = self._latest_snapshot_for_env(source_name)
if latest is None:
snap_result = self.save_snapshot(source_name, f"{source_name}-fork-{int(time.time())}")
if not snap_result.get("ok"):
return self._record("fork", False, source_name, error="source_snapshot_failed", details=snap_result)
latest = str((snap_result.get("result") or {}).get("snapshot"))
created = self.create(child_name, from_snapshot=latest)
return self._record("fork", created.get("ok") is True, source_name, result={"child": child_name, "snapshot": latest, "create": created})
def dashboard(self) -> dict:
env_rows = []
for env in sorted(p for p in self.envs_dir.iterdir() if p.is_dir()):
status = self._read_env_status(env.name)
env_rows.append({"name": env.name, "path": str(env), "status": status, "manifest": _manifest(env, self.policy.max_snapshot_files)})
job_rows = []
for meta_path in sorted(self.jobs_dir.glob("*/meta.json")):
job_rows.append(json.loads(meta_path.read_text(encoding="utf-8")))
snapshots = []
for snap in sorted(p for p in self.snapshots_dir.iterdir() if p.is_dir()):
manifest_path = snap / "manifest.json"
manifest = json.loads(manifest_path.read_text(encoding="utf-8")) if manifest_path.exists() else _manifest(snap / "files")
snapshots.append({"name": snap.name, "path": str(snap), "manifest": manifest})
return {
"sandboxes": env_rows,
"running": [row for row in env_rows if row["status"] == "running"],
"stopped": [row for row in env_rows if row["status"] == "stopped"],
"commands": job_rows,
"snapshots": snapshots,
"usage": self.resource_report(),
}
def save_snapshot(self, name: str, snapshot_name: str) -> dict:
env = self._env_path(name)
if not env.exists():
return self._record("snapshot", False, name, error="env_not_found")
snap = self._snapshot_path(snapshot_name)
if snap.exists():
shutil.rmtree(snap)
files_dir = snap / "files"
files_dir.mkdir(parents=True)
self._copy_tree_contents(env, files_dir)
manifest = _manifest(files_dir, self.policy.max_snapshot_files)
if self.resource_report()["sandbox_storage_bytes"] > self.policy.max_storage_bytes:
shutil.rmtree(snap, ignore_errors=True)
return self._record("snapshot", False, name, error="storage_limit")
(snap / "manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True), encoding="utf-8")
return self._record("snapshot", True, name, result={"snapshot": snapshot_name, "manifest": manifest})
def restore_snapshot(self, name: str, snapshot_name: str, replace: bool = True) -> dict:
env = self._env_path(name)
snap = self._snapshot_path(snapshot_name)
if not snap.exists():
return self._record("restore", False, name, error="snapshot_not_found")
if env.exists() and replace:
shutil.rmtree(env)
env.mkdir(parents=True, exist_ok=True)
self._copy_tree_contents(snap / "files", env)
self._write_env_status(name, "running")
return self._record("restore", True, name, result={"env_path": str(env), "manifest": _manifest(env)})
def resource_report(self) -> dict:
active_sandboxes = len([p for p in self.envs_dir.iterdir() if p.is_dir() and self._read_env_status(p.name) == "running"])
storage_bytes = self._tree_bytes(self.envs_dir) + self._tree_bytes(self.snapshots_dir)
data_transfer_bytes = 0
sandbox_creations = 0
if self.ledger_path.exists():
for line in self.ledger_path.read_text(encoding="utf-8").splitlines():
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if row.get("action") == "create" and row.get("ok") is True:
sandbox_creations += 1
if row.get("action") in {"copy_in", "copy_out"} and row.get("ok") is True:
data_transfer_bytes += int((row.get("result") or {}).get("bytes") or 0)
return {
"sandbox_provisioned_memory_mb": self.policy.provisioned_memory_mb,
"sandbox_active_cpu": self.policy.active_cpu,
"sandbox_active_gpu": self.policy.active_gpu,
"sandbox_creations": sandbox_creations,
"sandbox_data_transfer_bytes": data_transfer_bytes,
"sandbox_storage_bytes": storage_bytes,
"max_runtime_duration_s": self.policy.max_runtime_duration_s,
"concurrent_sandboxes": active_sandboxes,
"snapshot_storage_bytes": self._tree_bytes(self.snapshots_dir),
"limits": {
"max_creations": self.policy.max_creations,
"max_total_transfer_bytes": self.policy.max_total_transfer_bytes,
"max_storage_bytes": self.policy.max_storage_bytes,
"max_concurrent_sandboxes": self.policy.max_concurrent_sandboxes,
},
}
def _copy_path(self, source: Path, target: Path) -> int:
if source.is_dir():
total = sum(p.stat().st_size for p in source.rglob("*") if p.is_file())
if total > self.policy.max_copy_bytes:
raise ValueError("copy_too_large")
if target.exists():
shutil.rmtree(target)
shutil.copytree(source, target)
return total
size = source.stat().st_size
if size > self.policy.max_copy_bytes:
raise ValueError("copy_too_large")
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
return size
def _copy_tree_contents(self, source: Path, target: Path) -> None:
for item in source.iterdir():
dest = target / item.name
if item.is_dir():
shutil.copytree(item, dest)
else:
shutil.copy2(item, dest)
def _tree_bytes(self, root: Path) -> int:
return sum(p.stat().st_size for p in root.rglob("*") if p.is_file())
def _env_path(self, name: str) -> Path:
return _resolve_inside(self.envs_dir, name)
def _snapshot_path(self, name: str) -> Path:
return _resolve_inside(self.snapshots_dir, name)
def _env_status_path(self, name: str) -> Path:
return self._env_path(name) / ".sandbox_status.json"
def _write_env_status(self, name: str, status: str) -> None:
self._env_status_path(name).write_text(
json.dumps({"name": name, "status": status, "updated_at": time.time()}, ensure_ascii=False, sort_keys=True),
encoding="utf-8",
)
def _read_env_status(self, name: str) -> str:
path = self._env_status_path(name)
if not path.exists():
return "running"
try:
return str(json.loads(path.read_text(encoding="utf-8")).get("status", "running"))
except json.JSONDecodeError:
return "unknown"
def _job_meta_path(self, job_id: str) -> Path:
return self.jobs_dir / _safe_rel(job_id) / "meta.json"
def _write_job_meta(self, job_id: str, meta: dict) -> None:
meta["job_id"] = job_id
path = self._job_meta_path(job_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(meta, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
def _read_job_meta(self, job_id: str) -> dict:
path = self._job_meta_path(job_id)
return json.loads(path.read_text(encoding="utf-8")) if path.exists() else {}
def _latest_snapshot_for_env(self, name: str) -> str | None:
candidates = []
for snap in self.snapshots_dir.iterdir():
if snap.is_dir() and snap.name.startswith(f"{name}-"):
candidates.append((snap.stat().st_mtime, snap.name))
if not candidates:
return None
return max(candidates)[1]
def _record(self, action: str, ok: bool, env: str, result: Any = None, error: str | None = None, details: dict | None = None) -> dict:
payload = {
"timestamp": time.time(),
"action": action,
"ok": ok,
"env": env,
"result": result,
"error": error,
"details": details or {},
"policy": asdict(self.policy),
"resources": self.resource_report() if self.envs_dir.exists() and self.snapshots_dir.exists() else {},
}
with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f:
f.write(json.dumps(payload, ensure_ascii=False, sort_keys=True) + "\n")
return payload

Xet Storage Details

Size:
25.4 kB
·
Xet hash:
b6a3a6b1ad65bf1eba4ccec64adced8369bbfef3d1c101b3ec1dc46174d3e745

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.