her / engine /core /analyze.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""analyze.py — the engine orchestrator.
Consumes ONLY the contract shapes from a loader (Event[]/Turn[]/session). NEVER
reads raw JSONL. NO model. Runs the deterministic pipeline in dependency order:
1. errors → ToolCall.errored (loops need this)
2. provenance → ToolCall.{provenance,sourceTool,flowValue}, Turn.{direct,indirect}
3. loops → real loops + near-identical hints (per turn)
4. rereads → re-read patterns (per turn)
5. heavy → Turn.heavy (top-3 by COST) + Turn.overBudget (cost >= floor)
6. guidance → Turn.guide (silence unless a named pattern fires) + Findings
7. tokens → session rollup + cacheRead/out ratio + cost
Returns enriched {turns, events, session, findings}.
"""
from __future__ import annotations
from typing import Any, Optional
from engine.core import clusters, errors, heavy, hygiene, loops, pins, provenance, rereads, tokens
from engine.core.guidance import attach_guides, build_findings, build_recommendations
from engine.core.provenance import ProvenanceConfig
def analyze(
loaded: dict[str, Any],
provenance_config: Optional[ProvenanceConfig] = None,
) -> dict[str, Any]:
"""Run the full deterministic engine over a loader's output (in place + return).
`loaded` is exactly what engine.loaders.*.load(path) returns:
{'events': [...], 'turns': [...], 'session': {...}}
"""
turns = loaded["turns"]
events = loaded["events"]
session = loaded.get("session", {})
# 1) error signal (loops depend on it)
errors.annotate_errors(turns)
# 2) value-flow provenance (precision is the release gate)
provenance.annotate_provenance(turns, session, provenance_config)
# 3) loops (real + near-identical hints)
loops_by_turn = loops.detect_all_loops(turns)
# 4) re-reads
rereads_by_turn = rereads.detect_all_rereads(turns)
# 5) heavy turns (top-N by COST) + absolute over-budget floor
heavy_indices = heavy.annotate_heavy(turns)
over_budget_indices = heavy.over_budget_turns(turns)
# 6) tool clusters: flailing on an external CLI with no skill loaded (silence
# unless it fired). Session-level, list-free; cited fix from the knowledge file.
tool_clusters = clusters.detect_tool_clusters(turns)
# 7) guidance: per-turn guides (silence unless a pattern fires) + findings
attach_guides(turns, rereads_by_turn, loops_by_turn)
findings = build_findings(
turns, rereads_by_turn, loops_by_turn, heavy_indices, tool_clusters
)
# extra fixable signals (read-bursts, unverified edits, near-repeats, unloaded MCP)
read_bursts = hygiene.detect_read_bursts(turns)
unverified = hygiene.detect_unverified_edits(turns)
near_repeats = hygiene.detect_near_repeats(loops_by_turn)
unloaded_mcp = hygiene.detect_unloaded_mcp(turns)
# unpinned package runners (`npx pkg` with no @version) — a generally-recommended
# signal only (no Anthropic practice); reuses the binary splitter, never alters
# any token/turn/loop count. Reads the contract dicts like binaries.extract_binaries.
npx_unpinned = pins.detect_npx_unpinned([t.to_dict() for t in turns])
# session-level "what could have been better": fixable signals, abstracted +
# cited (silence => empty list, an honest "expensive but clean").
recommendations = build_recommendations(
turns, tool_clusters, read_bursts, unverified, near_repeats, unloaded_mcp,
npx_unpinned,
)
# 8) session token rollup + always-present cacheRead/out ratio, plus the
# point-in-time context-window "fuel gauge" (peak fill, trajectory, compactions)
sess_tokens = tokens.rollup(turns)
sess_context = tokens.context_window(turns, model=session.get("model"))
# totals for the session header
total_tools = sum(len(t.tools) for t in turns)
total_indirect = sum(t.indirect for t in turns)
total_direct = sum(t.direct for t in turns)
indirect_ratio = (total_indirect / total_tools) if total_tools else 0.0
session_out = dict(session)
session_out.update(
{
"tokens": sess_tokens["tokens"],
"cost": sess_tokens["tokens"].get("cost", 0), # cost-weighted total (ranking key)
"cacheReadOverOut": sess_tokens["cacheReadOverOut"],
"context": sess_context, # fuel gauge: peak fill / 1M, trajectory, compactions
"turns": len(turns),
"humanTurns": sum(1 for t in turns if t.origin == "human"),
"systemTurns": sum(1 for t in turns if t.origin == "system"),
"tools": total_tools,
"direct": total_direct,
"indirect": total_indirect,
"indirectRatio": indirect_ratio,
"heavyTurns": heavy_indices,
"overBudgetTurns": over_budget_indices,
"toolClusters": tool_clusters,
}
)
return {
"turns": turns,
"events": events,
"session": session_out,
"findings": findings,
"recommendations": recommendations,
}
def analyze_path(
path: str,
provenance_config: Optional[ProvenanceConfig] = None,
) -> dict[str, Any]:
"""Convenience: load a JSONL path through the seam, then analyze."""
from engine.loaders.jsonl_loader import load
return analyze(load(path), provenance_config)