heap-trm / heaptrm /cli.py
amarck's picture
Add heaptrm package: v2 harness, CLI, pwntools integration, CVE tests
22374d1
#!/usr/bin/env python3
"""
heaptrm CLI - Scan binaries for heap exploitation patterns.
Usage:
heaptrm scan ./binary [args...]
heaptrm scan --stdin payload.bin ./binary
heaptrm analyze dump.jsonl
heaptrm watch ./binary # live monitoring
"""
import argparse
import sys
import os
import json
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from heaptrm.monitor import HeapMonitor
def cmd_scan(args):
"""Scan a binary for heap exploitation."""
monitor = HeapMonitor(model_path=args.model)
stdin_data = None
if args.stdin:
with open(args.stdin, "rb") as f:
stdin_data = f.read()
result = monitor.scan(
args.binary, args=args.args,
stdin_data=stdin_data, timeout=args.timeout
)
# Output
if args.json:
print(json.dumps({
"verdict": result.verdict,
"confidence": result.confidence,
"states": result.n_states,
"flagged": result.n_flagged,
"corruptions": [
{"type": c.type, "step": c.step, "detail": c.detail}
for c in result.corruptions
],
}, indent=2))
else:
icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict]
print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})")
print(f" States observed: {result.n_states}")
print(f" States flagged: {result.n_flagged}")
if result.corruptions:
print(f" Corruptions detected:")
for c in result.corruptions:
print(f" step {c.step}: {c.type}{c.detail}")
return 0 if result.verdict == "CLEAN" else 1
def cmd_analyze(args):
"""Analyze an existing heap dump."""
monitor = HeapMonitor(model_path=args.model)
result = monitor.analyze_dump(args.dump)
icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict]
print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})")
print(f" States: {result.n_states}, Flagged: {result.n_flagged}")
if result.corruptions:
for c in result.corruptions:
print(f" step {c.step}: {c.type}{c.detail}")
def cmd_watch(args):
"""Live-watch a binary."""
import subprocess
import tempfile
import time
monitor = HeapMonitor(model_path=args.model)
dump_path = tempfile.mktemp(suffix=".jsonl")
env = os.environ.copy()
env["LD_PRELOAD"] = monitor._harness_path
env["HEAPGRID_OUT"] = dump_path
cmd = [args.binary] + (args.args or [])
proc = subprocess.Popen(cmd, env=env)
print(f"Watching {args.binary} (PID {proc.pid})...")
last_pos = 0
try:
while proc.poll() is None:
time.sleep(0.1)
if os.path.exists(dump_path):
with open(dump_path) as f:
f.seek(last_pos)
for line in f:
if line.strip():
state = json.loads(line.strip())
corruptions = state.get("corruptions", [])
if corruptions:
for c in corruptions:
print(f" [!!] step {state['step']}: "
f"{c['type']}{c['detail']}")
last_pos = f.tell()
except KeyboardInterrupt:
proc.terminate()
# Final analysis
if os.path.exists(dump_path):
result = monitor.analyze_dump(dump_path)
print(f"\nFinal: [{result.verdict}] {result.n_states} states, "
f"{len(result.corruptions)} corruptions")
os.unlink(dump_path)
def main():
parser = argparse.ArgumentParser(
prog="heaptrm",
description="Heap exploit detection using Tiny Recursive Models"
)
parser.add_argument("--model", help="Path to trained .pt model")
sub = parser.add_subparsers(dest="command")
# scan
p_scan = sub.add_parser("scan", help="Scan a binary")
p_scan.add_argument("binary")
p_scan.add_argument("args", nargs="*")
p_scan.add_argument("--stdin", help="File to pipe to stdin")
p_scan.add_argument("--timeout", type=int, default=30)
p_scan.add_argument("--json", action="store_true")
# analyze
p_analyze = sub.add_parser("analyze", help="Analyze a heap dump")
p_analyze.add_argument("dump")
# watch
p_watch = sub.add_parser("watch", help="Live-watch a binary")
p_watch.add_argument("binary")
p_watch.add_argument("args", nargs="*")
args = parser.parse_args()
if args.command == "scan":
sys.exit(cmd_scan(args))
elif args.command == "analyze":
cmd_analyze(args)
elif args.command == "watch":
cmd_watch(args)
else:
parser.print_help()
if __name__ == "__main__":
main()