Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-colab-handoff /bundle /model /sandbox_environment.py
| """Isolated filesystem sandbox environments for commands, builds, and tests.""" | |
| from __future__ import annotations | |
| from dataclasses import asdict, dataclass | |
| import base64 | |
| import hashlib | |
| import json | |
| import os | |
| from pathlib import Path | |
| import shutil | |
| import signal | |
| import subprocess | |
| import time | |
| from typing import Any | |
| from model.sandbox_rl import ALLOWED_CMD | |
| BASE_LINUX_PACKAGE_COMMANDS = { | |
| "bzip2", | |
| "bunzip2", | |
| "bzcat", | |
| "dig", | |
| "find", | |
| "git", | |
| "gzip", | |
| "gunzip", | |
| "host", | |
| "nslookup", | |
| "openssl", | |
| "ping", | |
| "ps", | |
| "tar", | |
| "unzip", | |
| "which", | |
| "whois", | |
| "zstd", | |
| "unzstd", | |
| "zstdcat", | |
| } | |
| def _sha256_bytes(data: bytes) -> str: | |
| return hashlib.sha256(data).hexdigest() | |
| def _safe_rel(path: str | Path) -> Path: | |
| rel = Path(path) | |
| if rel.is_absolute() or ".." in rel.parts: | |
| raise ValueError("path_escape") | |
| return rel | |
| def _resolve_inside(root: Path, rel: str | Path) -> Path: | |
| target = (root / _safe_rel(rel)).resolve() | |
| if root != target and root not in target.parents: | |
| raise ValueError("path_escape") | |
| return target | |
| def _manifest(root: Path, max_files: int = 10_000) -> dict: | |
| files: list[dict] = [] | |
| for path in sorted(p for p in root.rglob("*") if p.is_file()): | |
| if len(files) >= max_files: | |
| break | |
| data = path.read_bytes() | |
| files.append( | |
| { | |
| "path": path.relative_to(root).as_posix(), | |
| "bytes": len(data), | |
| "sha256": _sha256_bytes(data), | |
| } | |
| ) | |
| digest = _sha256_bytes(json.dumps(files, sort_keys=True, separators=(",", ":")).encode("utf-8")) | |
| return {"file_count": len(files), "sha256": digest, "truncated": len(files) >= max_files, "files": files} | |
| class SandboxEnvironmentPolicy: | |
| command_allowlist: tuple[str, ...] = tuple(sorted(ALLOWED_CMD | BASE_LINUX_PACKAGE_COMMANDS | {"pytest"})) | |
| timeout_s: float = 30.0 | |
| max_copy_bytes: int = 64_000_000 | |
| max_snapshot_files: int = 10_000 | |
| provisioned_memory_mb: int = 1024 | |
| active_cpu: bool = True | |
| active_gpu: bool = False | |
| max_creations: int = 32 | |
| max_total_transfer_bytes: int = 1_000_000_000 | |
| max_storage_bytes: int = 2_000_000_000 | |
| max_runtime_duration_s: float = 300.0 | |
| max_concurrent_sandboxes: int = 4 | |
| class SandboxEnvironmentManager: | |
| """Manage reusable isolated working directories with audited operations.""" | |
| def __init__(self, root: str | Path, policy: SandboxEnvironmentPolicy | None = None): | |
| self.root = Path(root).resolve() | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| self.policy = policy or SandboxEnvironmentPolicy() | |
| self.ledger_path = self.root / "sandbox_env_ledger.jsonl" | |
| self.envs_dir = self.root / "envs" | |
| self.snapshots_dir = self.root / "snapshots" | |
| self.jobs_dir = self.root / "jobs" | |
| self.envs_dir.mkdir(exist_ok=True) | |
| self.snapshots_dir.mkdir(exist_ok=True) | |
| self.jobs_dir.mkdir(exist_ok=True) | |
| def create(self, name: str, from_snapshot: str | None = None) -> dict: | |
| quota = self.resource_report() | |
| if quota["sandbox_creations"] >= self.policy.max_creations: | |
| return self._record("create", False, name, error="sandbox_creation_limit") | |
| if quota["concurrent_sandboxes"] >= self.policy.max_concurrent_sandboxes: | |
| return self._record("create", False, name, error="concurrent_sandbox_limit") | |
| env = self._env_path(name) | |
| if env.exists(): | |
| return self._record("create", False, name, error="env_exists") | |
| env.mkdir(parents=True) | |
| if from_snapshot: | |
| snap = self._snapshot_path(from_snapshot) | |
| if not snap.exists(): | |
| return self._record("create", False, name, error="snapshot_not_found") | |
| self._copy_tree_contents(snap / "files", env) | |
| self._write_env_status(name, "running") | |
| return self._record("create", True, name, result={"env_path": str(env), "manifest": _manifest(env)}) | |
| def run(self, name: str, argv: list[str], timeout_s: float | None = None) -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("run", False, name, error="env_not_found") | |
| requested_timeout = timeout_s or self.policy.timeout_s | |
| if requested_timeout > self.policy.max_runtime_duration_s: | |
| return self._record("run", False, name, error="runtime_duration_limit") | |
| if not self.policy.active_cpu: | |
| return self._record("run", False, name, error="cpu_disabled") | |
| if not argv: | |
| return self._record("run", False, name, error="empty_command") | |
| executable = Path(argv[0]).name.lower() | |
| if executable not in self.policy.command_allowlist: | |
| return self._record("run", False, name, error="command_not_allowlisted", details={"argv": argv}) | |
| before = _manifest(env, self.policy.max_snapshot_files) | |
| started = time.time() | |
| if executable == "echo": | |
| result = { | |
| "returncode": 0, | |
| "stdout": " ".join(argv[1:]) + "\n", | |
| "stderr": "", | |
| "elapsed_ms": (time.time() - started) * 1000.0, | |
| "manifest_before": before, | |
| "manifest_after": _manifest(env, self.policy.max_snapshot_files), | |
| "builtin": True, | |
| } | |
| return self._record("run", True, name, result=result) | |
| try: | |
| proc = subprocess.run( | |
| argv, | |
| cwd=env, | |
| capture_output=True, | |
| text=True, | |
| shell=False, | |
| timeout=requested_timeout, | |
| check=False, | |
| ) | |
| result = { | |
| "returncode": proc.returncode, | |
| "stdout": proc.stdout, | |
| "stderr": proc.stderr, | |
| "elapsed_ms": (time.time() - started) * 1000.0, | |
| "manifest_before": before, | |
| "manifest_after": _manifest(env, self.policy.max_snapshot_files), | |
| } | |
| return self._record("run", proc.returncode == 0, name, result=result) | |
| except subprocess.TimeoutExpired as exc: | |
| return self._record( | |
| "run", | |
| False, | |
| name, | |
| error="timeout", | |
| details={"elapsed_ms": (time.time() - started) * 1000.0, "stdout": exc.stdout, "stderr": exc.stderr}, | |
| ) | |
| def copy_in(self, name: str, src: str | Path, dest: str | Path) -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("copy_in", False, name, error="env_not_found") | |
| source = Path(src).resolve() | |
| if not source.exists(): | |
| return self._record("copy_in", False, name, error="source_not_found") | |
| target = _resolve_inside(env, dest) | |
| try: | |
| copied = self._copy_path(source, target) | |
| except ValueError as exc: | |
| return self._record("copy_in", False, name, error=str(exc)) | |
| if self.resource_report()["sandbox_data_transfer_bytes"] + copied > self.policy.max_total_transfer_bytes: | |
| target.unlink(missing_ok=True) if target.is_file() else shutil.rmtree(target, ignore_errors=True) | |
| return self._record("copy_in", False, name, error="data_transfer_limit") | |
| if self.resource_report()["sandbox_storage_bytes"] > self.policy.max_storage_bytes: | |
| target.unlink(missing_ok=True) if target.is_file() else shutil.rmtree(target, ignore_errors=True) | |
| return self._record("copy_in", False, name, error="storage_limit") | |
| return self._record("copy_in", True, name, result={"bytes": copied, "dest": str(target.relative_to(env))}) | |
| def copy_out(self, name: str, src: str | Path, dest: str | Path) -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("copy_out", False, name, error="env_not_found") | |
| try: | |
| source = _resolve_inside(env, src) | |
| except ValueError as exc: | |
| return self._record("copy_out", False, name, error=str(exc)) | |
| if not source.exists(): | |
| return self._record("copy_out", False, name, error="source_not_found") | |
| target = Path(dest).resolve() | |
| copied = self._copy_path(source, target) | |
| if self.resource_report()["sandbox_data_transfer_bytes"] + copied > self.policy.max_total_transfer_bytes: | |
| target.unlink(missing_ok=True) if target.is_file() else shutil.rmtree(target, ignore_errors=True) | |
| return self._record("copy_out", False, name, error="data_transfer_limit") | |
| return self._record("copy_out", True, name, result={"bytes": copied, "dest": str(target)}) | |
| def file_put(self, name: str, path: str | Path, content: str, encoding: str = "utf-8") -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("file_put", False, name, error="env_not_found") | |
| try: | |
| target = _resolve_inside(env, path) | |
| except ValueError as exc: | |
| return self._record("file_put", False, name, error=str(exc)) | |
| if encoding == "base64": | |
| data = base64.b64decode(content.encode("ascii")) | |
| elif encoding == "utf-8": | |
| data = content.encode("utf-8") | |
| else: | |
| return self._record("file_put", False, name, error="unsupported_encoding") | |
| if len(data) > self.policy.max_copy_bytes: | |
| return self._record("file_put", False, name, error="copy_too_large") | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| target.write_bytes(data) | |
| return self._record("file_put", True, name, result={"path": str(target.relative_to(env)), "bytes": len(data)}) | |
| def file_get(self, name: str, path: str | Path, encoding: str = "utf-8") -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("file_get", False, name, error="env_not_found") | |
| try: | |
| source = _resolve_inside(env, path) | |
| except ValueError as exc: | |
| return self._record("file_get", False, name, error=str(exc)) | |
| if not source.exists() or not source.is_file(): | |
| return self._record("file_get", False, name, error="source_not_found") | |
| data = source.read_bytes() | |
| if len(data) > self.policy.max_copy_bytes: | |
| return self._record("file_get", False, name, error="copy_too_large") | |
| if encoding == "base64": | |
| content = base64.b64encode(data).decode("ascii") | |
| elif encoding == "utf-8": | |
| content = data.decode("utf-8") | |
| else: | |
| return self._record("file_get", False, name, error="unsupported_encoding") | |
| return self._record( | |
| "file_get", | |
| True, | |
| name, | |
| result={"path": str(source.relative_to(env)), "bytes": len(data), "encoding": encoding, "content": content}, | |
| ) | |
| def run_detached(self, name: str, argv: list[str], timeout_s: float | None = None) -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("run_detached", False, name, error="env_not_found") | |
| requested_timeout = timeout_s or self.policy.timeout_s | |
| if requested_timeout > self.policy.max_runtime_duration_s: | |
| return self._record("run_detached", False, name, error="runtime_duration_limit") | |
| if not argv: | |
| return self._record("run_detached", False, name, error="empty_command") | |
| executable = Path(argv[0]).name.lower() | |
| if executable not in self.policy.command_allowlist: | |
| return self._record("run_detached", False, name, error="command_not_allowlisted", details={"argv": argv}) | |
| job_id = f"job-{int(time.time() * 1000)}-{hashlib.sha1(json.dumps(argv).encode('utf-8')).hexdigest()[:8]}" | |
| job_dir = self.jobs_dir / job_id | |
| job_dir.mkdir() | |
| stdout_path = job_dir / "stdout.log" | |
| stderr_path = job_dir / "stderr.log" | |
| before = _manifest(env, self.policy.max_snapshot_files) | |
| if executable == "echo": | |
| stdout_path.write_text(" ".join(argv[1:]) + "\n", encoding="utf-8") | |
| stderr_path.write_text("", encoding="utf-8") | |
| meta = { | |
| "job_id": job_id, | |
| "env": name, | |
| "argv": argv, | |
| "pid": None, | |
| "status": "exited", | |
| "returncode": 0, | |
| "started_at": time.time(), | |
| "ended_at": time.time(), | |
| "stdout_path": str(stdout_path), | |
| "stderr_path": str(stderr_path), | |
| "manifest_before": before, | |
| "manifest_after": _manifest(env, self.policy.max_snapshot_files), | |
| "builtin": True, | |
| } | |
| self._write_job_meta(job_id, meta) | |
| return self._record("run_detached", True, name, result=meta) | |
| stdout_f = stdout_path.open("wb") | |
| stderr_f = stderr_path.open("wb") | |
| proc = subprocess.Popen(argv, cwd=env, stdout=stdout_f, stderr=stderr_f, shell=False) | |
| stdout_f.close() | |
| stderr_f.close() | |
| meta = { | |
| "job_id": job_id, | |
| "env": name, | |
| "argv": argv, | |
| "pid": proc.pid, | |
| "status": "running", | |
| "returncode": None, | |
| "started_at": time.time(), | |
| "ended_at": None, | |
| "timeout_s": requested_timeout, | |
| "stdout_path": str(stdout_path), | |
| "stderr_path": str(stderr_path), | |
| "manifest_before": before, | |
| } | |
| self._write_job_meta(job_id, meta) | |
| return self._record("run_detached", True, name, result=meta) | |
| def job_status(self, job_id: str) -> dict: | |
| meta = self._read_job_meta(job_id) | |
| if not meta: | |
| return self._record("job_status", False, job_id, error="job_not_found") | |
| if meta.get("status") == "running" and meta.get("pid"): | |
| try: | |
| os.kill(int(meta["pid"]), 0) | |
| except OSError: | |
| meta["status"] = "exited" | |
| meta["ended_at"] = time.time() | |
| self._write_job_meta(job_id, meta) | |
| return self._record("job_status", True, str(meta.get("env", job_id)), result=meta) | |
| def job_stream(self, job_id: str, stream: str = "stdout", offset: int = 0, max_bytes: int = 64_000) -> dict: | |
| meta = self._read_job_meta(job_id) | |
| if not meta: | |
| return self._record("job_stream", False, job_id, error="job_not_found") | |
| path = Path(meta["stderr_path"] if stream == "stderr" else meta["stdout_path"]) | |
| data = path.read_bytes() if path.exists() else b"" | |
| chunk = data[offset : offset + max_bytes] | |
| result = { | |
| "job_id": job_id, | |
| "stream": stream, | |
| "offset": offset, | |
| "next_offset": offset + len(chunk), | |
| "content": chunk.decode("utf-8", errors="replace"), | |
| "eof": offset + len(chunk) >= len(data), | |
| } | |
| return self._record("job_stream", True, str(meta.get("env", job_id)), result=result) | |
| def stop(self, name: str, snapshot: bool = True) -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("stop", False, name, error="env_not_found") | |
| stopped_jobs: list[str] = [] | |
| for meta_path in self.jobs_dir.glob("*/meta.json"): | |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) | |
| if meta.get("env") == name and meta.get("status") == "running" and meta.get("pid"): | |
| try: | |
| os.kill(int(meta["pid"]), signal.SIGTERM) | |
| except OSError: | |
| pass | |
| meta["status"] = "stopped" | |
| meta["ended_at"] = time.time() | |
| meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| stopped_jobs.append(str(meta.get("job_id"))) | |
| snapshot_result = None | |
| if snapshot: | |
| snapshot_result = self.save_snapshot(name, f"{name}-auto-stop") | |
| self._write_env_status(name, "stopped") | |
| return self._record("stop", True, name, result={"stopped_jobs": stopped_jobs, "auto_snapshot": snapshot_result}) | |
| def fork(self, source_name: str, child_name: str) -> dict: | |
| latest = self._latest_snapshot_for_env(source_name) | |
| if latest is None: | |
| snap_result = self.save_snapshot(source_name, f"{source_name}-fork-{int(time.time())}") | |
| if not snap_result.get("ok"): | |
| return self._record("fork", False, source_name, error="source_snapshot_failed", details=snap_result) | |
| latest = str((snap_result.get("result") or {}).get("snapshot")) | |
| created = self.create(child_name, from_snapshot=latest) | |
| return self._record("fork", created.get("ok") is True, source_name, result={"child": child_name, "snapshot": latest, "create": created}) | |
| def dashboard(self) -> dict: | |
| env_rows = [] | |
| for env in sorted(p for p in self.envs_dir.iterdir() if p.is_dir()): | |
| status = self._read_env_status(env.name) | |
| env_rows.append({"name": env.name, "path": str(env), "status": status, "manifest": _manifest(env, self.policy.max_snapshot_files)}) | |
| job_rows = [] | |
| for meta_path in sorted(self.jobs_dir.glob("*/meta.json")): | |
| job_rows.append(json.loads(meta_path.read_text(encoding="utf-8"))) | |
| snapshots = [] | |
| for snap in sorted(p for p in self.snapshots_dir.iterdir() if p.is_dir()): | |
| manifest_path = snap / "manifest.json" | |
| manifest = json.loads(manifest_path.read_text(encoding="utf-8")) if manifest_path.exists() else _manifest(snap / "files") | |
| snapshots.append({"name": snap.name, "path": str(snap), "manifest": manifest}) | |
| return { | |
| "sandboxes": env_rows, | |
| "running": [row for row in env_rows if row["status"] == "running"], | |
| "stopped": [row for row in env_rows if row["status"] == "stopped"], | |
| "commands": job_rows, | |
| "snapshots": snapshots, | |
| "usage": self.resource_report(), | |
| } | |
| def save_snapshot(self, name: str, snapshot_name: str) -> dict: | |
| env = self._env_path(name) | |
| if not env.exists(): | |
| return self._record("snapshot", False, name, error="env_not_found") | |
| snap = self._snapshot_path(snapshot_name) | |
| if snap.exists(): | |
| shutil.rmtree(snap) | |
| files_dir = snap / "files" | |
| files_dir.mkdir(parents=True) | |
| self._copy_tree_contents(env, files_dir) | |
| manifest = _manifest(files_dir, self.policy.max_snapshot_files) | |
| if self.resource_report()["sandbox_storage_bytes"] > self.policy.max_storage_bytes: | |
| shutil.rmtree(snap, ignore_errors=True) | |
| return self._record("snapshot", False, name, error="storage_limit") | |
| (snap / "manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True), encoding="utf-8") | |
| return self._record("snapshot", True, name, result={"snapshot": snapshot_name, "manifest": manifest}) | |
| def restore_snapshot(self, name: str, snapshot_name: str, replace: bool = True) -> dict: | |
| env = self._env_path(name) | |
| snap = self._snapshot_path(snapshot_name) | |
| if not snap.exists(): | |
| return self._record("restore", False, name, error="snapshot_not_found") | |
| if env.exists() and replace: | |
| shutil.rmtree(env) | |
| env.mkdir(parents=True, exist_ok=True) | |
| self._copy_tree_contents(snap / "files", env) | |
| self._write_env_status(name, "running") | |
| return self._record("restore", True, name, result={"env_path": str(env), "manifest": _manifest(env)}) | |
| def resource_report(self) -> dict: | |
| active_sandboxes = len([p for p in self.envs_dir.iterdir() if p.is_dir() and self._read_env_status(p.name) == "running"]) | |
| storage_bytes = self._tree_bytes(self.envs_dir) + self._tree_bytes(self.snapshots_dir) | |
| data_transfer_bytes = 0 | |
| sandbox_creations = 0 | |
| if self.ledger_path.exists(): | |
| for line in self.ledger_path.read_text(encoding="utf-8").splitlines(): | |
| try: | |
| row = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| if row.get("action") == "create" and row.get("ok") is True: | |
| sandbox_creations += 1 | |
| if row.get("action") in {"copy_in", "copy_out"} and row.get("ok") is True: | |
| data_transfer_bytes += int((row.get("result") or {}).get("bytes") or 0) | |
| return { | |
| "sandbox_provisioned_memory_mb": self.policy.provisioned_memory_mb, | |
| "sandbox_active_cpu": self.policy.active_cpu, | |
| "sandbox_active_gpu": self.policy.active_gpu, | |
| "sandbox_creations": sandbox_creations, | |
| "sandbox_data_transfer_bytes": data_transfer_bytes, | |
| "sandbox_storage_bytes": storage_bytes, | |
| "max_runtime_duration_s": self.policy.max_runtime_duration_s, | |
| "concurrent_sandboxes": active_sandboxes, | |
| "snapshot_storage_bytes": self._tree_bytes(self.snapshots_dir), | |
| "limits": { | |
| "max_creations": self.policy.max_creations, | |
| "max_total_transfer_bytes": self.policy.max_total_transfer_bytes, | |
| "max_storage_bytes": self.policy.max_storage_bytes, | |
| "max_concurrent_sandboxes": self.policy.max_concurrent_sandboxes, | |
| }, | |
| } | |
| def _copy_path(self, source: Path, target: Path) -> int: | |
| if source.is_dir(): | |
| total = sum(p.stat().st_size for p in source.rglob("*") if p.is_file()) | |
| if total > self.policy.max_copy_bytes: | |
| raise ValueError("copy_too_large") | |
| if target.exists(): | |
| shutil.rmtree(target) | |
| shutil.copytree(source, target) | |
| return total | |
| size = source.stat().st_size | |
| if size > self.policy.max_copy_bytes: | |
| raise ValueError("copy_too_large") | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(source, target) | |
| return size | |
| def _copy_tree_contents(self, source: Path, target: Path) -> None: | |
| for item in source.iterdir(): | |
| dest = target / item.name | |
| if item.is_dir(): | |
| shutil.copytree(item, dest) | |
| else: | |
| shutil.copy2(item, dest) | |
| def _tree_bytes(self, root: Path) -> int: | |
| return sum(p.stat().st_size for p in root.rglob("*") if p.is_file()) | |
| def _env_path(self, name: str) -> Path: | |
| return _resolve_inside(self.envs_dir, name) | |
| def _snapshot_path(self, name: str) -> Path: | |
| return _resolve_inside(self.snapshots_dir, name) | |
| def _env_status_path(self, name: str) -> Path: | |
| return self._env_path(name) / ".sandbox_status.json" | |
| def _write_env_status(self, name: str, status: str) -> None: | |
| self._env_status_path(name).write_text( | |
| json.dumps({"name": name, "status": status, "updated_at": time.time()}, ensure_ascii=False, sort_keys=True), | |
| encoding="utf-8", | |
| ) | |
| def _read_env_status(self, name: str) -> str: | |
| path = self._env_status_path(name) | |
| if not path.exists(): | |
| return "running" | |
| try: | |
| return str(json.loads(path.read_text(encoding="utf-8")).get("status", "running")) | |
| except json.JSONDecodeError: | |
| return "unknown" | |
| def _job_meta_path(self, job_id: str) -> Path: | |
| return self.jobs_dir / _safe_rel(job_id) / "meta.json" | |
| def _write_job_meta(self, job_id: str, meta: dict) -> None: | |
| meta["job_id"] = job_id | |
| path = self._job_meta_path(job_id) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(meta, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| def _read_job_meta(self, job_id: str) -> dict: | |
| path = self._job_meta_path(job_id) | |
| return json.loads(path.read_text(encoding="utf-8")) if path.exists() else {} | |
| def _latest_snapshot_for_env(self, name: str) -> str | None: | |
| candidates = [] | |
| for snap in self.snapshots_dir.iterdir(): | |
| if snap.is_dir() and snap.name.startswith(f"{name}-"): | |
| candidates.append((snap.stat().st_mtime, snap.name)) | |
| if not candidates: | |
| return None | |
| return max(candidates)[1] | |
| def _record(self, action: str, ok: bool, env: str, result: Any = None, error: str | None = None, details: dict | None = None) -> dict: | |
| payload = { | |
| "timestamp": time.time(), | |
| "action": action, | |
| "ok": ok, | |
| "env": env, | |
| "result": result, | |
| "error": error, | |
| "details": details or {}, | |
| "policy": asdict(self.policy), | |
| "resources": self.resource_report() if self.envs_dir.exists() and self.snapshots_dir.exists() else {}, | |
| } | |
| with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f: | |
| f.write(json.dumps(payload, ensure_ascii=False, sort_keys=True) + "\n") | |
| return payload | |
Xet Storage Details
- Size:
- 25.4 kB
- Xet hash:
- b6a3a6b1ad65bf1eba4ccec64adced8369bbfef3d1c101b3ec1dc46174d3e745
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.