""" Utilities for reconstructing an AI decision trace from a logged game session. The analyzer intentionally works from files already written by older sessions: prompt_N.json, response_N.json, intermediate tool-call responses, optional tool follow-up prompts, and tool_executions.json. """ from __future__ import annotations import copy import json from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional def build_decision_analysis( source_session: Path, decision: Dict[str, Any], action_result: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Build a human-readable decision trace for one replay decision.""" session_dir = _resolve_session_for_decision(source_session, decision) player_name = str(decision.get("player_name") or "") request_number = int(decision.get("request_number") or 0) prompt_doc = _load_prompt(session_dir, player_name, request_number) response_doc = _load_response(session_dir, player_name, request_number) if not isinstance(prompt_doc, dict): prompt_doc = {} if not isinstance(response_doc, dict): response_doc = {} parsed = ( copy.deepcopy(response_doc.get("parsed")) if isinstance(response_doc, dict) and isinstance(response_doc.get("parsed"), dict) else copy.deepcopy(decision.get("parsed") or {}) ) prompt = prompt_doc.get("prompt") if isinstance(prompt_doc, dict) else {} if not isinstance(prompt, dict): prompt = {} tool_trace = _load_tool_trace(session_dir, player_name, request_number) memory_before = copy.deepcopy(prompt.get("memory") or {}) social_context = copy.deepcopy(prompt.get("social_context") or {}) constraints = copy.deepcopy(prompt.get("constraints") or {}) allowed_actions = copy.deepcopy(prompt_doc.get("allowed_actions") or constraints.get("allowed_actions") or []) compact_state_text = prompt.get("game_state") or "" compact_state_json = _extract_embedded_json(compact_state_text) observed_facts = _build_observed_facts( compact_state_json, allowed_actions, prompt.get("task_context") or {}, ) action_type = parsed.get("action_type") or (parsed.get("action") or {}).get("type") action_parameters = parsed.get("parameters") if action_parameters is None and isinstance(parsed.get("action"), dict): action_parameters = parsed["action"].get("parameters") return { "available": bool(prompt_doc or response_doc or parsed), "session": session_dir.name if session_dir else "", "session_path": str(session_dir) if session_dir else "", "player_name": player_name, "request_number": request_number, "timestamp": ( response_doc.get("timestamp") or prompt_doc.get("timestamp") or decision.get("timestamp") or "" ), "label": f"{player_name} #{request_number}: {action_type or 'decision'}", "worldview": { "task_context": copy.deepcopy(prompt.get("task_context") or {}), "memory_before": memory_before, "social_context": social_context, "constraints": constraints, "compact_game_state": compact_state_text, "compact_game_state_json": compact_state_json, "observed_facts": observed_facts, "allowed_actions": allowed_actions, }, "tool_trace": tool_trace, "thinking": parsed.get("internal_thinking") or "", "memory_write": parsed.get("note_to_self") or "", "say_outloud": parsed.get("say_outloud") or "", "action": { "type": action_type, "parameters": action_parameters, }, "engine_result": copy.deepcopy(action_result or {}), "raw": { "prompt": prompt_doc, "response": response_doc, }, } def build_turn_flow( source_session: Path, decisions: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: """Build lightweight summaries for every decision in the selected turn.""" flow: List[Dict[str, Any]] = [] for item in decisions: decision = item.get("decision") or {} action_result = item.get("action_result") or {} parsed = decision.get("parsed") or {} action_type = parsed.get("action_type") or (parsed.get("action") or {}).get("type") response_doc = _load_response( _resolve_session_for_decision(source_session, decision), str(decision.get("player_name") or ""), int(decision.get("request_number") or 0), ) if isinstance(response_doc.get("parsed"), dict): parsed = response_doc["parsed"] action_type = parsed.get("action_type") or (parsed.get("action") or {}).get("type") flow.append({ "snapshot_index": item.get("snapshot_index"), "label": item.get("label") or "", "player_name": decision.get("player_name") or "", "request_number": decision.get("request_number") or 0, "action_type": action_type, "say_outloud": parsed.get("say_outloud") or "", "memory_write": parsed.get("note_to_self") or "", "success": action_result.get("success"), "message": action_result.get("message") or "", "turn_number": action_result.get("turn_number"), }) return flow def _load_tool_trace(session_dir: Path, player_name: str, request_number: int) -> List[Dict[str, Any]]: intermediate_responses = _load_intermediate_responses(session_dir, player_name, request_number) followups = _load_tool_followups(session_dir, player_name, request_number) execution_batches = _load_tool_executions(session_dir) used_batch_indexes: set[int] = set() trace = [] for intermediate in intermediate_responses: iteration = int(intermediate.get("iteration") or 0) tool_calls = copy.deepcopy(intermediate.get("tool_calls") or []) followup = next((item for item in followups if int(item.get("iteration") or 0) == iteration), {}) batch_index = _match_tool_execution_batch( execution_batches, intermediate, tool_calls, used_batch_indexes, ) batch = execution_batches[batch_index] if batch_index is not None else {} if batch_index is not None: used_batch_indexes.add(batch_index) trace.append({ "iteration": iteration, "timestamp": intermediate.get("timestamp") or followup.get("timestamp") or batch.get("timestamp") or "", "tool_calls": tool_calls, "tool_results_text": followup.get("tool_results") or _format_batch_results(batch), "execution_batch": batch, "followup_context_available": bool(followup.get("full_context_sent")), "full_context_sent": followup.get("full_context_sent") or "", }) for followup in followups: iteration = int(followup.get("iteration") or 0) if any(item["iteration"] == iteration for item in trace): continue trace.append({ "iteration": iteration, "timestamp": followup.get("timestamp") or "", "tool_calls": [], "tool_results_text": followup.get("tool_results") or "", "execution_batch": {}, "followup_context_available": bool(followup.get("full_context_sent")), "full_context_sent": followup.get("full_context_sent") or "", }) trace.sort(key=lambda item: item.get("iteration") or 0) return trace def _resolve_session_for_decision(source_session: Path, decision: Dict[str, Any]) -> Path: source_file = decision.get("source_file") if source_file: response_path = Path(source_file) if response_path.exists(): return response_path.parent.parent.parent return Path(source_session) def _player_dir(session_dir: Path, player_name: str) -> Optional[Path]: direct = session_dir / player_name if direct.exists(): return direct wanted = player_name.lower() for child in session_dir.iterdir() if session_dir.exists() else []: if child.is_dir() and child.name.lower() == wanted: return child return None def _load_prompt(session_dir: Path, player_name: str, request_number: int) -> Dict[str, Any]: player_dir = _player_dir(session_dir, player_name) if not player_dir: return {} return _read_json(player_dir / "prompts" / f"prompt_{request_number}.json") def _load_response(session_dir: Path, player_name: str, request_number: int) -> Dict[str, Any]: player_dir = _player_dir(session_dir, player_name) if not player_dir: return {} return _read_json(player_dir / "responses" / f"response_{request_number}.json") def _load_intermediate_responses(session_dir: Path, player_name: str, request_number: int) -> List[Dict[str, Any]]: player_dir = _player_dir(session_dir, player_name) if not player_dir: return [] intermediate_dir = player_dir / "responses" / "intermediate" items = [] for path in sorted(intermediate_dir.glob(f"response_{request_number}_iter*.json")): data = _read_json(path) if data: items.append(data) return items def _load_tool_followups(session_dir: Path, player_name: str, request_number: int) -> List[Dict[str, Any]]: player_dir = _player_dir(session_dir, player_name) if not player_dir: return [] iterations_dir = player_dir / "prompts" / "iterations" items = [] for path in sorted(iterations_dir.glob(f"prompt_{request_number}_iter*.json")): data = _read_json(path) if data: items.append(data) return items def _load_tool_executions(session_dir: Path) -> List[Dict[str, Any]]: data = _read_json(session_dir / "tool_executions.json") return data if isinstance(data, list) else [] def _match_tool_execution_batch( batches: List[Dict[str, Any]], intermediate: Dict[str, Any], tool_calls: List[Dict[str, Any]], used_batch_indexes: set[int], ) -> Optional[int]: expected_names = [str(call.get("name") or "") for call in tool_calls] intermediate_ts = _parse_timestamp(intermediate.get("timestamp")) best_index = None best_delta = None for index, batch in enumerate(batches): if index in used_batch_indexes: continue batch_names = [str(call.get("name") or "") for call in batch.get("calls") or []] if expected_names and batch_names[: len(expected_names)] != expected_names: continue batch_ts = _parse_timestamp(batch.get("timestamp")) if intermediate_ts and batch_ts and batch_ts < intermediate_ts: continue delta = ( (batch_ts - intermediate_ts).total_seconds() if intermediate_ts and batch_ts else float(index) ) if best_delta is None or delta < best_delta: best_delta = delta best_index = index return best_index def _format_batch_results(batch: Dict[str, Any]) -> str: calls = batch.get("calls") or [] if not calls: return "" lines = ["=== Tool Results ===\n"] for call in calls: lines.append(f"Tool: {call.get('name', '')}") lines.append(f"Parameters: {json.dumps(call.get('parameters') or {}, indent=2, ensure_ascii=False)}") if call.get("success", True): lines.append("Result:") lines.append(json.dumps(call.get("result"), indent=2, ensure_ascii=False)) else: lines.append(f"Error: {call.get('error') or ''}") lines.append("---\n") return "\n".join(lines) def _extract_embedded_json(text: str) -> Optional[Dict[str, Any]]: marker = "JSON:" if not isinstance(text, str) or marker not in text: return None candidate = text.split(marker, 1)[1].strip() try: return json.loads(candidate) except Exception: return None def _build_observed_facts( compact_state: Optional[Dict[str, Any]], allowed_actions: List[Dict[str, Any]], task_context: Dict[str, Any], ) -> Dict[str, Any]: """Extract the high-signal facts that were visible in compact game_state.""" if not isinstance(compact_state, dict): return { "expected_action": _expected_action_from_allowed(allowed_actions), "prompt_warnings": _prompt_consistency_warnings(allowed_actions, task_context), } meta = compact_state.get("meta") or {} dice = meta.get("dice") dice_total = sum(dice) if isinstance(dice, list) and all(isinstance(x, (int, float)) for x in dice) else None current_player = meta.get("curr") players = compact_state.get("players") or {} current_player_state = ( copy.deepcopy(players.get(current_player) or {}) if current_player is not None and isinstance(players, dict) else {} ) return { "current_player": current_player, "phase": meta.get("phase"), "robber_hex": meta.get("robber"), "dice": dice, "dice_total": dice_total, "expected_action": _expected_action_from_allowed(allowed_actions), "prompt_warnings": _prompt_consistency_warnings(allowed_actions, task_context), "current_player_state": current_player_state, "players": copy.deepcopy(players) if isinstance(players, dict) else {}, } def _allowed_types(allowed_actions: List[Dict[str, Any]]) -> set[str]: result = set() for action in allowed_actions or []: if isinstance(action, dict): value = action.get("type") else: value = str(action) if value: result.add(str(value).lower()) return result def _expected_action_from_allowed(allowed_actions: List[Dict[str, Any]]) -> str: allowed = _allowed_types(allowed_actions) if "roll_dice" in allowed and allowed <= {"roll_dice", "use_dev_card"}: if "use_dev_card" in allowed: return "Start the turn: roll dice, or optionally use a development card before rolling." return "Start the turn: roll dice." if {"build_settlement", "build_city", "build_road", "trade_propose", "trade_bank", "buy_dev_card", "end_turn"} & allowed: return "Post-roll actions: build, trade, buy/use development card, or end turn." if allowed: return "Allowed now: " + ", ".join(sorted(allowed)) return "" def _prompt_consistency_warnings( allowed_actions: List[Dict[str, Any]], task_context: Dict[str, Any], ) -> List[str]: allowed = _allowed_types(allowed_actions) what_happened = str((task_context or {}).get("what_just_happened") or "").lower() warnings = [] if "roll_dice" in allowed and allowed <= {"roll_dice", "use_dev_card"}: if "build, trade, or end" in what_happened: warnings.append( "The prompt text says build/trade/end, but the allowed actions show this is a pre-roll decision." ) return warnings def _parse_timestamp(value: Any) -> Optional[datetime]: if not value: return None try: return datetime.fromisoformat(str(value)) except Exception: return None def _read_json(path: Path) -> Any: try: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return {}