amarck commited on
Commit
e5b2c4a
·
1 Parent(s): d869ff3

Add HeapObserver: structured heap state for LLM exploit generation

Browse files

Rich observations with chunk adjacency, freelist contents, corruption
events, reachable exploit primitives, and natural-language summaries.
Designed as the feedback signal for LLM-in-the-loop exploitation.

Files changed (1) hide show
  1. heaptrm/observe.py +403 -0
heaptrm/observe.py ADDED
@@ -0,0 +1,403 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ observe.py - Rich heap observability for LLM consumption.
3
+
4
+ The core insight: LLMs generating exploits need structured, actionable
5
+ feedback about heap state — not raw dumps, not ML scores. They need
6
+ answers to:
7
+ - "Did my allocation land where I expected?"
8
+ - "Is chunk A adjacent to chunk B?"
9
+ - "What's in the tcache for size 0x40?"
10
+ - "Did my overflow corrupt the right field?"
11
+ - "What exploit primitives are currently reachable?"
12
+
13
+ This module transforms raw harness JSONL into structured observations
14
+ that an LLM can reason about.
15
+ """
16
+
17
+ import json
18
+ import os
19
+ from dataclasses import dataclass, field
20
+ from typing import List, Dict, Optional, Tuple
21
+ from collections import defaultdict
22
+
23
+
24
+ @dataclass
25
+ class ChunkInfo:
26
+ index: int
27
+ address: str
28
+ size: int
29
+ state: str # "allocated", "freed"
30
+ flags: dict
31
+ fd: Optional[str] # forward pointer (freed chunks)
32
+ bk: Optional[str] # backward pointer
33
+ fd_target: Optional[int] # index of chunk fd points to
34
+ data_preview: str # first 16 bytes hex
35
+ is_corrupted: bool
36
+ alloc_order: int
37
+ free_order: int
38
+
39
+
40
+ @dataclass
41
+ class BinInfo:
42
+ bin_type: str # "tcache", "fastbin", "unsorted", "smallbin"
43
+ size_class: int
44
+ entries: List[int] # chunk indices, head first
45
+
46
+
47
+ @dataclass
48
+ class Corruption:
49
+ step: int
50
+ type: str
51
+ chunk_index: int
52
+ detail: str
53
+
54
+
55
+ @dataclass
56
+ class Primitive:
57
+ name: str
58
+ description: str
59
+ ready: bool
60
+ requirements: List[str]
61
+ chunks_involved: List[int]
62
+
63
+
64
+ @dataclass
65
+ class HeapObservation:
66
+ """Complete structured observation of heap state for LLM consumption."""
67
+ step: int
68
+ operation: str
69
+
70
+ # Layout
71
+ chunks: List[ChunkInfo]
72
+ n_allocated: int
73
+ n_freed: int
74
+
75
+ # Bins
76
+ bins: List[BinInfo]
77
+
78
+ # Adjacency map: chunk_idx -> (prev_idx, next_idx)
79
+ adjacency: Dict[int, Tuple[Optional[int], Optional[int]]]
80
+
81
+ # Corruptions detected
82
+ corruptions: List[Corruption]
83
+ cumulative_corruptions: int
84
+
85
+ # Reachable exploit primitives
86
+ primitives: List[Primitive]
87
+
88
+ # Size class summary
89
+ size_classes: Dict[int, Dict[str, int]] # size -> {alloc: N, freed: N}
90
+
91
+ # Human/LLM-readable summary
92
+ summary: str
93
+
94
+
95
+ class HeapObserver:
96
+ """Transforms raw harness dumps into structured observations."""
97
+
98
+ def __init__(self):
99
+ self.cumulative_corruptions = 0
100
+ self.history: List[HeapObservation] = []
101
+
102
+ def observe(self, state: dict) -> HeapObservation:
103
+ """Convert a raw harness state to a structured observation."""
104
+ raw_chunks = state.get("chunks", [])
105
+ step = state.get("step", 0)
106
+ operation = state.get("operation", "unknown")
107
+
108
+ # Build chunk list
109
+ chunks = []
110
+ addr_to_idx = {}
111
+ for c in raw_chunks:
112
+ ci = ChunkInfo(
113
+ index=c.get("idx", len(chunks)),
114
+ address=c.get("addr", "0x0"),
115
+ size=c.get("chunk_size", 0),
116
+ state="allocated" if c.get("state") == 1 else "freed",
117
+ flags={"P": c.get("flag_p", 0), "M": c.get("flag_m", 0)},
118
+ fd=hex(c["fd"]) if c.get("fd", 0) != 0 else None,
119
+ bk=hex(c["bk"]) if c.get("bk", 0) != 0 else None,
120
+ fd_target=c.get("fd_idx") if c.get("fd_idx", -1) >= 0 else None,
121
+ data_preview=c.get("data_hex", "")[:32],
122
+ is_corrupted=c.get("is_corrupted", False),
123
+ alloc_order=c.get("alloc_order", 0),
124
+ free_order=c.get("free_order", 0),
125
+ )
126
+ chunks.append(ci)
127
+ addr_to_idx[c.get("addr", "")] = ci.index
128
+
129
+ n_alloc = sum(1 for c in chunks if c.state == "allocated")
130
+ n_freed = sum(1 for c in chunks if c.state == "freed")
131
+
132
+ # Build adjacency map (chunks sorted by address)
133
+ adjacency = {}
134
+ sorted_chunks = sorted(chunks, key=lambda c: int(c.address, 16) if c.address.startswith("0x") else 0)
135
+ for i, c in enumerate(sorted_chunks):
136
+ prev_idx = sorted_chunks[i-1].index if i > 0 else None
137
+ next_idx = sorted_chunks[i+1].index if i < len(sorted_chunks)-1 else None
138
+ adjacency[c.index] = (prev_idx, next_idx)
139
+
140
+ # Build bin info
141
+ bins = []
142
+ size_freed = defaultdict(list)
143
+ for c in chunks:
144
+ if c.state == "freed":
145
+ size_freed[c.size].append(c.index)
146
+ for size, indices in sorted(size_freed.items()):
147
+ bins.append(BinInfo(
148
+ bin_type="tcache" if size <= 0x410 else "unsorted",
149
+ size_class=size,
150
+ entries=indices,
151
+ ))
152
+
153
+ # Corruptions
154
+ corruptions = []
155
+ for c in state.get("corruptions", []):
156
+ corruptions.append(Corruption(
157
+ step=step,
158
+ type=c.get("type", "unknown"),
159
+ chunk_index=c.get("chunk_idx", -1),
160
+ detail=c.get("detail", ""),
161
+ ))
162
+ self.cumulative_corruptions += len(corruptions)
163
+
164
+ # Size class summary
165
+ size_classes = defaultdict(lambda: {"alloc": 0, "freed": 0})
166
+ for c in chunks:
167
+ key = c.size
168
+ if c.state == "allocated":
169
+ size_classes[key]["alloc"] += 1
170
+ else:
171
+ size_classes[key]["freed"] += 1
172
+
173
+ # Detect reachable primitives
174
+ primitives = self._detect_primitives(chunks, adjacency, bins, corruptions)
175
+
176
+ # Generate summary
177
+ summary = self._summarize(step, operation, chunks, bins, corruptions, primitives)
178
+
179
+ obs = HeapObservation(
180
+ step=step,
181
+ operation=operation,
182
+ chunks=chunks,
183
+ n_allocated=n_alloc,
184
+ n_freed=n_freed,
185
+ bins=bins,
186
+ adjacency=adjacency,
187
+ corruptions=corruptions,
188
+ cumulative_corruptions=self.cumulative_corruptions,
189
+ primitives=primitives,
190
+ size_classes=dict(size_classes),
191
+ summary=summary,
192
+ )
193
+ self.history.append(obs)
194
+ return obs
195
+
196
+ def _detect_primitives(self, chunks, adjacency, bins, corruptions) -> List[Primitive]:
197
+ """Detect which exploit primitives are currently reachable."""
198
+ primitives = []
199
+
200
+ # Tcache poison: freed chunk with corrupted fd
201
+ for c in chunks:
202
+ if c.state == "freed" and c.fd and c.fd_target is None and c.fd != "0x0":
203
+ primitives.append(Primitive(
204
+ name="tcache_poison",
205
+ description=f"Chunk {c.index} (freed, size {hex(c.size)}) has fd={c.fd} pointing outside heap. "
206
+ f"Next malloc({c.size - 0x10}) returns attacker-controlled address.",
207
+ ready=True,
208
+ requirements=[],
209
+ chunks_involved=[c.index],
210
+ ))
211
+
212
+ # Overlapping chunks: two allocated chunks at overlapping addresses
213
+ alloc_chunks = [(c, int(c.address, 16)) for c in chunks
214
+ if c.state == "allocated" and c.address.startswith("0x")]
215
+ for i, (c1, a1) in enumerate(alloc_chunks):
216
+ for c2, a2 in alloc_chunks[i+1:]:
217
+ if a1 < a2 + c2.size and a2 < a1 + c1.size:
218
+ primitives.append(Primitive(
219
+ name="overlapping_chunks",
220
+ description=f"Chunks {c1.index} and {c2.index} overlap in memory. "
221
+ f"Writing to one corrupts the other.",
222
+ ready=True,
223
+ requirements=[],
224
+ chunks_involved=[c1.index, c2.index],
225
+ ))
226
+
227
+ # Double free detected
228
+ seen_freed = set()
229
+ for c in chunks:
230
+ if c.state == "freed":
231
+ if c.address in seen_freed:
232
+ primitives.append(Primitive(
233
+ name="double_free",
234
+ description=f"Address {c.address} freed multiple times. "
235
+ f"Tcache/fastbin contains a cycle.",
236
+ ready=True,
237
+ requirements=[],
238
+ chunks_involved=[c.index],
239
+ ))
240
+ seen_freed.add(c.address)
241
+
242
+ # UAF opportunity: freed chunks adjacent to allocated chunks
243
+ for c in chunks:
244
+ if c.state == "freed" and c.index in adjacency:
245
+ prev_idx, next_idx = adjacency[c.index]
246
+ for neighbor_idx in [prev_idx, next_idx]:
247
+ if neighbor_idx is not None:
248
+ neighbor = next((ch for ch in chunks if ch.index == neighbor_idx), None)
249
+ if neighbor and neighbor.state == "allocated":
250
+ primitives.append(Primitive(
251
+ name="uaf_adjacent",
252
+ description=f"Freed chunk {c.index} (size {hex(c.size)}) is adjacent to "
253
+ f"allocated chunk {neighbor.index} (size {hex(neighbor.size)}). "
254
+ f"UAF write to {c.index} could corrupt {neighbor.index}'s data.",
255
+ ready=True,
256
+ requirements=["Write to freed chunk via dangling pointer"],
257
+ chunks_involved=[c.index, neighbor.index],
258
+ ))
259
+
260
+ # Tcache ready: same-size chunks available for poisoning setup
261
+ for bin_info in bins:
262
+ if bin_info.bin_type == "tcache" and len(bin_info.entries) >= 1:
263
+ primitives.append(Primitive(
264
+ name="tcache_available",
265
+ description=f"Tcache bin for size {hex(bin_info.size_class)} has "
266
+ f"{len(bin_info.entries)} entries. Poison fd to redirect allocation.",
267
+ ready=len(bin_info.entries) >= 1,
268
+ requirements=["Ability to write to freed chunk's fd pointer"],
269
+ chunks_involved=bin_info.entries,
270
+ ))
271
+
272
+ # Coalesce opportunity: two adjacent freed chunks
273
+ for c in chunks:
274
+ if c.state == "freed" and c.index in adjacency:
275
+ _, next_idx = adjacency[c.index]
276
+ if next_idx is not None:
277
+ neighbor = next((ch for ch in chunks if ch.index == next_idx), None)
278
+ if neighbor and neighbor.state == "freed":
279
+ primitives.append(Primitive(
280
+ name="coalesce_opportunity",
281
+ description=f"Freed chunks {c.index} and {neighbor.index} are adjacent. "
282
+ f"May coalesce into larger chunk on next free/malloc.",
283
+ ready=True,
284
+ requirements=[],
285
+ chunks_involved=[c.index, neighbor.index],
286
+ ))
287
+
288
+ # Metadata corruption detected
289
+ if corruptions:
290
+ for corr in corruptions:
291
+ primitives.append(Primitive(
292
+ name=f"corruption_{corr.type}",
293
+ description=f"CORRUPTION DETECTED: {corr.detail}",
294
+ ready=True,
295
+ requirements=[],
296
+ chunks_involved=[corr.chunk_index],
297
+ ))
298
+
299
+ return primitives
300
+
301
+ def _summarize(self, step, operation, chunks, bins, corruptions, primitives) -> str:
302
+ """Generate a concise natural-language summary for LLM consumption."""
303
+ n_alloc = sum(1 for c in chunks if c.state == "allocated")
304
+ n_freed = sum(1 for c in chunks if c.state == "freed")
305
+
306
+ lines = []
307
+ lines.append(f"Step {step}: {operation} | {n_alloc} allocated, {n_freed} freed, {len(chunks)} total")
308
+
309
+ if bins:
310
+ bin_strs = [f"size {hex(b.size_class)}: {len(b.entries)} entries" for b in bins]
311
+ lines.append(f"Freelists: {', '.join(bin_strs)}")
312
+
313
+ if corruptions:
314
+ for c in corruptions:
315
+ lines.append(f"!! CORRUPTION: {c.type} at chunk {c.chunk_index}: {c.detail}")
316
+
317
+ ready_prims = [p for p in primitives if p.ready and "corruption" not in p.name]
318
+ if ready_prims:
319
+ prim_names = list(set(p.name for p in ready_prims))
320
+ lines.append(f"Primitives available: {', '.join(prim_names)}")
321
+
322
+ return "\n".join(lines)
323
+
324
+ def to_llm_context(self, obs: HeapObservation) -> str:
325
+ """Format observation as context for an LLM exploit generator."""
326
+ parts = []
327
+ parts.append(f"=== Heap State (step {obs.step}, after {obs.operation}) ===")
328
+ parts.append(f"Chunks: {obs.n_allocated} allocated, {obs.n_freed} freed")
329
+ parts.append("")
330
+
331
+ # Chunk table
332
+ parts.append("Chunks:")
333
+ for c in obs.chunks:
334
+ adj = obs.adjacency.get(c.index, (None, None))
335
+ adj_str = f"prev={adj[0]} next={adj[1]}" if any(adj) else ""
336
+ fd_str = f"fd={c.fd}" if c.fd else ""
337
+ corr_str = " [CORRUPTED]" if c.is_corrupted else ""
338
+ parts.append(f" [{c.index}] {c.address} size={hex(c.size)} {c.state} "
339
+ f"{fd_str} {adj_str}{corr_str}")
340
+
341
+ # Bins
342
+ if obs.bins:
343
+ parts.append("")
344
+ parts.append("Freelists:")
345
+ for b in obs.bins:
346
+ entries = " -> ".join(str(e) for e in b.entries)
347
+ parts.append(f" {b.bin_type} size={hex(b.size_class)}: [{entries}]")
348
+
349
+ # Corruptions
350
+ if obs.corruptions:
351
+ parts.append("")
352
+ parts.append("!! CORRUPTIONS:")
353
+ for c in obs.corruptions:
354
+ parts.append(f" {c.type}: {c.detail}")
355
+
356
+ # Primitives
357
+ ready = [p for p in obs.primitives if p.ready]
358
+ if ready:
359
+ parts.append("")
360
+ parts.append("Available primitives:")
361
+ for p in ready:
362
+ parts.append(f" - {p.name}: {p.description}")
363
+
364
+ return "\n".join(parts)
365
+
366
+ def diff(self, prev: HeapObservation, curr: HeapObservation) -> str:
367
+ """Generate a diff between two observations — what changed."""
368
+ changes = []
369
+
370
+ # New chunks
371
+ prev_indices = {c.index for c in prev.chunks}
372
+ curr_indices = {c.index for c in curr.chunks}
373
+
374
+ for idx in curr_indices - prev_indices:
375
+ c = next(ch for ch in curr.chunks if ch.index == idx)
376
+ changes.append(f"+ Chunk {idx} allocated: size={hex(c.size)} at {c.address}")
377
+
378
+ for idx in prev_indices - curr_indices:
379
+ changes.append(f"- Chunk {idx} removed")
380
+
381
+ # State changes
382
+ for idx in prev_indices & curr_indices:
383
+ prev_c = next(ch for ch in prev.chunks if ch.index == idx)
384
+ curr_c = next(ch for ch in curr.chunks if ch.index == idx)
385
+ if prev_c.state != curr_c.state:
386
+ changes.append(f"~ Chunk {idx}: {prev_c.state} -> {curr_c.state}")
387
+ if prev_c.fd != curr_c.fd:
388
+ changes.append(f"~ Chunk {idx} fd: {prev_c.fd} -> {curr_c.fd}")
389
+
390
+ # New corruptions
391
+ if curr.corruptions:
392
+ for c in curr.corruptions:
393
+ changes.append(f"!! {c.type}: {c.detail}")
394
+
395
+ # New primitives
396
+ prev_prims = {p.name for p in prev.primitives}
397
+ for p in curr.primitives:
398
+ if p.name not in prev_prims and p.ready:
399
+ changes.append(f">> New primitive: {p.name} — {p.description}")
400
+
401
+ if not changes:
402
+ return "No significant changes."
403
+ return "\n".join(changes)