WillHbx's picture
Merge remote-tracking branch 'origin/main'
a750c39
Raw
History Blame Contribute Delete
20.7 kB
"""The Engine — the single façade the UI talks to.
engine = Engine()
view = engine.start(SetupForm(theme="fantasy_forest", tone="cozy"))
view = engine.play_turn("hello? who are you?")
`start` / `play_turn` return a `ViewState` (speaker, dialogue, emotion, backdrop_url,
present sprites, beat, ending). The engine owns the session `GameState` and orchestrates:
stt -> direct_turn -> apply_directives -> paint (cached) -> memory -> trace -> view.
It deliberately holds ONE game (single-session, per the hackathon scope). For per-user
sessions on a busy Space, key engines by session id instead.
"""
from __future__ import annotations
import base64
import json
import os
import sys
import tempfile
import time
from pathlib import Path
# ZeroGPU dispatches each @spaces.GPU call to a worker subprocess; in-memory state
# is invisible across workers. Every state mutation writes to this file so any
# worker can restore the current session on its next call -- so ALL workers must
# resolve it to the SAME path.
#
# On Linux (ZeroGPU + Modal) we keep the literal shared /tmp: every worker on the
# instance sees it, and we must NOT use tempfile.gettempdir() there because ZeroGPU
# may hand each worker an isolated $TMPDIR, which would silently break state sharing.
# Windows has no /tmp, so locally we fall back to the per-user temp dir (single
# process there, so a stable per-user path is fine). Override with VN_STATE_FILE.
if os.environ.get("VN_STATE_FILE"):
_STATE_FILE = Path(os.environ["VN_STATE_FILE"])
elif sys.platform == "win32":
_STATE_FILE = Path(tempfile.gettempdir()) / "vn_game_state.json"
else:
_STATE_FILE = Path("/tmp/vn_game_state.json")
def _save_state(
state: GameState,
pending_out: DirectorOutput | None = None,
intro: str = "",
notifications: list[str] | None = None,
) -> None:
data: dict = {"state": state.model_dump()}
if pending_out is not None:
data["pending_out"] = pending_out.model_dump()
data["intro"] = intro
data["notifications"] = notifications or []
# Atomic write: write to a sibling tmp file then rename so readers never see a partial file.
_tmp = _STATE_FILE.with_name(_STATE_FILE.name + ".tmp")
_tmp.write_text(json.dumps(data), encoding="utf-8")
_tmp.replace(_STATE_FILE)
def _load_state() -> tuple[GameState | None, DirectorOutput | None, str, list[str]]:
if not _STATE_FILE.exists():
return None, None, "", []
try:
d = json.loads(_STATE_FILE.read_text(encoding="utf-8"))
from .schemas import DirectorOutput, GameState # noqa: PLC0415
gs = GameState.model_validate(d["state"])
out = DirectorOutput.model_validate(d["pending_out"]) if "pending_out" in d else None
return gs, out, d.get("intro", ""), d.get("notifications", [])
except Exception:
return None, None, "", []
def session_info() -> dict:
"""Cheap peek at the persisted session (no model validation, no GPU needed)."""
if not _STATE_FILE.exists():
return {"exists": False}
try:
s = json.loads(_STATE_FILE.read_text(encoding="utf-8"))["state"]
return {
"exists": True,
"turn_index": s.get("turn_index", 0),
"place": s.get("scene", {}).get("place", ""),
"ended": s.get("beat") == "ended",
}
except Exception:
return {"exists": False}
from . import config, memory, orchestrator, state
from .llm import LLMBackend, get_llm
from .metrics import collector
from .painter import Painter, get_painter
from .schemas import DirectorOutput, GameState, NPCBond, SetupForm, SpritePresence, Turn, ViewState
from .stt import STTBackend, get_stt
from .trace import Tracer
from .tts import TTSBackend, get_tts
class Engine:
def __init__(self) -> None:
# NOTE
# ZeroGPU : The models need to be loadded during the first call to @spaces.GPU
# instead of at module load time.
self._llm: LLMBackend | None = None
self._painter_instance: Painter | None = None
self._stt: STTBackend | None = None
self._tts: TTSBackend | None = None
self.tracer = Tracer(config.TRACE_PATH)
self.state: GameState | None = None
self._pending_out: DirectorOutput | None = None
self._pending_intro: str = ""
self._pending_notifications: list[str] = []
@property
def llm(self) -> LLMBackend:
if self._llm is None:
self._llm = get_llm()
return self._llm
@property
def painter(self) -> Painter:
if self._painter_instance is None:
self._painter_instance = get_painter()
return self._painter_instance
@property
def stt(self) -> STTBackend:
if self._stt is None:
self._stt = get_stt()
return self._stt
@property
def tts(self) -> TTSBackend:
if self._tts is None:
self._tts = get_tts()
return self._tts
# -- lifecycle --
def start(self, setup: SetupForm) -> ViewState:
"""Full start (used by smoke test / MVP UI). Calls both phases."""
self.start_text(setup)
return self.start_images()
def start_text(self, setup: SetupForm) -> ViewState:
"""Phase 1 — LLM world-init only. Fast. Returns a text-only ViewState (no images/TTS).
Sets self.state and stores the opening DirectorOutput for start_images()."""
self.state, opening = orchestrator.init_world(self.llm, setup)
_tone_music: dict[str, str] = {
"romantic": "romantic",
"flirty": "romantic",
"dramatic": "dramatic",
"bittersweet": "sad",
"comedic": "joyful",
}
self.state.flags["current_music"] = _tone_music.get(setup.tone, "calm")
self.state.recent_turns.append(
Turn(
player="(arrives)",
speaker=opening.speaker,
dialogue=opening.dialogue,
emotion=opening.emotion,
)
)
state.apply_directives(self.state, opening)
state.save_memory(self.state)
self.tracer.log(event="start", setup=setup.model_dump(), opening=opening.model_dump())
intro = self.state.flags.get("situation_intro", "")
# Stash for start_images() — also written to disk so ZeroGPU workers can pick it up
self._pending_out: DirectorOutput | None = opening
self._pending_intro: str = intro
_save_state(self.state, opening, intro)
# Text-only ViewState so the frontend can show dialogue immediately
return self._text_view(opening, intro_text=intro)
def start_images(self) -> ViewState:
"""Phase 2 — paint backdrop + sprite, TTS. Call after start_text().
Returns the full ViewState."""
if self.state is None or self._pending_out is None:
self.state, self._pending_out, self._pending_intro, _ = _load_state()
assert self.state is not None and self._pending_out is not None, "call start_text() first"
return self._view(self._pending_out, intro_text=self._pending_intro)
def transcribe(self, audio_path: str) -> str:
return self.stt.transcribe(audio_path)
# -- save / load (file-based, works on HF Spaces) --
def save_data(self) -> str:
"""Serialise current GameState to a JSON string for download."""
assert self.state is not None, "call start() first"
return json.dumps(
{
"version": 1,
"saved_at": time.time(),
"turn_index": self.state.turn_index,
"place": self.state.scene.place,
"characters": [ch.name for ch in self.state.characters.values()],
"state": self.state.model_dump(),
},
ensure_ascii=False,
)
def resume(self) -> ViewState | None:
"""Restore the session persisted on disk (the file every turn already writes).
Returns None when there is nothing to resume."""
gs, _, _, _ = _load_state()
if gs is None:
return None
self.state = gs
last = gs.recent_turns[-1] if gs.recent_turns else None
fake_out = DirectorOutput(
speaker=last.speaker if last else "narrator",
dialogue=last.dialogue if last else "…",
emotion=last.emotion if last else "neutral",
)
view = self._view(fake_out)
# Rebuild the client-side journal: condensed past first, then the recent turns
# (speaker = display name so the frontend can show it directly).
history: list[Turn] = []
if gs.summary:
history.append(Turn(player="", speaker="The tale so far", dialogue=gs.summary))
for t in gs.recent_turns:
name = gs.characters[t.speaker].name if t.speaker in gs.characters else t.speaker
history.append(
Turn(player=t.player, speaker=name, dialogue=t.dialogue, emotion=t.emotion)
)
view.history = history
return view
def load_data(self, json_str: str) -> ViewState:
"""Restore GameState from a JSON string (uploaded save file)."""
payload = json.loads(json_str)
self.state = GameState.model_validate(payload["state"])
last = self.state.recent_turns[-1] if self.state.recent_turns else None
fake_out = DirectorOutput(
speaker=last.speaker if last else "narrator",
dialogue=last.dialogue if last else "…",
emotion=last.emotion if last else "neutral",
)
_save_state(self.state)
return self._view(fake_out)
def play_turn(
self,
player_input: str,
action: str = "talk",
target: str = "",
audio_path: str | None = None,
) -> ViewState:
"""Single-call turn (smoke test / MVP UI): text phase + image phase composed."""
turn_num = (self.state.turn_index + 1) if self.state else 1
with collector.measure("total_turn", turn=turn_num):
self.play_turn_text(player_input, action=action, target=target, audio_path=audio_path)
view = self.play_turn_images()
# The split path delivers notifications with the text phase; the composed
# path delivers everything at once.
view.notifications = self._pending_notifications
return view
def play_turn_text(
self,
player_input: str,
action: str = "talk",
target: str = "",
audio_path: str | None = None,
) -> ViewState:
"""Phase 1 — STT + LLM + state mutation. Fast. Returns a text-only ViewState so
the frontend shows dialogue before the (slower) images from play_turn_images()."""
if self.state is None:
self.state, _, _, _ = _load_state()
assert self.state is not None, "call start() first"
# turn_num matches what the tracer logs (post-increment value)
turn_num = self.state.turn_index + 1
if audio_path:
with collector.measure("stt", turn=turn_num):
player_input = self.stt.transcribe(audio_path) or player_input
with collector.measure("llm_direct", turn=turn_num):
out = orchestrator.direct_turn(
self.llm, self.state, player_input, action=action, target=target
)
with collector.measure("apply_directives", turn=turn_num):
effects = state.apply_directives(self.state, out)
# Collect human-readable notifications for newly unlocked traits/goals
notifications: list[str] = []
for e in effects:
if e.startswith("unlock_trait:"):
_, cid, trait = e.split(":", 2)
ch_name = self.state.characters[cid].name if cid in self.state.characters else cid
notifications.append(f"🔓 {ch_name} — New trait discovered: {trait}")
elif e.startswith("unlock_goal:"):
cid = e.split(":", 1)[1]
ch_name = self.state.characters[cid].name if cid in self.state.characters else cid
notifications.append(f"💫 {ch_name}'s secret goal revealed!")
elif e.startswith("milestone50:"):
cid = e.split(":", 1)[1]
ch_name = self.state.characters[cid].name if cid in self.state.characters else cid
notifications.append(f"💖 {ch_name} is growing close to you…")
self.state.recent_turns.append(
Turn(
player=player_input,
speaker=out.speaker,
dialogue=out.dialogue,
emotion=out.emotion,
)
)
self.state.turn_index += 1 # now == turn_num
with collector.measure("save_memory", turn=turn_num):
state.save_memory(self.state)
if memory.should_compact(self.state):
with collector.measure("compact_memory", turn=turn_num):
orchestrator.compact_memory(self.llm, self.state)
# Stash for play_turn_images() — also on disk so ZeroGPU workers can pick it up
self._pending_out = out
self._pending_intro = ""
self._pending_notifications = notifications
_save_state(self.state, out, "", notifications)
self.tracer.log(
event="turn",
turn=self.state.turn_index,
player=player_input,
output=out.model_dump(),
effects=effects,
)
return self._text_view(out, notifications=notifications)
def play_turn_images(self) -> ViewState:
"""Phase 2 — paint backdrop + sprites, TTS. Call after play_turn_text().
Notifications were already delivered with the text phase (no double toast)."""
if self.state is None or self._pending_out is None:
self.state, self._pending_out, self._pending_intro, self._pending_notifications = (
_load_state()
)
assert self.state is not None and self._pending_out is not None, (
"call play_turn_text() first"
)
return self._view(self._pending_out, turn=self.state.turn_index)
# -- rendering --
def _text_view(
self,
out: DirectorOutput,
notifications: list[str] | None = None,
intro_text: str = "",
) -> ViewState:
"""Text-only ViewState (no painter, no TTS): dialogue + known-characters journal.
`backdrop_url=None` / `present=[]` tell the frontend to keep its current visuals."""
s = self.state
assert s is not None
speaker_ch = s.characters.get(out.speaker)
speaker_name = speaker_ch.name if speaker_ch else "The wood"
known: list[SpritePresence] = []
for ch in s.characters.values():
disc = [ch.traits[i] for i in ch.discovered_traits if i < len(ch.traits)]
known.append(
SpritePresence(
id=ch.id,
name=ch.name,
mood=ch.mood,
sprite_url=None,
relationship=ch.relationship,
public_bio=ch.one_line,
discovered_traits=disc,
total_traits=len(ch.traits),
secret_goal=ch.goals if ch.goal_unlocked else None,
)
)
return ViewState(
speaker=speaker_name,
dialogue=out.dialogue,
emotion=out.emotion,
place=s.scene.place,
backdrop_url=None,
present=[],
known_characters=known,
beat=s.beat,
ended=s.beat == "ended",
ending_text=s.flags.get("ending_text"),
ending_kind=s.flags.get("ending_kind"),
turn_index=s.turn_index,
notifications=notifications or [],
intro_text=intro_text,
current_music=s.flags.get("current_music"),
npc_bonds=[],
)
def _view(
self,
out: DirectorOutput,
turn: int = -1,
notifications: list[str] | None = None,
intro_text: str = "",
) -> ViewState:
s = self.state
assert s is not None
with collector.measure("painter_backdrop", turn=turn):
if s.beat == "ended":
# Generate a dedicated ending illustration instead of the regular backdrop
ending_kind = s.flags.get("ending_kind", "warm")
backdrop = self.painter.ending_backdrop(s, ending_kind)
else:
backdrop = self.painter.backdrop(s) # cached -> cheap even when unchanged
present = []
for cid in s.scene.present:
ch = s.characters.get(cid)
if not ch:
continue
with collector.measure("painter_sprite", turn=turn):
sprite = self.painter.sprite(s, ch)
discovered = [ch.traits[i] for i in ch.discovered_traits if i < len(ch.traits)]
present.append(
SpritePresence(
id=ch.id,
name=ch.name,
mood=ch.mood,
sprite_url=_to_url(sprite),
relationship=ch.relationship,
public_bio=ch.one_line,
discovered_traits=discovered,
total_traits=len(ch.traits),
secret_goal=ch.goals if ch.goal_unlocked else None,
)
)
# All known characters (present + off-stage) for the relations journal.
# Off-stage entries reuse whatever sprite was last painted (looked up by cache filename).
known: list[SpritePresence] = []
for cid, ch in s.characters.items():
disc = [ch.traits[i] for i in ch.discovered_traits if i < len(ch.traits)]
# Reuse sprite_url from present list if available, otherwise None
on_stage = next((sp for sp in present if sp.id == cid), None)
known.append(
SpritePresence(
id=ch.id,
name=ch.name,
mood=ch.mood,
sprite_url=on_stage.sprite_url if on_stage else None,
relationship=ch.relationship,
public_bio=ch.one_line,
discovered_traits=disc,
total_traits=len(ch.traits),
secret_goal=ch.goals if ch.goal_unlocked else None,
)
)
# Collect all directed NPC↔NPC bonds for the frontend graph
npc_bonds: list[NPCBond] = []
for cid, ch in s.characters.items():
for other_id, val in ch.npc_relations.items():
if other_id in s.characters:
other = s.characters[other_id]
npc_bonds.append(
NPCBond(
source_id=cid,
source_name=ch.name,
target_id=other_id,
target_name=other.name,
value=val,
note=ch.npc_relation_notes.get(other_id, ""),
)
)
speaker_ch = s.characters.get(out.speaker)
speaker_name = speaker_ch.name if speaker_ch else "The wood"
audio_b64: str | None = None
if speaker_ch and speaker_ch.tts_voice_description:
with collector.measure("tts", turn=turn):
wav = self.tts.synthesize(
out.dialogue, speaker_ch.tts_voice_description, speaker_ch.sprite_seed
)
if wav:
audio_b64 = "data:audio/wav;base64," + base64.b64encode(wav).decode()
return ViewState(
speaker=speaker_name,
dialogue=out.dialogue,
emotion=out.emotion,
place=s.scene.place,
backdrop_url=_to_url(backdrop),
present=present,
known_characters=known,
beat=s.beat,
ended=s.beat == "ended",
ending_text=s.flags.get("ending_text"),
ending_kind=s.flags.get("ending_kind"),
turn_index=s.turn_index,
notifications=notifications or [],
intro_text=intro_text,
audio_b64=audio_b64,
current_music=s.flags.get("current_music"),
npc_bonds=npc_bonds,
)
def _to_url(p: Path | None) -> str | None:
# served by app.py via StaticFiles mounted at /images
return f"/images/{p.name}" if p else None