case0 / src /case_zero /api /runtime.py
HusseinEid's picture
feat: multi-crime cases, scene+exhibit pixel art, background AI generation
80cd1f2 verified
raw
history blame
13.2 kB
"""Live game runtime: lazily builds the in-process llama.cpp backend, generates cases,
and holds live ``Session`` objects per run.
Single-flight is MANDATORY: ``llama_cpp.Llama`` is not thread-safe, so every model call
(generation + interrogation) runs under one lock - never concurrently, on any machine.
Background case generation runs on EVERY box, but it can never make a player wait:
each generation call holds the lock for just that one call (never the whole pipeline),
and on a small box it streams with an interrupt check between tokens - the moment a
player asks a question the in-flight generation aborts within a token, the lock frees,
and the turn runs. Generation resumes once the table has been idle for a while. Fresh
AI cases land in the buffer (served on the very next New Case) AND join the shuffled
rotation, so the pool of mysteries grows for as long as the Space stays up.
"""
from __future__ import annotations
import random
import threading
import time
import uuid
from collections.abc import Iterator
from dataclasses import dataclass
from ..config import effective_cpus, get_settings
from ..engine.session import Session
from ..generator.pipeline import generate_case
from ..llm.backend import GenParams, LLMBackend, LLMError, make_backend
from ..persistence.case_store import load_case, save_runtime_case
from ..persistence.paths import prebaked_cases_dir, runtime_cases_dir
from ..schemas.accusation import Accusation
from ..schemas.case import CaseFile
from ..schemas.enums import Relevance
from .case_adapter import casefile_to_public
from .public_view import PublicCase
# How long the table must be quiet (no interrogation turn) before a small box starts -
# or resumes - a background generation.
_IDLE_SECS = 90.0
class _SharedLockBackend:
"""Per-call single-flight wrapper. Each model call holds the runtime lock for JUST
that call, so a player's turn waits behind at most one in-flight call - never a
whole multi-call generation. With an ``interrupt`` event the call streams internally
and aborts between tokens the moment a player shows up, freeing the lock at once."""
def __init__(self, inner: LLMBackend, lock: threading.Lock,
interrupt: threading.Event | None = None) -> None:
self._inner = inner
self._lock = lock
self._interrupt = interrupt
def generate(self, prompt: str, params: GenParams) -> str:
with self._lock:
if self._interrupt is None:
return self._inner.generate(prompt, params)
if self._interrupt.is_set():
raise LLMError("generation interrupted by player")
parts: list[str] = []
for delta in self._inner.stream(prompt, params):
parts.append(delta)
if self._interrupt.is_set():
raise LLMError("generation interrupted by player")
return "".join(parts)
def stream(self, prompt: str, params: GenParams) -> Iterator[str]:
with self._lock:
yield from self._inner.stream(prompt, params)
@dataclass
class LiveRun:
run_id: str
case: CaseFile
session: Session
public: PublicCase
baselines: dict[str, int]
class GameRuntime:
def __init__(self) -> None:
self._lock = threading.Lock() # MANDATORY single-flight over all model calls
self._backend: LLMBackend | None = None
self._backend_failed = False
self._runs: dict[str, LiveRun] = {}
self._buffer: CaseFile | None = None
self._buffer_lock = threading.Lock()
self._seed = int(time.time()) % 900_000 + 1000
self._rng = random.Random(self._seed)
# Pre-baked pool: full, model-authored cases shipped with the Space, served instantly
# on New Case so nobody waits ~2 min for live generation. Interrogation is still live.
self._prebaked: list[CaseFile] = []
self._prebaked_idx = 0
self._prebaked_loaded = False
# Background generation: a fast box generates immediately; a small box (the 2-vCPU
# Space) waits for an idle table and aborts between tokens when a player shows up.
self._fast_box = effective_cpus() > 4
self._gen_interrupt = threading.Event()
self._gen_running = False
self._gen_flag_lock = threading.Lock()
self._last_player_ts = 0.0
# ---- backend ----
def _get_backend(self) -> LLMBackend | None:
if self._backend is None and not self._backend_failed:
try:
self._backend = make_backend(get_settings())
except LLMError:
self._backend_failed = True
return self._backend
def available(self) -> bool:
return self._get_backend() is not None
def _next_seed(self) -> int:
self._seed += 1
return self._seed
# ---- generation ----
def _generate(self, seed: int, *, interruptible: bool = False) -> CaseFile:
backend = self._get_backend()
if backend is None:
raise LLMError("no backend")
wrapped = _SharedLockBackend(backend, self._lock,
self._gen_interrupt if interruptible else None)
result = generate_case(wrapped, seed=seed)
save_runtime_case(result.case)
return result.case
def _prebuild(self) -> None:
"""Generate ONE fresh AI case in the background, yielding to players. On a small
box: wait until the table is idle, abort between tokens if a player interrupts,
then wait for idle again and retry. The finished case is served on the very next
New Case and joins the rotation for good."""
try:
for _ in range(8):
try:
if not self._fast_box:
while time.time() - self._last_player_ts < _IDLE_SECS:
time.sleep(5)
case = self._generate(self._next_seed(), interruptible=not self._fast_box)
except LLMError:
continue # interrupted by a player, or malformed output - try again
except Exception:
break
with self._buffer_lock:
self._buffer = case
self._prebaked.append(case)
break
finally:
with self._gen_flag_lock:
self._gen_running = False
def _spawn_gen(self) -> None:
if not self.available():
return
with self._gen_flag_lock:
if self._gen_running:
return
self._gen_running = True
threading.Thread(target=self._prebuild, daemon=True).start()
def _load_prebaked(self) -> None:
if self._prebaked_loaded:
return
self._prebaked_loaded = True
pool_dir = prebaked_cases_dir()
if not pool_dir.is_dir():
return
for path in sorted(pool_dir.glob("*.json")):
try:
self._prebaked.append(load_case(path))
except Exception:
continue
# Shuffle per process (the seed is time-based) so the FIRST case of every fresh
# visit/restart is randomized - never the same opening mystery twice in a row.
self._rng.shuffle(self._prebaked)
def start_buffer(self) -> None:
"""Make the first New Case instant (shipped pool, shuffled) and start growing the
pool with a fresh AI-generated case in the background."""
self._load_prebaked()
self._spawn_gen()
def _take_buffered(self) -> CaseFile | None:
with self._buffer_lock:
case = self._buffer
self._buffer = None
return case
def _take_prebaked(self) -> CaseFile | None:
self._load_prebaked()
if not self._prebaked:
return None
if self._prebaked_idx >= len(self._prebaked):
# Bag exhausted: reshuffle for a fresh order on the next lap.
self._rng.shuffle(self._prebaked)
self._prebaked_idx = 0
case = self._prebaked[self._prebaked_idx]
self._prebaked_idx += 1
return case
def _maybe_refill(self) -> None:
"""Keep one fresh AI case cooking in the background whenever the buffer is empty."""
if self._buffer is None:
self._spawn_gen()
def new_generated_run(self) -> tuple[PublicCase, str] | None:
if not self.available():
return None
# Prefer a freshly generated case if one is ready; else serve the pre-baked pool
# instantly; only with neither do we generate synchronously (first run, no pool).
case = self._take_buffered() or self._take_prebaked()
if case is None:
try:
case = self._generate(self._next_seed())
except Exception:
return None
self._maybe_refill()
return self._register(case)
def load_generated_run(self, case_id: str) -> tuple[PublicCase, str] | None:
if not self.available():
return None
self._load_prebaked()
case = next((c for c in self._prebaked if c.case_id == case_id), None)
if case is None:
for directory in (prebaked_cases_dir(), runtime_cases_dir()):
path = directory / f"{case_id}.json"
if path.exists():
try:
case = load_case(path)
except Exception:
case = None
break
if case is None:
return None
return self._register(case)
def _register(self, case: CaseFile) -> tuple[PublicCase, str]:
public = casefile_to_public(case)
session = Session(case, self._get_backend()) # type: ignore[arg-type]
run_id = uuid.uuid4().hex
baselines = {s.id: s.baseline_suspicion for s in public.suspects}
self._runs[run_id] = LiveRun(run_id, case, session, public, baselines)
return public, run_id
def get(self, run_id: str) -> LiveRun | None:
return self._runs.get(run_id)
# ---- live turn / verdict ----
def _suspicion(self, run: LiveRun, sus_id: str) -> int:
st = run.session.state.state_for(sus_id)
base = run.baselines.get(sus_id, 25)
val = base + round(st.stress * 55) + (20 if st.broken_lie_ids else 0)
return max(0, min(100, val))
def interrogate_live(
self, run: LiveRun, sus_id: str, question: str, clue_id: str | None
) -> dict:
prev = self._suspicion(run, sus_id)
# Tell any in-flight background generation to yield the lock NOW (it aborts
# between tokens), then take the table.
self._gen_interrupt.set()
self._last_player_ts = time.time()
with self._lock:
self._gen_interrupt.clear()
final = None
for ev in run.session.interrogate(sus_id, question, presented_clue_id=clue_id):
if ev.final is not None:
final = ev.final
self._last_player_ts = time.time()
reply = final.turn.spoken if final else "…I have nothing to say to that."
after = self._suspicion(run, sus_id)
adj = final.adjudication if final else None
rattled = bool(adj and adj.relevance in (Relevance.DIRECT, Relevance.BREAKING))
cornered = bool(adj and adj.is_contradiction)
return {
"reply": reply,
"suspicionDelta": after - prev,
"suspicion": after,
"flags": {"rattled": rattled, "contradictionExposed": cornered, "cornered": cornered},
}
def accuse_live(self, run: LiveRun, suspect_id: str, motive_id: str, evidence_ids: list[str]) -> dict:
verdict = run.session.accuse(
Accusation(accused_sus_id=suspect_id, motive_id=motive_id, cited_clue_ids=tuple(evidence_ids))
)
culprit_id = run.case.culprit.sus_id
killer = run.case.suspect(culprit_id)
if verdict.culprit_correct:
truth = verdict.rationale or run.case.culprit.method_narrative
else:
accused = run.case.suspect(suspect_id).name if any(s.sus_id == suspect_id for s in run.case.suspects) else "the accused"
truth = (
f"You charged {accused}. The case held for a night - but the evidence led past "
f"them to {killer.name}, who walked out into the rain."
)
return {
"correct": verdict.culprit_correct,
"verdict": {
"stamp": "CASE CLOSED" if verdict.culprit_correct else "MISTRIAL",
"killerId": culprit_id,
"killerName": killer.name,
"truth": truth,
},
"score": {
"points": verdict.score,
"max": 100,
"killerCorrect": verdict.culprit_correct,
"motiveCorrect": verdict.motive_correct,
"evidenceHits": len(evidence_ids),
},
"stats": [],
}
RUNTIME = GameRuntime()