| |
| export const SANDBOX_PINNED_FS_MUTATION_PYTHON = String.raw`import os |
| import secrets |
| import subprocess |
| import sys |
| |
| operation = sys.argv[1] |
| |
| DIR_FLAGS = os.O_RDONLY |
| if hasattr(os, "O_DIRECTORY"): |
| DIR_FLAGS |= os.O_DIRECTORY |
| if hasattr(os, "O_NOFOLLOW"): |
| DIR_FLAGS |= os.O_NOFOLLOW |
| |
| WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL |
| if hasattr(os, "O_NOFOLLOW"): |
| WRITE_FLAGS |= os.O_NOFOLLOW |
| |
| |
| def open_dir(path, dir_fd=None): |
| return os.open(path, DIR_FLAGS, dir_fd=dir_fd) |
| |
| |
| def walk_parent(root_fd, rel_parent, mkdir_enabled): |
| current_fd = os.dup(root_fd) |
| try: |
| segments = [segment for segment in rel_parent.split("/") if segment and segment != "."] |
| for segment in segments: |
| if segment == "..": |
| raise OSError("path traversal is not allowed") |
| try: |
| next_fd = open_dir(segment, dir_fd=current_fd) |
| except FileNotFoundError: |
| if not mkdir_enabled: |
| raise |
| os.mkdir(segment, 0o777, dir_fd=current_fd) |
| next_fd = open_dir(segment, dir_fd=current_fd) |
| os.close(current_fd) |
| current_fd = next_fd |
| return current_fd |
| except Exception: |
| os.close(current_fd) |
| raise |
| |
| |
| def create_temp_file(parent_fd, basename): |
| prefix = ".openclaw-write-" + basename + "." |
| for _ in range(128): |
| candidate = prefix + secrets.token_hex(6) |
| try: |
| fd = os.open(candidate, WRITE_FLAGS, 0o600, dir_fd=parent_fd) |
| return candidate, fd |
| except FileExistsError: |
| continue |
| raise RuntimeError("failed to allocate sandbox temp file") |
| |
| |
| def fd_path(fd, basename=None): |
| base = f"/proc/self/fd/{fd}" |
| if basename is None: |
| return base |
| return f"{base}/{basename}" |
| |
| |
| def run_command(argv, pass_fds): |
| subprocess.run(argv, check=True, pass_fds=tuple(pass_fds)) |
| |
| |
| def write_stdin_to_fd(fd): |
| while True: |
| chunk = sys.stdin.buffer.read(65536) |
| if not chunk: |
| break |
| os.write(fd, chunk) |
| |
| |
| def run_write(args): |
| mount_root, relative_parent, basename, mkdir_enabled_raw = args |
| mkdir_enabled = mkdir_enabled_raw == "1" |
| root_fd = open_dir(mount_root) |
| parent_fd = None |
| temp_fd = None |
| temp_name = None |
| try: |
| parent_fd = walk_parent(root_fd, relative_parent, mkdir_enabled) |
| temp_name, temp_fd = create_temp_file(parent_fd, basename) |
| write_stdin_to_fd(temp_fd) |
| os.fsync(temp_fd) |
| os.close(temp_fd) |
| temp_fd = None |
| os.replace(temp_name, basename, src_dir_fd=parent_fd, dst_dir_fd=parent_fd) |
| os.fsync(parent_fd) |
| except Exception: |
| if temp_fd is not None: |
| os.close(temp_fd) |
| temp_fd = None |
| if temp_name is not None and parent_fd is not None: |
| try: |
| os.unlink(temp_name, dir_fd=parent_fd) |
| except FileNotFoundError: |
| pass |
| raise |
| finally: |
| if parent_fd is not None: |
| os.close(parent_fd) |
| os.close(root_fd) |
| |
| |
| def run_mkdirp(args): |
| mount_root, relative_parent, basename = args |
| root_fd = open_dir(mount_root) |
| parent_fd = None |
| try: |
| parent_fd = walk_parent(root_fd, relative_parent, True) |
| run_command(["mkdir", "-p", "--", fd_path(parent_fd, basename)], [parent_fd]) |
| os.fsync(parent_fd) |
| finally: |
| if parent_fd is not None: |
| os.close(parent_fd) |
| os.close(root_fd) |
| |
| |
| def run_remove(args): |
| mount_root, relative_parent, basename, recursive_raw, force_raw = args |
| root_fd = open_dir(mount_root) |
| parent_fd = None |
| try: |
| parent_fd = walk_parent(root_fd, relative_parent, False) |
| argv = ["rm"] |
| if force_raw == "1": |
| argv.append("-f") |
| if recursive_raw == "1": |
| argv.append("-r") |
| argv.extend(["--", fd_path(parent_fd, basename)]) |
| run_command(argv, [parent_fd]) |
| os.fsync(parent_fd) |
| finally: |
| if parent_fd is not None: |
| os.close(parent_fd) |
| os.close(root_fd) |
| |
| |
| def run_rename(args): |
| ( |
| from_mount_root, |
| from_relative_parent, |
| from_basename, |
| to_mount_root, |
| to_relative_parent, |
| to_basename, |
| ) = args |
| from_root_fd = open_dir(from_mount_root) |
| to_root_fd = open_dir(to_mount_root) |
| from_parent_fd = None |
| to_parent_fd = None |
| try: |
| from_parent_fd = walk_parent(from_root_fd, from_relative_parent, False) |
| to_parent_fd = walk_parent(to_root_fd, to_relative_parent, True) |
| run_command( |
| [ |
| "mv", |
| "--", |
| fd_path(from_parent_fd, from_basename), |
| fd_path(to_parent_fd, to_basename), |
| ], |
| [from_parent_fd, to_parent_fd], |
| ) |
| os.fsync(from_parent_fd) |
| if to_parent_fd != from_parent_fd: |
| os.fsync(to_parent_fd) |
| finally: |
| if from_parent_fd is not None: |
| os.close(from_parent_fd) |
| if to_parent_fd is not None: |
| os.close(to_parent_fd) |
| os.close(from_root_fd) |
| os.close(to_root_fd) |
| |
| |
| OPERATIONS = { |
| "write": run_write, |
| "mkdirp": run_mkdirp, |
| "remove": run_remove, |
| "rename": run_rename, |
| } |
| |
| if operation not in OPERATIONS: |
| raise RuntimeError(f"unknown sandbox fs mutation: {operation}") |
| |
| OPERATIONS[operation](sys.argv[2:])`; |
|
|