#!/usr/bin/env python3 """Her · हेर — engine CLI. Deterministic. NO model. 100% local; reads a session JSONL through the loader seam, runs the engine, writes the enriched contract JSON. Usage: python3 engine/cli.py analyze "" --out "" Output JSON shape: { "session": {...}, "turns": [Turn...], "events": [Event...], "findings": [Finding...] } """ from __future__ import annotations import argparse import json import sys from pathlib import Path # Make the repo root importable regardless of CWD (agent threads reset cwd). REPO = Path(__file__).resolve().parent.parent if str(REPO) not in sys.path: sys.path.insert(0, str(REPO)) from engine.contract import to_jsonable # noqa: E402 from engine.core.analyze import analyze_path # noqa: E402 from engine.core.binaries_db import load_registry # noqa: E402 from engine.core.impact import detect_impact # noqa: E402 from engine.binaries import extract_binaries # noqa: E402 from engine.entities import extract_entities # noqa: E402 def _serialize(result: dict) -> dict: """Turn the enriched engine result into plain JSON-serializable structures. `session` and `findings` are already plain dicts/lists; `turns` and `events` are dataclasses with to_dict() (handled by to_jsonable). """ turns = [to_jsonable(t) for t in result["turns"]] binaries = extract_binaries(turns, load_registry()) return { "session": result["session"], "turns": turns, "events": [to_jsonable(e) for e in result["events"]], "findings": result["findings"], "recommendations": result.get("recommendations", []), # named entities (skills / sub-agents / MCP servers), per-session, traceable # to the launching turn — same as the server adds, so the offline/demo path # (and the UI's session-level inventory) has them too. "entities": extract_entities(turns), # binaries actually run via Bash (npx remotion -> remotion, railway, …), # a SEPARATE entity dimension from tool calls, enriched from the registry. "binaries": binaries, # actions worth reviewing + risk level + outcome (deterministic, suggest-only) "impact": detect_impact(turns, binaries), } def cmd_analyze(args: argparse.Namespace) -> int: src = Path(args.session) if not src.exists(): print(f"FAIL — session not found: {src}", file=sys.stderr) return 1 result = analyze_path(str(src)) payload = _serialize(result) if args.out: out = Path(args.out) out.parent.mkdir(parents=True, exist_ok=True) out.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") sess = payload["session"] print( f"OK — analyzed {sess.get('tools')} tools across {sess.get('turns')} turns " f"({sess.get('indirect')} indirect / {sess.get('direct')} direct, " f"ratio {sess.get('indirectRatio'):.4f}); " f"{len(payload['findings'])} findings → {out}" ) else: json.dump(payload, sys.stdout, indent=2, ensure_ascii=False) sys.stdout.write("\n") return 0 def main(argv=None) -> int: parser = argparse.ArgumentParser(prog="trace-engine", description="Her · हेर engine") sub = parser.add_subparsers(dest="command", required=True) p_an = sub.add_parser("analyze", help="analyze a session JSONL") p_an.add_argument("session", help="path to the session .jsonl") p_an.add_argument("--out", default=None, help="write enriched analysis JSON here") p_an.set_defaults(func=cmd_analyze) args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": sys.exit(main())