File size: 12,612 Bytes
9fca766
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
"""The sandbox terminal screen: explore, solve, prepare β€” then fight.

The Warden is resident here: it watches every command through the
deterministic watcher, interjects as wall(1)-style broadcasts, and can be
addressed directly with `say`. Dismisses with the Rewards earned.
"""

from __future__ import annotations

import random
import time

from rich.text import Text
from textual import events
from textual.app import ComposeResult
from textual.containers import Vertical
from textual.screen import Screen
from textual.widgets import Input, RichLog, Static

from scrypt.sandbox.puzzles import Puzzle, Reward
from scrypt.sandbox.shell import Shell
from scrypt.sandbox.vfs import DirNode, VfsError
from scrypt.ui import fx as fxmod
from scrypt.ui import palette as pal
from scrypt.warden import watcher
from scrypt.warden.guardrails import wrap_player_text
from scrypt.warden.presence import ANSWER, WardenPresence

MOTD = """\
scryptOS 0.1 β€” restricted session
this machine is mine. the files were yours. type `help` to see
what you can still do, `say <words>` if you must talk to me,
and `fight` when you are done stalling.
"""

IDLE_AFTER_S = 40.0

# Printed once, the very first time a new player lands in the shell.
FIRST_VISIT_GUIDE = """\
─── first time in here? ───
  ls           see what is lying around    cat <file>    read it
  cd <dir>     move around                 grep <w> <f>  search inside
files that look out of place usually are. solving what you find pays
cycles and cards before the next fight. `help` lists what you still own.
[tab] completes, [↑/↓] recall history, [ctrl+l] clears the screen.
"""

# Game verbs the shell layer owns; completed alongside real commands.
GAME_COMMANDS = ("continue", "deck", "exit", "fight", "say")


def _common_prefix(names: list[str]) -> str:
    if not names:
        return ""
    lo, hi = min(names), max(names)
    i = 0
    while i < len(lo) and lo[i] == hi[i]:
        i += 1
    return lo[:i]


class ShellPrompt(Input):
    """An Input that behaves like a terminal: ↑/↓ history, tab completion,
    ctrl+l. The keys are swallowed here so Textual's focus-switching and
    cursor bindings never see them."""

    async def on_key(self, event: events.Key) -> None:
        screen = self.screen
        if not isinstance(screen, ShellScreen):
            return
        if event.key in ("up", "down"):
            event.stop()
            event.prevent_default()
            screen.history_step(-1 if event.key == "up" else 1)
        elif event.key == "tab":
            event.stop()
            event.prevent_default()
            screen.complete()
        elif event.key == "ctrl+l":
            event.stop()
            event.prevent_default()
            screen.query_one("#term", RichLog).clear()


