File size: 4,949 Bytes
22374d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | #!/usr/bin/env python3
"""
heaptrm CLI - Scan binaries for heap exploitation patterns.
Usage:
heaptrm scan ./binary [args...]
heaptrm scan --stdin payload.bin ./binary
heaptrm analyze dump.jsonl
heaptrm watch ./binary # live monitoring
"""
import argparse
import sys
import os
import json
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from heaptrm.monitor import HeapMonitor
def cmd_scan(args):
"""Scan a binary for heap exploitation."""
monitor = HeapMonitor(model_path=args.model)
stdin_data = None
if args.stdin:
with open(args.stdin, "rb") as f:
stdin_data = f.read()
result = monitor.scan(
args.binary, args=args.args,
stdin_data=stdin_data, timeout=args.timeout
)
# Output
if args.json:
print(json.dumps({
"verdict": result.verdict,
"confidence": result.confidence,
"states": result.n_states,
"flagged": result.n_flagged,
"corruptions": [
{"type": c.type, "step": c.step, "detail": c.detail}
for c in result.corruptions
],
}, indent=2))
else:
icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict]
print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})")
print(f" States observed: {result.n_states}")
print(f" States flagged: {result.n_flagged}")
if result.corruptions:
print(f" Corruptions detected:")
for c in result.corruptions:
print(f" step {c.step}: {c.type} — {c.detail}")
return 0 if result.verdict == "CLEAN" else 1
def cmd_analyze(args):
"""Analyze an existing heap dump."""
monitor = HeapMonitor(model_path=args.model)
result = monitor.analyze_dump(args.dump)
icon = {"EXPLOIT": "!!", "SUSPICIOUS": "??", "CLEAN": "OK"}[result.verdict]
print(f"[{icon}] {result.verdict} (confidence: {result.confidence:.1%})")
print(f" States: {result.n_states}, Flagged: {result.n_flagged}")
if result.corruptions:
for c in result.corruptions:
print(f" step {c.step}: {c.type} — {c.detail}")
def cmd_watch(args):
"""Live-watch a binary."""
import subprocess
import tempfile
import time
monitor = HeapMonitor(model_path=args.model)
dump_path = tempfile.mktemp(suffix=".jsonl")
env = os.environ.copy()
env["LD_PRELOAD"] = monitor._harness_path
env["HEAPGRID_OUT"] = dump_path
cmd = [args.binary] + (args.args or [])
proc = subprocess.Popen(cmd, env=env)
print(f"Watching {args.binary} (PID {proc.pid})...")
last_pos = 0
try:
while proc.poll() is None:
time.sleep(0.1)
if os.path.exists(dump_path):
with open(dump_path) as f:
f.seek(last_pos)
for line in f:
if line.strip():
state = json.loads(line.strip())
corruptions = state.get("corruptions", [])
if corruptions:
for c in corruptions:
print(f" [!!] step {state['step']}: "
f"{c['type']} — {c['detail']}")
last_pos = f.tell()
except KeyboardInterrupt:
proc.terminate()
# Final analysis
if os.path.exists(dump_path):
result = monitor.analyze_dump(dump_path)
print(f"\nFinal: [{result.verdict}] {result.n_states} states, "
f"{len(result.corruptions)} corruptions")
os.unlink(dump_path)
def main():
parser = argparse.ArgumentParser(
prog="heaptrm",
description="Heap exploit detection using Tiny Recursive Models"
)
parser.add_argument("--model", help="Path to trained .pt model")
sub = parser.add_subparsers(dest="command")
# scan
p_scan = sub.add_parser("scan", help="Scan a binary")
p_scan.add_argument("binary")
p_scan.add_argument("args", nargs="*")
p_scan.add_argument("--stdin", help="File to pipe to stdin")
p_scan.add_argument("--timeout", type=int, default=30)
p_scan.add_argument("--json", action="store_true")
# analyze
p_analyze = sub.add_parser("analyze", help="Analyze a heap dump")
p_analyze.add_argument("dump")
# watch
p_watch = sub.add_parser("watch", help="Live-watch a binary")
p_watch.add_argument("binary")
p_watch.add_argument("args", nargs="*")
args = parser.parse_args()
if args.command == "scan":
sys.exit(cmd_scan(args))
elif args.command == "analyze":
cmd_analyze(args)
elif args.command == "watch":
cmd_watch(args)
else:
parser.print_help()
if __name__ == "__main__":
main()
|