File size: 10,798 Bytes
9fca766
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""The fake shell: a command registry over the VFS.

Commands are plain Python handlers — nothing is ever executed on the host.
Each invocation is counted, because the Warden takes your *most-used*
commands first. Revoked commands stay in the registry as gravestones that
answer with a taunt.
"""

from __future__ import annotations

import fnmatch
import shlex
from collections import Counter
from dataclasses import dataclass, field
from typing import Callable

from .vfs import VFS, DirNode, FileNode, VfsError


@dataclass
class ShellResult:
    out: str = ""
    err: str = ""


Handler = Callable[["Shell", list[str], str], ShellResult]


@dataclass
class Shell:
    vfs: VFS
    usage: Counter = field(default_factory=Counter)
    revoked: dict[str, str] = field(default_factory=dict)  # name -> epitaph
    # Events the game layer reads after each command (e.g. files deleted).
    last_deletions: int = 0
    # Canonical paths the player has cat'ed (puzzles check this).
    reads: set[str] = field(default_factory=set)

    def __post_init__(self) -> None:
        self.commands: dict[str, Handler] = {
            "pwd": _pwd, "ls": _ls, "cd": _cd, "cat": _cat, "grep": _grep,
            "find": _find, "echo": _echo, "head": _head, "rm": _rm,
            "tree": _tree, "unzip": _unzip, "chmod": _chmod,
            "help": _help, "man": _man,
        }

    # ------------------------------------------------------------ control

    def revoke(self, name: str, epitaph: str = "") -> None:
        if name in self.commands:
            del self.commands[name]
            self.revoked[name] = epitaph or "you traded it away. it is not coming back."

    def available(self) -> list[str]:
        return sorted(self.commands)

    def most_used(self, exclude: tuple[str, ...] = ("help", "man")) -> str | None:
        """The command the Warden wants: most-used, still owned, not trivial."""
        for name, _ in self.usage.most_common():
            if name in self.commands and name not in exclude:
                return name
        return None

    # ------------------------------------------------------------ running

    def run(self, line: str) -> ShellResult:
        self.last_deletions = 0
        line = line.strip()
        if not line:
            return ShellResult()
        stages = [s.strip() for s in line.split("|")]
        stdin = ""
        result = ShellResult()
        for stage in stages:
            try:
                argv = shlex.split(stage)
            except ValueError as e:
                return ShellResult(err=f"parse error: {e}")
            if not argv:
                return ShellResult(err="empty pipeline stage")
            name, args = argv[0], argv[1:]
            if name in self.revoked:
                return ShellResult(err=f"{name}: command not found. ({self.revoked[name]})")
            handler = self.commands.get(name)
            if handler is None:
                return ShellResult(err=f"{name}: command not found")
            self.usage[name] += 1
            result = handler(self, self._expand(args), stdin)
            if result.err:
                return result
            stdin = result.out
        return result

    def _expand(self, args: list[str]) -> list[str]:
        """Glob expansion against the current directory."""
        out: list[str] = []
        for arg in args:
            if any(c in arg for c in "*?[") and "/" not in arg:
                names = [n.name for n in self.vfs.listing(".", show_hidden=True)]
                matches = sorted(fnmatch.filter(names, arg))
                out.extend(matches or [arg])
            else:
                out.append(arg)
        return out


# ------------------------------------------------------------------ commands

def _pwd(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    return ShellResult(out=sh.vfs.cwd_path)


def _ls(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    show_hidden = "-a" in args
    paths = [a for a in args if not a.startswith("-")] or ["."]
    lines: list[str] = []
    try:
        for p in paths:
            for node in sh.vfs.listing(p, show_hidden=show_hidden):
                suffix = "/" if isinstance(node, DirNode) else ""
                lock = " [locked]" if isinstance(node, FileNode) and node.locked else ""
                lines.append(f"{node.name}{suffix}{lock}")
    except VfsError as e:
        return ShellResult(err=f"ls: {e}")
    return ShellResult(out="\n".join(lines))


def _cd(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    try:
        sh.vfs.chdir(args[0] if args else "~")
    except VfsError as e:
        return ShellResult(err=f"cd: {e}")
    return ShellResult()


def _cat(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    if not args:
        return ShellResult(out=stdin)
    try:
        chunks = []
        for a in args:
            chunks.append(sh.vfs.read(a))
            sh.reads.add(sh.vfs.path_of(sh.vfs._parts(a)))
        return ShellResult(out="\n".join(chunks))
    except VfsError as e:
        return ShellResult(err=f"cat: {e}")


def _grep(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    flags = [a for a in args if a.startswith("-")]
    rest = [a for a in args if not a.startswith("-")]
    if not rest:
        return ShellResult(err="usage: grep [-i] PATTERN [FILE...]")
    pattern, files = rest[0], rest[1:]
    fold = "-i" in flags

    def matches(text: str) -> list[str]:
        needle = pattern.lower() if fold else pattern
        return [
            line for line in text.splitlines()
            if needle in (line.lower() if fold else line)
        ]

    if not files:
        return ShellResult(out="\n".join(matches(stdin)))
    lines: list[str] = []
    try:
        for f in files:
            hits = matches(sh.vfs.read(f))
            prefix = f"{f}:" if len(files) > 1 else ""
            lines.extend(prefix + h for h in hits)
    except VfsError as e:
        return ShellResult(err=f"grep: {e}")
    return ShellResult(out="\n".join(lines))


def _find(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    pattern = "*"
    if "-name" in args:
        i = args.index("-name")
        if i + 1 >= len(args):
            return ShellResult(err="find: -name needs a pattern")
        pattern = args[i + 1]
    start = args[0] if args and not args[0].startswith("-") else "."
    node = sh.vfs.resolve(start)
    if node is None:
        return ShellResult(err=f"find: {start}: no such file or directory")
    base = sh.vfs.cwd_path if start == "." else start
    lines = [
        path for path, f in sh.vfs.iter_files(node, prefix=base.rstrip("/"))
        if fnmatch.fnmatch(path.rsplit("/", 1)[-1], pattern)
    ]
    return ShellResult(out="\n".join(lines))


def _echo(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    return ShellResult(out=" ".join(args))


def _head(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    n = 10
    if "-n" in args:
        i = args.index("-n")
        try:
            n = int(args[i + 1])
            args = args[:i] + args[i + 2 :]
        except (IndexError, ValueError):
            return ShellResult(err="head: bad -n argument")
    text = stdin
    if args:
        try:
            text = sh.vfs.read(args[0])
        except VfsError as e:
            return ShellResult(err=f"head: {e}")
    return ShellResult(out="\n".join(text.splitlines()[:n]))


def _rm(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    recursive = any(a in ("-r", "-rf", "-fr") for a in args)
    targets = [a for a in args if not a.startswith("-")]
    if not targets:
        return ShellResult(err="usage: rm [-r] FILE...")
    removed = 0
    try:
        for t in targets:
            removed += sh.vfs.remove(t, recursive=recursive)
    except VfsError as e:
        return ShellResult(err=f"rm: {e}")
    sh.last_deletions = removed
    return ShellResult()


def _tree(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    start = args[0] if args else "."
    node = sh.vfs.resolve(start)
    if not isinstance(node, DirNode):
        return ShellResult(err=f"tree: {start}: not a directory")
    lines: list[str] = ["."]

    def walk(d: DirNode, indent: str) -> None:
        names = sorted(d.children)
        for i, name in enumerate(names):
            child = d.children[name]
            last = i == len(names) - 1
            lines.append(f"{indent}{'└── ' if last else '├── '}{name}")
            if isinstance(child, DirNode):
                walk(child, indent + ("    " if last else "│   "))

    walk(node, "")
    return ShellResult(out="\n".join(lines))


def _unzip(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    targets = [a for a in args if not a.startswith("-")]
    if not targets:
        return ShellResult(err="usage: unzip ARCHIVE [PASSWORD]")
    path, password = targets[0], (targets[1] if len(targets) > 1 else "")
    node = sh.vfs.resolve(path)
    if not isinstance(node, FileNode) or node.archive is None:
        return ShellResult(err=f"unzip: {path}: not an archive")
    if node.password and password != node.password:
        return ShellResult(err=f"unzip: {path}: incorrect password")
    parent_path = "/".join(path.split("/")[:-1]) or "."
    for name, content in node.archive.items():
        sh.vfs.write(f"{parent_path}/{name}", content)
    return ShellResult(out="\n".join(f"  inflating: {n}" for n in node.archive))


def _chmod(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    if len(args) != 2 or args[0] != "+x":
        return ShellResult(err="usage: chmod +x FILE")
    node = sh.vfs.resolve(args[1])
    if not isinstance(node, FileNode):
        return ShellResult(err=f"chmod: {args[1]}: no such file")
    node.executable = True
    return ShellResult()


def _help(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    lines = ["commands you still own:", "  " + "  ".join(sh.available())]
    if sh.revoked:
        lines.append("commands you sold:")
        lines.append("  " + "  ".join(sorted(sh.revoked)))
    lines.append("game:  deck  say <words>  fight  exit")
    return ShellResult(out="\n".join(lines))


MAN_PAGES = {
    "grep": "search file contents for a pattern. the Warden is very fond of this one.",
    "ls": "list directory contents. you would miss it more than you think.",
    "rm": "remove files. the Warden uses a bigger version of this.",
    "unzip": "extract an archive. some take a password. passwords are written down somewhere. they always are.",
}


def _man(sh: Shell, args: list[str], stdin: str) -> ShellResult:
    if not args:
        return ShellResult(err="what manual page do you want?")
    return ShellResult(out=MAN_PAGES.get(args[0], f"no manual entry for {args[0]}"))