| |
| """ |
| run_poc.py - End-to-end validation pipeline for HeapTRM. |
| |
| 1. Compiles the LD_PRELOAD harness |
| 2. Compiles how2heap examples for the local glibc version |
| 3. Runs each instrumented, collecting heap state dumps |
| 4. Generates grid datasets |
| 5. Trains HeapTRM |
| 6. Evaluates on held-out techniques |
| """ |
|
|
| import subprocess |
| import sys |
| import os |
| from pathlib import Path |
|
|
| |
| ROOT = Path(__file__).resolve().parent.parent |
| HOW2HEAP = ROOT / "how2heap" |
| HARNESS_DIR = ROOT / "harness" |
| DUMP_DIR = ROOT / "data" / "dumps" |
| PROCESSED_DIR = ROOT / "data" / "processed" |
| CHECKPOINT_DIR = ROOT / "data" / "checkpoints" |
|
|
| |
| GLIBC_VERSION = "2.39" |
|
|
| |
| TRAIN_TECHNIQUES = [ |
| "fastbin_dup", |
| "fastbin_dup_into_stack", |
| "tcache_poisoning", |
| "house_of_spirit", |
| "unsafe_unlink", |
| "tcache_house_of_spirit", |
| "tcache_stashing_unlink_attack", |
| "house_of_einherjar", |
| "large_bin_attack", |
| "poison_null_byte", |
| ] |
|
|
| TEST_TECHNIQUES = [ |
| "fastbin_dup_consolidate", |
| "house_of_botcake", |
| "overlapping_chunks", |
| ] |
|
|
| ALL_TECHNIQUES = TRAIN_TECHNIQUES + TEST_TECHNIQUES |
|
|
|
|
| def run(cmd, **kwargs): |
| """Run a command, print it, and check for errors.""" |
| print(f" $ {' '.join(str(c) for c in cmd)}") |
| result = subprocess.run(cmd, capture_output=True, text=True, **kwargs) |
| if result.returncode != 0: |
| print(f" STDERR: {result.stderr[:500]}") |
| return result |
|
|
|
|
| def step1_build_harness(): |
| """Compile the LD_PRELOAD heap instrumentation harness.""" |
| print("\n=== Step 1: Building harness ===") |
| result = run(["make", "-C", str(HARNESS_DIR), "clean"]) |
| result = run(["make", "-C", str(HARNESS_DIR)]) |
| harness_so = HARNESS_DIR / "heapgrid_harness.so" |
| if not harness_so.exists(): |
| print("FATAL: Failed to build harness") |
| sys.exit(1) |
| print(f" Built: {harness_so}") |
| return harness_so |
|
|
|
|
| def step2_compile_examples(): |
| """Compile how2heap examples for our glibc version.""" |
| print("\n=== Step 2: Compiling how2heap examples ===") |
| src_dir = HOW2HEAP / f"glibc_{GLIBC_VERSION}" |
| bin_dir = ROOT / "data" / "bins" |
| bin_dir.mkdir(parents=True, exist_ok=True) |
|
|
| compiled = {} |
| for tech in ALL_TECHNIQUES: |
| src = src_dir / f"{tech}.c" |
| if not src.exists(): |
| print(f" [SKIP] {src} not found") |
| continue |
|
|
| out = bin_dir / tech |
| result = run([ |
| "gcc", "-o", str(out), str(src), |
| "-std=c99", "-g", "-O0", |
| "-Wno-all", |
| "-lpthread", |
| ]) |
|
|
| if result.returncode == 0 and out.exists(): |
| compiled[tech] = out |
| print(f" [OK] {tech}") |
| else: |
| print(f" [FAIL] {tech}: {result.stderr[:200]}") |
|
|
| return compiled |
|
|
|
|
| def step3_collect_dumps(compiled: dict, harness_so: Path): |
| """Run each compiled example with the harness and collect dumps.""" |
| print("\n=== Step 3: Collecting heap state dumps ===") |
| DUMP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| collected = [] |
| for tech, binary in compiled.items(): |
| dump_file = DUMP_DIR / f"{tech}.jsonl" |
|
|
| env = os.environ.copy() |
| env["LD_PRELOAD"] = str(harness_so) |
| env["HEAPGRID_OUT"] = str(dump_file) |
|
|
| result = run([str(binary)], env=env, timeout=10) |
|
|
| if dump_file.exists() and dump_file.stat().st_size > 0: |
| n_lines = sum(1 for _ in open(dump_file)) |
| print(f" [OK] {tech}: {n_lines} states captured") |
| collected.append(tech) |
| else: |
| print(f" [FAIL] {tech}: no dump output") |
| if result.stderr: |
| print(f" stderr: {result.stderr[:200]}") |
|
|
| return collected |
|
|
|
|
| def step4_generate_dataset(collected: list): |
| """Convert dumps to numpy grid arrays.""" |
| print("\n=== Step 4: Generating grid dataset ===") |
|
|
| |
| train = [t for t in TRAIN_TECHNIQUES if t in collected] |
| test = [t for t in TEST_TECHNIQUES if t in collected] |
|
|
| print(f" Train techniques: {train}") |
| print(f" Test techniques: {test}") |
|
|
| |
| sys.path.insert(0, str(ROOT / "dataset")) |
| from dataset_gen import build_dataset |
|
|
| build_dataset(DUMP_DIR, PROCESSED_DIR, train, test) |
|
|
|
|
| def step5_train_and_evaluate(): |
| """Train HeapTRM and evaluate.""" |
| print("\n=== Step 5: Training HeapTRM ===") |
|
|
| sys.path.insert(0, str(ROOT / "model")) |
| from trm_heap import train_model |
|
|
| model, metrics = train_model( |
| data_dir=PROCESSED_DIR, |
| output_dir=CHECKPOINT_DIR, |
| hidden_dim=128, |
| n_outer=3, |
| n_inner=6, |
| epochs=100, |
| batch_size=16, |
| lr=3e-4, |
| ) |
|
|
| print("\n" + "=" * 60) |
| print("FINAL RESULTS") |
| print("=" * 60) |
| print(f" Accuracy: {metrics['accuracy']:.3f}") |
| print(f" Precision: {metrics['precision']:.3f}") |
| print(f" Recall: {metrics['recall']:.3f}") |
| print(f" F1: {metrics['f1']:.3f}") |
| print(f" TP={metrics['tp']} FP={metrics['fp']} " |
| f"FN={metrics['fn']} TN={metrics['tn']}") |
| print("=" * 60) |
|
|
| return metrics |
|
|
|
|
| def main(): |
| print("HeapTRM Proof-of-Concept Validation Pipeline") |
| print("=" * 60) |
|
|
| harness_so = step1_build_harness() |
| compiled = step2_compile_examples() |
|
|
| if not compiled: |
| print("\nFATAL: No examples compiled successfully") |
| sys.exit(1) |
|
|
| collected = step3_collect_dumps(compiled, harness_so) |
|
|
| if not collected: |
| print("\nFATAL: No dumps collected") |
| sys.exit(1) |
|
|
| step4_generate_dataset(collected) |
| metrics = step5_train_and_evaluate() |
|
|
| |
| if metrics['f1'] > 0.5: |
| print("\nVERDICT: TRM shows signal on heap state reasoning. Worth pursuing.") |
| sys.exit(0) |
| else: |
| print("\nVERDICT: TRM did not learn meaningful patterns. Reconsider approach.") |
| sys.exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|