Scrypt / scrypt /sandbox /shell.py
IMJONEZZ's picture
SCRYPT: initial commit — game, sandbox, Warden, Space web layer
9fca766
Raw
History Blame Contribute Delete
10.8 kB
"""The fake shell: a command registry over the VFS.
Commands are plain Python handlers — nothing is ever executed on the host.
Each invocation is counted, because the Warden takes your *most-used*
commands first. Revoked commands stay in the registry as gravestones that
answer with a taunt.
"""
from __future__ import annotations
import fnmatch
import shlex
from collections import Counter
from dataclasses import dataclass, field
from typing import Callable
from .vfs import VFS, DirNode, FileNode, VfsError
@dataclass
class ShellResult:
out: str = ""
err: str = ""
Handler = Callable[["Shell", list[str], str], ShellResult]
@dataclass
class Shell:
vfs: VFS
usage: Counter = field(default_factory=Counter)
revoked: dict[str, str] = field(default_factory=dict) # name -> epitaph
# Events the game layer reads after each command (e.g. files deleted).
last_deletions: int = 0
# Canonical paths the player has cat'ed (puzzles check this).
reads: set[str] = field(default_factory=set)
def __post_init__(self) -> None:
self.commands: dict[str, Handler] = {
"pwd": _pwd, "ls": _ls, "cd": _cd, "cat": _cat, "grep": _grep,
"find": _find, "echo": _echo, "head": _head, "rm": _rm,
"tree": _tree, "unzip": _unzip, "chmod": _chmod,
"help": _help, "man": _man,
}
# ------------------------------------------------------------ control
def revoke(self, name: str, epitaph: str = "") -> None:
if name in self.commands:
del self.commands[name]
self.revoked[name] = epitaph or "you traded it away. it is not coming back."
def available(self) -> list[str]:
return sorted(self.commands)
def most_used(self, exclude: tuple[str, ...] = ("help", "man")) -> str | None:
"""The command the Warden wants: most-used, still owned, not trivial."""
for name, _ in self.usage.most_common():
if name in self.commands and name not in exclude:
return name
return None
# ------------------------------------------------------------ running
def run(self, line: str) -> ShellResult:
self.last_deletions = 0
line = line.strip()
if not line:
return ShellResult()
stages = [s.strip() for s in line.split("|")]
stdin = ""
result = ShellResult()
for stage in stages:
try:
argv = shlex.split(stage)
except ValueError as e:
return ShellResult(err=f"parse error: {e}")
if not argv:
return ShellResult(err="empty pipeline stage")
name, args = argv[0], argv[1:]
if name in self.revoked:
return ShellResult(err=f"{name}: command not found. ({self.revoked[name]})")
handler = self.commands.get(name)
if handler is None:
return ShellResult(err=f"{name}: command not found")
self.usage[name] += 1
result = handler(self, self._expand(args), stdin)
if result.err:
return result
stdin = result.out
return result
def _expand(self, args: list[str]) -> list[str]:
"""Glob expansion against the current directory."""
out: list[str] = []
for arg in args:
if any(c in arg for c in "*?[") and "/" not in arg:
names = [n.name for n in self.vfs.listing(".", show_hidden=True)]
matches = sorted(fnmatch.filter(names, arg))
out.extend(matches or [arg])
else:
out.append(arg)
return out
# ------------------------------------------------------------------ commands
def _pwd(sh: Shell, args: list[str], stdin: str) -> ShellResult:
return ShellResult(out=sh.vfs.cwd_path)
def _ls(sh: Shell, args: list[str], stdin: str) -> ShellResult:
show_hidden = "-a" in args
paths = [a for a in args if not a.startswith("-")] or ["."]
lines: list[str] = []
try:
for p in paths:
for node in sh.vfs.listing(p, show_hidden=show_hidden):
suffix = "/" if isinstance(node, DirNode) else ""
lock = " [locked]" if isinstance(node, FileNode) and node.locked else ""
lines.append(f"{node.name}{suffix}{lock}")
except VfsError as e:
return ShellResult(err=f"ls: {e}")
return ShellResult(out="\n".join(lines))
def _cd(sh: Shell, args: list[str], stdin: str) -> ShellResult:
try:
sh.vfs.chdir(args[0] if args else "~")
except VfsError as e:
return ShellResult(err=f"cd: {e}")
return ShellResult()
def _cat(sh: Shell, args: list[str], stdin: str) -> ShellResult:
if not args:
return ShellResult(out=stdin)
try:
chunks = []
for a in args:
chunks.append(sh.vfs.read(a))
sh.reads.add(sh.vfs.path_of(sh.vfs._parts(a)))
return ShellResult(out="\n".join(chunks))
except VfsError as e:
return ShellResult(err=f"cat: {e}")
def _grep(sh: Shell, args: list[str], stdin: str) -> ShellResult:
flags = [a for a in args if a.startswith("-")]
rest = [a for a in args if not a.startswith("-")]
if not rest:
return ShellResult(err="usage: grep [-i] PATTERN [FILE...]")
pattern, files = rest[0], rest[1:]
fold = "-i" in flags
def matches(text: str) -> list[str]:
needle = pattern.lower() if fold else pattern
return [
line for line in text.splitlines()
if needle in (line.lower() if fold else line)
]
if not files:
return ShellResult(out="\n".join(matches(stdin)))
lines: list[str] = []
try:
for f in files:
hits = matches(sh.vfs.read(f))
prefix = f"{f}:" if len(files) > 1 else ""
lines.extend(prefix + h for h in hits)
except VfsError as e:
return ShellResult(err=f"grep: {e}")
return ShellResult(out="\n".join(lines))
def _find(sh: Shell, args: list[str], stdin: str) -> ShellResult:
pattern = "*"
if "-name" in args:
i = args.index("-name")
if i + 1 >= len(args):
return ShellResult(err="find: -name needs a pattern")
pattern = args[i + 1]
start = args[0] if args and not args[0].startswith("-") else "."
node = sh.vfs.resolve(start)
if node is None:
return ShellResult(err=f"find: {start}: no such file or directory")
base = sh.vfs.cwd_path if start == "." else start
lines = [
path for path, f in sh.vfs.iter_files(node, prefix=base.rstrip("/"))
if fnmatch.fnmatch(path.rsplit("/", 1)[-1], pattern)
]
return ShellResult(out="\n".join(lines))
def _echo(sh: Shell, args: list[str], stdin: str) -> ShellResult:
return ShellResult(out=" ".join(args))
def _head(sh: Shell, args: list[str], stdin: str) -> ShellResult:
n = 10
if "-n" in args:
i = args.index("-n")
try:
n = int(args[i + 1])
args = args[:i] + args[i + 2 :]
except (IndexError, ValueError):
return ShellResult(err="head: bad -n argument")
text = stdin
if args:
try:
text = sh.vfs.read(args[0])
except VfsError as e:
return ShellResult(err=f"head: {e}")
return ShellResult(out="\n".join(text.splitlines()[:n]))
def _rm(sh: Shell, args: list[str], stdin: str) -> ShellResult:
recursive = any(a in ("-r", "-rf", "-fr") for a in args)
targets = [a for a in args if not a.startswith("-")]
if not targets:
return ShellResult(err="usage: rm [-r] FILE...")
removed = 0
try:
for t in targets:
removed += sh.vfs.remove(t, recursive=recursive)
except VfsError as e:
return ShellResult(err=f"rm: {e}")
sh.last_deletions = removed
return ShellResult()
def _tree(sh: Shell, args: list[str], stdin: str) -> ShellResult:
start = args[0] if args else "."
node = sh.vfs.resolve(start)
if not isinstance(node, DirNode):
return ShellResult(err=f"tree: {start}: not a directory")
lines: list[str] = ["."]
def walk(d: DirNode, indent: str) -> None:
names = sorted(d.children)
for i, name in enumerate(names):
child = d.children[name]
last = i == len(names) - 1
lines.append(f"{indent}{'└── ' if last else '├── '}{name}")
if isinstance(child, DirNode):
walk(child, indent + (" " if last else "│ "))
walk(node, "")
return ShellResult(out="\n".join(lines))
def _unzip(sh: Shell, args: list[str], stdin: str) -> ShellResult:
targets = [a for a in args if not a.startswith("-")]
if not targets:
return ShellResult(err="usage: unzip ARCHIVE [PASSWORD]")
path, password = targets[0], (targets[1] if len(targets) > 1 else "")
node = sh.vfs.resolve(path)
if not isinstance(node, FileNode) or node.archive is None:
return ShellResult(err=f"unzip: {path}: not an archive")
if node.password and password != node.password:
return ShellResult(err=f"unzip: {path}: incorrect password")
parent_path = "/".join(path.split("/")[:-1]) or "."
for name, content in node.archive.items():
sh.vfs.write(f"{parent_path}/{name}", content)
return ShellResult(out="\n".join(f" inflating: {n}" for n in node.archive))
def _chmod(sh: Shell, args: list[str], stdin: str) -> ShellResult:
if len(args) != 2 or args[0] != "+x":
return ShellResult(err="usage: chmod +x FILE")
node = sh.vfs.resolve(args[1])
if not isinstance(node, FileNode):
return ShellResult(err=f"chmod: {args[1]}: no such file")
node.executable = True
return ShellResult()
def _help(sh: Shell, args: list[str], stdin: str) -> ShellResult:
lines = ["commands you still own:", " " + " ".join(sh.available())]
if sh.revoked:
lines.append("commands you sold:")
lines.append(" " + " ".join(sorted(sh.revoked)))
lines.append("game: deck say <words> fight exit")
return ShellResult(out="\n".join(lines))
MAN_PAGES = {
"grep": "search file contents for a pattern. the Warden is very fond of this one.",
"ls": "list directory contents. you would miss it more than you think.",
"rm": "remove files. the Warden uses a bigger version of this.",
"unzip": "extract an archive. some take a password. passwords are written down somewhere. they always are.",
}
def _man(sh: Shell, args: list[str], stdin: str) -> ShellResult:
if not args:
return ShellResult(err="what manual page do you want?")
return ShellResult(out=MAN_PAGES.get(args[0], f"no manual entry for {args[0]}"))