| |
| """ |
| heaptrm CLI - Scan binaries for heap exploitation patterns. |
| |
| Usage: |
| heaptrm scan ./binary [args...] |
| heaptrm scan --stdin payload.bin ./binary |
| heaptrm analyze dump.jsonl |
| heaptrm watch ./binary # live monitoring |
| """ |
|
|
| import argparse |
| import sys |
| import os |
| import json |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from heaptrm.monitor import HeapMonitor |
|
|
|
|
| def cmd_scan(args): |
| """Scan a binary for heap exploitation.""" |
| monitor = HeapMonitor(model_path=args.model) |
|
|
| stdin_data = None |
| if args.stdin: |
| with open(args.stdin, "rb") as f: |
| stdin_data = f.read() |
|
|
| result = monitor.scan( |
| args.binary, args=args.args, |
| stdin_data=stdin_data, timeout=args.timeout |
| ) |
|
|
| |
| if args.json: |
| print(json.dumps({ |
| "verdict": result.verdict, |
| "confidence": result.confidence, |
| "states": result.n_states, |
| "flagged": result.n_flagged, |
| "corruptions": [ |
| {"type": c.type, "step": c.step, "detail": c.detail} |
| for c in result.corruptions |
| ], |
| }, indent=2)) |
| else: |
| icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict] |
| print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})") |
| print(f" States observed: {result.n_states}") |
| print(f" States flagged: {result.n_flagged}") |
| if result.corruptions: |
| print(f" Corruptions detected:") |
| for c in result.corruptions: |
| print(f" step {c.step}: {c.type} — {c.detail}") |
|
|
| return 0 if result.verdict == "CLEAN" else 1 |
|
|
|
|
| def cmd_analyze(args): |
| """Analyze an existing heap dump.""" |
| monitor = HeapMonitor(model_path=args.model) |
| result = monitor.analyze_dump(args.dump) |
|
|
| icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict] |
| print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})") |
| print(f" States: {result.n_states}, Flagged: {result.n_flagged}") |
| if result.corruptions: |
| for c in result.corruptions: |
| print(f" step {c.step}: {c.type} — {c.detail}") |
|
|
|
|
| def cmd_watch(args): |
| """Live-watch a binary.""" |
| import subprocess |
| import tempfile |
| import time |
|
|
| monitor = HeapMonitor(model_path=args.model) |
| dump_path = tempfile.mktemp(suffix=".jsonl") |
|
|
| env = os.environ.copy() |
| env["LD_PRELOAD"] = monitor._harness_path |
| env["HEAPGRID_OUT"] = dump_path |
|
|
| cmd = [args.binary] + (args.args or []) |
| proc = subprocess.Popen(cmd, env=env) |
|
|
| print(f"Watching {args.binary} (PID {proc.pid})...") |
| last_pos = 0 |
|
|
| try: |
| while proc.poll() is None: |
| time.sleep(0.1) |
| if os.path.exists(dump_path): |
| with open(dump_path) as f: |
| f.seek(last_pos) |
| for line in f: |
| if line.strip(): |
| state = json.loads(line.strip()) |
| corruptions = state.get("corruptions", []) |
| if corruptions: |
| for c in corruptions: |
| print(f" [!!] step {state['step']}: " |
| f"{c['type']} — {c['detail']}") |
| last_pos = f.tell() |
| except KeyboardInterrupt: |
| proc.terminate() |
|
|
| |
| if os.path.exists(dump_path): |
| result = monitor.analyze_dump(dump_path) |
| print(f"\nFinal: [{result.verdict}] {result.n_states} states, " |
| f"{len(result.corruptions)} corruptions") |
| os.unlink(dump_path) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| prog="heaptrm", |
| description="Heap exploit detection using Tiny Recursive Models" |
| ) |
| parser.add_argument("--model", help="Path to trained .pt model") |
|
|
| sub = parser.add_subparsers(dest="command") |
|
|
| |
| p_scan = sub.add_parser("scan", help="Scan a binary") |
| p_scan.add_argument("binary") |
| p_scan.add_argument("args", nargs="*") |
| p_scan.add_argument("--stdin", help="File to pipe to stdin") |
| p_scan.add_argument("--timeout", type=int, default=30) |
| p_scan.add_argument("--json", action="store_true") |
|
|
| |
| p_analyze = sub.add_parser("analyze", help="Analyze a heap dump") |
| p_analyze.add_argument("dump") |
|
|
| |
| p_watch = sub.add_parser("watch", help="Live-watch a binary") |
| p_watch.add_argument("binary") |
| p_watch.add_argument("args", nargs="*") |
|
|
| args = parser.parse_args() |
|
|
| if args.command == "scan": |
| sys.exit(cmd_scan(args)) |
| elif args.command == "analyze": |
| cmd_analyze(args) |
| elif args.command == "watch": |
| cmd_watch(args) |
| else: |
| parser.print_help() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|