"""GameController - per-session glue between the engine and the Gradio UI. Holds the Session plus the visual/TTS providers and the audio data URIs. One instance lives in gr.State per browser session (single-player). """ from __future__ import annotations import base64 import queue import random import threading import time from pathlib import Path from ..audio import SFX_EVENTS, generate_placeholder_pack from ..audio.manifest import MUSIC_DIR, SFX_DIR from ..config import Settings, effective_cpus, get_settings from ..engine import Session from ..llm import LLMError, make_backend from ..persistence import load_seed_case from ..schemas.accusation import Accusation from ..schemas.suspect import VoiceAssignment from ..tts import assign_voice, make_tts_provider from ..visuals import make_visual_provider _TTS_DIR = Path(".cache") / "tts" def _data_uri(path: Path, mime: str) -> str: return f"data:{mime};base64,{base64.b64encode(path.read_bytes()).decode('ascii')}" # Imported background tracks (mp3/ogg) take priority over the procedural .wav, so a # real royalty-free track can be dropped into assets/ui/music/ to replace it. _MUSIC_CANDIDATES: tuple[tuple[str, str], ...] = ( ("ambient_theme.mp3", "audio/mpeg"), ("ambient_theme.ogg", "audio/ogg"), ("ambient_theme.wav", "audio/wav"), ) def _music_data_uri() -> str: for name, mime in _MUSIC_CANDIDATES: path = MUSIC_DIR / name if path.exists(): return _data_uri(path, mime) return "" def _load_audio() -> tuple[dict[str, str], str]: """Return (sfx event -> data uri, music data uri). Generates the procedural SFX on first run if missing; the background track prefers an imported mp3/ogg.""" if not (SFX_DIR / "click.wav").exists(): from ..audio import generate_sfx generate_sfx() if not _music_data_uri(): generate_placeholder_pack() # falls back to the generated loop if none imported sfx = {ev: _data_uri(SFX_DIR / fn, "audio/wav") for ev, fn in SFX_EVENTS.items() if (SFX_DIR / fn).exists()} return sfx, _music_data_uri() # The LLM is loaded once and shared by every session, so the model is never loaded # more than once per process. _SHARED_BACKEND: object | None = None _BACKEND_LOCK = threading.Lock() _SHARED_TTS: object | None = None _TTS_LOCK = threading.Lock() def _obtain_shared_tts(settings: Settings): global _SHARED_TTS with _TTS_LOCK: # load the TTS models once, not per session if _SHARED_TTS is None: _SHARED_TTS = make_tts_provider(settings) return _SHARED_TTS def _obtain_shared_backend(settings: Settings): global _SHARED_BACKEND with _BACKEND_LOCK: # never load the model more than once, even under a race if _SHARED_BACKEND is None: _SHARED_BACKEND = make_backend(settings) return _SHARED_BACKEND # A SECOND model instance dedicated to background case generation, so generating the next # case never blocks live interrogation (which uses the primary backend). RAM is ample # (two ~1GB Q4 instances); llama.cpp is safe to run as independent instances concurrently. _GEN_BACKEND: object | None = None _GEN_BACKEND_LOCK = threading.Lock() _LAST_INTERACTION = [0.0] # monotonic ts of the last interrogation; the worker yields to it _GEN_YIELD_SECS = 5.0 def note_interaction() -> None: """Mark that the player just used the LLM, so the background generator backs off and leaves the CPU to interrogation.""" _LAST_INTERACTION[0] = time.monotonic() def _obtain_gen_backend(settings: Settings): global _GEN_BACKEND with _GEN_BACKEND_LOCK: if _GEN_BACKEND is None: _GEN_BACKEND = make_backend(settings) return _GEN_BACKEND # Continuous background refill is only safe when there are spare cores. On a 2-vCPU CPU # Space, a case generation (two long LLM calls) running alongside an interrogation starves # the reply - so on low-core hosts we prebuild exactly ONE case for an instant first load, # then stop, and let New Case generate on demand behind the loading screen instead. # (effective_cpus reads the real cgroup quota, so a container that lies about its core # count via os.cpu_count() doesn't trick us into refilling on only 2 vCPUs.) _CONTINUOUS_GEN = effective_cpus() > 4 class _CaseBuffer: """Keeps a freshly-generated case ready so the first New Case is instant. Every case is live and unique. On a many-core machine it refills continuously (always instant); on a low-core Space it prebuilds one case and then stops so it never competes with a reply.""" def __init__(self) -> None: self._q: queue.Queue = queue.Queue(maxsize=1) self._started = False self._lock = threading.Lock() self._seed = random.randint(1, 1_000_000_000) self._continuous = _CONTINUOUS_GEN def start(self, settings: Settings) -> None: with self._lock: if self._started: return self._started = True threading.Thread(target=self._run, args=(settings,), daemon=True).start() def _run(self, settings: Settings) -> None: from ..generator import generate_case from ..schemas.case import GenerationKnobs backend = _obtain_gen_backend(settings) while True: if self._q.full(): if not self._continuous: return # low-core: one case prebuilt, never refill during play time.sleep(0.5) continue # Yield to the player: only generate when interrogation has paused a moment. while time.monotonic() - _LAST_INTERACTION[0] < _GEN_YIELD_SECS: time.sleep(0.5) try: self._seed = (self._seed + 7919) % 1_000_000_000 result = generate_case(backend, seed=self._seed, knobs=GenerationKnobs(n_suspects=3)) if result.report.ok: self._q.put(result.case) else: time.sleep(1.0) except Exception: time.sleep(2.0) def take(self, timeout: float = 0.0): try: return self._q.get(timeout=timeout) if timeout > 0 else self._q.get_nowait() except queue.Empty: return None _CASE_BUFFER = _CaseBuffer() def start_case_buffer(settings: Settings) -> None: """Begin generating cases ahead of demand (call at app startup so the first case is already being built before the player connects).""" _CASE_BUFFER.start(settings) def take_ready_case(timeout: float = 0.0): """Return a pre-generated case if one is ready (optionally waiting up to ``timeout``).""" return _CASE_BUFFER.take(timeout) class GameController: def __init__(self, settings: Settings | None = None) -> None: self.settings = settings or get_settings() self.backend = _obtain_shared_backend(self.settings) self.visuals = make_visual_provider(self.settings) self.tts = _obtain_shared_tts(self.settings) self.session: Session | None = None self.current_sus: str | None = None self.current_room: str | None = None self._voices: dict[str, VoiceAssignment] = {} self._tts_counter = 0 self._img_cache: dict[str, str] = {} # -- case lifecycle ------------------------------------------------------- def _begin(self, case) -> None: # type: ignore[no-untyped-def] self.session = Session(case, self.backend) self.current_sus = case.suspects[0].sus_id # The stage backdrop starts on the crime scene and changes as rooms are searched. self.current_room = case.victim.found_at_loc_id self._voices = {s.sus_id: assign_voice(s) for s in case.suspects} def start(self, source: str = "tutorial", seed: int | None = None) -> None: self._begin(self._load_case(source, seed)) def start_buffered(self, wait_secs: float = 0.0) -> bool: """Begin a pre-generated case from the background buffer if one is ready (waiting up to ``wait_secs``). Returns True if a buffered case was used, False if the caller should fall back to live generation behind the overlay.""" case = take_ready_case(timeout=wait_secs) if case is None: return False self._begin(case) return True def _load_case(self, source: str, seed: int | None): if source == "generate": try: from ..generator import generate_case from ..schemas.case import GenerationKnobs result = generate_case(self.backend, seed=seed if seed is not None else 0, knobs=GenerationKnobs(n_suspects=3)) if result.report.ok: return result.case # A model that can't produce a solvable case must not strand the player. except LLMError: pass # fall back to the tutorial when no model is available return load_seed_case("tutorial") @property def case(self): assert self.session is not None return self.session.case # -- images as data URIs (no file serving; fully offline) ----------------- def _uri(self, path) -> str: key = str(path) if key not in self._img_cache: self._img_cache[key] = ( f"data:image/png;base64,{base64.b64encode(Path(path).read_bytes()).decode('ascii')}" ) return self._img_cache[key] def _portrait_key(self, s) -> str: # type: ignore[no-untyped-def] # ONE key for both the roster thumbnail and the interrogation sprite, so a # suspect looks identical in the menu and on the stage. return f"{self.case.case_id}:{s.name}" def portrait_uri(self, sus_id: str) -> str: s = self.case.suspect(sus_id) return self._uri(self.visuals.portrait_path(s.visual, self._portrait_key(s))) def portrait_sheet_uri(self, sus_id: str) -> str: s = self.case.suspect(sus_id) return self._uri(self.visuals.portrait_sheet_path(s.visual, self._portrait_key(s))) def scene_uri(self, loc_id: str) -> str: loc = next((x for x in self.case.setting.locations if x.loc_id == loc_id), self.case.setting.locations[0]) return self._uri(self.visuals.scene_path(loc.name, f"{self.case.case_id}-{loc.loc_id}")) def interrogation_uri(self) -> str: return self._uri(self.visuals.scene_path("Interrogation Room", f"{self.case.case_id}-interro")) def prop_uri(self, clue) -> str: return self._uri(self.visuals.prop_path(clue.name, f"{self.case.case_id}-{clue.clue_id}")) def loc_name_for_id(self, loc_id: str) -> str: return next((x.name for x in self.case.setting.locations if x.loc_id == loc_id), loc_id) # -- selectors ------------------------------------------------------------ def roster(self) -> list[tuple[str, str]]: # Same key as the interrogation sprite so the thumbnail matches the stage exactly. return [(str(self.visuals.portrait_path(s.visual, self._portrait_key(s))), s.name) for s in self.case.suspects] def select_by_index(self, index: int) -> str: suspects = self.case.suspects if 0 <= index < len(suspects): self.current_sus = suspects[index].sus_id return self.current_name() def current_name(self) -> str: return self.case.suspect(self.current_sus).name if self.current_sus else "" def evidence_choices(self) -> list[str]: return [c.name for c in self.session.evidence()] if self.session else [] def clue_id_for_name(self, name: str | None) -> str | None: if not name or not self.session: return None for c in self.case.clues: if c.name == name: return c.clue_id return None def relevance_breaking(self, clue_id: str | None) -> bool: """True if presenting this clue to the current suspect is their breaking evidence - used only to pick the right tone for the (rare) confession-scrub fallback.""" if not clue_id or not self.session or not self.current_sus: return False from ..engine.relevance import assess_relevance from ..schemas.enums import Relevance result = assess_relevance(self.case, self.case.suspect(self.current_sus), clue_id) return result.relevance is Relevance.BREAKING def location_choices(self) -> list[str]: return [loc.name for loc in self.case.setting.locations] def loc_id_for_name(self, name: str) -> str | None: for loc in self.case.setting.locations: if loc.name == name: return loc.loc_id return None # -- actions -------------------------------------------------------------- def search(self, loc_name: str): loc_id = self.loc_id_for_name(loc_name) return self.session.search(loc_id) if (self.session and loc_id) else () def add_note(self, text: str) -> None: if self.session and text and text.strip(): self.session.add_note(text) def accuse(self, accused_name: str, weapon_ok: bool, motive_ok: bool, cited_names: list[str]): accused = next((s.sus_id for s in self.case.suspects if s.name == accused_name), None) cited = tuple(cid for cid in (self.clue_id_for_name(n) for n in cited_names) if cid) accusation = Accusation( # An unmatched name scores zero rather than silently accusing suspect #1. accused_sus_id=accused or "", weapon_id=self.case.weapon.weapon_id if weapon_ok else None, motive_id=self.case.culprit.true_motive.motive_id if motive_ok else None, cited_clue_ids=cited, ) return self.session.accuse(accusation) def speak(self, text: str) -> str: """Synthesize the line and return a base64 WAV data URI (or '' if unavailable), for reliable client-side playback. Temp files are cleaned up after encoding.""" if not getattr(self.tts, "available", False) or not self.current_sus: return "" self._tts_counter += 1 out = _TTS_DIR / f"line_{self._tts_counter}.wav" path = self.tts.synth_to_file(text, self._voices.get(self.current_sus), out) if not path: return "" try: uri = f"data:audio/wav;base64,{base64.b64encode(Path(path).read_bytes()).decode('ascii')}" finally: Path(path).unlink(missing_ok=True) return uri