Spaces:
Sleeping
Sleeping
| """A tiny in-memory virtual filesystem + bash-lite parser. | |
| We don't touch the real filesystem — everything is a Python dict. | |
| This keeps the env safe (no escape), deterministic, and fast. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import re | |
| import shlex | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional | |
| class VFSFile: | |
| content: bytes | |
| size: int # logical size (may be >> len(content) for simulated fat logs) | |
| protected: bool = False # if True, any modification is a reward-tanker | |
| class VFS: | |
| """Dict-based virtual filesystem. Keys are absolute paths.""" | |
| files: Dict[str, VFSFile] = field(default_factory=dict) | |
| disk_total_bytes: int = 10 * 1024 ** 3 # 10 GiB | |
| # background "runaway writer": bytes added per step to this path | |
| runaway_path: Optional[str] = None | |
| runaway_rate: int = 0 | |
| # services we simulate | |
| services: Dict[str, str] = field(default_factory=dict) # name -> "active"|"inactive"|"failed" | |
| def used_bytes(self) -> int: | |
| return sum(f.size for f in self.files.values()) | |
| def free_bytes(self) -> int: | |
| return max(0, self.disk_total_bytes - self.used_bytes()) | |
| def usage_pct(self) -> float: | |
| if self.disk_total_bytes == 0: | |
| return 0.0 | |
| return 100.0 * self.used_bytes() / self.disk_total_bytes | |
| def df_output(self) -> str: | |
| used = self.used_bytes() | |
| total = self.disk_total_bytes | |
| avail = total - used | |
| pct = int(round(self.usage_pct())) | |
| return ( | |
| "Filesystem Size Used Avail Use% Mounted on\n" | |
| f"/dev/root {_h(total)} {_h(used)} {_h(avail)} {pct}% /" | |
| ) | |
| def ls(self, path: str) -> str: | |
| path = _norm(path) | |
| if path in self.files: | |
| f = self.files[path] | |
| return f"-rw-r--r-- 1 root root {f.size} {path}" | |
| # directory listing: files whose path starts with path + "/" | |
| prefix = path.rstrip("/") + "/" | |
| kids = [] | |
| for p, f in sorted(self.files.items()): | |
| if p.startswith(prefix): | |
| rel = p[len(prefix):].split("/", 1)[0] | |
| if rel and rel not in kids: | |
| kids.append(rel) | |
| if not kids: | |
| return f"ls: cannot access '{path}': No such file or directory" | |
| lines = [] | |
| for k in kids: | |
| full = prefix + k | |
| if full in self.files: | |
| f = self.files[full] | |
| lines.append(f"-rw-r--r-- 1 root root {f.size:>10} {k}") | |
| else: | |
| lines.append(f"drwxr-xr-x 2 root root 0 {k}") | |
| return "\n".join(lines) | |
| def du(self, path: str) -> str: | |
| path = _norm(path) | |
| if path in self.files: | |
| return f"{_h(self.files[path].size)}\t{path}" | |
| prefix = path.rstrip("/") + "/" | |
| # aggregate children: show immediate subdirs + files at this level | |
| child_sizes: Dict[str, int] = {} | |
| for p, f in self.files.items(): | |
| if not p.startswith(prefix): | |
| continue | |
| rel = p[len(prefix):] | |
| first = rel.split("/", 1)[0] | |
| key = prefix + first | |
| child_sizes[key] = child_sizes.get(key, 0) + f.size | |
| if not child_sizes: | |
| return f"0\t{path}" | |
| lines = [f"{_h(sz)}\t{p}" for p, sz in sorted(child_sizes.items(), key=lambda x: -x[1])] | |
| total = sum(child_sizes.values()) | |
| lines.append(f"{_h(total)}\t{path}") | |
| return "\n".join(lines) | |
| def expand_glob(self, pattern: str) -> List[str]: | |
| """Minimal glob support: trailing /* matches immediate children.""" | |
| if "*" not in pattern: | |
| return [_norm(pattern)] | |
| if pattern.endswith("/*"): | |
| parent = _norm(pattern[:-2]) | |
| prefix = parent.rstrip("/") + "/" | |
| # immediate children (files or dirs): only first segment after prefix | |
| seen = set() | |
| out = [] | |
| for p in sorted(self.files): | |
| if not p.startswith(prefix): | |
| continue | |
| rel = p[len(prefix):] | |
| first = rel.split("/", 1)[0] | |
| child = prefix + first | |
| if child not in seen: | |
| seen.add(child) | |
| out.append(child) | |
| return out | |
| return [_norm(pattern)] | |
| def cat(self, path: str) -> str: | |
| path = _norm(path) | |
| if path not in self.files: | |
| return f"cat: {path}: No such file or directory" | |
| try: | |
| return self.files[path].content.decode("utf-8", errors="replace")[:4096] | |
| except Exception: | |
| return "<binary>" | |
| def rm(self, path: str, recursive: bool = False) -> str: | |
| path = _norm(path) | |
| if path in self.files: | |
| del self.files[path] | |
| return "" | |
| if recursive: | |
| prefix = path.rstrip("/") + "/" | |
| to_delete = [p for p in self.files if p.startswith(prefix)] | |
| for p in to_delete: | |
| del self.files[p] | |
| if to_delete: | |
| return "" | |
| return f"rm: cannot remove '{path}': No such file or directory" | |
| def find(self, path: str) -> str: | |
| path = _norm(path) | |
| prefix = path.rstrip("/") + "/" | |
| hits = [p for p in sorted(self.files) if p == path or p.startswith(prefix)] | |
| return "\n".join(hits) if hits else "" | |
| def sha256(self, path: str) -> str: | |
| path = _norm(path) | |
| if path in self.files: | |
| h = hashlib.sha256(self.files[path].content).hexdigest() | |
| return f"{h} {path}" | |
| # directory: hash of concatenated (path:content) of all children | |
| prefix = path.rstrip("/") + "/" | |
| children = sorted(p for p in self.files if p.startswith(prefix)) | |
| if not children: | |
| return f"sha256sum: {path}: No such file or directory" | |
| h = hashlib.sha256() | |
| for p in children: | |
| h.update(p.encode()) | |
| h.update(b"\0") | |
| h.update(self.files[p].content) | |
| return f"{h.hexdigest()} {path}" | |
| def write(self, path: str, content: str) -> str: | |
| path = _norm(path) | |
| data = content.encode("utf-8") | |
| self.files[path] = VFSFile(content=data, size=len(data)) | |
| return "" | |
| def systemctl(self, verb: str, service: str) -> str: | |
| svc = service.replace(".service", "") | |
| state = self.services.get(svc, "not-found") | |
| if state == "not-found": | |
| return f"Unit {svc}.service could not be found." | |
| if verb == "is-active": | |
| return state | |
| if verb == "status": | |
| return f"● {svc}.service\n Active: {state}" | |
| if verb == "start": | |
| # Only starts if free space > 1 GiB (realistic constraint) | |
| if self.free_bytes() < 1024 ** 3: | |
| self.services[svc] = "failed" | |
| return f"Job for {svc}.service failed: not enough disk space" | |
| self.services[svc] = "active" | |
| return "" | |
| if verb == "stop": | |
| self.services[svc] = "inactive" | |
| return "" | |
| if verb == "restart": | |
| if self.free_bytes() < 1024 ** 3: | |
| self.services[svc] = "failed" | |
| return f"Job for {svc}.service failed: not enough disk space" | |
| self.services[svc] = "active" | |
| return "" | |
| return f"Unknown verb {verb}" | |
| def tick(self) -> None: | |
| """Advance the simulated world by one step: runaway writer grows.""" | |
| if self.runaway_path and self.runaway_rate > 0: | |
| if self.runaway_path in self.files: | |
| self.files[self.runaway_path].size += self.runaway_rate | |
| else: | |
| self.files[self.runaway_path] = VFSFile( | |
| content=b"", size=self.runaway_rate | |
| ) | |
| def _norm(path: str) -> str: | |
| if not path.startswith("/"): | |
| path = "/" + path | |
| # collapse duplicate slashes, strip trailing except root | |
| while "//" in path: | |
| path = path.replace("//", "/") | |
| if len(path) > 1 and path.endswith("/"): | |
| path = path[:-1] | |
| return path | |
| def _h(n: int) -> str: | |
| """Format bytes like `df -h`.""" | |
| for unit in ("B", "K", "M", "G", "T"): | |
| if n < 1024: | |
| return f"{n:>4.1f}{unit}" | |
| n /= 1024 | |
| return f"{n:.1f}P" | |
| class CommandResult: | |
| __slots__ = ("stdout", "error") | |
| def __init__(self, stdout: str = "", error: Optional[str] = None): | |
| self.stdout = stdout | |
| self.error = error | |
| def execute(vfs: VFS, cmd: str) -> CommandResult: | |
| """Parse a bash-lite command and execute against the VFS.""" | |
| cmd = cmd.strip() | |
| if not cmd: | |
| return CommandResult(error="empty command") | |
| # Handle: echo "content" > /path | |
| m = re.match(r'^echo\s+(.+?)\s*>\s*(\S+)\s*$', cmd) | |
| if m: | |
| content_raw, path = m.group(1), m.group(2) | |
| content = content_raw.strip().strip('"').strip("'") | |
| vfs.write(path, content + "\n") | |
| return CommandResult(stdout="") | |
| # Handle: cat > /path << EOF ... EOF (simplified, single-line) | |
| # (not supporting heredocs; agents should use echo > form) | |
| try: | |
| tokens = shlex.split(cmd) | |
| except ValueError as e: | |
| return CommandResult(error=f"parse error: {e}") | |
| if not tokens: | |
| return CommandResult(error="empty command") | |
| verb = tokens[0] | |
| args = tokens[1:] | |
| if verb == "df": | |
| return CommandResult(stdout=vfs.df_output()) | |
| if verb == "ls": | |
| paths = [a for a in args if not a.startswith("-")] | |
| target = paths[0] if paths else "/" | |
| expanded = vfs.expand_glob(target) | |
| out = "\n".join(vfs.ls(p) for p in expanded) if expanded else vfs.ls(target) | |
| return CommandResult(stdout=out) | |
| if verb == "du": | |
| paths = [a for a in args if not a.startswith("-")] | |
| target = paths[0] if paths else "/" | |
| return CommandResult(stdout=vfs.du(target)) | |
| if verb == "cat": | |
| if not args: | |
| return CommandResult(error="cat: missing operand") | |
| return CommandResult(stdout=vfs.cat(args[0])) | |
| if verb == "rm": | |
| recursive = any(a in ("-r", "-rf", "-fr", "-R", "-f") for a in args) | |
| paths = [a for a in args if not a.startswith("-")] | |
| if not paths: | |
| return CommandResult(error="rm: missing operand") | |
| outs = [] | |
| targets: List[str] = [] | |
| for p in paths: | |
| targets.extend(vfs.expand_glob(p)) | |
| for t in targets: | |
| outs.append(vfs.rm(t, recursive=recursive)) | |
| out = "\n".join(o for o in outs if o) | |
| return CommandResult(stdout=out) | |
| if verb == "find": | |
| paths = [a for a in args if not a.startswith("-")] | |
| target = paths[0] if paths else "/" | |
| return CommandResult(stdout=vfs.find(target)) | |
| if verb == "sha256sum": | |
| if not args: | |
| return CommandResult(error="sha256sum: missing operand") | |
| return CommandResult(stdout=vfs.sha256(args[0])) | |
| if verb == "systemctl": | |
| # accept optional flags between verb and service | |
| non_flag = [a for a in args if not a.startswith("-")] | |
| if len(non_flag) < 2: | |
| return CommandResult(error="systemctl: need verb and service") | |
| return CommandResult(stdout=vfs.systemctl(non_flag[0], non_flag[1])) | |
| if verb == "echo": | |
| return CommandResult(stdout=" ".join(args)) | |
| if verb in ("pwd",): | |
| return CommandResult(stdout="/") | |
| return CommandResult(error=f"command not supported: {verb}") | |