Spaces:
Running on Zero
Running on Zero
| #!/usr/bin/env python3 | |
| """Her · हेर — engine CLI. | |
| Deterministic. NO model. 100% local; reads a session JSONL through the loader | |
| seam, runs the engine, writes the enriched contract JSON. | |
| Usage: | |
| python3 engine/cli.py analyze "<session.jsonl>" --out "<file.json>" | |
| Output JSON shape: | |
| { "session": {...}, "turns": [Turn...], "events": [Event...], "findings": [Finding...] } | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from pathlib import Path | |
| # Make the repo root importable regardless of CWD (agent threads reset cwd). | |
| REPO = Path(__file__).resolve().parent.parent | |
| if str(REPO) not in sys.path: | |
| sys.path.insert(0, str(REPO)) | |
| from engine.contract import to_jsonable # noqa: E402 | |
| from engine.core.analyze import analyze_path # noqa: E402 | |
| from engine.core.binaries_db import load_registry # noqa: E402 | |
| from engine.core.impact import detect_impact # noqa: E402 | |
| from engine.binaries import extract_binaries # noqa: E402 | |
| from engine.entities import extract_entities # noqa: E402 | |
| def _serialize(result: dict) -> dict: | |
| """Turn the enriched engine result into plain JSON-serializable structures. | |
| `session` and `findings` are already plain dicts/lists; `turns` and `events` | |
| are dataclasses with to_dict() (handled by to_jsonable). | |
| """ | |
| turns = [to_jsonable(t) for t in result["turns"]] | |
| binaries = extract_binaries(turns, load_registry()) | |
| return { | |
| "session": result["session"], | |
| "turns": turns, | |
| "events": [to_jsonable(e) for e in result["events"]], | |
| "findings": result["findings"], | |
| "recommendations": result.get("recommendations", []), | |
| # named entities (skills / sub-agents / MCP servers), per-session, traceable | |
| # to the launching turn — same as the server adds, so the offline/demo path | |
| # (and the UI's session-level inventory) has them too. | |
| "entities": extract_entities(turns), | |
| # binaries actually run via Bash (npx remotion -> remotion, railway, …), | |
| # a SEPARATE entity dimension from tool calls, enriched from the registry. | |
| "binaries": binaries, | |
| # actions worth reviewing + risk level + outcome (deterministic, suggest-only) | |
| "impact": detect_impact(turns, binaries), | |
| } | |
| def cmd_analyze(args: argparse.Namespace) -> int: | |
| src = Path(args.session) | |
| if not src.exists(): | |
| print(f"FAIL — session not found: {src}", file=sys.stderr) | |
| return 1 | |
| result = analyze_path(str(src)) | |
| payload = _serialize(result) | |
| if args.out: | |
| out = Path(args.out) | |
| out.parent.mkdir(parents=True, exist_ok=True) | |
| out.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") | |
| sess = payload["session"] | |
| print( | |
| f"OK — analyzed {sess.get('tools')} tools across {sess.get('turns')} turns " | |
| f"({sess.get('indirect')} indirect / {sess.get('direct')} direct, " | |
| f"ratio {sess.get('indirectRatio'):.4f}); " | |
| f"{len(payload['findings'])} findings → {out}" | |
| ) | |
| else: | |
| json.dump(payload, sys.stdout, indent=2, ensure_ascii=False) | |
| sys.stdout.write("\n") | |
| return 0 | |
| def main(argv=None) -> int: | |
| parser = argparse.ArgumentParser(prog="trace-engine", description="Her · हेर engine") | |
| sub = parser.add_subparsers(dest="command", required=True) | |
| p_an = sub.add_parser("analyze", help="analyze a session JSONL") | |
| p_an.add_argument("session", help="path to the session .jsonl") | |
| p_an.add_argument("--out", default=None, help="write enriched analysis JSON here") | |
| p_an.set_defaults(func=cmd_analyze) | |
| args = parser.parse_args(argv) | |
| return args.func(args) | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |