File size: 5,351 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""analyze.py — the engine orchestrator.

Consumes ONLY the contract shapes from a loader (Event[]/Turn[]/session). NEVER
reads raw JSONL. NO model. Runs the deterministic pipeline in dependency order:

  1. errors      → ToolCall.errored        (loops need this)
  2. provenance  → ToolCall.{provenance,sourceTool,flowValue}, Turn.{direct,indirect}
  3. loops       → real loops + near-identical hints (per turn)
  4. rereads     → re-read patterns (per turn)
  5. heavy       → Turn.heavy (top-3 by COST) + Turn.overBudget (cost >= floor)
  6. guidance    → Turn.guide (silence unless a named pattern fires) + Findings
  7. tokens      → session rollup + cacheRead/out ratio + cost

Returns enriched {turns, events, session, findings}.
"""
from __future__ import annotations

from typing import Any, Optional

from engine.core import clusters, errors, heavy, hygiene, loops, pins, provenance, rereads, tokens
from engine.core.guidance import attach_guides, build_findings, build_recommendations
from engine.core.provenance import ProvenanceConfig


def analyze(
    loaded: dict[str, Any],
    provenance_config: Optional[ProvenanceConfig] = None,
) -> dict[str, Any]:
    """Run the full deterministic engine over a loader's output (in place + return).

    `loaded` is exactly what engine.loaders.*.load(path) returns:
      {'events': [...], 'turns': [...], 'session': {...}}
    """
    turns = loaded["turns"]
    events = loaded["events"]
    session = loaded.get("session", {})

    # 1) error signal (loops depend on it)
    errors.annotate_errors(turns)

    # 2) value-flow provenance (precision is the release gate)
    provenance.annotate_provenance(turns, session, provenance_config)

    # 3) loops (real + near-identical hints)
    loops_by_turn = loops.detect_all_loops(turns)

    # 4) re-reads
    rereads_by_turn = rereads.detect_all_rereads(turns)

    # 5) heavy turns (top-N by COST) + absolute over-budget floor
    heavy_indices = heavy.annotate_heavy(turns)
    over_budget_indices = heavy.over_budget_turns(turns)

    # 6) tool clusters: flailing on an external CLI with no skill loaded (silence
    #    unless it fired). Session-level, list-free; cited fix from the knowledge file.
    tool_clusters = clusters.detect_tool_clusters(turns)

    # 7) guidance: per-turn guides (silence unless a pattern fires) + findings
    attach_guides(turns, rereads_by_turn, loops_by_turn)
    findings = build_findings(
        turns, rereads_by_turn, loops_by_turn, heavy_indices, tool_clusters
    )
    # extra fixable signals (read-bursts, unverified edits, near-repeats, unloaded MCP)
    read_bursts = hygiene.detect_read_bursts(turns)
    unverified = hygiene.detect_unverified_edits(turns)
    near_repeats = hygiene.detect_near_repeats(loops_by_turn)
    unloaded_mcp = hygiene.detect_unloaded_mcp(turns)
    # unpinned package runners (`npx pkg` with no @version) — a generally-recommended
    # signal only (no Anthropic practice); reuses the binary splitter, never alters
    # any token/turn/loop count. Reads the contract dicts like binaries.extract_binaries.
    npx_unpinned = pins.detect_npx_unpinned([t.to_dict() for t in turns])

    # session-level "what could have been better": fixable signals, abstracted +
    # cited (silence => empty list, an honest "expensive but clean").
    recommendations = build_recommendations(
        turns, tool_clusters, read_bursts, unverified, near_repeats, unloaded_mcp,
        npx_unpinned,
    )

    # 8) session token rollup + always-present cacheRead/out ratio, plus the
    #    point-in-time context-window "fuel gauge" (peak fill, trajectory, compactions)
    sess_tokens = tokens.rollup(turns)
    sess_context = tokens.context_window(turns, model=session.get("model"))

    # totals for the session header
    total_tools = sum(len(t.tools) for t in turns)
    total_indirect = sum(t.indirect for t in turns)
    total_direct = sum(t.direct for t in turns)
    indirect_ratio = (total_indirect / total_tools) if total_tools else 0.0

    session_out = dict(session)
    session_out.update(
        {
            "tokens": sess_tokens["tokens"],
            "cost": sess_tokens["tokens"].get("cost", 0),  # cost-weighted total (ranking key)
            "cacheReadOverOut": sess_tokens["cacheReadOverOut"],
            "context": sess_context,  # fuel gauge: peak fill / 1M, trajectory, compactions

            "turns": len(turns),
            "humanTurns": sum(1 for t in turns if t.origin == "human"),
            "systemTurns": sum(1 for t in turns if t.origin == "system"),
            "tools": total_tools,
            "direct": total_direct,
            "indirect": total_indirect,
            "indirectRatio": indirect_ratio,
            "heavyTurns": heavy_indices,
            "overBudgetTurns": over_budget_indices,
            "toolClusters": tool_clusters,
        }
    )

    return {
        "turns": turns,
        "events": events,
        "session": session_out,
        "findings": findings,
        "recommendations": recommendations,
    }


def analyze_path(
    path: str,
    provenance_config: Optional[ProvenanceConfig] = None,
) -> dict[str, Any]:
    """Convenience: load a JSONL path through the seam, then analyze."""
    from engine.loaders.jsonl_loader import load

    return analyze(load(path), provenance_config)