Spaces:
Running
Running
| """Live game runtime: lazily builds the in-process llama.cpp backend, generates cases, | |
| and holds live ``Session`` objects per run. | |
| Single-flight is MANDATORY: ``llama_cpp.Llama`` is not thread-safe, so every model call | |
| (generation + interrogation) runs under one lock - never concurrently, on any machine. | |
| Background case generation runs on EVERY box, but it can never make a player wait: | |
| each generation call holds the lock for just that one call (never the whole pipeline), | |
| and on a small box it streams with an interrupt check between tokens - the moment a | |
| player asks a question the in-flight generation aborts within a token, the lock frees, | |
| and the turn runs. Generation resumes once the table has been idle for a while. Fresh | |
| AI cases land in the buffer (served on the very next New Case) AND join the shuffled | |
| rotation, so the pool of mysteries grows for as long as the Space stays up. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| import threading | |
| import time | |
| import uuid | |
| from collections.abc import Iterator | |
| from dataclasses import dataclass | |
| from ..config import effective_cpus, get_settings | |
| from ..engine.session import Session | |
| from ..generator.pipeline import generate_case | |
| from ..llm.backend import GenParams, LLMBackend, LLMError, make_backend | |
| from ..persistence.case_store import load_case, save_runtime_case | |
| from ..persistence.paths import prebaked_cases_dir, runtime_cases_dir | |
| from ..schemas.accusation import Accusation | |
| from ..schemas.case import CaseFile | |
| from ..schemas.enums import Relevance | |
| from .case_adapter import casefile_to_public | |
| from .public_view import PublicCase | |
| # How long the table must be quiet (no interrogation turn) before a small box starts - | |
| # or resumes - a background generation. | |
| _IDLE_SECS = 90.0 | |
| class _SharedLockBackend: | |
| """Per-call single-flight wrapper. Each model call holds the runtime lock for JUST | |
| that call, so a player's turn waits behind at most one in-flight call - never a | |
| whole multi-call generation. With an ``interrupt`` event the call streams internally | |
| and aborts between tokens the moment a player shows up, freeing the lock at once.""" | |
| def __init__(self, inner: LLMBackend, lock: threading.Lock, | |
| interrupt: threading.Event | None = None) -> None: | |
| self._inner = inner | |
| self._lock = lock | |
| self._interrupt = interrupt | |
| def generate(self, prompt: str, params: GenParams) -> str: | |
| with self._lock: | |
| if self._interrupt is None: | |
| return self._inner.generate(prompt, params) | |
| if self._interrupt.is_set(): | |
| raise LLMError("generation interrupted by player") | |
| parts: list[str] = [] | |
| for delta in self._inner.stream(prompt, params): | |
| parts.append(delta) | |
| if self._interrupt.is_set(): | |
| raise LLMError("generation interrupted by player") | |
| return "".join(parts) | |
| def stream(self, prompt: str, params: GenParams) -> Iterator[str]: | |
| with self._lock: | |
| yield from self._inner.stream(prompt, params) | |
| class LiveRun: | |
| run_id: str | |
| case: CaseFile | |
| session: Session | |
| public: PublicCase | |
| baselines: dict[str, int] | |
| class GameRuntime: | |
| def __init__(self) -> None: | |
| self._lock = threading.Lock() # MANDATORY single-flight over all model calls | |
| self._backend: LLMBackend | None = None | |
| self._backend_failed = False | |
| self._runs: dict[str, LiveRun] = {} | |
| self._buffer: CaseFile | None = None | |
| self._buffer_lock = threading.Lock() | |
| self._seed = int(time.time()) % 900_000 + 1000 | |
| self._rng = random.Random(self._seed) | |
| # Pre-baked pool: full, model-authored cases shipped with the Space, served instantly | |
| # on New Case so nobody waits ~2 min for live generation. Interrogation is still live. | |
| self._prebaked: list[CaseFile] = [] | |
| self._prebaked_idx = 0 | |
| self._prebaked_loaded = False | |
| # Background generation: a fast box generates immediately; a small box (the 2-vCPU | |
| # Space) waits for an idle table and aborts between tokens when a player shows up. | |
| self._fast_box = effective_cpus() > 4 | |
| self._gen_interrupt = threading.Event() | |
| self._gen_running = False | |
| self._gen_flag_lock = threading.Lock() | |
| self._last_player_ts = 0.0 | |
| # ---- backend ---- | |
| def _get_backend(self) -> LLMBackend | None: | |
| if self._backend is None and not self._backend_failed: | |
| try: | |
| self._backend = make_backend(get_settings()) | |
| except LLMError: | |
| self._backend_failed = True | |
| return self._backend | |
| def available(self) -> bool: | |
| return self._get_backend() is not None | |
| def _next_seed(self) -> int: | |
| self._seed += 1 | |
| return self._seed | |
| # ---- generation ---- | |
| def _generate(self, seed: int, *, interruptible: bool = False) -> CaseFile: | |
| backend = self._get_backend() | |
| if backend is None: | |
| raise LLMError("no backend") | |
| wrapped = _SharedLockBackend(backend, self._lock, | |
| self._gen_interrupt if interruptible else None) | |
| result = generate_case(wrapped, seed=seed) | |
| save_runtime_case(result.case) | |
| return result.case | |
| def _prebuild(self) -> None: | |
| """Generate ONE fresh AI case in the background, yielding to players. On a small | |
| box: wait until the table is idle, abort between tokens if a player interrupts, | |
| then wait for idle again and retry. The finished case is served on the very next | |
| New Case and joins the rotation for good.""" | |
| try: | |
| for _ in range(8): | |
| try: | |
| if not self._fast_box: | |
| while time.time() - self._last_player_ts < _IDLE_SECS: | |
| time.sleep(5) | |
| case = self._generate(self._next_seed(), interruptible=not self._fast_box) | |
| except LLMError: | |
| continue # interrupted by a player, or malformed output - try again | |
| except Exception: | |
| break | |
| with self._buffer_lock: | |
| self._buffer = case | |
| self._prebaked.append(case) | |
| break | |
| finally: | |
| with self._gen_flag_lock: | |
| self._gen_running = False | |
| def _spawn_gen(self) -> None: | |
| if not self.available(): | |
| return | |
| with self._gen_flag_lock: | |
| if self._gen_running: | |
| return | |
| self._gen_running = True | |
| threading.Thread(target=self._prebuild, daemon=True).start() | |
| def _load_prebaked(self) -> None: | |
| if self._prebaked_loaded: | |
| return | |
| self._prebaked_loaded = True | |
| pool_dir = prebaked_cases_dir() | |
| if not pool_dir.is_dir(): | |
| return | |
| for path in sorted(pool_dir.glob("*.json")): | |
| try: | |
| self._prebaked.append(load_case(path)) | |
| except Exception: | |
| continue | |
| # Shuffle per process (the seed is time-based) so the FIRST case of every fresh | |
| # visit/restart is randomized - never the same opening mystery twice in a row. | |
| self._rng.shuffle(self._prebaked) | |
| def start_buffer(self) -> None: | |
| """Make the first New Case instant (shipped pool, shuffled) and start growing the | |
| pool with a fresh AI-generated case in the background.""" | |
| self._load_prebaked() | |
| self._spawn_gen() | |
| def _take_buffered(self) -> CaseFile | None: | |
| with self._buffer_lock: | |
| case = self._buffer | |
| self._buffer = None | |
| return case | |
| def _take_prebaked(self) -> CaseFile | None: | |
| self._load_prebaked() | |
| if not self._prebaked: | |
| return None | |
| if self._prebaked_idx >= len(self._prebaked): | |
| # Bag exhausted: reshuffle for a fresh order on the next lap. | |
| self._rng.shuffle(self._prebaked) | |
| self._prebaked_idx = 0 | |
| case = self._prebaked[self._prebaked_idx] | |
| self._prebaked_idx += 1 | |
| return case | |
| def _maybe_refill(self) -> None: | |
| """Keep one fresh AI case cooking in the background whenever the buffer is empty.""" | |
| if self._buffer is None: | |
| self._spawn_gen() | |
| def new_generated_run(self) -> tuple[PublicCase, str] | None: | |
| if not self.available(): | |
| return None | |
| # Prefer a freshly generated case if one is ready; else serve the pre-baked pool | |
| # instantly; only with neither do we generate synchronously (first run, no pool). | |
| case = self._take_buffered() or self._take_prebaked() | |
| if case is None: | |
| try: | |
| case = self._generate(self._next_seed()) | |
| except Exception: | |
| return None | |
| self._maybe_refill() | |
| return self._register(case) | |
| def load_generated_run(self, case_id: str) -> tuple[PublicCase, str] | None: | |
| if not self.available(): | |
| return None | |
| self._load_prebaked() | |
| case = next((c for c in self._prebaked if c.case_id == case_id), None) | |
| if case is None: | |
| for directory in (prebaked_cases_dir(), runtime_cases_dir()): | |
| path = directory / f"{case_id}.json" | |
| if path.exists(): | |
| try: | |
| case = load_case(path) | |
| except Exception: | |
| case = None | |
| break | |
| if case is None: | |
| return None | |
| return self._register(case) | |
| def _register(self, case: CaseFile) -> tuple[PublicCase, str]: | |
| public = casefile_to_public(case) | |
| session = Session(case, self._get_backend()) # type: ignore[arg-type] | |
| run_id = uuid.uuid4().hex | |
| baselines = {s.id: s.baseline_suspicion for s in public.suspects} | |
| self._runs[run_id] = LiveRun(run_id, case, session, public, baselines) | |
| return public, run_id | |
| def get(self, run_id: str) -> LiveRun | None: | |
| return self._runs.get(run_id) | |
| # ---- live turn / verdict ---- | |
| def _suspicion(self, run: LiveRun, sus_id: str) -> int: | |
| st = run.session.state.state_for(sus_id) | |
| base = run.baselines.get(sus_id, 25) | |
| val = base + round(st.stress * 55) + (20 if st.broken_lie_ids else 0) | |
| return max(0, min(100, val)) | |
| def interrogate_live( | |
| self, run: LiveRun, sus_id: str, question: str, clue_id: str | None | |
| ) -> dict: | |
| prev = self._suspicion(run, sus_id) | |
| # Tell any in-flight background generation to yield the lock NOW (it aborts | |
| # between tokens), then take the table. | |
| self._gen_interrupt.set() | |
| self._last_player_ts = time.time() | |
| with self._lock: | |
| self._gen_interrupt.clear() | |
| final = None | |
| for ev in run.session.interrogate(sus_id, question, presented_clue_id=clue_id): | |
| if ev.final is not None: | |
| final = ev.final | |
| self._last_player_ts = time.time() | |
| reply = final.turn.spoken if final else "…I have nothing to say to that." | |
| after = self._suspicion(run, sus_id) | |
| adj = final.adjudication if final else None | |
| rattled = bool(adj and adj.relevance in (Relevance.DIRECT, Relevance.BREAKING)) | |
| cornered = bool(adj and adj.is_contradiction) | |
| return { | |
| "reply": reply, | |
| "suspicionDelta": after - prev, | |
| "suspicion": after, | |
| "flags": {"rattled": rattled, "contradictionExposed": cornered, "cornered": cornered}, | |
| } | |
| def accuse_live(self, run: LiveRun, suspect_id: str, motive_id: str, evidence_ids: list[str]) -> dict: | |
| verdict = run.session.accuse( | |
| Accusation(accused_sus_id=suspect_id, motive_id=motive_id, cited_clue_ids=tuple(evidence_ids)) | |
| ) | |
| culprit_id = run.case.culprit.sus_id | |
| killer = run.case.suspect(culprit_id) | |
| if verdict.culprit_correct: | |
| truth = verdict.rationale or run.case.culprit.method_narrative | |
| else: | |
| accused = run.case.suspect(suspect_id).name if any(s.sus_id == suspect_id for s in run.case.suspects) else "the accused" | |
| truth = ( | |
| f"You charged {accused}. The case held for a night - but the evidence led past " | |
| f"them to {killer.name}, who walked out into the rain." | |
| ) | |
| return { | |
| "correct": verdict.culprit_correct, | |
| "verdict": { | |
| "stamp": "CASE CLOSED" if verdict.culprit_correct else "MISTRIAL", | |
| "killerId": culprit_id, | |
| "killerName": killer.name, | |
| "truth": truth, | |
| }, | |
| "score": { | |
| "points": verdict.score, | |
| "max": 100, | |
| "killerCorrect": verdict.culprit_correct, | |
| "motiveCorrect": verdict.motive_correct, | |
| "evidenceHits": len(evidence_ids), | |
| }, | |
| "stats": [], | |
| } | |
| RUNTIME = GameRuntime() | |