"""Session orchestrator (Section 7 & 11): turn cycle + crash-safe persistence. Every state-changing action flushes to disk, so a crash or quit never loses the ledger or progress. Holds the deterministic engine pieces and routes the in-session verbs (talk / look / confront / notes / accuse). """ from __future__ import annotations import uuid from dataclasses import dataclass, field from datetime import UTC, datetime from ..config import Config from ..llm.client import LLMClient from ..llm.prompts import PromptRegistry from ..llm.usage import UsageLedger from ..models import ( AccusationResult, CharacterCard, SessionState, SessionStatus, TranscriptEvent, ) from ..worldio import World, load_world from .accuse import score_accusation from .character import CharacterPipeline, TurnOutcome from .clues import ClueGraph from .confront import ConfrontResult, verify_confrontation from .environment import EnvironmentChat, WorldAnswer, WorldDelta from .ledger import LedgerStore def _new_session_id(world_id: str) -> str: stamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") return f"{world_id}-{stamp}-{uuid.uuid4().hex[:6]}" @dataclass class Notes: """The player's case file (typed view returned by ``Session.notes``).""" discovered_clues: list[dict[str, str]] = field(default_factory=list) cracked: list[str] = field(default_factory=list) ledgers: dict[str, list[dict[str, str]]] = field(default_factory=dict) class Session: def __init__( self, *, config: Config, world: World, state: SessionState, prompts: PromptRegistry, base_client: LLMClient, ) -> None: self.config = config self.world = world self.state = state self.prompts = prompts self.dir = config.runtime_dir / state.session_id usage = UsageLedger(self.dir / "usage.jsonl") self.client = base_client.bind( world_id=world.meta.world_id, session_id=state.session_id, usage=usage, ) self.clue_graph = ClueGraph(world.clues) self.ledger = LedgerStore(self.dir / "ledgers") self.delta = WorldDelta(self.dir / "world_delta.json") self.transcript_path = self.dir / "transcript.jsonl" self.pipeline = CharacterPipeline( world=world, client=self.client, prompts=prompts, ledger=self.ledger, clue_graph=self.clue_graph, regenerate_retries=config.engine.regenerate_retries, ) self.env = EnvironmentChat( world=world, client=self.client, prompts=prompts, clue_graph=self.clue_graph, delta=self.delta, ) self._confront_counts: dict[str, int] = {} # -- construction ------------------------------------------------------- @classmethod def start( cls, config: Config, world_id: str, prompts: PromptRegistry, base_client: LLMClient, ) -> Session: world = load_world(config.worlds_dir / world_id) state = SessionState(session_id=_new_session_id(world_id), world_id=world_id) sess = cls(config=config, world=world, state=state, prompts=prompts, base_client=base_client) sess.dir.mkdir(parents=True, exist_ok=True) sess._flush_state() sess._append_transcript(TranscriptEvent( turn=0, kind="system", text=f"Session started for world {world_id}.", )) return sess @classmethod def resume( cls, config: Config, session_id: str, prompts: PromptRegistry, base_client: LLMClient, ) -> Session: sdir = config.runtime_dir / session_id state = SessionState.model_validate_json( (sdir / "session.json").read_text("utf-8") ) world = load_world(config.worlds_dir / state.world_id) sess = cls(config=config, world=world, state=state, prompts=prompts, base_client=base_client) return sess # -- persistence -------------------------------------------------------- def _flush_state(self) -> None: (self.dir / "session.json").write_text( self.state.model_dump_json(indent=2), "utf-8" ) def _append_transcript(self, event: TranscriptEvent) -> None: with self.transcript_path.open("a", encoding="utf-8") as fh: fh.write(event.model_dump_json() + "\n") def transcript(self) -> list[TranscriptEvent]: if not self.transcript_path.exists(): return [] return [ TranscriptEvent.model_validate_json(line) for line in self.transcript_path.read_text("utf-8").splitlines() if line.strip() ] def _tail_for(self, actor: str, n: int = 8) -> list[TranscriptEvent]: rel = [ e for e in self.transcript() if e.kind in ("player", "character", "crack") and (e.actor == actor or e.meta.get("target") == actor) ] return rel[-n:] def _next_turn(self) -> int: self.state.turn += 1 return self.state.turn # -- verbs -------------------------------------------------------------- def _resolve_character(self, character: str) -> CharacterCard: card = self.world.character(character) if card is not None: return card matches = self.world.candidates(character) if len(matches) > 1: names = ", ".join(c.name for c in matches) raise KeyError(f"{character} matches {names} — be more specific") raise KeyError(f"no such character: {character}") def talk(self, character: str, message: str) -> TurnOutcome: card = self._resolve_character(character) turn = self._next_turn() self._append_transcript(TranscriptEvent( turn=turn, kind="player", actor="player", text=message, meta={"target": card.name}, )) outcome = self.pipeline.run_turn( card=card, player_message=message, turn=turn, transcript_tail=self._tail_for(card.name), discovered=set(self.state.discovered_clues), confront_count=self._confront_counts.get(card.name, 0), already_cracked=card.name in self.state.cracked, ) kind = "crack" if outcome.cracked else "character" self._append_transcript(TranscriptEvent( turn=turn, kind=kind, actor=card.name, text=outcome.text, meta={"behavior": outcome.crack_behavior, "tell": outcome.tell}, )) if outcome.cracked and card.name not in self.state.cracked: self.state.cracked.append(card.name) # Testimony-sourced clues surface when the character speaks to them. outcome.discovered_clues = self._discover_from_testimony( card.name, message, outcome.text ) self._flush_state() return outcome def _discover_from_testimony( self, character: str, question: str, reply: str ) -> list[str]: """Mark unlocked, testimony-sourced clues discovered when the character actually speaks to their substance. The engine owns *whether* a clue is unlocked (gating); a cheap extractor-tier check judges, robustly, whether the witness genuinely confirmed the fact (names/times may be reworded).""" slug = character.lower().replace(" ", "_").replace(".", "") source_tag = f"{slug}_testimony" discovered = set(self.state.discovered_clues) candidates = [ {"id": node.id, "reveals": node.reveals} for node in self.world.clues if node.id not in discovered and self.clue_graph.is_unlocked(node.id, discovered) and any( source_tag in s.lower().replace(" ", "_").replace(".", "") for s in node.sources ) ] if not candidates: return [] confirmed = self.pipeline.extractor.confirmed_testimony( question=question, reply=reply, candidates=candidates, ) newly: list[str] = [] for cid in confirmed: if cid not in self.state.discovered_clues: self.state.discovered_clues.append(cid) newly.append(cid) return newly def look(self, query: str, location: str | None = None) -> WorldAnswer: turn = self._next_turn() self._append_transcript(TranscriptEvent( turn=turn, kind="player", actor="player", text=f"[look @ {location or 'scene'}] {query}", )) answer = self.env.ask( query=query, location=location, discovered=set(self.state.discovered_clues), ) if answer.discovered_clue and answer.discovered_clue not in self.state.discovered_clues: self.state.discovered_clues.append(answer.discovered_clue) self._append_transcript(TranscriptEvent( turn=turn, kind="world", actor="world", text=answer.text, meta={"source": answer.source, "clue": answer.discovered_clue}, )) self._flush_state() return answer def confront(self, character: str, claim_a: str, claim_b: str) -> ConfrontResult: card = self._resolve_character(character) result = verify_confrontation(self.ledger, card.name, claim_a, claim_b) turn = self._next_turn() if result.verified: self._confront_counts[card.name] = self._confront_counts.get(card.name, 0) + 1 self._append_transcript(TranscriptEvent( turn=turn, kind="confront", actor=card.name, text=result.reason, meta={"verified": result.verified, "a": claim_a, "b": claim_b}, )) self._flush_state() return result def notes(self) -> Notes: """The player's case file: committed claims + discovered clues.""" ledgers: dict[str, list[dict[str, str]]] = {} for name in self.world.characters: entries = self.ledger.get(name).entries if not entries: continue ledgers[name] = [ { "claim_id": c.claim_id, "topic": c.topic, "proposition": c.proposition, } for e in entries for c in e.claims ] clues = [ {"id": cid, "reveals": self.clue_graph.nodes[cid].reveals} for cid in self.state.discovered_clues if cid in self.clue_graph.nodes ] return Notes( discovered_clues=clues, cracked=list(self.state.cracked), ledgers=ledgers, ) def accuse( self, culprit: str, means: str, motive: str, opportunity: str ) -> AccusationResult: result = score_accusation( world=self.world, clue_graph=self.clue_graph, discovered=set(self.state.discovered_clues), culprit=culprit, means=means, motive=motive, opportunity=opportunity, ) turn = self._next_turn() self.state.accusation = result self.state.status = ( SessionStatus.solved if result.grade == "solved" else SessionStatus.closed ) self._append_transcript(TranscriptEvent( turn=turn, kind="accuse", actor="player", text=f"Accused {culprit}. Grade: {result.grade}", meta={"grade": result.grade}, )) self._flush_state() return result