class ShellScreen(Screen):
    CSS = """
    #term { height: 1fr; background: $surface; padding: 0 1; }
    #shell-prompt { dock: bottom; }
    #shell-status { height: 1; dock: bottom; background: $panel; padding: 0 1; }
    """

    def __init__(
        self,
        shell: Shell,
        puzzles: list[Puzzle],
        deck_names: list[str],
        motd: str | None = None,
        presence: WardenPresence | None = None,
        track: Text | None = None,
        guide: bool = False,
    ):
        super().__init__()
        self.shell = shell
        self.puzzles = puzzles
        self.deck_names = deck_names
        self.motd = motd or MOTD
        self.presence = presence
        self.track = track  # where this shell sits in the run
        self.guide = guide  # first shell of a player's first game ever
        self.rewards: list[Reward] = []
        # Rolled fresh for every shell visit: some nights it tolerates you.
        self._say_budget = random.randint(2, 7)
        self._say_spent = 0
        self._last_input = time.monotonic()
        self._idled = False  # one ambient nudge per visit is plenty
        self._history: list[str] = []
        self._hist_pos: int | None = None  # None = at the live prompt
        self._draft = ""  # what was typed before browsing history

    def compose(self) -> ComposeResult:
        with Vertical():
            yield RichLog(id="term", wrap=True, markup=False)
            yield Static(id="shell-status")
            yield ShellPrompt(placeholder="$", id="shell-prompt")

    def on_mount(self) -> None:
        log = self.query_one("#term", RichLog)
        log.write(self.motd)
        if self.guide:
            log.write(Text(FIRST_VISIT_GUIDE, style=pal.MUTED))
        self._update_status()
        self.query_one("#shell-prompt", Input).focus()
        if self.presence is not None:
            self.presence.attach(self._broadcast)
            self.set_interval(5.0, self._check_idle)

    def on_unmount(self) -> None:
        if self.presence is not None:
            self.presence.detach(self._broadcast)

    # -------------------------------------------------- the Warden's mouth

    def _broadcast(self, line: str, priority: int) -> None:
        log = self.query_one("#term", RichLog)
        log.write(Text("\nBroadcast message from warden@scryptos (tty1):", style=f"bold {pal.WARDEN}"))
        # A leaked reasoning block rides above the line as a dim comment.
        comment, _, spoken = line.rpartition("\n")
        if comment:
            log.write(Text(f"  {comment}", style=pal.GHOST))
        log.write(Text(f"  {spoken}\n", style=f"{pal.CYAN_SOFT} italic"))
        if not fxmod.reduced_motion():
            # wall(1) seizes the terminal: one dark-red breath.
            self.styles.background = pal.BG_DEEP
            self.set_timer(0.15, self._unflash)

    def _unflash(self) -> None:
        self.styles.background = None

    def _check_idle(self) -> None:
        if self._idled or self.presence is None:
            return
        idle = time.monotonic() - self._last_input
        if idle >= IDLE_AFTER_S:
            self._idled = True
            n = watcher.idle_notice(idle)
            self.presence.submit(
                n.moment, fallback=n.fallback, priority=n.priority, tags=set(n.tags)
            )

    async def _burn_in(self, text: str) -> None:
        import asyncio

        log = self.query_one("#term", RichLog)
        for line in text.splitlines():
            log.write(Text(line, style=pal.FG_DIM))
            await asyncio.sleep(0.06)

    def _handle_say(self, words: str) -> None:
        log = self.query_one("#term", RichLog)
        if self.presence is None or self._say_spent >= self._say_budget:
            line = watcher.BRUSH_OFFS[self._say_spent % len(watcher.BRUSH_OFFS)]
            self._broadcast(line, ANSWER)
            return
        self._say_spent += 1
        log.write(Text("(the machine is listening)", style=f"{pal.MUTED} italic"))
        self.presence.submit(
            watcher.say_moment(wrap_player_text(words)),
            fallback="I heard you. The scale is my answer to everything.",
            priority=ANSWER,
            tags={"shell", "player"},
            taboo=words,  # the reply must never parrot the input back
        )

    # ----------------------------------------------- terminal conveniences

    def history_step(self, delta: int) -> None:
        """↑/↓ through past commands; ↓ past the newest restores the draft."""
        if not self._history:
            return
        prompt = self.query_one("#shell-prompt", Input)
        if self._hist_pos is None:
            if delta > 0:
                return
            self._draft = prompt.value
            self._hist_pos = len(self._history)
        self._hist_pos = max(0, self._hist_pos + delta)
        if self._hist_pos >= len(self._history):
            self._hist_pos = None
            prompt.value = self._draft
        else:
            prompt.value = self._history[self._hist_pos]
        prompt.cursor_position = len(prompt.value)

    def _completions(self, token: str, first_word: bool) -> list[str]:
        """Candidate completions for one token. Commands for the first word
        (sold ones don't complete β€” they're gone), VFS paths after it.
        Hidden files stay hidden unless the prefix already reaches for them."""
        if first_word:
            names = sorted(set(self.shell.available()) | set(GAME_COMMANDS))
            return [n + " " for n in names if n.startswith(token)]
        dirpart, slash, prefix = token.rpartition("/")
        base = (dirpart or "/") if slash else "."
        try:
            nodes = self.shell.vfs.listing(base, show_hidden=prefix.startswith("."))
        except VfsError:
            return []
        out = []
        for node in nodes:
            if not node.name.startswith(prefix):
                continue
            tail = "/" if isinstance(node, DirNode) else " "
            out.append(f"{dirpart}{slash}{node.name}{tail}")
        return out

    def complete(self) -> None:
        """Tab: extend to the common prefix; show candidates when stuck."""
        prompt = self.query_one("#shell-prompt", Input)
        value = prompt.value[: prompt.cursor_position]
        head, sep, token = value.rpartition(" ")
        matches = self._completions(token, first_word=not sep)
        if not matches:
            return
        if len(matches) == 1:
            completed = matches[0]
        else:
            completed = _common_prefix([m.rstrip(" ") for m in matches])
            if completed == token:  # no progress: list the options, like bash
                names = []
                for m in matches:
                    is_dir = m.endswith("/")
                    stem = m.rstrip("/ ")
                    names.append(stem.rpartition("/")[2] + ("/" if is_dir else ""))
                self.query_one("#term", RichLog).write(
                    Text("  ".join(sorted(set(names))), style=pal.MUTED)
                )
                return
        prompt.value = head + sep + completed + prompt.value[prompt.cursor_position:]
        prompt.cursor_position = len(head + sep + completed)

    def _update_status(self) -> None:
        owned = len(self.shell.available())
        sold = len(self.shell.revoked)
        status = Text(
            f"{self.shell.vfs.cwd_path}   commands {owned}"
            + (f" (sold {sold})" if sold else "")
        )
        if self.track is not None:
            status.append("   ")
            status.append_text(self.track)
        self.query_one("#shell-status", Static).update(status)

    def on_input_submitted(self, event: Input.Submitted) -> None:
        line = event.value.strip()
        event.input.value = ""
        self._last_input = time.monotonic()
        self._hist_pos = None
        self._draft = ""
        if line and (not self._history or self._history[-1] != line):
            self._history.append(line)
        log = self.query_one("#term", RichLog)
        log.write(Text(f"$ {line}", style="bold"))
        if not line:
            return

        cmd = line.split()[0]
        if cmd in ("fight", "exit", "continue"):
            self.dismiss(self.rewards)
            return
        if cmd == "deck":
            log.write("your deck: " + ", ".join(self.deck_names))
            return
        if cmd == "say":
            self._handle_say(line[len("say"):].strip() or "...")
            return

        result = self.shell.run(line)
        if result.err:
            log.write(Text(result.err, style=pal.DANGER))
        elif result.out:
            if "CORE DUMP" in result.out and not fxmod.reduced_motion():
                # Your own death record burns in line by line.
                self.run_worker(self._burn_in(result.out))
            else:
                log.write(result.out)
        if self.shell.last_deletions:
            log.write(
                Text(f"({self.shell.last_deletions} file(s) unlinked. they are not coming back.)",
                     style=f"{pal.MUTED} italic")
            )

        solved_now = False
        for puzzle in self.puzzles:
            reward = puzzle.poll(self.shell)
            if reward is not None:
                solved_now = True
                self.rewards.append(reward)
                log.write(Text(f'\n"{reward.line}"\n', style=f"italic {pal.TREASURE}"))

        # The Warden watched that. Puzzle lines already speak for it.
        if self.presence is not None and not solved_now:
            n = watcher.notice(self.shell, line, result)
            if n is not None:
                self.presence.submit(
                    n.moment, fallback=n.fallback, priority=n.priority, tags=set(n.tags)
                )
        self._update_status()