"""Live game runtime: lazily builds the in-process llama.cpp backend, generates cases, and holds live ``Session`` objects per run. Single-flight is MANDATORY: ``llama_cpp.Llama`` is not thread-safe, so every model call (generation + interrogation) runs under one lock - never concurrently, on any machine. To keep that lock from ever blocking an interrogation, case generation only happens when no one is playing yet: one case is prebuilt at startup, and each later New Case generates synchronously while the player waits on the loading screen (no background generation runs during play). On a 2-vCPU Space this also means the LLM is never oversubscribed. """ from __future__ import annotations import threading import time import uuid from dataclasses import dataclass from ..config import effective_cpus, get_settings from ..engine.session import Session from ..generator.pipeline import generate_case from ..llm.backend import LLMBackend, LLMError, make_backend from ..persistence.case_store import load_case, save_runtime_case from ..persistence.paths import prebaked_cases_dir, runtime_cases_dir from ..schemas.accusation import Accusation from ..schemas.case import CaseFile from ..schemas.enums import Relevance from .case_adapter import casefile_to_public from .public_view import PublicCase @dataclass class LiveRun: run_id: str case: CaseFile session: Session public: PublicCase baselines: dict[str, int] class GameRuntime: def __init__(self) -> None: self._lock = threading.Lock() # MANDATORY single-flight over all model calls self._backend: LLMBackend | None = None self._backend_failed = False self._runs: dict[str, LiveRun] = {} self._buffer: CaseFile | None = None self._buffer_lock = threading.Lock() self._seed = int(time.time()) % 900_000 + 1000 # Pre-baked pool: full, model-authored cases shipped with the Space, served instantly # on New Case so nobody waits ~2 min for live generation. Interrogation is still live. self._prebaked: list[CaseFile] = [] self._prebaked_idx = 0 self._prebaked_loaded = False # Only generate cases ahead-of-time in the background on a box with cores to spare; # on the 2-vCPU Space that would steal the single model lock from an interrogation, # so there we rely on the pre-baked pool and generate only on demand. self._gen_ahead = effective_cpus() > 4 # ---- backend ---- def _get_backend(self) -> LLMBackend | None: if self._backend is None and not self._backend_failed: try: self._backend = make_backend(get_settings()) except LLMError: self._backend_failed = True return self._backend def available(self) -> bool: return self._get_backend() is not None def _next_seed(self) -> int: self._seed += 1 return self._seed # ---- generation ---- def _generate(self, seed: int) -> CaseFile: backend = self._get_backend() if backend is None: raise LLMError("no backend") with self._lock: result = generate_case(backend, seed=seed) save_runtime_case(result.case) return result.case def _prebuild(self) -> None: try: case = self._generate(self._next_seed()) with self._buffer_lock: self._buffer = case except Exception: pass def _load_prebaked(self) -> None: if self._prebaked_loaded: return self._prebaked_loaded = True pool_dir = prebaked_cases_dir() if not pool_dir.is_dir(): return for path in sorted(pool_dir.glob("*.json")): try: self._prebaked.append(load_case(path)) except Exception: continue # Start the rotation at a varied offset so a Space restart doesn't always serve the # first case again (the seed is time-based), keeping New Case fresh across restarts. if self._prebaked: self._prebaked_idx = self._seed % len(self._prebaked) def start_buffer(self) -> None: """Make the first New Case instant: load the shipped pool now and (only on a box with cores to spare) prebuild one fresh live case in the background. On the 2-vCPU Space we skip the background prebuild so the model lock stays free for the first interrogation - the pre-baked pool already gives an instant case.""" self._load_prebaked() if self._gen_ahead and self.available(): threading.Thread(target=self._prebuild, daemon=True).start() def _take_buffered(self) -> CaseFile | None: with self._buffer_lock: case = self._buffer self._buffer = None return case def _take_prebaked(self) -> CaseFile | None: self._load_prebaked() if not self._prebaked: return None case = self._prebaked[self._prebaked_idx % len(self._prebaked)] self._prebaked_idx += 1 return case def _maybe_refill(self) -> None: """Generate one fresh case in the background - capable hardware only (see _gen_ahead).""" if self._gen_ahead and self._buffer is None and self.available(): threading.Thread(target=self._prebuild, daemon=True).start() def new_generated_run(self) -> tuple[PublicCase, str] | None: if not self.available(): return None # Prefer a freshly generated case if one is ready; else serve the pre-baked pool # instantly; only with neither do we generate synchronously (first run, no pool). case = self._take_buffered() or self._take_prebaked() if case is None: try: case = self._generate(self._next_seed()) except Exception: return None self._maybe_refill() return self._register(case) def load_generated_run(self, case_id: str) -> tuple[PublicCase, str] | None: if not self.available(): return None self._load_prebaked() case = next((c for c in self._prebaked if c.case_id == case_id), None) if case is None: for directory in (prebaked_cases_dir(), runtime_cases_dir()): path = directory / f"{case_id}.json" if path.exists(): try: case = load_case(path) except Exception: case = None break if case is None: return None return self._register(case) def _register(self, case: CaseFile) -> tuple[PublicCase, str]: public = casefile_to_public(case) session = Session(case, self._get_backend()) # type: ignore[arg-type] run_id = uuid.uuid4().hex baselines = {s.id: s.baseline_suspicion for s in public.suspects} self._runs[run_id] = LiveRun(run_id, case, session, public, baselines) return public, run_id def get(self, run_id: str) -> LiveRun | None: return self._runs.get(run_id) # ---- live turn / verdict ---- def _suspicion(self, run: LiveRun, sus_id: str) -> int: st = run.session.state.state_for(sus_id) base = run.baselines.get(sus_id, 25) val = base + round(st.stress * 55) + (20 if st.broken_lie_ids else 0) return max(0, min(100, val)) def interrogate_live( self, run: LiveRun, sus_id: str, question: str, clue_id: str | None ) -> dict: prev = self._suspicion(run, sus_id) with self._lock: final = None for ev in run.session.interrogate(sus_id, question, presented_clue_id=clue_id): if ev.final is not None: final = ev.final reply = final.turn.spoken if final else "…I have nothing to say to that." after = self._suspicion(run, sus_id) adj = final.adjudication if final else None rattled = bool(adj and adj.relevance in (Relevance.DIRECT, Relevance.BREAKING)) cornered = bool(adj and adj.is_contradiction) return { "reply": reply, "suspicionDelta": after - prev, "suspicion": after, "flags": {"rattled": rattled, "contradictionExposed": cornered, "cornered": cornered}, } def accuse_live(self, run: LiveRun, suspect_id: str, motive_id: str, evidence_ids: list[str]) -> dict: verdict = run.session.accuse( Accusation(accused_sus_id=suspect_id, motive_id=motive_id, cited_clue_ids=tuple(evidence_ids)) ) culprit_id = run.case.culprit.sus_id killer = run.case.suspect(culprit_id) if verdict.culprit_correct: truth = verdict.rationale or run.case.culprit.method_narrative else: accused = run.case.suspect(suspect_id).name if any(s.sus_id == suspect_id for s in run.case.suspects) else "the accused" truth = ( f"You charged {accused}. The case held for a night - but the evidence led past " f"them to {killer.name}, who walked out into the rain." ) return { "correct": verdict.culprit_correct, "verdict": { "stamp": "CASE CLOSED" if verdict.culprit_correct else "MISTRIAL", "killerId": culprit_id, "killerName": killer.name, "truth": truth, }, "score": { "points": verdict.score, "max": 100, "killerCorrect": verdict.culprit_correct, "motiveCorrect": verdict.motive_correct, "evidenceHits": len(evidence_ids), }, "stats": [], } RUNTIME = GameRuntime()