"""analyze.py — the engine orchestrator. Consumes ONLY the contract shapes from a loader (Event[]/Turn[]/session). NEVER reads raw JSONL. NO model. Runs the deterministic pipeline in dependency order: 1. errors → ToolCall.errored (loops need this) 2. provenance → ToolCall.{provenance,sourceTool,flowValue}, Turn.{direct,indirect} 3. loops → real loops + near-identical hints (per turn) 4. rereads → re-read patterns (per turn) 5. heavy → Turn.heavy (top-3 by COST) + Turn.overBudget (cost >= floor) 6. guidance → Turn.guide (silence unless a named pattern fires) + Findings 7. tokens → session rollup + cacheRead/out ratio + cost Returns enriched {turns, events, session, findings}. """ from __future__ import annotations from typing import Any, Optional from engine.core import clusters, errors, heavy, hygiene, loops, pins, provenance, rereads, tokens from engine.core.guidance import attach_guides, build_findings, build_recommendations from engine.core.provenance import ProvenanceConfig def analyze( loaded: dict[str, Any], provenance_config: Optional[ProvenanceConfig] = None, ) -> dict[str, Any]: """Run the full deterministic engine over a loader's output (in place + return). `loaded` is exactly what engine.loaders.*.load(path) returns: {'events': [...], 'turns': [...], 'session': {...}} """ turns = loaded["turns"] events = loaded["events"] session = loaded.get("session", {}) # 1) error signal (loops depend on it) errors.annotate_errors(turns) # 2) value-flow provenance (precision is the release gate) provenance.annotate_provenance(turns, session, provenance_config) # 3) loops (real + near-identical hints) loops_by_turn = loops.detect_all_loops(turns) # 4) re-reads rereads_by_turn = rereads.detect_all_rereads(turns) # 5) heavy turns (top-N by COST) + absolute over-budget floor heavy_indices = heavy.annotate_heavy(turns) over_budget_indices = heavy.over_budget_turns(turns) # 6) tool clusters: flailing on an external CLI with no skill loaded (silence # unless it fired). Session-level, list-free; cited fix from the knowledge file. tool_clusters = clusters.detect_tool_clusters(turns) # 7) guidance: per-turn guides (silence unless a pattern fires) + findings attach_guides(turns, rereads_by_turn, loops_by_turn) findings = build_findings( turns, rereads_by_turn, loops_by_turn, heavy_indices, tool_clusters ) # extra fixable signals (read-bursts, unverified edits, near-repeats, unloaded MCP) read_bursts = hygiene.detect_read_bursts(turns) unverified = hygiene.detect_unverified_edits(turns) near_repeats = hygiene.detect_near_repeats(loops_by_turn) unloaded_mcp = hygiene.detect_unloaded_mcp(turns) # unpinned package runners (`npx pkg` with no @version) — a generally-recommended # signal only (no Anthropic practice); reuses the binary splitter, never alters # any token/turn/loop count. Reads the contract dicts like binaries.extract_binaries. npx_unpinned = pins.detect_npx_unpinned([t.to_dict() for t in turns]) # session-level "what could have been better": fixable signals, abstracted + # cited (silence => empty list, an honest "expensive but clean"). recommendations = build_recommendations( turns, tool_clusters, read_bursts, unverified, near_repeats, unloaded_mcp, npx_unpinned, ) # 8) session token rollup + always-present cacheRead/out ratio, plus the # point-in-time context-window "fuel gauge" (peak fill, trajectory, compactions) sess_tokens = tokens.rollup(turns) sess_context = tokens.context_window(turns, model=session.get("model")) # totals for the session header total_tools = sum(len(t.tools) for t in turns) total_indirect = sum(t.indirect for t in turns) total_direct = sum(t.direct for t in turns) indirect_ratio = (total_indirect / total_tools) if total_tools else 0.0 session_out = dict(session) session_out.update( { "tokens": sess_tokens["tokens"], "cost": sess_tokens["tokens"].get("cost", 0), # cost-weighted total (ranking key) "cacheReadOverOut": sess_tokens["cacheReadOverOut"], "context": sess_context, # fuel gauge: peak fill / 1M, trajectory, compactions "turns": len(turns), "humanTurns": sum(1 for t in turns if t.origin == "human"), "systemTurns": sum(1 for t in turns if t.origin == "system"), "tools": total_tools, "direct": total_direct, "indirect": total_indirect, "indirectRatio": indirect_ratio, "heavyTurns": heavy_indices, "overBudgetTurns": over_budget_indices, "toolClusters": tool_clusters, } ) return { "turns": turns, "events": events, "session": session_out, "findings": findings, "recommendations": recommendations, } def analyze_path( path: str, provenance_config: Optional[ProvenanceConfig] = None, ) -> dict[str, Any]: """Convenience: load a JSONL path through the seam, then analyze.""" from engine.loaders.jsonl_loader import load return analyze(load(path), provenance_config)