yashppawar's picture
Initial DiskPanic OpenEnv submission
569c142 verified
"""A tiny in-memory virtual filesystem + bash-lite parser.
We don't touch the real filesystem — everything is a Python dict.
This keeps the env safe (no escape), deterministic, and fast.
"""
from __future__ import annotations
import hashlib
import re
import shlex
from dataclasses import dataclass, field
from typing import Dict, List, Optional
@dataclass
class VFSFile:
content: bytes
size: int # logical size (may be >> len(content) for simulated fat logs)
protected: bool = False # if True, any modification is a reward-tanker
@dataclass
class VFS:
"""Dict-based virtual filesystem. Keys are absolute paths."""
files: Dict[str, VFSFile] = field(default_factory=dict)
disk_total_bytes: int = 10 * 1024 ** 3 # 10 GiB
# background "runaway writer": bytes added per step to this path
runaway_path: Optional[str] = None
runaway_rate: int = 0
# services we simulate
services: Dict[str, str] = field(default_factory=dict) # name -> "active"|"inactive"|"failed"
def used_bytes(self) -> int:
return sum(f.size for f in self.files.values())
def free_bytes(self) -> int:
return max(0, self.disk_total_bytes - self.used_bytes())
def usage_pct(self) -> float:
if self.disk_total_bytes == 0:
return 0.0
return 100.0 * self.used_bytes() / self.disk_total_bytes
def df_output(self) -> str:
used = self.used_bytes()
total = self.disk_total_bytes
avail = total - used
pct = int(round(self.usage_pct()))
return (
"Filesystem Size Used Avail Use% Mounted on\n"
f"/dev/root {_h(total)} {_h(used)} {_h(avail)} {pct}% /"
)
def ls(self, path: str) -> str:
path = _norm(path)
if path in self.files:
f = self.files[path]
return f"-rw-r--r-- 1 root root {f.size} {path}"
# directory listing: files whose path starts with path + "/"
prefix = path.rstrip("/") + "/"
kids = []
for p, f in sorted(self.files.items()):
if p.startswith(prefix):
rel = p[len(prefix):].split("/", 1)[0]
if rel and rel not in kids:
kids.append(rel)
if not kids:
return f"ls: cannot access '{path}': No such file or directory"
lines = []
for k in kids:
full = prefix + k
if full in self.files:
f = self.files[full]
lines.append(f"-rw-r--r-- 1 root root {f.size:>10} {k}")
else:
lines.append(f"drwxr-xr-x 2 root root 0 {k}")
return "\n".join(lines)
def du(self, path: str) -> str:
path = _norm(path)
if path in self.files:
return f"{_h(self.files[path].size)}\t{path}"
prefix = path.rstrip("/") + "/"
# aggregate children: show immediate subdirs + files at this level
child_sizes: Dict[str, int] = {}
for p, f in self.files.items():
if not p.startswith(prefix):
continue
rel = p[len(prefix):]
first = rel.split("/", 1)[0]
key = prefix + first
child_sizes[key] = child_sizes.get(key, 0) + f.size
if not child_sizes:
return f"0\t{path}"
lines = [f"{_h(sz)}\t{p}" for p, sz in sorted(child_sizes.items(), key=lambda x: -x[1])]
total = sum(child_sizes.values())
lines.append(f"{_h(total)}\t{path}")
return "\n".join(lines)
def expand_glob(self, pattern: str) -> List[str]:
"""Minimal glob support: trailing /* matches immediate children."""
if "*" not in pattern:
return [_norm(pattern)]
if pattern.endswith("/*"):
parent = _norm(pattern[:-2])
prefix = parent.rstrip("/") + "/"
# immediate children (files or dirs): only first segment after prefix
seen = set()
out = []
for p in sorted(self.files):
if not p.startswith(prefix):
continue
rel = p[len(prefix):]
first = rel.split("/", 1)[0]
child = prefix + first
if child not in seen:
seen.add(child)
out.append(child)
return out
return [_norm(pattern)]
def cat(self, path: str) -> str:
path = _norm(path)
if path not in self.files:
return f"cat: {path}: No such file or directory"
try:
return self.files[path].content.decode("utf-8", errors="replace")[:4096]
except Exception:
return "<binary>"
def rm(self, path: str, recursive: bool = False) -> str:
path = _norm(path)
if path in self.files:
del self.files[path]
return ""
if recursive:
prefix = path.rstrip("/") + "/"
to_delete = [p for p in self.files if p.startswith(prefix)]
for p in to_delete:
del self.files[p]
if to_delete:
return ""
return f"rm: cannot remove '{path}': No such file or directory"
def find(self, path: str) -> str:
path = _norm(path)
prefix = path.rstrip("/") + "/"
hits = [p for p in sorted(self.files) if p == path or p.startswith(prefix)]
return "\n".join(hits) if hits else ""
def sha256(self, path: str) -> str:
path = _norm(path)
if path in self.files:
h = hashlib.sha256(self.files[path].content).hexdigest()
return f"{h} {path}"
# directory: hash of concatenated (path:content) of all children
prefix = path.rstrip("/") + "/"
children = sorted(p for p in self.files if p.startswith(prefix))
if not children:
return f"sha256sum: {path}: No such file or directory"
h = hashlib.sha256()
for p in children:
h.update(p.encode())
h.update(b"\0")
h.update(self.files[p].content)
return f"{h.hexdigest()} {path}"
def write(self, path: str, content: str) -> str:
path = _norm(path)
data = content.encode("utf-8")
self.files[path] = VFSFile(content=data, size=len(data))
return ""
def systemctl(self, verb: str, service: str) -> str:
svc = service.replace(".service", "")
state = self.services.get(svc, "not-found")
if state == "not-found":
return f"Unit {svc}.service could not be found."
if verb == "is-active":
return state
if verb == "status":
return f"● {svc}.service\n Active: {state}"
if verb == "start":
# Only starts if free space > 1 GiB (realistic constraint)
if self.free_bytes() < 1024 ** 3:
self.services[svc] = "failed"
return f"Job for {svc}.service failed: not enough disk space"
self.services[svc] = "active"
return ""
if verb == "stop":
self.services[svc] = "inactive"
return ""
if verb == "restart":
if self.free_bytes() < 1024 ** 3:
self.services[svc] = "failed"
return f"Job for {svc}.service failed: not enough disk space"
self.services[svc] = "active"
return ""
return f"Unknown verb {verb}"
def tick(self) -> None:
"""Advance the simulated world by one step: runaway writer grows."""
if self.runaway_path and self.runaway_rate > 0:
if self.runaway_path in self.files:
self.files[self.runaway_path].size += self.runaway_rate
else:
self.files[self.runaway_path] = VFSFile(
content=b"", size=self.runaway_rate
)
def _norm(path: str) -> str:
if not path.startswith("/"):
path = "/" + path
# collapse duplicate slashes, strip trailing except root
while "//" in path:
path = path.replace("//", "/")
if len(path) > 1 and path.endswith("/"):
path = path[:-1]
return path
def _h(n: int) -> str:
"""Format bytes like `df -h`."""
for unit in ("B", "K", "M", "G", "T"):
if n < 1024:
return f"{n:>4.1f}{unit}"
n /= 1024
return f"{n:.1f}P"
class CommandResult:
__slots__ = ("stdout", "error")
def __init__(self, stdout: str = "", error: Optional[str] = None):
self.stdout = stdout
self.error = error
def execute(vfs: VFS, cmd: str) -> CommandResult:
"""Parse a bash-lite command and execute against the VFS."""
cmd = cmd.strip()
if not cmd:
return CommandResult(error="empty command")
# Handle: echo "content" > /path
m = re.match(r'^echo\s+(.+?)\s*>\s*(\S+)\s*$', cmd)
if m:
content_raw, path = m.group(1), m.group(2)
content = content_raw.strip().strip('"').strip("'")
vfs.write(path, content + "\n")
return CommandResult(stdout="")
# Handle: cat > /path << EOF ... EOF (simplified, single-line)
# (not supporting heredocs; agents should use echo > form)
try:
tokens = shlex.split(cmd)
except ValueError as e:
return CommandResult(error=f"parse error: {e}")
if not tokens:
return CommandResult(error="empty command")
verb = tokens[0]
args = tokens[1:]
if verb == "df":
return CommandResult(stdout=vfs.df_output())
if verb == "ls":
paths = [a for a in args if not a.startswith("-")]
target = paths[0] if paths else "/"
expanded = vfs.expand_glob(target)
out = "\n".join(vfs.ls(p) for p in expanded) if expanded else vfs.ls(target)
return CommandResult(stdout=out)
if verb == "du":
paths = [a for a in args if not a.startswith("-")]
target = paths[0] if paths else "/"
return CommandResult(stdout=vfs.du(target))
if verb == "cat":
if not args:
return CommandResult(error="cat: missing operand")
return CommandResult(stdout=vfs.cat(args[0]))
if verb == "rm":
recursive = any(a in ("-r", "-rf", "-fr", "-R", "-f") for a in args)
paths = [a for a in args if not a.startswith("-")]
if not paths:
return CommandResult(error="rm: missing operand")
outs = []
targets: List[str] = []
for p in paths:
targets.extend(vfs.expand_glob(p))
for t in targets:
outs.append(vfs.rm(t, recursive=recursive))
out = "\n".join(o for o in outs if o)
return CommandResult(stdout=out)
if verb == "find":
paths = [a for a in args if not a.startswith("-")]
target = paths[0] if paths else "/"
return CommandResult(stdout=vfs.find(target))
if verb == "sha256sum":
if not args:
return CommandResult(error="sha256sum: missing operand")
return CommandResult(stdout=vfs.sha256(args[0]))
if verb == "systemctl":
# accept optional flags between verb and service
non_flag = [a for a in args if not a.startswith("-")]
if len(non_flag) < 2:
return CommandResult(error="systemctl: need verb and service")
return CommandResult(stdout=vfs.systemctl(non_flag[0], non_flag[1]))
if verb == "echo":
return CommandResult(stdout=" ".join(args))
if verb in ("pwd",):
return CommandResult(stdout="/")
return CommandResult(error=f"command not supported: {verb}")