#!/usr/bin/env python3 """ heaptrm CLI - Scan binaries for heap exploitation patterns. Usage: heaptrm scan ./binary [args...] heaptrm scan --stdin payload.bin ./binary heaptrm analyze dump.jsonl heaptrm watch ./binary # live monitoring """ import argparse import sys import os import json from pathlib import Path # Add parent to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from heaptrm.monitor import HeapMonitor def cmd_scan(args): """Scan a binary for heap exploitation.""" monitor = HeapMonitor(model_path=args.model) stdin_data = None if args.stdin: with open(args.stdin, "rb") as f: stdin_data = f.read() result = monitor.scan( args.binary, args=args.args, stdin_data=stdin_data, timeout=args.timeout ) # Output if args.json: print(json.dumps({ "verdict": result.verdict, "confidence": result.confidence, "states": result.n_states, "flagged": result.n_flagged, "corruptions": [ {"type": c.type, "step": c.step, "detail": c.detail} for c in result.corruptions ], }, indent=2)) else: icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict] print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})") print(f" States observed: {result.n_states}") print(f" States flagged: {result.n_flagged}") if result.corruptions: print(f" Corruptions detected:") for c in result.corruptions: print(f" step {c.step}: {c.type} — {c.detail}") return 0 if result.verdict == "CLEAN" else 1 def cmd_analyze(args): """Analyze an existing heap dump.""" monitor = HeapMonitor(model_path=args.model) result = monitor.analyze_dump(args.dump) icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict] print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})") print(f" States: {result.n_states}, Flagged: {result.n_flagged}") if result.corruptions: for c in result.corruptions: print(f" step {c.step}: {c.type} — {c.detail}") def cmd_watch(args): """Live-watch a binary.""" import subprocess import tempfile import time monitor = HeapMonitor(model_path=args.model) dump_path = tempfile.mktemp(suffix=".jsonl") env = os.environ.copy() env["LD_PRELOAD"] = monitor._harness_path env["HEAPGRID_OUT"] = dump_path cmd = [args.binary] + (args.args or []) proc = subprocess.Popen(cmd, env=env) print(f"Watching {args.binary} (PID {proc.pid})...") last_pos = 0 try: while proc.poll() is None: time.sleep(0.1) if os.path.exists(dump_path): with open(dump_path) as f: f.seek(last_pos) for line in f: if line.strip(): state = json.loads(line.strip()) corruptions = state.get("corruptions", []) if corruptions: for c in corruptions: print(f" [!!] step {state['step']}: " f"{c['type']} — {c['detail']}") last_pos = f.tell() except KeyboardInterrupt: proc.terminate() # Final analysis if os.path.exists(dump_path): result = monitor.analyze_dump(dump_path) print(f"\nFinal: [{result.verdict}] {result.n_states} states, " f"{len(result.corruptions)} corruptions") os.unlink(dump_path) def main(): parser = argparse.ArgumentParser( prog="heaptrm", description="Heap exploit detection using Tiny Recursive Models" ) parser.add_argument("--model", help="Path to trained .pt model") sub = parser.add_subparsers(dest="command") # scan p_scan = sub.add_parser("scan", help="Scan a binary") p_scan.add_argument("binary") p_scan.add_argument("args", nargs="*") p_scan.add_argument("--stdin", help="File to pipe to stdin") p_scan.add_argument("--timeout", type=int, default=30) p_scan.add_argument("--json", action="store_true") # analyze p_analyze = sub.add_parser("analyze", help="Analyze a heap dump") p_analyze.add_argument("dump") # watch p_watch = sub.add_parser("watch", help="Live-watch a binary") p_watch.add_argument("binary") p_watch.add_argument("args", nargs="*") args = parser.parse_args() if args.command == "scan": sys.exit(cmd_scan(args)) elif args.command == "analyze": cmd_analyze(args) elif args.command == "watch": cmd_watch(args) else: parser.print_help() if __name__ == "__main__": main()