Spaces:
Running on Zero
Running on Zero
| """The Engine — the single façade the UI talks to. | |
| engine = Engine() | |
| view = engine.start(SetupForm(theme="fantasy_forest", tone="cozy")) | |
| view = engine.play_turn("hello? who are you?") | |
| `start` / `play_turn` return a `ViewState` (speaker, dialogue, emotion, backdrop_url, | |
| present sprites, beat, ending). The engine owns the session `GameState` and orchestrates: | |
| stt -> direct_turn -> apply_directives -> paint (cached) -> memory -> trace -> view. | |
| It deliberately holds ONE game (single-session, per the hackathon scope). For per-user | |
| sessions on a busy Space, key engines by session id instead. | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import os | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| # ZeroGPU dispatches each @spaces.GPU call to a worker subprocess; in-memory state | |
| # is invisible across workers. Every state mutation writes to this file so any | |
| # worker can restore the current session on its next call -- so ALL workers must | |
| # resolve it to the SAME path. | |
| # | |
| # On Linux (ZeroGPU + Modal) we keep the literal shared /tmp: every worker on the | |
| # instance sees it, and we must NOT use tempfile.gettempdir() there because ZeroGPU | |
| # may hand each worker an isolated $TMPDIR, which would silently break state sharing. | |
| # Windows has no /tmp, so locally we fall back to the per-user temp dir (single | |
| # process there, so a stable per-user path is fine). Override with VN_STATE_FILE. | |
| if os.environ.get("VN_STATE_FILE"): | |
| _STATE_FILE = Path(os.environ["VN_STATE_FILE"]) | |
| elif sys.platform == "win32": | |
| _STATE_FILE = Path(tempfile.gettempdir()) / "vn_game_state.json" | |
| else: | |
| _STATE_FILE = Path("/tmp/vn_game_state.json") | |
| def _save_state( | |
| state: GameState, | |
| pending_out: DirectorOutput | None = None, | |
| intro: str = "", | |
| notifications: list[str] | None = None, | |
| ) -> None: | |
| data: dict = {"state": state.model_dump()} | |
| if pending_out is not None: | |
| data["pending_out"] = pending_out.model_dump() | |
| data["intro"] = intro | |
| data["notifications"] = notifications or [] | |
| # Atomic write: write to a sibling tmp file then rename so readers never see a partial file. | |
| _tmp = _STATE_FILE.with_name(_STATE_FILE.name + ".tmp") | |
| _tmp.write_text(json.dumps(data), encoding="utf-8") | |
| _tmp.replace(_STATE_FILE) | |
| def _load_state() -> tuple[GameState | None, DirectorOutput | None, str, list[str]]: | |
| if not _STATE_FILE.exists(): | |
| return None, None, "", [] | |
| try: | |
| d = json.loads(_STATE_FILE.read_text(encoding="utf-8")) | |
| from .schemas import DirectorOutput, GameState # noqa: PLC0415 | |
| gs = GameState.model_validate(d["state"]) | |
| out = DirectorOutput.model_validate(d["pending_out"]) if "pending_out" in d else None | |
| return gs, out, d.get("intro", ""), d.get("notifications", []) | |
| except Exception: | |
| return None, None, "", [] | |
| def session_info() -> dict: | |
| """Cheap peek at the persisted session (no model validation, no GPU needed).""" | |
| if not _STATE_FILE.exists(): | |
| return {"exists": False} | |
| try: | |
| s = json.loads(_STATE_FILE.read_text(encoding="utf-8"))["state"] | |
| return { | |
| "exists": True, | |
| "turn_index": s.get("turn_index", 0), | |
| "place": s.get("scene", {}).get("place", ""), | |
| "ended": s.get("beat") == "ended", | |
| } | |
| except Exception: | |
| return {"exists": False} | |
| from . import config, memory, orchestrator, state | |
| from .llm import LLMBackend, get_llm | |
| from .metrics import collector | |
| from .painter import Painter, get_painter | |
| from .schemas import DirectorOutput, GameState, NPCBond, SetupForm, SpritePresence, Turn, ViewState | |
| from .stt import STTBackend, get_stt | |
| from .trace import Tracer | |
| from .tts import TTSBackend, get_tts | |
| class Engine: | |
| def __init__(self) -> None: | |
| # NOTE | |
| # ZeroGPU : The models need to be loadded during the first call to @spaces.GPU | |
| # instead of at module load time. | |
| self._llm: LLMBackend | None = None | |
| self._painter_instance: Painter | None = None | |
| self._stt: STTBackend | None = None | |
| self._tts: TTSBackend | None = None | |
| self.tracer = Tracer(config.TRACE_PATH) | |
| self.state: GameState | None = None | |
| self._pending_out: DirectorOutput | None = None | |
| self._pending_intro: str = "" | |
| self._pending_notifications: list[str] = [] | |
| def llm(self) -> LLMBackend: | |
| if self._llm is None: | |
| self._llm = get_llm() | |
| return self._llm | |
| def painter(self) -> Painter: | |
| if self._painter_instance is None: | |
| self._painter_instance = get_painter() | |
| return self._painter_instance | |
| def stt(self) -> STTBackend: | |
| if self._stt is None: | |
| self._stt = get_stt() | |
| return self._stt | |
| def tts(self) -> TTSBackend: | |
| if self._tts is None: | |
| self._tts = get_tts() | |
| return self._tts | |
| # -- lifecycle -- | |
| def start(self, setup: SetupForm) -> ViewState: | |
| """Full start (used by smoke test / MVP UI). Calls both phases.""" | |
| self.start_text(setup) | |
| return self.start_images() | |
| def start_text(self, setup: SetupForm) -> ViewState: | |
| """Phase 1 — LLM world-init only. Fast. Returns a text-only ViewState (no images/TTS). | |
| Sets self.state and stores the opening DirectorOutput for start_images().""" | |
| self.state, opening = orchestrator.init_world(self.llm, setup) | |
| _tone_music: dict[str, str] = { | |
| "romantic": "romantic", | |
| "flirty": "romantic", | |
| "dramatic": "dramatic", | |
| "bittersweet": "sad", | |
| "comedic": "joyful", | |
| } | |
| self.state.flags["current_music"] = _tone_music.get(setup.tone, "calm") | |
| self.state.recent_turns.append( | |
| Turn( | |
| player="(arrives)", | |
| speaker=opening.speaker, | |
| dialogue=opening.dialogue, | |
| emotion=opening.emotion, | |
| ) | |
| ) | |
| state.apply_directives(self.state, opening) | |
| state.save_memory(self.state) | |
| self.tracer.log(event="start", setup=setup.model_dump(), opening=opening.model_dump()) | |
| intro = self.state.flags.get("situation_intro", "") | |
| # Stash for start_images() — also written to disk so ZeroGPU workers can pick it up | |
| self._pending_out: DirectorOutput | None = opening | |
| self._pending_intro: str = intro | |
| _save_state(self.state, opening, intro) | |
| # Text-only ViewState so the frontend can show dialogue immediately | |
| return self._text_view(opening, intro_text=intro) | |
| def start_images(self) -> ViewState: | |
| """Phase 2 — paint backdrop + sprite, TTS. Call after start_text(). | |
| Returns the full ViewState.""" | |
| if self.state is None or self._pending_out is None: | |
| self.state, self._pending_out, self._pending_intro, _ = _load_state() | |
| assert self.state is not None and self._pending_out is not None, "call start_text() first" | |
| return self._view(self._pending_out, intro_text=self._pending_intro) | |
| def transcribe(self, audio_path: str) -> str: | |
| return self.stt.transcribe(audio_path) | |
| # -- save / load (file-based, works on HF Spaces) -- | |
| def save_data(self) -> str: | |
| """Serialise current GameState to a JSON string for download.""" | |
| assert self.state is not None, "call start() first" | |
| return json.dumps( | |
| { | |
| "version": 1, | |
| "saved_at": time.time(), | |
| "turn_index": self.state.turn_index, | |
| "place": self.state.scene.place, | |
| "characters": [ch.name for ch in self.state.characters.values()], | |
| "state": self.state.model_dump(), | |
| }, | |
| ensure_ascii=False, | |
| ) | |
| def resume(self) -> ViewState | None: | |
| """Restore the session persisted on disk (the file every turn already writes). | |
| Returns None when there is nothing to resume.""" | |
| gs, _, _, _ = _load_state() | |
| if gs is None: | |
| return None | |
| self.state = gs | |
| last = gs.recent_turns[-1] if gs.recent_turns else None | |
| fake_out = DirectorOutput( | |
| speaker=last.speaker if last else "narrator", | |
| dialogue=last.dialogue if last else "…", | |
| emotion=last.emotion if last else "neutral", | |
| ) | |
| view = self._view(fake_out) | |
| # Rebuild the client-side journal: condensed past first, then the recent turns | |
| # (speaker = display name so the frontend can show it directly). | |
| history: list[Turn] = [] | |
| if gs.summary: | |
| history.append(Turn(player="", speaker="The tale so far", dialogue=gs.summary)) | |
| for t in gs.recent_turns: | |
| name = gs.characters[t.speaker].name if t.speaker in gs.characters else t.speaker | |
| history.append( | |
| Turn(player=t.player, speaker=name, dialogue=t.dialogue, emotion=t.emotion) | |
| ) | |
| view.history = history | |
| return view | |
| def load_data(self, json_str: str) -> ViewState: | |
| """Restore GameState from a JSON string (uploaded save file).""" | |
| payload = json.loads(json_str) | |
| self.state = GameState.model_validate(payload["state"]) | |
| last = self.state.recent_turns[-1] if self.state.recent_turns else None | |
| fake_out = DirectorOutput( | |
| speaker=last.speaker if last else "narrator", | |
| dialogue=last.dialogue if last else "…", | |
| emotion=last.emotion if last else "neutral", | |
| ) | |
| _save_state(self.state) | |
| return self._view(fake_out) | |
| def play_turn( | |
| self, | |
| player_input: str, | |
| action: str = "talk", | |
| target: str = "", | |
| audio_path: str | None = None, | |
| ) -> ViewState: | |
| """Single-call turn (smoke test / MVP UI): text phase + image phase composed.""" | |
| turn_num = (self.state.turn_index + 1) if self.state else 1 | |
| with collector.measure("total_turn", turn=turn_num): | |
| self.play_turn_text(player_input, action=action, target=target, audio_path=audio_path) | |
| view = self.play_turn_images() | |
| # The split path delivers notifications with the text phase; the composed | |
| # path delivers everything at once. | |
| view.notifications = self._pending_notifications | |
| return view | |
| def play_turn_text( | |
| self, | |
| player_input: str, | |
| action: str = "talk", | |
| target: str = "", | |
| audio_path: str | None = None, | |
| ) -> ViewState: | |
| """Phase 1 — STT + LLM + state mutation. Fast. Returns a text-only ViewState so | |
| the frontend shows dialogue before the (slower) images from play_turn_images().""" | |
| if self.state is None: | |
| self.state, _, _, _ = _load_state() | |
| assert self.state is not None, "call start() first" | |
| # turn_num matches what the tracer logs (post-increment value) | |
| turn_num = self.state.turn_index + 1 | |
| if audio_path: | |
| with collector.measure("stt", turn=turn_num): | |
| player_input = self.stt.transcribe(audio_path) or player_input | |
| with collector.measure("llm_direct", turn=turn_num): | |
| out = orchestrator.direct_turn( | |
| self.llm, self.state, player_input, action=action, target=target | |
| ) | |
| with collector.measure("apply_directives", turn=turn_num): | |
| effects = state.apply_directives(self.state, out) | |
| # Collect human-readable notifications for newly unlocked traits/goals | |
| notifications: list[str] = [] | |
| for e in effects: | |
| if e.startswith("unlock_trait:"): | |
| _, cid, trait = e.split(":", 2) | |
| ch_name = self.state.characters[cid].name if cid in self.state.characters else cid | |
| notifications.append(f"🔓 {ch_name} — New trait discovered: {trait}") | |
| elif e.startswith("unlock_goal:"): | |
| cid = e.split(":", 1)[1] | |
| ch_name = self.state.characters[cid].name if cid in self.state.characters else cid | |
| notifications.append(f"💫 {ch_name}'s secret goal revealed!") | |
| elif e.startswith("milestone50:"): | |
| cid = e.split(":", 1)[1] | |
| ch_name = self.state.characters[cid].name if cid in self.state.characters else cid | |
| notifications.append(f"💖 {ch_name} is growing close to you…") | |
| self.state.recent_turns.append( | |
| Turn( | |
| player=player_input, | |
| speaker=out.speaker, | |
| dialogue=out.dialogue, | |
| emotion=out.emotion, | |
| ) | |
| ) | |
| self.state.turn_index += 1 # now == turn_num | |
| with collector.measure("save_memory", turn=turn_num): | |
| state.save_memory(self.state) | |
| if memory.should_compact(self.state): | |
| with collector.measure("compact_memory", turn=turn_num): | |
| orchestrator.compact_memory(self.llm, self.state) | |
| # Stash for play_turn_images() — also on disk so ZeroGPU workers can pick it up | |
| self._pending_out = out | |
| self._pending_intro = "" | |
| self._pending_notifications = notifications | |
| _save_state(self.state, out, "", notifications) | |
| self.tracer.log( | |
| event="turn", | |
| turn=self.state.turn_index, | |
| player=player_input, | |
| output=out.model_dump(), | |
| effects=effects, | |
| ) | |
| return self._text_view(out, notifications=notifications) | |
| def play_turn_images(self) -> ViewState: | |
| """Phase 2 — paint backdrop + sprites, TTS. Call after play_turn_text(). | |
| Notifications were already delivered with the text phase (no double toast).""" | |
| if self.state is None or self._pending_out is None: | |
| self.state, self._pending_out, self._pending_intro, self._pending_notifications = ( | |
| _load_state() | |
| ) | |
| assert self.state is not None and self._pending_out is not None, ( | |
| "call play_turn_text() first" | |
| ) | |
| return self._view(self._pending_out, turn=self.state.turn_index) | |
| # -- rendering -- | |
| def _text_view( | |
| self, | |
| out: DirectorOutput, | |
| notifications: list[str] | None = None, | |
| intro_text: str = "", | |
| ) -> ViewState: | |
| """Text-only ViewState (no painter, no TTS): dialogue + known-characters journal. | |
| `backdrop_url=None` / `present=[]` tell the frontend to keep its current visuals.""" | |
| s = self.state | |
| assert s is not None | |
| speaker_ch = s.characters.get(out.speaker) | |
| speaker_name = speaker_ch.name if speaker_ch else "The wood" | |
| known: list[SpritePresence] = [] | |
| for ch in s.characters.values(): | |
| disc = [ch.traits[i] for i in ch.discovered_traits if i < len(ch.traits)] | |
| known.append( | |
| SpritePresence( | |
| id=ch.id, | |
| name=ch.name, | |
| mood=ch.mood, | |
| sprite_url=None, | |
| relationship=ch.relationship, | |
| public_bio=ch.one_line, | |
| discovered_traits=disc, | |
| total_traits=len(ch.traits), | |
| secret_goal=ch.goals if ch.goal_unlocked else None, | |
| ) | |
| ) | |
| return ViewState( | |
| speaker=speaker_name, | |
| dialogue=out.dialogue, | |
| emotion=out.emotion, | |
| place=s.scene.place, | |
| backdrop_url=None, | |
| present=[], | |
| known_characters=known, | |
| beat=s.beat, | |
| ended=s.beat == "ended", | |
| ending_text=s.flags.get("ending_text"), | |
| ending_kind=s.flags.get("ending_kind"), | |
| turn_index=s.turn_index, | |
| notifications=notifications or [], | |
| intro_text=intro_text, | |
| current_music=s.flags.get("current_music"), | |
| npc_bonds=[], | |
| ) | |
| def _view( | |
| self, | |
| out: DirectorOutput, | |
| turn: int = -1, | |
| notifications: list[str] | None = None, | |
| intro_text: str = "", | |
| ) -> ViewState: | |
| s = self.state | |
| assert s is not None | |
| with collector.measure("painter_backdrop", turn=turn): | |
| if s.beat == "ended": | |
| # Generate a dedicated ending illustration instead of the regular backdrop | |
| ending_kind = s.flags.get("ending_kind", "warm") | |
| backdrop = self.painter.ending_backdrop(s, ending_kind) | |
| else: | |
| backdrop = self.painter.backdrop(s) # cached -> cheap even when unchanged | |
| present = [] | |
| for cid in s.scene.present: | |
| ch = s.characters.get(cid) | |
| if not ch: | |
| continue | |
| with collector.measure("painter_sprite", turn=turn): | |
| sprite = self.painter.sprite(s, ch) | |
| discovered = [ch.traits[i] for i in ch.discovered_traits if i < len(ch.traits)] | |
| present.append( | |
| SpritePresence( | |
| id=ch.id, | |
| name=ch.name, | |
| mood=ch.mood, | |
| sprite_url=_to_url(sprite), | |
| relationship=ch.relationship, | |
| public_bio=ch.one_line, | |
| discovered_traits=discovered, | |
| total_traits=len(ch.traits), | |
| secret_goal=ch.goals if ch.goal_unlocked else None, | |
| ) | |
| ) | |
| # All known characters (present + off-stage) for the relations journal. | |
| # Off-stage entries reuse whatever sprite was last painted (looked up by cache filename). | |
| known: list[SpritePresence] = [] | |
| for cid, ch in s.characters.items(): | |
| disc = [ch.traits[i] for i in ch.discovered_traits if i < len(ch.traits)] | |
| # Reuse sprite_url from present list if available, otherwise None | |
| on_stage = next((sp for sp in present if sp.id == cid), None) | |
| known.append( | |
| SpritePresence( | |
| id=ch.id, | |
| name=ch.name, | |
| mood=ch.mood, | |
| sprite_url=on_stage.sprite_url if on_stage else None, | |
| relationship=ch.relationship, | |
| public_bio=ch.one_line, | |
| discovered_traits=disc, | |
| total_traits=len(ch.traits), | |
| secret_goal=ch.goals if ch.goal_unlocked else None, | |
| ) | |
| ) | |
| # Collect all directed NPC↔NPC bonds for the frontend graph | |
| npc_bonds: list[NPCBond] = [] | |
| for cid, ch in s.characters.items(): | |
| for other_id, val in ch.npc_relations.items(): | |
| if other_id in s.characters: | |
| other = s.characters[other_id] | |
| npc_bonds.append( | |
| NPCBond( | |
| source_id=cid, | |
| source_name=ch.name, | |
| target_id=other_id, | |
| target_name=other.name, | |
| value=val, | |
| note=ch.npc_relation_notes.get(other_id, ""), | |
| ) | |
| ) | |
| speaker_ch = s.characters.get(out.speaker) | |
| speaker_name = speaker_ch.name if speaker_ch else "The wood" | |
| audio_b64: str | None = None | |
| if speaker_ch and speaker_ch.tts_voice_description: | |
| with collector.measure("tts", turn=turn): | |
| wav = self.tts.synthesize( | |
| out.dialogue, speaker_ch.tts_voice_description, speaker_ch.sprite_seed | |
| ) | |
| if wav: | |
| audio_b64 = "data:audio/wav;base64," + base64.b64encode(wav).decode() | |
| return ViewState( | |
| speaker=speaker_name, | |
| dialogue=out.dialogue, | |
| emotion=out.emotion, | |
| place=s.scene.place, | |
| backdrop_url=_to_url(backdrop), | |
| present=present, | |
| known_characters=known, | |
| beat=s.beat, | |
| ended=s.beat == "ended", | |
| ending_text=s.flags.get("ending_text"), | |
| ending_kind=s.flags.get("ending_kind"), | |
| turn_index=s.turn_index, | |
| notifications=notifications or [], | |
| intro_text=intro_text, | |
| audio_b64=audio_b64, | |
| current_music=s.flags.get("current_music"), | |
| npc_bonds=npc_bonds, | |
| ) | |
| def _to_url(p: Path | None) -> str | None: | |
| # served by app.py via StaticFiles mounted at /images | |
| return f"/images/{p.name}" if p else None | |