heap-trm / runner /run_poc.py
amarck's picture
Add heaptrm package: v2 harness, CLI, pwntools integration, CVE tests
22374d1
#!/usr/bin/env python3
"""
run_poc.py - End-to-end validation pipeline for HeapTRM.
1. Compiles the LD_PRELOAD harness
2. Compiles how2heap examples for the local glibc version
3. Runs each instrumented, collecting heap state dumps
4. Generates grid datasets
5. Trains HeapTRM
6. Evaluates on held-out techniques
"""
import subprocess
import sys
import os
from pathlib import Path
# Project root
ROOT = Path(__file__).resolve().parent.parent
HOW2HEAP = ROOT / "how2heap"
HARNESS_DIR = ROOT / "harness"
DUMP_DIR = ROOT / "data" / "dumps"
PROCESSED_DIR = ROOT / "data" / "processed"
CHECKPOINT_DIR = ROOT / "data" / "checkpoints"
# Detect glibc version
GLIBC_VERSION = "2.39" # matches system
# Techniques to use
TRAIN_TECHNIQUES = [
"fastbin_dup",
"fastbin_dup_into_stack",
"tcache_poisoning",
"house_of_spirit",
"unsafe_unlink",
"tcache_house_of_spirit",
"tcache_stashing_unlink_attack",
"house_of_einherjar",
"large_bin_attack",
"poison_null_byte",
]
TEST_TECHNIQUES = [
"fastbin_dup_consolidate",
"house_of_botcake",
"overlapping_chunks",
]
ALL_TECHNIQUES = TRAIN_TECHNIQUES + TEST_TECHNIQUES
def run(cmd, **kwargs):
"""Run a command, print it, and check for errors."""
print(f" $ {' '.join(str(c) for c in cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, **kwargs)
if result.returncode != 0:
print(f" STDERR: {result.stderr[:500]}")
return result
def step1_build_harness():
"""Compile the LD_PRELOAD heap instrumentation harness."""
print("\n=== Step 1: Building harness ===")
result = run(["make", "-C", str(HARNESS_DIR), "clean"])
result = run(["make", "-C", str(HARNESS_DIR)])
harness_so = HARNESS_DIR / "heapgrid_harness.so"
if not harness_so.exists():
print("FATAL: Failed to build harness")
sys.exit(1)
print(f" Built: {harness_so}")
return harness_so
def step2_compile_examples():
"""Compile how2heap examples for our glibc version."""
print("\n=== Step 2: Compiling how2heap examples ===")
src_dir = HOW2HEAP / f"glibc_{GLIBC_VERSION}"
bin_dir = ROOT / "data" / "bins"
bin_dir.mkdir(parents=True, exist_ok=True)
compiled = {}
for tech in ALL_TECHNIQUES:
src = src_dir / f"{tech}.c"
if not src.exists():
print(f" [SKIP] {src} not found")
continue
out = bin_dir / tech
result = run([
"gcc", "-o", str(out), str(src),
"-std=c99", "-g", "-O0",
"-Wno-all",
"-lpthread", # some examples need it
])
if result.returncode == 0 and out.exists():
compiled[tech] = out
print(f" [OK] {tech}")
else:
print(f" [FAIL] {tech}: {result.stderr[:200]}")
return compiled
def step3_collect_dumps(compiled: dict, harness_so: Path):
"""Run each compiled example with the harness and collect dumps."""
print("\n=== Step 3: Collecting heap state dumps ===")
DUMP_DIR.mkdir(parents=True, exist_ok=True)
collected = []
for tech, binary in compiled.items():
dump_file = DUMP_DIR / f"{tech}.jsonl"
env = os.environ.copy()
env["LD_PRELOAD"] = str(harness_so)
env["HEAPGRID_OUT"] = str(dump_file)
result = run([str(binary)], env=env, timeout=10)
if dump_file.exists() and dump_file.stat().st_size > 0:
n_lines = sum(1 for _ in open(dump_file))
print(f" [OK] {tech}: {n_lines} states captured")
collected.append(tech)
else:
print(f" [FAIL] {tech}: no dump output")
if result.stderr:
print(f" stderr: {result.stderr[:200]}")
return collected
def step4_generate_dataset(collected: list):
"""Convert dumps to numpy grid arrays."""
print("\n=== Step 4: Generating grid dataset ===")
# Filter techniques to only those successfully collected
train = [t for t in TRAIN_TECHNIQUES if t in collected]
test = [t for t in TEST_TECHNIQUES if t in collected]
print(f" Train techniques: {train}")
print(f" Test techniques: {test}")
# Import dataset generator
sys.path.insert(0, str(ROOT / "dataset"))
from dataset_gen import build_dataset
build_dataset(DUMP_DIR, PROCESSED_DIR, train, test)
def step5_train_and_evaluate():
"""Train HeapTRM and evaluate."""
print("\n=== Step 5: Training HeapTRM ===")
sys.path.insert(0, str(ROOT / "model"))
from trm_heap import train_model
model, metrics = train_model(
data_dir=PROCESSED_DIR,
output_dir=CHECKPOINT_DIR,
hidden_dim=128,
n_outer=3,
n_inner=6,
epochs=100,
batch_size=16,
lr=3e-4,
)
print("\n" + "=" * 60)
print("FINAL RESULTS")
print("=" * 60)
print(f" Accuracy: {metrics['accuracy']:.3f}")
print(f" Precision: {metrics['precision']:.3f}")
print(f" Recall: {metrics['recall']:.3f}")
print(f" F1: {metrics['f1']:.3f}")
print(f" TP={metrics['tp']} FP={metrics['fp']} "
f"FN={metrics['fn']} TN={metrics['tn']}")
print("=" * 60)
return metrics
def main():
print("HeapTRM Proof-of-Concept Validation Pipeline")
print("=" * 60)
harness_so = step1_build_harness()
compiled = step2_compile_examples()
if not compiled:
print("\nFATAL: No examples compiled successfully")
sys.exit(1)
collected = step3_collect_dumps(compiled, harness_so)
if not collected:
print("\nFATAL: No dumps collected")
sys.exit(1)
step4_generate_dataset(collected)
metrics = step5_train_and_evaluate()
# Exit code based on whether model learned anything
if metrics['f1'] > 0.5:
print("\nVERDICT: TRM shows signal on heap state reasoning. Worth pursuing.")
sys.exit(0)
else:
print("\nVERDICT: TRM did not learn meaningful patterns. Reconsider approach.")
sys.exit(1)
if __name__ == "__main__":
main()