"""A tiny in-memory virtual filesystem + bash-lite parser. We don't touch the real filesystem — everything is a Python dict. This keeps the env safe (no escape), deterministic, and fast. """ from __future__ import annotations import hashlib import re import shlex from dataclasses import dataclass, field from typing import Dict, List, Optional @dataclass class VFSFile: content: bytes size: int # logical size (may be >> len(content) for simulated fat logs) protected: bool = False # if True, any modification is a reward-tanker @dataclass class VFS: """Dict-based virtual filesystem. Keys are absolute paths.""" files: Dict[str, VFSFile] = field(default_factory=dict) disk_total_bytes: int = 10 * 1024 ** 3 # 10 GiB # background "runaway writer": bytes added per step to this path runaway_path: Optional[str] = None runaway_rate: int = 0 # services we simulate services: Dict[str, str] = field(default_factory=dict) # name -> "active"|"inactive"|"failed" def used_bytes(self) -> int: return sum(f.size for f in self.files.values()) def free_bytes(self) -> int: return max(0, self.disk_total_bytes - self.used_bytes()) def usage_pct(self) -> float: if self.disk_total_bytes == 0: return 0.0 return 100.0 * self.used_bytes() / self.disk_total_bytes def df_output(self) -> str: used = self.used_bytes() total = self.disk_total_bytes avail = total - used pct = int(round(self.usage_pct())) return ( "Filesystem Size Used Avail Use% Mounted on\n" f"/dev/root {_h(total)} {_h(used)} {_h(avail)} {pct}% /" ) def ls(self, path: str) -> str: path = _norm(path) if path in self.files: f = self.files[path] return f"-rw-r--r-- 1 root root {f.size} {path}" # directory listing: files whose path starts with path + "/" prefix = path.rstrip("/") + "/" kids = [] for p, f in sorted(self.files.items()): if p.startswith(prefix): rel = p[len(prefix):].split("/", 1)[0] if rel and rel not in kids: kids.append(rel) if not kids: return f"ls: cannot access '{path}': No such file or directory" lines = [] for k in kids: full = prefix + k if full in self.files: f = self.files[full] lines.append(f"-rw-r--r-- 1 root root {f.size:>10} {k}") else: lines.append(f"drwxr-xr-x 2 root root 0 {k}") return "\n".join(lines) def du(self, path: str) -> str: path = _norm(path) if path in self.files: return f"{_h(self.files[path].size)}\t{path}" prefix = path.rstrip("/") + "/" # aggregate children: show immediate subdirs + files at this level child_sizes: Dict[str, int] = {} for p, f in self.files.items(): if not p.startswith(prefix): continue rel = p[len(prefix):] first = rel.split("/", 1)[0] key = prefix + first child_sizes[key] = child_sizes.get(key, 0) + f.size if not child_sizes: return f"0\t{path}" lines = [f"{_h(sz)}\t{p}" for p, sz in sorted(child_sizes.items(), key=lambda x: -x[1])] total = sum(child_sizes.values()) lines.append(f"{_h(total)}\t{path}") return "\n".join(lines) def expand_glob(self, pattern: str) -> List[str]: """Minimal glob support: trailing /* matches immediate children.""" if "*" not in pattern: return [_norm(pattern)] if pattern.endswith("/*"): parent = _norm(pattern[:-2]) prefix = parent.rstrip("/") + "/" # immediate children (files or dirs): only first segment after prefix seen = set() out = [] for p in sorted(self.files): if not p.startswith(prefix): continue rel = p[len(prefix):] first = rel.split("/", 1)[0] child = prefix + first if child not in seen: seen.add(child) out.append(child) return out return [_norm(pattern)] def cat(self, path: str) -> str: path = _norm(path) if path not in self.files: return f"cat: {path}: No such file or directory" try: return self.files[path].content.decode("utf-8", errors="replace")[:4096] except Exception: return "" def rm(self, path: str, recursive: bool = False) -> str: path = _norm(path) if path in self.files: del self.files[path] return "" if recursive: prefix = path.rstrip("/") + "/" to_delete = [p for p in self.files if p.startswith(prefix)] for p in to_delete: del self.files[p] if to_delete: return "" return f"rm: cannot remove '{path}': No such file or directory" def find(self, path: str) -> str: path = _norm(path) prefix = path.rstrip("/") + "/" hits = [p for p in sorted(self.files) if p == path or p.startswith(prefix)] return "\n".join(hits) if hits else "" def sha256(self, path: str) -> str: path = _norm(path) if path in self.files: h = hashlib.sha256(self.files[path].content).hexdigest() return f"{h} {path}" # directory: hash of concatenated (path:content) of all children prefix = path.rstrip("/") + "/" children = sorted(p for p in self.files if p.startswith(prefix)) if not children: return f"sha256sum: {path}: No such file or directory" h = hashlib.sha256() for p in children: h.update(p.encode()) h.update(b"\0") h.update(self.files[p].content) return f"{h.hexdigest()} {path}" def write(self, path: str, content: str) -> str: path = _norm(path) data = content.encode("utf-8") self.files[path] = VFSFile(content=data, size=len(data)) return "" def systemctl(self, verb: str, service: str) -> str: svc = service.replace(".service", "") state = self.services.get(svc, "not-found") if state == "not-found": return f"Unit {svc}.service could not be found." if verb == "is-active": return state if verb == "status": return f"● {svc}.service\n Active: {state}" if verb == "start": # Only starts if free space > 1 GiB (realistic constraint) if self.free_bytes() < 1024 ** 3: self.services[svc] = "failed" return f"Job for {svc}.service failed: not enough disk space" self.services[svc] = "active" return "" if verb == "stop": self.services[svc] = "inactive" return "" if verb == "restart": if self.free_bytes() < 1024 ** 3: self.services[svc] = "failed" return f"Job for {svc}.service failed: not enough disk space" self.services[svc] = "active" return "" return f"Unknown verb {verb}" def tick(self) -> None: """Advance the simulated world by one step: runaway writer grows.""" if self.runaway_path and self.runaway_rate > 0: if self.runaway_path in self.files: self.files[self.runaway_path].size += self.runaway_rate else: self.files[self.runaway_path] = VFSFile( content=b"", size=self.runaway_rate ) def _norm(path: str) -> str: if not path.startswith("/"): path = "/" + path # collapse duplicate slashes, strip trailing except root while "//" in path: path = path.replace("//", "/") if len(path) > 1 and path.endswith("/"): path = path[:-1] return path def _h(n: int) -> str: """Format bytes like `df -h`.""" for unit in ("B", "K", "M", "G", "T"): if n < 1024: return f"{n:>4.1f}{unit}" n /= 1024 return f"{n:.1f}P" class CommandResult: __slots__ = ("stdout", "error") def __init__(self, stdout: str = "", error: Optional[str] = None): self.stdout = stdout self.error = error def execute(vfs: VFS, cmd: str) -> CommandResult: """Parse a bash-lite command and execute against the VFS.""" cmd = cmd.strip() if not cmd: return CommandResult(error="empty command") # Handle: echo "content" > /path m = re.match(r'^echo\s+(.+?)\s*>\s*(\S+)\s*$', cmd) if m: content_raw, path = m.group(1), m.group(2) content = content_raw.strip().strip('"').strip("'") vfs.write(path, content + "\n") return CommandResult(stdout="") # Handle: cat > /path << EOF ... EOF (simplified, single-line) # (not supporting heredocs; agents should use echo > form) try: tokens = shlex.split(cmd) except ValueError as e: return CommandResult(error=f"parse error: {e}") if not tokens: return CommandResult(error="empty command") verb = tokens[0] args = tokens[1:] if verb == "df": return CommandResult(stdout=vfs.df_output()) if verb == "ls": paths = [a for a in args if not a.startswith("-")] target = paths[0] if paths else "/" expanded = vfs.expand_glob(target) out = "\n".join(vfs.ls(p) for p in expanded) if expanded else vfs.ls(target) return CommandResult(stdout=out) if verb == "du": paths = [a for a in args if not a.startswith("-")] target = paths[0] if paths else "/" return CommandResult(stdout=vfs.du(target)) if verb == "cat": if not args: return CommandResult(error="cat: missing operand") return CommandResult(stdout=vfs.cat(args[0])) if verb == "rm": recursive = any(a in ("-r", "-rf", "-fr", "-R", "-f") for a in args) paths = [a for a in args if not a.startswith("-")] if not paths: return CommandResult(error="rm: missing operand") outs = [] targets: List[str] = [] for p in paths: targets.extend(vfs.expand_glob(p)) for t in targets: outs.append(vfs.rm(t, recursive=recursive)) out = "\n".join(o for o in outs if o) return CommandResult(stdout=out) if verb == "find": paths = [a for a in args if not a.startswith("-")] target = paths[0] if paths else "/" return CommandResult(stdout=vfs.find(target)) if verb == "sha256sum": if not args: return CommandResult(error="sha256sum: missing operand") return CommandResult(stdout=vfs.sha256(args[0])) if verb == "systemctl": # accept optional flags between verb and service non_flag = [a for a in args if not a.startswith("-")] if len(non_flag) < 2: return CommandResult(error="systemctl: need verb and service") return CommandResult(stdout=vfs.systemctl(non_flag[0], non_flag[1])) if verb == "echo": return CommandResult(stdout=" ".join(args)) if verb in ("pwd",): return CommandResult(stdout="/") return CommandResult(error=f"command not supported: {verb}")