Spaces:
Running on Zero
Running on Zero
| """analyze.py — the engine orchestrator. | |
| Consumes ONLY the contract shapes from a loader (Event[]/Turn[]/session). NEVER | |
| reads raw JSONL. NO model. Runs the deterministic pipeline in dependency order: | |
| 1. errors → ToolCall.errored (loops need this) | |
| 2. provenance → ToolCall.{provenance,sourceTool,flowValue}, Turn.{direct,indirect} | |
| 3. loops → real loops + near-identical hints (per turn) | |
| 4. rereads → re-read patterns (per turn) | |
| 5. heavy → Turn.heavy (top-3 by COST) + Turn.overBudget (cost >= floor) | |
| 6. guidance → Turn.guide (silence unless a named pattern fires) + Findings | |
| 7. tokens → session rollup + cacheRead/out ratio + cost | |
| Returns enriched {turns, events, session, findings}. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Optional | |
| from engine.core import clusters, errors, heavy, hygiene, loops, pins, provenance, rereads, tokens | |
| from engine.core.guidance import attach_guides, build_findings, build_recommendations | |
| from engine.core.provenance import ProvenanceConfig | |
| def analyze( | |
| loaded: dict[str, Any], | |
| provenance_config: Optional[ProvenanceConfig] = None, | |
| ) -> dict[str, Any]: | |
| """Run the full deterministic engine over a loader's output (in place + return). | |
| `loaded` is exactly what engine.loaders.*.load(path) returns: | |
| {'events': [...], 'turns': [...], 'session': {...}} | |
| """ | |
| turns = loaded["turns"] | |
| events = loaded["events"] | |
| session = loaded.get("session", {}) | |
| # 1) error signal (loops depend on it) | |
| errors.annotate_errors(turns) | |
| # 2) value-flow provenance (precision is the release gate) | |
| provenance.annotate_provenance(turns, session, provenance_config) | |
| # 3) loops (real + near-identical hints) | |
| loops_by_turn = loops.detect_all_loops(turns) | |
| # 4) re-reads | |
| rereads_by_turn = rereads.detect_all_rereads(turns) | |
| # 5) heavy turns (top-N by COST) + absolute over-budget floor | |
| heavy_indices = heavy.annotate_heavy(turns) | |
| over_budget_indices = heavy.over_budget_turns(turns) | |
| # 6) tool clusters: flailing on an external CLI with no skill loaded (silence | |
| # unless it fired). Session-level, list-free; cited fix from the knowledge file. | |
| tool_clusters = clusters.detect_tool_clusters(turns) | |
| # 7) guidance: per-turn guides (silence unless a pattern fires) + findings | |
| attach_guides(turns, rereads_by_turn, loops_by_turn) | |
| findings = build_findings( | |
| turns, rereads_by_turn, loops_by_turn, heavy_indices, tool_clusters | |
| ) | |
| # extra fixable signals (read-bursts, unverified edits, near-repeats, unloaded MCP) | |
| read_bursts = hygiene.detect_read_bursts(turns) | |
| unverified = hygiene.detect_unverified_edits(turns) | |
| near_repeats = hygiene.detect_near_repeats(loops_by_turn) | |
| unloaded_mcp = hygiene.detect_unloaded_mcp(turns) | |
| # unpinned package runners (`npx pkg` with no @version) — a generally-recommended | |
| # signal only (no Anthropic practice); reuses the binary splitter, never alters | |
| # any token/turn/loop count. Reads the contract dicts like binaries.extract_binaries. | |
| npx_unpinned = pins.detect_npx_unpinned([t.to_dict() for t in turns]) | |
| # session-level "what could have been better": fixable signals, abstracted + | |
| # cited (silence => empty list, an honest "expensive but clean"). | |
| recommendations = build_recommendations( | |
| turns, tool_clusters, read_bursts, unverified, near_repeats, unloaded_mcp, | |
| npx_unpinned, | |
| ) | |
| # 8) session token rollup + always-present cacheRead/out ratio, plus the | |
| # point-in-time context-window "fuel gauge" (peak fill, trajectory, compactions) | |
| sess_tokens = tokens.rollup(turns) | |
| sess_context = tokens.context_window(turns, model=session.get("model")) | |
| # totals for the session header | |
| total_tools = sum(len(t.tools) for t in turns) | |
| total_indirect = sum(t.indirect for t in turns) | |
| total_direct = sum(t.direct for t in turns) | |
| indirect_ratio = (total_indirect / total_tools) if total_tools else 0.0 | |
| session_out = dict(session) | |
| session_out.update( | |
| { | |
| "tokens": sess_tokens["tokens"], | |
| "cost": sess_tokens["tokens"].get("cost", 0), # cost-weighted total (ranking key) | |
| "cacheReadOverOut": sess_tokens["cacheReadOverOut"], | |
| "context": sess_context, # fuel gauge: peak fill / 1M, trajectory, compactions | |
| "turns": len(turns), | |
| "humanTurns": sum(1 for t in turns if t.origin == "human"), | |
| "systemTurns": sum(1 for t in turns if t.origin == "system"), | |
| "tools": total_tools, | |
| "direct": total_direct, | |
| "indirect": total_indirect, | |
| "indirectRatio": indirect_ratio, | |
| "heavyTurns": heavy_indices, | |
| "overBudgetTurns": over_budget_indices, | |
| "toolClusters": tool_clusters, | |
| } | |
| ) | |
| return { | |
| "turns": turns, | |
| "events": events, | |
| "session": session_out, | |
| "findings": findings, | |
| "recommendations": recommendations, | |
| } | |
| def analyze_path( | |
| path: str, | |
| provenance_config: Optional[ProvenanceConfig] = None, | |
| ) -> dict[str, Any]: | |
| """Convenience: load a JSONL path through the seam, then analyze.""" | |
| from engine.loaders.jsonl_loader import load | |
| return analyze(load(path), provenance_config) | |