openskynet / src /agents /sandbox /fs-bridge-mutation-helper.ts
Darochin's picture
Mirror OpenSkyNet workspace snapshot from Git HEAD
fc93158 verified
import { PATH_ALIAS_POLICIES } from "../../infra/path-alias-guards.js";
import type {
PathSafetyCheck,
PinnedSandboxDirectoryEntry,
PinnedSandboxEntry,
} from "./fs-bridge-path-safety.js";
import type { SandboxFsCommandPlan } from "./fs-bridge-shell-command-plans.js";
export const SANDBOX_PINNED_MUTATION_PYTHON = [
"import errno",
"import os",
"import secrets",
"import stat",
"import sys",
"",
"operation = sys.argv[1]",
"",
"DIR_FLAGS = os.O_RDONLY",
"if hasattr(os, 'O_DIRECTORY'):",
" DIR_FLAGS |= os.O_DIRECTORY",
"if hasattr(os, 'O_NOFOLLOW'):",
" DIR_FLAGS |= os.O_NOFOLLOW",
"",
"READ_FLAGS = os.O_RDONLY",
"if hasattr(os, 'O_NOFOLLOW'):",
" READ_FLAGS |= os.O_NOFOLLOW",
"",
"WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL",
"if hasattr(os, 'O_NOFOLLOW'):",
" WRITE_FLAGS |= os.O_NOFOLLOW",
"",
"def split_relative(path_value):",
" segments = []",
" for segment in path_value.split('/'):",
" if not segment or segment == '.':",
" continue",
" if segment == '..':",
" raise OSError(errno.EPERM, 'path traversal is not allowed', segment)",
" segments.append(segment)",
" return segments",
"",
"def open_dir(path_value, dir_fd=None):",
" return os.open(path_value, DIR_FLAGS, dir_fd=dir_fd)",
"",
"def walk_dir(root_fd, rel_path, mkdir_enabled):",
" current_fd = os.dup(root_fd)",
" try:",
" for segment in split_relative(rel_path):",
" try:",
" next_fd = open_dir(segment, dir_fd=current_fd)",
" except FileNotFoundError:",
" if not mkdir_enabled:",
" raise",
" os.mkdir(segment, 0o777, dir_fd=current_fd)",
" next_fd = open_dir(segment, dir_fd=current_fd)",
" os.close(current_fd)",
" current_fd = next_fd",
" return current_fd",
" except Exception:",
" os.close(current_fd)",
" raise",
"",
"def create_temp_file(parent_fd, basename):",
" prefix = '.openclaw-write-' + basename + '.'",
" for _ in range(128):",
" candidate = prefix + secrets.token_hex(6)",
" try:",
" fd = os.open(candidate, WRITE_FLAGS, 0o600, dir_fd=parent_fd)",
" return candidate, fd",
" except FileExistsError:",
" continue",
" raise RuntimeError('failed to allocate sandbox temp file')",
"",
"def create_temp_dir(parent_fd, basename, mode):",
" prefix = '.openclaw-move-' + basename + '.'",
" for _ in range(128):",
" candidate = prefix + secrets.token_hex(6)",
" try:",
" os.mkdir(candidate, mode, dir_fd=parent_fd)",
" return candidate",
" except FileExistsError:",
" continue",
" raise RuntimeError('failed to allocate sandbox temp directory')",
"",
"def write_atomic(parent_fd, basename, stdin_buffer):",
" temp_fd = None",
" temp_name = None",
" try:",
" temp_name, temp_fd = create_temp_file(parent_fd, basename)",
" while True:",
" chunk = stdin_buffer.read(65536)",
" if not chunk:",
" break",
" os.write(temp_fd, chunk)",
" os.fsync(temp_fd)",
" os.close(temp_fd)",
" temp_fd = None",
" os.replace(temp_name, basename, src_dir_fd=parent_fd, dst_dir_fd=parent_fd)",
" temp_name = None",
" os.fsync(parent_fd)",
" finally:",
" if temp_fd is not None:",
" os.close(temp_fd)",
" if temp_name is not None:",
" try:",
" os.unlink(temp_name, dir_fd=parent_fd)",
" except FileNotFoundError:",
" pass",
"",
"def remove_tree(parent_fd, basename):",
" entry_stat = os.lstat(basename, dir_fd=parent_fd)",
" if not stat.S_ISDIR(entry_stat.st_mode) or stat.S_ISLNK(entry_stat.st_mode):",
" os.unlink(basename, dir_fd=parent_fd)",
" return",
" dir_fd = open_dir(basename, dir_fd=parent_fd)",
" try:",
" for child in os.listdir(dir_fd):",
" remove_tree(dir_fd, child)",
" finally:",
" os.close(dir_fd)",
" os.rmdir(basename, dir_fd=parent_fd)",
"",
"def move_entry(src_parent_fd, src_basename, dst_parent_fd, dst_basename):",
" try:",
" os.rename(src_basename, dst_basename, src_dir_fd=src_parent_fd, dst_dir_fd=dst_parent_fd)",
" os.fsync(dst_parent_fd)",
" os.fsync(src_parent_fd)",
" return",
" except OSError as err:",
" if err.errno != errno.EXDEV:",
" raise",
" src_stat = os.lstat(src_basename, dir_fd=src_parent_fd)",
" if stat.S_ISDIR(src_stat.st_mode) and not stat.S_ISLNK(src_stat.st_mode):",
" temp_dir_name = create_temp_dir(dst_parent_fd, dst_basename, stat.S_IMODE(src_stat.st_mode) or 0o755)",
" temp_dir_fd = open_dir(temp_dir_name, dir_fd=dst_parent_fd)",
" src_dir_fd = open_dir(src_basename, dir_fd=src_parent_fd)",
" try:",
" for child in os.listdir(src_dir_fd):",
" move_entry(src_dir_fd, child, temp_dir_fd, child)",
" finally:",
" os.close(src_dir_fd)",
" os.close(temp_dir_fd)",
" os.rename(temp_dir_name, dst_basename, src_dir_fd=dst_parent_fd, dst_dir_fd=dst_parent_fd)",
" os.rmdir(src_basename, dir_fd=src_parent_fd)",
" os.fsync(dst_parent_fd)",
" os.fsync(src_parent_fd)",
" return",
" if stat.S_ISLNK(src_stat.st_mode):",
" link_target = os.readlink(src_basename, dir_fd=src_parent_fd)",
" try:",
" os.unlink(dst_basename, dir_fd=dst_parent_fd)",
" except FileNotFoundError:",
" pass",
" os.symlink(link_target, dst_basename, dir_fd=dst_parent_fd)",
" os.unlink(src_basename, dir_fd=src_parent_fd)",
" os.fsync(dst_parent_fd)",
" os.fsync(src_parent_fd)",
" return",
" src_fd = os.open(src_basename, READ_FLAGS, dir_fd=src_parent_fd)",
" temp_fd = None",
" temp_name = None",
" try:",
" temp_name, temp_fd = create_temp_file(dst_parent_fd, dst_basename)",
" while True:",
" chunk = os.read(src_fd, 65536)",
" if not chunk:",
" break",
" os.write(temp_fd, chunk)",
" try:",
" os.fchmod(temp_fd, stat.S_IMODE(src_stat.st_mode))",
" except AttributeError:",
" pass",
" os.fsync(temp_fd)",
" os.close(temp_fd)",
" temp_fd = None",
" os.replace(temp_name, dst_basename, src_dir_fd=dst_parent_fd, dst_dir_fd=dst_parent_fd)",
" temp_name = None",
" os.unlink(src_basename, dir_fd=src_parent_fd)",
" os.fsync(dst_parent_fd)",
" os.fsync(src_parent_fd)",
" finally:",
" if temp_fd is not None:",
" os.close(temp_fd)",
" if temp_name is not None:",
" try:",
" os.unlink(temp_name, dir_fd=dst_parent_fd)",
" except FileNotFoundError:",
" pass",
" os.close(src_fd)",
"",
"if operation == 'write':",
" root_fd = open_dir(sys.argv[2])",
" parent_fd = None",
" try:",
" parent_fd = walk_dir(root_fd, sys.argv[3], sys.argv[5] == '1')",
" write_atomic(parent_fd, sys.argv[4], sys.stdin.buffer)",
" finally:",
" if parent_fd is not None:",
" os.close(parent_fd)",
" os.close(root_fd)",
"elif operation == 'mkdirp':",
" root_fd = open_dir(sys.argv[2])",
" target_fd = None",
" try:",
" target_fd = walk_dir(root_fd, sys.argv[3], True)",
" os.fsync(target_fd)",
" finally:",
" if target_fd is not None:",
" os.close(target_fd)",
" os.close(root_fd)",
"elif operation == 'remove':",
" root_fd = open_dir(sys.argv[2])",
" parent_fd = None",
" try:",
" parent_fd = walk_dir(root_fd, sys.argv[3], False)",
" try:",
" if sys.argv[5] == '1':",
" remove_tree(parent_fd, sys.argv[4])",
" else:",
" entry_stat = os.lstat(sys.argv[4], dir_fd=parent_fd)",
" if stat.S_ISDIR(entry_stat.st_mode) and not stat.S_ISLNK(entry_stat.st_mode):",
" os.rmdir(sys.argv[4], dir_fd=parent_fd)",
" else:",
" os.unlink(sys.argv[4], dir_fd=parent_fd)",
" os.fsync(parent_fd)",
" except FileNotFoundError:",
" if sys.argv[6] != '1':",
" raise",
" finally:",
" if parent_fd is not None:",
" os.close(parent_fd)",
" os.close(root_fd)",
"elif operation == 'rename':",
" src_root_fd = open_dir(sys.argv[2])",
" dst_root_fd = open_dir(sys.argv[5])",
" src_parent_fd = None",
" dst_parent_fd = None",
" try:",
" src_parent_fd = walk_dir(src_root_fd, sys.argv[3], False)",
" dst_parent_fd = walk_dir(dst_root_fd, sys.argv[6], sys.argv[8] == '1')",
" move_entry(src_parent_fd, sys.argv[4], dst_parent_fd, sys.argv[7])",
" finally:",
" if src_parent_fd is not None:",
" os.close(src_parent_fd)",
" if dst_parent_fd is not None:",
" os.close(dst_parent_fd)",
" os.close(src_root_fd)",
" os.close(dst_root_fd)",
"else:",
" raise RuntimeError('unknown sandbox mutation operation: ' + operation)",
].join("\n");
function buildPinnedMutationPlan(params: {
args: string[];
checks: PathSafetyCheck[];
}): SandboxFsCommandPlan {
return {
checks: params.checks,
recheckBeforeCommand: true,
// Feed the helper source over fd 3 so stdin stays available for write payload bytes.
script: [
"set -eu",
"python3 /dev/fd/3 \"$@\" 3<<'PY'",
SANDBOX_PINNED_MUTATION_PYTHON,
"PY",
].join("\n"),
args: params.args,
};
}
export function buildPinnedWritePlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxEntry;
mkdir: boolean;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [params.check],
args: [
"write",
params.pinned.mountRootPath,
params.pinned.relativeParentPath,
params.pinned.basename,
params.mkdir ? "1" : "0",
],
});
}
export function buildPinnedMkdirpPlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxDirectoryEntry;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [params.check],
args: ["mkdirp", params.pinned.mountRootPath, params.pinned.relativePath],
});
}
export function buildPinnedRemovePlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxEntry;
recursive?: boolean;
force?: boolean;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [
{
target: params.check.target,
options: {
...params.check.options,
aliasPolicy: PATH_ALIAS_POLICIES.unlinkTarget,
},
},
],
args: [
"remove",
params.pinned.mountRootPath,
params.pinned.relativeParentPath,
params.pinned.basename,
params.recursive ? "1" : "0",
params.force === false ? "0" : "1",
],
});
}
export function buildPinnedRenamePlan(params: {
fromCheck: PathSafetyCheck;
toCheck: PathSafetyCheck;
from: PinnedSandboxEntry;
to: PinnedSandboxEntry;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [
{
target: params.fromCheck.target,
options: {
...params.fromCheck.options,
aliasPolicy: PATH_ALIAS_POLICIES.unlinkTarget,
},
},
params.toCheck,
],
args: [
"rename",
params.from.mountRootPath,
params.from.relativeParentPath,
params.from.basename,
params.to.mountRootPath,
params.to.relativeParentPath,
params.to.basename,
"1",
],
});
}