case0 / src /case_zero /ui /app_state.py
HusseinEid's picture
Case Zero - initial public release (fully local: Qwen2.5-1.5B via llama.cpp + Supertonic, custom pixel-noir SPA via gradio.Server)
414dc55
"""GameController - per-session glue between the engine and the Gradio UI.
Holds the Session plus the visual/TTS providers and the audio data URIs. One instance
lives in gr.State per browser session (single-player).
"""
from __future__ import annotations
import base64
import queue
import random
import threading
import time
from pathlib import Path
from ..audio import SFX_EVENTS, generate_placeholder_pack
from ..audio.manifest import MUSIC_DIR, SFX_DIR
from ..config import Settings, effective_cpus, get_settings
from ..engine import Session
from ..llm import LLMError, make_backend
from ..persistence import load_seed_case
from ..schemas.accusation import Accusation
from ..schemas.suspect import VoiceAssignment
from ..tts import assign_voice, make_tts_provider
from ..visuals import make_visual_provider
_TTS_DIR = Path(".cache") / "tts"
def _data_uri(path: Path, mime: str) -> str:
return f"data:{mime};base64,{base64.b64encode(path.read_bytes()).decode('ascii')}"
# Imported background tracks (mp3/ogg) take priority over the procedural .wav, so a
# real royalty-free track can be dropped into assets/ui/music/ to replace it.
_MUSIC_CANDIDATES: tuple[tuple[str, str], ...] = (
("ambient_theme.mp3", "audio/mpeg"),
("ambient_theme.ogg", "audio/ogg"),
("ambient_theme.wav", "audio/wav"),
)
def _music_data_uri() -> str:
for name, mime in _MUSIC_CANDIDATES:
path = MUSIC_DIR / name
if path.exists():
return _data_uri(path, mime)
return ""
def _load_audio() -> tuple[dict[str, str], str]:
"""Return (sfx event -> data uri, music data uri). Generates the procedural SFX
on first run if missing; the background track prefers an imported mp3/ogg."""
if not (SFX_DIR / "click.wav").exists():
from ..audio import generate_sfx
generate_sfx()
if not _music_data_uri():
generate_placeholder_pack() # falls back to the generated loop if none imported
sfx = {ev: _data_uri(SFX_DIR / fn, "audio/wav") for ev, fn in SFX_EVENTS.items()
if (SFX_DIR / fn).exists()}
return sfx, _music_data_uri()
# The LLM is loaded once and shared by every session, so the model is never loaded
# more than once per process.
_SHARED_BACKEND: object | None = None
_BACKEND_LOCK = threading.Lock()
_SHARED_TTS: object | None = None
_TTS_LOCK = threading.Lock()
def _obtain_shared_tts(settings: Settings):
global _SHARED_TTS
with _TTS_LOCK: # load the TTS models once, not per session
if _SHARED_TTS is None:
_SHARED_TTS = make_tts_provider(settings)
return _SHARED_TTS
def _obtain_shared_backend(settings: Settings):
global _SHARED_BACKEND
with _BACKEND_LOCK: # never load the model more than once, even under a race
if _SHARED_BACKEND is None:
_SHARED_BACKEND = make_backend(settings)
return _SHARED_BACKEND
# A SECOND model instance dedicated to background case generation, so generating the next
# case never blocks live interrogation (which uses the primary backend). RAM is ample
# (two ~1GB Q4 instances); llama.cpp is safe to run as independent instances concurrently.
_GEN_BACKEND: object | None = None
_GEN_BACKEND_LOCK = threading.Lock()
_LAST_INTERACTION = [0.0] # monotonic ts of the last interrogation; the worker yields to it
_GEN_YIELD_SECS = 5.0
def note_interaction() -> None:
"""Mark that the player just used the LLM, so the background generator backs off and
leaves the CPU to interrogation."""
_LAST_INTERACTION[0] = time.monotonic()
def _obtain_gen_backend(settings: Settings):
global _GEN_BACKEND
with _GEN_BACKEND_LOCK:
if _GEN_BACKEND is None:
_GEN_BACKEND = make_backend(settings)
return _GEN_BACKEND
# Continuous background refill is only safe when there are spare cores. On a 2-vCPU CPU
# Space, a case generation (two long LLM calls) running alongside an interrogation starves
# the reply - so on low-core hosts we prebuild exactly ONE case for an instant first load,
# then stop, and let New Case generate on demand behind the loading screen instead.
# (effective_cpus reads the real cgroup quota, so a container that lies about its core
# count via os.cpu_count() doesn't trick us into refilling on only 2 vCPUs.)
_CONTINUOUS_GEN = effective_cpus() > 4
class _CaseBuffer:
"""Keeps a freshly-generated case ready so the first New Case is instant. Every case is
live and unique. On a many-core machine it refills continuously (always instant); on a
low-core Space it prebuilds one case and then stops so it never competes with a reply."""
def __init__(self) -> None:
self._q: queue.Queue = queue.Queue(maxsize=1)
self._started = False
self._lock = threading.Lock()
self._seed = random.randint(1, 1_000_000_000)
self._continuous = _CONTINUOUS_GEN
def start(self, settings: Settings) -> None:
with self._lock:
if self._started:
return
self._started = True
threading.Thread(target=self._run, args=(settings,), daemon=True).start()
def _run(self, settings: Settings) -> None:
from ..generator import generate_case
from ..schemas.case import GenerationKnobs
backend = _obtain_gen_backend(settings)
while True:
if self._q.full():
if not self._continuous:
return # low-core: one case prebuilt, never refill during play
time.sleep(0.5)
continue
# Yield to the player: only generate when interrogation has paused a moment.
while time.monotonic() - _LAST_INTERACTION[0] < _GEN_YIELD_SECS:
time.sleep(0.5)
try:
self._seed = (self._seed + 7919) % 1_000_000_000
result = generate_case(backend, seed=self._seed,
knobs=GenerationKnobs(n_suspects=3))
if result.report.ok:
self._q.put(result.case)
else:
time.sleep(1.0)
except Exception:
time.sleep(2.0)
def take(self, timeout: float = 0.0):
try:
return self._q.get(timeout=timeout) if timeout > 0 else self._q.get_nowait()
except queue.Empty:
return None
_CASE_BUFFER = _CaseBuffer()
def start_case_buffer(settings: Settings) -> None:
"""Begin generating cases ahead of demand (call at app startup so the first case is
already being built before the player connects)."""
_CASE_BUFFER.start(settings)
def take_ready_case(timeout: float = 0.0):
"""Return a pre-generated case if one is ready (optionally waiting up to ``timeout``)."""
return _CASE_BUFFER.take(timeout)
class GameController:
def __init__(self, settings: Settings | None = None) -> None:
self.settings = settings or get_settings()
self.backend = _obtain_shared_backend(self.settings)
self.visuals = make_visual_provider(self.settings)
self.tts = _obtain_shared_tts(self.settings)
self.session: Session | None = None
self.current_sus: str | None = None
self.current_room: str | None = None
self._voices: dict[str, VoiceAssignment] = {}
self._tts_counter = 0
self._img_cache: dict[str, str] = {}
# -- case lifecycle -------------------------------------------------------
def _begin(self, case) -> None: # type: ignore[no-untyped-def]
self.session = Session(case, self.backend)
self.current_sus = case.suspects[0].sus_id
# The stage backdrop starts on the crime scene and changes as rooms are searched.
self.current_room = case.victim.found_at_loc_id
self._voices = {s.sus_id: assign_voice(s) for s in case.suspects}
def start(self, source: str = "tutorial", seed: int | None = None) -> None:
self._begin(self._load_case(source, seed))
def start_buffered(self, wait_secs: float = 0.0) -> bool:
"""Begin a pre-generated case from the background buffer if one is ready (waiting up
to ``wait_secs``). Returns True if a buffered case was used, False if the caller
should fall back to live generation behind the overlay."""
case = take_ready_case(timeout=wait_secs)
if case is None:
return False
self._begin(case)
return True
def _load_case(self, source: str, seed: int | None):
if source == "generate":
try:
from ..generator import generate_case
from ..schemas.case import GenerationKnobs
result = generate_case(self.backend, seed=seed if seed is not None else 0,
knobs=GenerationKnobs(n_suspects=3))
if result.report.ok:
return result.case
# A model that can't produce a solvable case must not strand the player.
except LLMError:
pass # fall back to the tutorial when no model is available
return load_seed_case("tutorial")
@property
def case(self):
assert self.session is not None
return self.session.case
# -- images as data URIs (no file serving; fully offline) -----------------
def _uri(self, path) -> str:
key = str(path)
if key not in self._img_cache:
self._img_cache[key] = (
f"data:image/png;base64,{base64.b64encode(Path(path).read_bytes()).decode('ascii')}"
)
return self._img_cache[key]
def _portrait_key(self, s) -> str: # type: ignore[no-untyped-def]
# ONE key for both the roster thumbnail and the interrogation sprite, so a
# suspect looks identical in the menu and on the stage.
return f"{self.case.case_id}:{s.name}"
def portrait_uri(self, sus_id: str) -> str:
s = self.case.suspect(sus_id)
return self._uri(self.visuals.portrait_path(s.visual, self._portrait_key(s)))
def portrait_sheet_uri(self, sus_id: str) -> str:
s = self.case.suspect(sus_id)
return self._uri(self.visuals.portrait_sheet_path(s.visual, self._portrait_key(s)))
def scene_uri(self, loc_id: str) -> str:
loc = next((x for x in self.case.setting.locations if x.loc_id == loc_id),
self.case.setting.locations[0])
return self._uri(self.visuals.scene_path(loc.name, f"{self.case.case_id}-{loc.loc_id}"))
def interrogation_uri(self) -> str:
return self._uri(self.visuals.scene_path("Interrogation Room", f"{self.case.case_id}-interro"))
def prop_uri(self, clue) -> str:
return self._uri(self.visuals.prop_path(clue.name, f"{self.case.case_id}-{clue.clue_id}"))
def loc_name_for_id(self, loc_id: str) -> str:
return next((x.name for x in self.case.setting.locations if x.loc_id == loc_id), loc_id)
# -- selectors ------------------------------------------------------------
def roster(self) -> list[tuple[str, str]]:
# Same key as the interrogation sprite so the thumbnail matches the stage exactly.
return [(str(self.visuals.portrait_path(s.visual, self._portrait_key(s))), s.name)
for s in self.case.suspects]
def select_by_index(self, index: int) -> str:
suspects = self.case.suspects
if 0 <= index < len(suspects):
self.current_sus = suspects[index].sus_id
return self.current_name()
def current_name(self) -> str:
return self.case.suspect(self.current_sus).name if self.current_sus else ""
def evidence_choices(self) -> list[str]:
return [c.name for c in self.session.evidence()] if self.session else []
def clue_id_for_name(self, name: str | None) -> str | None:
if not name or not self.session:
return None
for c in self.case.clues:
if c.name == name:
return c.clue_id
return None
def relevance_breaking(self, clue_id: str | None) -> bool:
"""True if presenting this clue to the current suspect is their breaking evidence -
used only to pick the right tone for the (rare) confession-scrub fallback."""
if not clue_id or not self.session or not self.current_sus:
return False
from ..engine.relevance import assess_relevance
from ..schemas.enums import Relevance
result = assess_relevance(self.case, self.case.suspect(self.current_sus), clue_id)
return result.relevance is Relevance.BREAKING
def location_choices(self) -> list[str]:
return [loc.name for loc in self.case.setting.locations]
def loc_id_for_name(self, name: str) -> str | None:
for loc in self.case.setting.locations:
if loc.name == name:
return loc.loc_id
return None
# -- actions --------------------------------------------------------------
def search(self, loc_name: str):
loc_id = self.loc_id_for_name(loc_name)
return self.session.search(loc_id) if (self.session and loc_id) else ()
def add_note(self, text: str) -> None:
if self.session and text and text.strip():
self.session.add_note(text)
def accuse(self, accused_name: str, weapon_ok: bool, motive_ok: bool, cited_names: list[str]):
accused = next((s.sus_id for s in self.case.suspects if s.name == accused_name), None)
cited = tuple(cid for cid in (self.clue_id_for_name(n) for n in cited_names) if cid)
accusation = Accusation(
# An unmatched name scores zero rather than silently accusing suspect #1.
accused_sus_id=accused or "",
weapon_id=self.case.weapon.weapon_id if weapon_ok else None,
motive_id=self.case.culprit.true_motive.motive_id if motive_ok else None,
cited_clue_ids=cited,
)
return self.session.accuse(accusation)
def speak(self, text: str) -> str:
"""Synthesize the line and return a base64 WAV data URI (or '' if unavailable),
for reliable client-side playback. Temp files are cleaned up after encoding."""
if not getattr(self.tts, "available", False) or not self.current_sus:
return ""
self._tts_counter += 1
out = _TTS_DIR / f"line_{self._tts_counter}.wav"
path = self.tts.synth_to_file(text, self._voices.get(self.current_sus), out)
if not path:
return ""
try:
uri = f"data:audio/wav;base64,{base64.b64encode(Path(path).read_bytes()).decode('ascii')}"
finally:
Path(path).unlink(missing_ok=True)
return uri