heap-trm / agent /universal_grid.py
amarck's picture
Add heaptrm package: v2 harness, CLI, pwntools integration, CVE tests
22374d1
"""
universal_grid.py - Allocator-agnostic grid encoding.
Encodes RELATIONSHIPS between chunks rather than allocator internals.
This should generalize across ptmalloc2, jemalloc, tcmalloc, etc.
Grid: 32 rows x 16 cols
Rows 0-23: Chunks (sorted by address)
Rows 24-27: Action history (last 4 ops)
Rows 28-31: Summary statistics
Column layout (per chunk row):
0: state (0=pad, 1=allocated, 2=freed)
1: size_bucket (size quantized to 0-63 buckets)
2: same_size_alloc (count of other allocated chunks with same size)
3: same_size_freed (count of other freed chunks with same size)
4: prev_chunk_free (1 if previous adjacent chunk is free)
5: next_chunk_free (1 if next adjacent chunk is free)
6: can_coalesce (1 if freeing this would merge with a neighbor)
7: fwd_ptr_target (0=null, 1-24=chunk index, 33=external)
8: fwd_ptr_corrupted (1 if fd doesn't point to expected freelist member)
9: is_adjacent_to_freed (1 if directly next to a freed chunk)
10: position_in_heap (0=bottom, 63=top, relative position)
11: distance_to_same_size_free (0=none, 1-63 chunks away)
12: n_times_reallocated (alloc_order reuse indicator)
13: was_target_of_last_op (1/0)
14: chunk_isolation (count of allocated neighbors within 3 slots)
15: size_uniqueness (1 if this is the only chunk of this size, 0 otherwise)
"""
import numpy as np
from typing import List, Dict
from collections import Counter, deque
GRID_ROWS = 32
GRID_COLS = 16
CHUNK_ROWS = 24
HISTORY_ROWS = 4
SUMMARY_ROWS = 4
VOCAB = 64
def clamp(v, lo=0, hi=63):
return max(lo, min(hi, int(v)))
def size_bucket(size: int) -> int:
"""Quantize chunk size to bucket 0-63. Logarithmic-ish."""
if size <= 0:
return 0
if size <= 0x80:
return clamp(size >> 4) # 0x10 steps -> 0-8
if size <= 0x400:
return clamp(8 + (size - 0x80) >> 5) # coarser
return clamp(32 + (size - 0x400) >> 8)
class UniversalGridEncoder:
"""Allocator-agnostic grid encoder with action history."""
def __init__(self):
self.action_history = deque(maxlen=HISTORY_ROWS)
self.total_allocs = 0
self.total_frees = 0
self.total_writes = 0
self.step = 0
def record_action(self, op_type: int, size: int = 0):
self.action_history.appendleft({"op": op_type, "size": size, "step": self.step})
if op_type == 0:
self.total_allocs += 1
elif op_type == 1:
self.total_frees += 1
elif op_type == 2:
self.total_writes += 1
self.step += 1
def encode(self, state: dict) -> np.ndarray:
grid = np.zeros((GRID_ROWS, GRID_COLS), dtype=np.int64)
chunks = state.get("chunks", [])
if not chunks:
self._encode_history_and_summary(grid, chunks)
return grid
# Precompute: group by size
sizes = [c.get("chunk_size", 0) for c in chunks]
states_list = [c.get("state", 0) for c in chunks]
size_alloc_count = Counter()
size_freed_count = Counter()
for sz, st in zip(sizes, states_list):
sb = size_bucket(sz)
if st == 1:
size_alloc_count[sb] += 1
elif st == 2:
size_freed_count[sb] += 1
# Precompute: for each size, index of nearest freed chunk
freed_indices_by_size = {}
for i, (sz, st) in enumerate(zip(sizes, states_list)):
sb = size_bucket(sz)
if st == 2:
freed_indices_by_size.setdefault(sb, []).append(i)
n_chunks = len(chunks)
for i, c in enumerate(chunks[:CHUNK_ROWS]):
sz = c.get("chunk_size", 0)
sb = size_bucket(sz)
st = c.get("state", 0)
# Col 0: state
grid[i, 0] = clamp(st, 0, 2)
# Col 1: size bucket
grid[i, 1] = sb
# Col 2: same-size allocated count (excluding self)
sa = size_alloc_count.get(sb, 0)
if st == 1:
sa -= 1
grid[i, 2] = clamp(sa)
# Col 3: same-size freed count
grid[i, 3] = clamp(size_freed_count.get(sb, 0) - (1 if st == 2 else 0))
# Col 4: prev chunk free
if i > 0 and states_list[i - 1] == 2:
grid[i, 4] = 1
# Col 5: next chunk free
if i + 1 < n_chunks and states_list[i + 1] == 2:
grid[i, 5] = 1
# Col 6: can_coalesce (would merge if freed)
if st == 1:
prev_free = (i > 0 and states_list[i - 1] == 2)
next_free = (i + 1 < n_chunks and states_list[i + 1] == 2)
grid[i, 6] = 1 if (prev_free or next_free) else 0
# Col 7: forward pointer target
fd_idx = c.get("fd_idx", -1)
if fd_idx == -1:
grid[i, 7] = 0
elif fd_idx == -2:
grid[i, 7] = 33 # external
else:
grid[i, 7] = clamp(fd_idx + 1, 1, 32)
# Col 8: forward pointer corrupted
# Heuristic: if freed chunk's fd points to external or to a non-same-size chunk
if st == 2 and c.get("fd", 0) != 0:
fd_idx_val = c.get("fd_idx", -1)
if fd_idx_val == -2:
grid[i, 8] = 1 # points outside heap
elif fd_idx_val >= 0 and fd_idx_val < len(chunks):
target_sz = size_bucket(chunks[fd_idx_val].get("chunk_size", 0))
if target_sz != sb:
grid[i, 8] = 1 # points to wrong size class
# Col 9: adjacent to freed chunk
adj_free = (i > 0 and states_list[i - 1] == 2) or \
(i + 1 < n_chunks and states_list[i + 1] == 2)
grid[i, 9] = 1 if adj_free else 0
# Col 10: position in heap (relative)
grid[i, 10] = clamp(int(i / max(n_chunks - 1, 1) * 63))
# Col 11: distance to nearest same-size freed chunk
freed_idxs = freed_indices_by_size.get(sb, [])
if freed_idxs and not (st == 2):
min_dist = min(abs(i - fi) for fi in freed_idxs)
grid[i, 11] = clamp(min_dist)
else:
grid[i, 11] = 0
# Col 12: alloc order
grid[i, 12] = clamp(c.get("alloc_order", 0))
# Col 13: was target of last operation
grid[i, 13] = c.get("is_target", 0)
# Col 14: chunk isolation (allocated neighbors within 3 positions)
neighbors = 0
for di in range(-3, 4):
ni = i + di
if di != 0 and 0 <= ni < n_chunks and states_list[ni] == 1:
neighbors += 1
grid[i, 14] = clamp(neighbors)
# Col 15: size uniqueness
total_same = size_alloc_count.get(sb, 0) + size_freed_count.get(sb, 0)
grid[i, 15] = 1 if total_same <= 1 else 0
self._encode_history_and_summary(grid, chunks)
return grid
def _encode_history_and_summary(self, grid, chunks):
# History rows (24-27)
for i, action in enumerate(self.action_history):
row = CHUNK_ROWS + i
if row >= CHUNK_ROWS + HISTORY_ROWS:
break
grid[row, 0] = 60 # marker
grid[row, 1] = clamp(action["op"] + 1, 1, 5)
grid[row, 2] = size_bucket(action.get("size", 0))
grid[row, 3] = clamp(self.step - action["step"])
grid[row, 4] = clamp(self.total_allocs)
grid[row, 5] = clamp(self.total_frees)
grid[row, 6] = clamp(self.total_writes)
# Summary row (28)
sr = CHUNK_ROWS + HISTORY_ROWS
n_alloc = sum(1 for c in chunks if c.get("state") == 1)
n_freed = sum(1 for c in chunks if c.get("state") == 2)
n_total = len(chunks)
n_poisoned = sum(1 for c in chunks if c.get("state") == 2 and c.get("fd", 0) != 0)
# Count distinct size classes
size_set = set()
for c in chunks:
size_set.add(size_bucket(c.get("chunk_size", 0)))
grid[sr, 0] = 50 # marker
grid[sr, 1] = clamp(n_alloc)
grid[sr, 2] = clamp(n_freed)
grid[sr, 3] = clamp(n_total)
grid[sr, 4] = clamp(self.total_allocs)
grid[sr, 5] = clamp(self.total_frees)
grid[sr, 6] = clamp(self.total_writes)
grid[sr, 7] = clamp(self.step)
grid[sr, 8] = clamp(n_poisoned)
# Phase indicator
if n_freed == 0:
phase = 1 # alloc
elif self.total_writes == 0 and n_freed > 0:
phase = 2 # ready to corrupt
elif self.total_writes > 0:
phase = 3 # post-corruption
else:
phase = 0
grid[sr, 9] = phase
# Freed-to-alloc ratio
if n_total > 0:
grid[sr, 10] = clamp(int(n_freed / n_total * 63))
grid[sr, 11] = clamp(len(size_set))
# Coalesce opportunity count
coalesce_count = 0
states_list = [c.get("state", 0) for c in chunks]
for i in range(len(chunks)):
if states_list[i] == 1:
prev_free = (i > 0 and states_list[i-1] == 2)
next_free = (i+1 < len(chunks) and states_list[i+1] == 2)
if prev_free or next_free:
coalesce_count += 1
grid[sr, 12] = clamp(coalesce_count)
# Has any freelist corruption
grid[sr, 13] = 1 if n_poisoned > 0 else 0