Spaces:
Running on Zero
Running on Zero
| """The fake shell: a command registry over the VFS. | |
| Commands are plain Python handlers — nothing is ever executed on the host. | |
| Each invocation is counted, because the Warden takes your *most-used* | |
| commands first. Revoked commands stay in the registry as gravestones that | |
| answer with a taunt. | |
| """ | |
| from __future__ import annotations | |
| import fnmatch | |
| import shlex | |
| from collections import Counter | |
| from dataclasses import dataclass, field | |
| from typing import Callable | |
| from .vfs import VFS, DirNode, FileNode, VfsError | |
| class ShellResult: | |
| out: str = "" | |
| err: str = "" | |
| Handler = Callable[["Shell", list[str], str], ShellResult] | |
| class Shell: | |
| vfs: VFS | |
| usage: Counter = field(default_factory=Counter) | |
| revoked: dict[str, str] = field(default_factory=dict) # name -> epitaph | |
| # Events the game layer reads after each command (e.g. files deleted). | |
| last_deletions: int = 0 | |
| # Canonical paths the player has cat'ed (puzzles check this). | |
| reads: set[str] = field(default_factory=set) | |
| def __post_init__(self) -> None: | |
| self.commands: dict[str, Handler] = { | |
| "pwd": _pwd, "ls": _ls, "cd": _cd, "cat": _cat, "grep": _grep, | |
| "find": _find, "echo": _echo, "head": _head, "rm": _rm, | |
| "tree": _tree, "unzip": _unzip, "chmod": _chmod, | |
| "help": _help, "man": _man, | |
| } | |
| # ------------------------------------------------------------ control | |
| def revoke(self, name: str, epitaph: str = "") -> None: | |
| if name in self.commands: | |
| del self.commands[name] | |
| self.revoked[name] = epitaph or "you traded it away. it is not coming back." | |
| def available(self) -> list[str]: | |
| return sorted(self.commands) | |
| def most_used(self, exclude: tuple[str, ...] = ("help", "man")) -> str | None: | |
| """The command the Warden wants: most-used, still owned, not trivial.""" | |
| for name, _ in self.usage.most_common(): | |
| if name in self.commands and name not in exclude: | |
| return name | |
| return None | |
| # ------------------------------------------------------------ running | |
| def run(self, line: str) -> ShellResult: | |
| self.last_deletions = 0 | |
| line = line.strip() | |
| if not line: | |
| return ShellResult() | |
| stages = [s.strip() for s in line.split("|")] | |
| stdin = "" | |
| result = ShellResult() | |
| for stage in stages: | |
| try: | |
| argv = shlex.split(stage) | |
| except ValueError as e: | |
| return ShellResult(err=f"parse error: {e}") | |
| if not argv: | |
| return ShellResult(err="empty pipeline stage") | |
| name, args = argv[0], argv[1:] | |
| if name in self.revoked: | |
| return ShellResult(err=f"{name}: command not found. ({self.revoked[name]})") | |
| handler = self.commands.get(name) | |
| if handler is None: | |
| return ShellResult(err=f"{name}: command not found") | |
| self.usage[name] += 1 | |
| result = handler(self, self._expand(args), stdin) | |
| if result.err: | |
| return result | |
| stdin = result.out | |
| return result | |
| def _expand(self, args: list[str]) -> list[str]: | |
| """Glob expansion against the current directory.""" | |
| out: list[str] = [] | |
| for arg in args: | |
| if any(c in arg for c in "*?[") and "/" not in arg: | |
| names = [n.name for n in self.vfs.listing(".", show_hidden=True)] | |
| matches = sorted(fnmatch.filter(names, arg)) | |
| out.extend(matches or [arg]) | |
| else: | |
| out.append(arg) | |
| return out | |
| # ------------------------------------------------------------------ commands | |
| def _pwd(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| return ShellResult(out=sh.vfs.cwd_path) | |
| def _ls(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| show_hidden = "-a" in args | |
| paths = [a for a in args if not a.startswith("-")] or ["."] | |
| lines: list[str] = [] | |
| try: | |
| for p in paths: | |
| for node in sh.vfs.listing(p, show_hidden=show_hidden): | |
| suffix = "/" if isinstance(node, DirNode) else "" | |
| lock = " [locked]" if isinstance(node, FileNode) and node.locked else "" | |
| lines.append(f"{node.name}{suffix}{lock}") | |
| except VfsError as e: | |
| return ShellResult(err=f"ls: {e}") | |
| return ShellResult(out="\n".join(lines)) | |
| def _cd(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| try: | |
| sh.vfs.chdir(args[0] if args else "~") | |
| except VfsError as e: | |
| return ShellResult(err=f"cd: {e}") | |
| return ShellResult() | |
| def _cat(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| if not args: | |
| return ShellResult(out=stdin) | |
| try: | |
| chunks = [] | |
| for a in args: | |
| chunks.append(sh.vfs.read(a)) | |
| sh.reads.add(sh.vfs.path_of(sh.vfs._parts(a))) | |
| return ShellResult(out="\n".join(chunks)) | |
| except VfsError as e: | |
| return ShellResult(err=f"cat: {e}") | |
| def _grep(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| flags = [a for a in args if a.startswith("-")] | |
| rest = [a for a in args if not a.startswith("-")] | |
| if not rest: | |
| return ShellResult(err="usage: grep [-i] PATTERN [FILE...]") | |
| pattern, files = rest[0], rest[1:] | |
| fold = "-i" in flags | |
| def matches(text: str) -> list[str]: | |
| needle = pattern.lower() if fold else pattern | |
| return [ | |
| line for line in text.splitlines() | |
| if needle in (line.lower() if fold else line) | |
| ] | |
| if not files: | |
| return ShellResult(out="\n".join(matches(stdin))) | |
| lines: list[str] = [] | |
| try: | |
| for f in files: | |
| hits = matches(sh.vfs.read(f)) | |
| prefix = f"{f}:" if len(files) > 1 else "" | |
| lines.extend(prefix + h for h in hits) | |
| except VfsError as e: | |
| return ShellResult(err=f"grep: {e}") | |
| return ShellResult(out="\n".join(lines)) | |
| def _find(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| pattern = "*" | |
| if "-name" in args: | |
| i = args.index("-name") | |
| if i + 1 >= len(args): | |
| return ShellResult(err="find: -name needs a pattern") | |
| pattern = args[i + 1] | |
| start = args[0] if args and not args[0].startswith("-") else "." | |
| node = sh.vfs.resolve(start) | |
| if node is None: | |
| return ShellResult(err=f"find: {start}: no such file or directory") | |
| base = sh.vfs.cwd_path if start == "." else start | |
| lines = [ | |
| path for path, f in sh.vfs.iter_files(node, prefix=base.rstrip("/")) | |
| if fnmatch.fnmatch(path.rsplit("/", 1)[-1], pattern) | |
| ] | |
| return ShellResult(out="\n".join(lines)) | |
| def _echo(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| return ShellResult(out=" ".join(args)) | |
| def _head(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| n = 10 | |
| if "-n" in args: | |
| i = args.index("-n") | |
| try: | |
| n = int(args[i + 1]) | |
| args = args[:i] + args[i + 2 :] | |
| except (IndexError, ValueError): | |
| return ShellResult(err="head: bad -n argument") | |
| text = stdin | |
| if args: | |
| try: | |
| text = sh.vfs.read(args[0]) | |
| except VfsError as e: | |
| return ShellResult(err=f"head: {e}") | |
| return ShellResult(out="\n".join(text.splitlines()[:n])) | |
| def _rm(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| recursive = any(a in ("-r", "-rf", "-fr") for a in args) | |
| targets = [a for a in args if not a.startswith("-")] | |
| if not targets: | |
| return ShellResult(err="usage: rm [-r] FILE...") | |
| removed = 0 | |
| try: | |
| for t in targets: | |
| removed += sh.vfs.remove(t, recursive=recursive) | |
| except VfsError as e: | |
| return ShellResult(err=f"rm: {e}") | |
| sh.last_deletions = removed | |
| return ShellResult() | |
| def _tree(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| start = args[0] if args else "." | |
| node = sh.vfs.resolve(start) | |
| if not isinstance(node, DirNode): | |
| return ShellResult(err=f"tree: {start}: not a directory") | |
| lines: list[str] = ["."] | |
| def walk(d: DirNode, indent: str) -> None: | |
| names = sorted(d.children) | |
| for i, name in enumerate(names): | |
| child = d.children[name] | |
| last = i == len(names) - 1 | |
| lines.append(f"{indent}{'└── ' if last else '├── '}{name}") | |
| if isinstance(child, DirNode): | |
| walk(child, indent + (" " if last else "│ ")) | |
| walk(node, "") | |
| return ShellResult(out="\n".join(lines)) | |
| def _unzip(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| targets = [a for a in args if not a.startswith("-")] | |
| if not targets: | |
| return ShellResult(err="usage: unzip ARCHIVE [PASSWORD]") | |
| path, password = targets[0], (targets[1] if len(targets) > 1 else "") | |
| node = sh.vfs.resolve(path) | |
| if not isinstance(node, FileNode) or node.archive is None: | |
| return ShellResult(err=f"unzip: {path}: not an archive") | |
| if node.password and password != node.password: | |
| return ShellResult(err=f"unzip: {path}: incorrect password") | |
| parent_path = "/".join(path.split("/")[:-1]) or "." | |
| for name, content in node.archive.items(): | |
| sh.vfs.write(f"{parent_path}/{name}", content) | |
| return ShellResult(out="\n".join(f" inflating: {n}" for n in node.archive)) | |
| def _chmod(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| if len(args) != 2 or args[0] != "+x": | |
| return ShellResult(err="usage: chmod +x FILE") | |
| node = sh.vfs.resolve(args[1]) | |
| if not isinstance(node, FileNode): | |
| return ShellResult(err=f"chmod: {args[1]}: no such file") | |
| node.executable = True | |
| return ShellResult() | |
| def _help(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| lines = ["commands you still own:", " " + " ".join(sh.available())] | |
| if sh.revoked: | |
| lines.append("commands you sold:") | |
| lines.append(" " + " ".join(sorted(sh.revoked))) | |
| lines.append("game: deck say <words> fight exit") | |
| return ShellResult(out="\n".join(lines)) | |
| MAN_PAGES = { | |
| "grep": "search file contents for a pattern. the Warden is very fond of this one.", | |
| "ls": "list directory contents. you would miss it more than you think.", | |
| "rm": "remove files. the Warden uses a bigger version of this.", | |
| "unzip": "extract an archive. some take a password. passwords are written down somewhere. they always are.", | |
| } | |
| def _man(sh: Shell, args: list[str], stdin: str) -> ShellResult: | |
| if not args: | |
| return ShellResult(err="what manual page do you want?") | |
| return ShellResult(out=MAN_PAGES.get(args[0], f"no manual entry for {args[0]}")) | |