"""The fake shell: a command registry over the VFS. Commands are plain Python handlers — nothing is ever executed on the host. Each invocation is counted, because the Warden takes your *most-used* commands first. Revoked commands stay in the registry as gravestones that answer with a taunt. """ from __future__ import annotations import fnmatch import shlex from collections import Counter from dataclasses import dataclass, field from typing import Callable from .vfs import VFS, DirNode, FileNode, VfsError @dataclass class ShellResult: out: str = "" err: str = "" Handler = Callable[["Shell", list[str], str], ShellResult] @dataclass class Shell: vfs: VFS usage: Counter = field(default_factory=Counter) revoked: dict[str, str] = field(default_factory=dict) # name -> epitaph # Events the game layer reads after each command (e.g. files deleted). last_deletions: int = 0 # Canonical paths the player has cat'ed (puzzles check this). reads: set[str] = field(default_factory=set) def __post_init__(self) -> None: self.commands: dict[str, Handler] = { "pwd": _pwd, "ls": _ls, "cd": _cd, "cat": _cat, "grep": _grep, "find": _find, "echo": _echo, "head": _head, "rm": _rm, "tree": _tree, "unzip": _unzip, "chmod": _chmod, "help": _help, "man": _man, } # ------------------------------------------------------------ control def revoke(self, name: str, epitaph: str = "") -> None: if name in self.commands: del self.commands[name] self.revoked[name] = epitaph or "you traded it away. it is not coming back." def available(self) -> list[str]: return sorted(self.commands) def most_used(self, exclude: tuple[str, ...] = ("help", "man")) -> str | None: """The command the Warden wants: most-used, still owned, not trivial.""" for name, _ in self.usage.most_common(): if name in self.commands and name not in exclude: return name return None # ------------------------------------------------------------ running def run(self, line: str) -> ShellResult: self.last_deletions = 0 line = line.strip() if not line: return ShellResult() stages = [s.strip() for s in line.split("|")] stdin = "" result = ShellResult() for stage in stages: try: argv = shlex.split(stage) except ValueError as e: return ShellResult(err=f"parse error: {e}") if not argv: return ShellResult(err="empty pipeline stage") name, args = argv[0], argv[1:] if name in self.revoked: return ShellResult(err=f"{name}: command not found. ({self.revoked[name]})") handler = self.commands.get(name) if handler is None: return ShellResult(err=f"{name}: command not found") self.usage[name] += 1 result = handler(self, self._expand(args), stdin) if result.err: return result stdin = result.out return result def _expand(self, args: list[str]) -> list[str]: """Glob expansion against the current directory.""" out: list[str] = [] for arg in args: if any(c in arg for c in "*?[") and "/" not in arg: names = [n.name for n in self.vfs.listing(".", show_hidden=True)] matches = sorted(fnmatch.filter(names, arg)) out.extend(matches or [arg]) else: out.append(arg) return out # ------------------------------------------------------------------ commands def _pwd(sh: Shell, args: list[str], stdin: str) -> ShellResult: return ShellResult(out=sh.vfs.cwd_path) def _ls(sh: Shell, args: list[str], stdin: str) -> ShellResult: show_hidden = "-a" in args paths = [a for a in args if not a.startswith("-")] or ["."] lines: list[str] = [] try: for p in paths: for node in sh.vfs.listing(p, show_hidden=show_hidden): suffix = "/" if isinstance(node, DirNode) else "" lock = " [locked]" if isinstance(node, FileNode) and node.locked else "" lines.append(f"{node.name}{suffix}{lock}") except VfsError as e: return ShellResult(err=f"ls: {e}") return ShellResult(out="\n".join(lines)) def _cd(sh: Shell, args: list[str], stdin: str) -> ShellResult: try: sh.vfs.chdir(args[0] if args else "~") except VfsError as e: return ShellResult(err=f"cd: {e}") return ShellResult() def _cat(sh: Shell, args: list[str], stdin: str) -> ShellResult: if not args: return ShellResult(out=stdin) try: chunks = [] for a in args: chunks.append(sh.vfs.read(a)) sh.reads.add(sh.vfs.path_of(sh.vfs._parts(a))) return ShellResult(out="\n".join(chunks)) except VfsError as e: return ShellResult(err=f"cat: {e}") def _grep(sh: Shell, args: list[str], stdin: str) -> ShellResult: flags = [a for a in args if a.startswith("-")] rest = [a for a in args if not a.startswith("-")] if not rest: return ShellResult(err="usage: grep [-i] PATTERN [FILE...]") pattern, files = rest[0], rest[1:] fold = "-i" in flags def matches(text: str) -> list[str]: needle = pattern.lower() if fold else pattern return [ line for line in text.splitlines() if needle in (line.lower() if fold else line) ] if not files: return ShellResult(out="\n".join(matches(stdin))) lines: list[str] = [] try: for f in files: hits = matches(sh.vfs.read(f)) prefix = f"{f}:" if len(files) > 1 else "" lines.extend(prefix + h for h in hits) except VfsError as e: return ShellResult(err=f"grep: {e}") return ShellResult(out="\n".join(lines)) def _find(sh: Shell, args: list[str], stdin: str) -> ShellResult: pattern = "*" if "-name" in args: i = args.index("-name") if i + 1 >= len(args): return ShellResult(err="find: -name needs a pattern") pattern = args[i + 1] start = args[0] if args and not args[0].startswith("-") else "." node = sh.vfs.resolve(start) if node is None: return ShellResult(err=f"find: {start}: no such file or directory") base = sh.vfs.cwd_path if start == "." else start lines = [ path for path, f in sh.vfs.iter_files(node, prefix=base.rstrip("/")) if fnmatch.fnmatch(path.rsplit("/", 1)[-1], pattern) ] return ShellResult(out="\n".join(lines)) def _echo(sh: Shell, args: list[str], stdin: str) -> ShellResult: return ShellResult(out=" ".join(args)) def _head(sh: Shell, args: list[str], stdin: str) -> ShellResult: n = 10 if "-n" in args: i = args.index("-n") try: n = int(args[i + 1]) args = args[:i] + args[i + 2 :] except (IndexError, ValueError): return ShellResult(err="head: bad -n argument") text = stdin if args: try: text = sh.vfs.read(args[0]) except VfsError as e: return ShellResult(err=f"head: {e}") return ShellResult(out="\n".join(text.splitlines()[:n])) def _rm(sh: Shell, args: list[str], stdin: str) -> ShellResult: recursive = any(a in ("-r", "-rf", "-fr") for a in args) targets = [a for a in args if not a.startswith("-")] if not targets: return ShellResult(err="usage: rm [-r] FILE...") removed = 0 try: for t in targets: removed += sh.vfs.remove(t, recursive=recursive) except VfsError as e: return ShellResult(err=f"rm: {e}") sh.last_deletions = removed return ShellResult() def _tree(sh: Shell, args: list[str], stdin: str) -> ShellResult: start = args[0] if args else "." node = sh.vfs.resolve(start) if not isinstance(node, DirNode): return ShellResult(err=f"tree: {start}: not a directory") lines: list[str] = ["."] def walk(d: DirNode, indent: str) -> None: names = sorted(d.children) for i, name in enumerate(names): child = d.children[name] last = i == len(names) - 1 lines.append(f"{indent}{'└── ' if last else '├── '}{name}") if isinstance(child, DirNode): walk(child, indent + (" " if last else "│ ")) walk(node, "") return ShellResult(out="\n".join(lines)) def _unzip(sh: Shell, args: list[str], stdin: str) -> ShellResult: targets = [a for a in args if not a.startswith("-")] if not targets: return ShellResult(err="usage: unzip ARCHIVE [PASSWORD]") path, password = targets[0], (targets[1] if len(targets) > 1 else "") node = sh.vfs.resolve(path) if not isinstance(node, FileNode) or node.archive is None: return ShellResult(err=f"unzip: {path}: not an archive") if node.password and password != node.password: return ShellResult(err=f"unzip: {path}: incorrect password") parent_path = "/".join(path.split("/")[:-1]) or "." for name, content in node.archive.items(): sh.vfs.write(f"{parent_path}/{name}", content) return ShellResult(out="\n".join(f" inflating: {n}" for n in node.archive)) def _chmod(sh: Shell, args: list[str], stdin: str) -> ShellResult: if len(args) != 2 or args[0] != "+x": return ShellResult(err="usage: chmod +x FILE") node = sh.vfs.resolve(args[1]) if not isinstance(node, FileNode): return ShellResult(err=f"chmod: {args[1]}: no such file") node.executable = True return ShellResult() def _help(sh: Shell, args: list[str], stdin: str) -> ShellResult: lines = ["commands you still own:", " " + " ".join(sh.available())] if sh.revoked: lines.append("commands you sold:") lines.append(" " + " ".join(sorted(sh.revoked))) lines.append("game: deck say fight exit") return ShellResult(out="\n".join(lines)) MAN_PAGES = { "grep": "search file contents for a pattern. the Warden is very fond of this one.", "ls": "list directory contents. you would miss it more than you think.", "rm": "remove files. the Warden uses a bigger version of this.", "unzip": "extract an archive. some take a password. passwords are written down somewhere. they always are.", } def _man(sh: Shell, args: list[str], stdin: str) -> ShellResult: if not args: return ShellResult(err="what manual page do you want?") return ShellResult(out=MAN_PAGES.get(args[0], f"no manual entry for {args[0]}"))