Spaces:
Running
Running
Case Zero - initial public release (fully local: Qwen2.5-1.5B via llama.cpp + Supertonic, custom pixel-noir SPA via gradio.Server)
414dc55 | """GameController - per-session glue between the engine and the Gradio UI. | |
| Holds the Session plus the visual/TTS providers and the audio data URIs. One instance | |
| lives in gr.State per browser session (single-player). | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import queue | |
| import random | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from ..audio import SFX_EVENTS, generate_placeholder_pack | |
| from ..audio.manifest import MUSIC_DIR, SFX_DIR | |
| from ..config import Settings, effective_cpus, get_settings | |
| from ..engine import Session | |
| from ..llm import LLMError, make_backend | |
| from ..persistence import load_seed_case | |
| from ..schemas.accusation import Accusation | |
| from ..schemas.suspect import VoiceAssignment | |
| from ..tts import assign_voice, make_tts_provider | |
| from ..visuals import make_visual_provider | |
| _TTS_DIR = Path(".cache") / "tts" | |
| def _data_uri(path: Path, mime: str) -> str: | |
| return f"data:{mime};base64,{base64.b64encode(path.read_bytes()).decode('ascii')}" | |
| # Imported background tracks (mp3/ogg) take priority over the procedural .wav, so a | |
| # real royalty-free track can be dropped into assets/ui/music/ to replace it. | |
| _MUSIC_CANDIDATES: tuple[tuple[str, str], ...] = ( | |
| ("ambient_theme.mp3", "audio/mpeg"), | |
| ("ambient_theme.ogg", "audio/ogg"), | |
| ("ambient_theme.wav", "audio/wav"), | |
| ) | |
| def _music_data_uri() -> str: | |
| for name, mime in _MUSIC_CANDIDATES: | |
| path = MUSIC_DIR / name | |
| if path.exists(): | |
| return _data_uri(path, mime) | |
| return "" | |
| def _load_audio() -> tuple[dict[str, str], str]: | |
| """Return (sfx event -> data uri, music data uri). Generates the procedural SFX | |
| on first run if missing; the background track prefers an imported mp3/ogg.""" | |
| if not (SFX_DIR / "click.wav").exists(): | |
| from ..audio import generate_sfx | |
| generate_sfx() | |
| if not _music_data_uri(): | |
| generate_placeholder_pack() # falls back to the generated loop if none imported | |
| sfx = {ev: _data_uri(SFX_DIR / fn, "audio/wav") for ev, fn in SFX_EVENTS.items() | |
| if (SFX_DIR / fn).exists()} | |
| return sfx, _music_data_uri() | |
| # The LLM is loaded once and shared by every session, so the model is never loaded | |
| # more than once per process. | |
| _SHARED_BACKEND: object | None = None | |
| _BACKEND_LOCK = threading.Lock() | |
| _SHARED_TTS: object | None = None | |
| _TTS_LOCK = threading.Lock() | |
| def _obtain_shared_tts(settings: Settings): | |
| global _SHARED_TTS | |
| with _TTS_LOCK: # load the TTS models once, not per session | |
| if _SHARED_TTS is None: | |
| _SHARED_TTS = make_tts_provider(settings) | |
| return _SHARED_TTS | |
| def _obtain_shared_backend(settings: Settings): | |
| global _SHARED_BACKEND | |
| with _BACKEND_LOCK: # never load the model more than once, even under a race | |
| if _SHARED_BACKEND is None: | |
| _SHARED_BACKEND = make_backend(settings) | |
| return _SHARED_BACKEND | |
| # A SECOND model instance dedicated to background case generation, so generating the next | |
| # case never blocks live interrogation (which uses the primary backend). RAM is ample | |
| # (two ~1GB Q4 instances); llama.cpp is safe to run as independent instances concurrently. | |
| _GEN_BACKEND: object | None = None | |
| _GEN_BACKEND_LOCK = threading.Lock() | |
| _LAST_INTERACTION = [0.0] # monotonic ts of the last interrogation; the worker yields to it | |
| _GEN_YIELD_SECS = 5.0 | |
| def note_interaction() -> None: | |
| """Mark that the player just used the LLM, so the background generator backs off and | |
| leaves the CPU to interrogation.""" | |
| _LAST_INTERACTION[0] = time.monotonic() | |
| def _obtain_gen_backend(settings: Settings): | |
| global _GEN_BACKEND | |
| with _GEN_BACKEND_LOCK: | |
| if _GEN_BACKEND is None: | |
| _GEN_BACKEND = make_backend(settings) | |
| return _GEN_BACKEND | |
| # Continuous background refill is only safe when there are spare cores. On a 2-vCPU CPU | |
| # Space, a case generation (two long LLM calls) running alongside an interrogation starves | |
| # the reply - so on low-core hosts we prebuild exactly ONE case for an instant first load, | |
| # then stop, and let New Case generate on demand behind the loading screen instead. | |
| # (effective_cpus reads the real cgroup quota, so a container that lies about its core | |
| # count via os.cpu_count() doesn't trick us into refilling on only 2 vCPUs.) | |
| _CONTINUOUS_GEN = effective_cpus() > 4 | |
| class _CaseBuffer: | |
| """Keeps a freshly-generated case ready so the first New Case is instant. Every case is | |
| live and unique. On a many-core machine it refills continuously (always instant); on a | |
| low-core Space it prebuilds one case and then stops so it never competes with a reply.""" | |
| def __init__(self) -> None: | |
| self._q: queue.Queue = queue.Queue(maxsize=1) | |
| self._started = False | |
| self._lock = threading.Lock() | |
| self._seed = random.randint(1, 1_000_000_000) | |
| self._continuous = _CONTINUOUS_GEN | |
| def start(self, settings: Settings) -> None: | |
| with self._lock: | |
| if self._started: | |
| return | |
| self._started = True | |
| threading.Thread(target=self._run, args=(settings,), daemon=True).start() | |
| def _run(self, settings: Settings) -> None: | |
| from ..generator import generate_case | |
| from ..schemas.case import GenerationKnobs | |
| backend = _obtain_gen_backend(settings) | |
| while True: | |
| if self._q.full(): | |
| if not self._continuous: | |
| return # low-core: one case prebuilt, never refill during play | |
| time.sleep(0.5) | |
| continue | |
| # Yield to the player: only generate when interrogation has paused a moment. | |
| while time.monotonic() - _LAST_INTERACTION[0] < _GEN_YIELD_SECS: | |
| time.sleep(0.5) | |
| try: | |
| self._seed = (self._seed + 7919) % 1_000_000_000 | |
| result = generate_case(backend, seed=self._seed, | |
| knobs=GenerationKnobs(n_suspects=3)) | |
| if result.report.ok: | |
| self._q.put(result.case) | |
| else: | |
| time.sleep(1.0) | |
| except Exception: | |
| time.sleep(2.0) | |
| def take(self, timeout: float = 0.0): | |
| try: | |
| return self._q.get(timeout=timeout) if timeout > 0 else self._q.get_nowait() | |
| except queue.Empty: | |
| return None | |
| _CASE_BUFFER = _CaseBuffer() | |
| def start_case_buffer(settings: Settings) -> None: | |
| """Begin generating cases ahead of demand (call at app startup so the first case is | |
| already being built before the player connects).""" | |
| _CASE_BUFFER.start(settings) | |
| def take_ready_case(timeout: float = 0.0): | |
| """Return a pre-generated case if one is ready (optionally waiting up to ``timeout``).""" | |
| return _CASE_BUFFER.take(timeout) | |
| class GameController: | |
| def __init__(self, settings: Settings | None = None) -> None: | |
| self.settings = settings or get_settings() | |
| self.backend = _obtain_shared_backend(self.settings) | |
| self.visuals = make_visual_provider(self.settings) | |
| self.tts = _obtain_shared_tts(self.settings) | |
| self.session: Session | None = None | |
| self.current_sus: str | None = None | |
| self.current_room: str | None = None | |
| self._voices: dict[str, VoiceAssignment] = {} | |
| self._tts_counter = 0 | |
| self._img_cache: dict[str, str] = {} | |
| # -- case lifecycle ------------------------------------------------------- | |
| def _begin(self, case) -> None: # type: ignore[no-untyped-def] | |
| self.session = Session(case, self.backend) | |
| self.current_sus = case.suspects[0].sus_id | |
| # The stage backdrop starts on the crime scene and changes as rooms are searched. | |
| self.current_room = case.victim.found_at_loc_id | |
| self._voices = {s.sus_id: assign_voice(s) for s in case.suspects} | |
| def start(self, source: str = "tutorial", seed: int | None = None) -> None: | |
| self._begin(self._load_case(source, seed)) | |
| def start_buffered(self, wait_secs: float = 0.0) -> bool: | |
| """Begin a pre-generated case from the background buffer if one is ready (waiting up | |
| to ``wait_secs``). Returns True if a buffered case was used, False if the caller | |
| should fall back to live generation behind the overlay.""" | |
| case = take_ready_case(timeout=wait_secs) | |
| if case is None: | |
| return False | |
| self._begin(case) | |
| return True | |
| def _load_case(self, source: str, seed: int | None): | |
| if source == "generate": | |
| try: | |
| from ..generator import generate_case | |
| from ..schemas.case import GenerationKnobs | |
| result = generate_case(self.backend, seed=seed if seed is not None else 0, | |
| knobs=GenerationKnobs(n_suspects=3)) | |
| if result.report.ok: | |
| return result.case | |
| # A model that can't produce a solvable case must not strand the player. | |
| except LLMError: | |
| pass # fall back to the tutorial when no model is available | |
| return load_seed_case("tutorial") | |
| def case(self): | |
| assert self.session is not None | |
| return self.session.case | |
| # -- images as data URIs (no file serving; fully offline) ----------------- | |
| def _uri(self, path) -> str: | |
| key = str(path) | |
| if key not in self._img_cache: | |
| self._img_cache[key] = ( | |
| f"data:image/png;base64,{base64.b64encode(Path(path).read_bytes()).decode('ascii')}" | |
| ) | |
| return self._img_cache[key] | |
| def _portrait_key(self, s) -> str: # type: ignore[no-untyped-def] | |
| # ONE key for both the roster thumbnail and the interrogation sprite, so a | |
| # suspect looks identical in the menu and on the stage. | |
| return f"{self.case.case_id}:{s.name}" | |
| def portrait_uri(self, sus_id: str) -> str: | |
| s = self.case.suspect(sus_id) | |
| return self._uri(self.visuals.portrait_path(s.visual, self._portrait_key(s))) | |
| def portrait_sheet_uri(self, sus_id: str) -> str: | |
| s = self.case.suspect(sus_id) | |
| return self._uri(self.visuals.portrait_sheet_path(s.visual, self._portrait_key(s))) | |
| def scene_uri(self, loc_id: str) -> str: | |
| loc = next((x for x in self.case.setting.locations if x.loc_id == loc_id), | |
| self.case.setting.locations[0]) | |
| return self._uri(self.visuals.scene_path(loc.name, f"{self.case.case_id}-{loc.loc_id}")) | |
| def interrogation_uri(self) -> str: | |
| return self._uri(self.visuals.scene_path("Interrogation Room", f"{self.case.case_id}-interro")) | |
| def prop_uri(self, clue) -> str: | |
| return self._uri(self.visuals.prop_path(clue.name, f"{self.case.case_id}-{clue.clue_id}")) | |
| def loc_name_for_id(self, loc_id: str) -> str: | |
| return next((x.name for x in self.case.setting.locations if x.loc_id == loc_id), loc_id) | |
| # -- selectors ------------------------------------------------------------ | |
| def roster(self) -> list[tuple[str, str]]: | |
| # Same key as the interrogation sprite so the thumbnail matches the stage exactly. | |
| return [(str(self.visuals.portrait_path(s.visual, self._portrait_key(s))), s.name) | |
| for s in self.case.suspects] | |
| def select_by_index(self, index: int) -> str: | |
| suspects = self.case.suspects | |
| if 0 <= index < len(suspects): | |
| self.current_sus = suspects[index].sus_id | |
| return self.current_name() | |
| def current_name(self) -> str: | |
| return self.case.suspect(self.current_sus).name if self.current_sus else "" | |
| def evidence_choices(self) -> list[str]: | |
| return [c.name for c in self.session.evidence()] if self.session else [] | |
| def clue_id_for_name(self, name: str | None) -> str | None: | |
| if not name or not self.session: | |
| return None | |
| for c in self.case.clues: | |
| if c.name == name: | |
| return c.clue_id | |
| return None | |
| def relevance_breaking(self, clue_id: str | None) -> bool: | |
| """True if presenting this clue to the current suspect is their breaking evidence - | |
| used only to pick the right tone for the (rare) confession-scrub fallback.""" | |
| if not clue_id or not self.session or not self.current_sus: | |
| return False | |
| from ..engine.relevance import assess_relevance | |
| from ..schemas.enums import Relevance | |
| result = assess_relevance(self.case, self.case.suspect(self.current_sus), clue_id) | |
| return result.relevance is Relevance.BREAKING | |
| def location_choices(self) -> list[str]: | |
| return [loc.name for loc in self.case.setting.locations] | |
| def loc_id_for_name(self, name: str) -> str | None: | |
| for loc in self.case.setting.locations: | |
| if loc.name == name: | |
| return loc.loc_id | |
| return None | |
| # -- actions -------------------------------------------------------------- | |
| def search(self, loc_name: str): | |
| loc_id = self.loc_id_for_name(loc_name) | |
| return self.session.search(loc_id) if (self.session and loc_id) else () | |
| def add_note(self, text: str) -> None: | |
| if self.session and text and text.strip(): | |
| self.session.add_note(text) | |
| def accuse(self, accused_name: str, weapon_ok: bool, motive_ok: bool, cited_names: list[str]): | |
| accused = next((s.sus_id for s in self.case.suspects if s.name == accused_name), None) | |
| cited = tuple(cid for cid in (self.clue_id_for_name(n) for n in cited_names) if cid) | |
| accusation = Accusation( | |
| # An unmatched name scores zero rather than silently accusing suspect #1. | |
| accused_sus_id=accused or "", | |
| weapon_id=self.case.weapon.weapon_id if weapon_ok else None, | |
| motive_id=self.case.culprit.true_motive.motive_id if motive_ok else None, | |
| cited_clue_ids=cited, | |
| ) | |
| return self.session.accuse(accusation) | |
| def speak(self, text: str) -> str: | |
| """Synthesize the line and return a base64 WAV data URI (or '' if unavailable), | |
| for reliable client-side playback. Temp files are cleaned up after encoding.""" | |
| if not getattr(self.tts, "available", False) or not self.current_sus: | |
| return "" | |
| self._tts_counter += 1 | |
| out = _TTS_DIR / f"line_{self._tts_counter}.wav" | |
| path = self.tts.synth_to_file(text, self._voices.get(self.current_sus), out) | |
| if not path: | |
| return "" | |
| try: | |
| uri = f"data:audio/wav;base64,{base64.b64encode(Path(path).read_bytes()).decode('ascii')}" | |
| finally: | |
| Path(path).unlink(missing_ok=True) | |
| return uri | |