""" universal_grid.py - Allocator-agnostic grid encoding. Encodes RELATIONSHIPS between chunks rather than allocator internals. This should generalize across ptmalloc2, jemalloc, tcmalloc, etc. Grid: 32 rows x 16 cols Rows 0-23: Chunks (sorted by address) Rows 24-27: Action history (last 4 ops) Rows 28-31: Summary statistics Column layout (per chunk row): 0: state (0=pad, 1=allocated, 2=freed) 1: size_bucket (size quantized to 0-63 buckets) 2: same_size_alloc (count of other allocated chunks with same size) 3: same_size_freed (count of other freed chunks with same size) 4: prev_chunk_free (1 if previous adjacent chunk is free) 5: next_chunk_free (1 if next adjacent chunk is free) 6: can_coalesce (1 if freeing this would merge with a neighbor) 7: fwd_ptr_target (0=null, 1-24=chunk index, 33=external) 8: fwd_ptr_corrupted (1 if fd doesn't point to expected freelist member) 9: is_adjacent_to_freed (1 if directly next to a freed chunk) 10: position_in_heap (0=bottom, 63=top, relative position) 11: distance_to_same_size_free (0=none, 1-63 chunks away) 12: n_times_reallocated (alloc_order reuse indicator) 13: was_target_of_last_op (1/0) 14: chunk_isolation (count of allocated neighbors within 3 slots) 15: size_uniqueness (1 if this is the only chunk of this size, 0 otherwise) """ import numpy as np from typing import List, Dict from collections import Counter, deque GRID_ROWS = 32 GRID_COLS = 16 CHUNK_ROWS = 24 HISTORY_ROWS = 4 SUMMARY_ROWS = 4 VOCAB = 64 def clamp(v, lo=0, hi=63): return max(lo, min(hi, int(v))) def size_bucket(size: int) -> int: """Quantize chunk size to bucket 0-63. Logarithmic-ish.""" if size <= 0: return 0 if size <= 0x80: return clamp(size >> 4) # 0x10 steps -> 0-8 if size <= 0x400: return clamp(8 + (size - 0x80) >> 5) # coarser return clamp(32 + (size - 0x400) >> 8) class UniversalGridEncoder: """Allocator-agnostic grid encoder with action history.""" def __init__(self): self.action_history = deque(maxlen=HISTORY_ROWS) self.total_allocs = 0 self.total_frees = 0 self.total_writes = 0 self.step = 0 def record_action(self, op_type: int, size: int = 0): self.action_history.appendleft({"op": op_type, "size": size, "step": self.step}) if op_type == 0: self.total_allocs += 1 elif op_type == 1: self.total_frees += 1 elif op_type == 2: self.total_writes += 1 self.step += 1 def encode(self, state: dict) -> np.ndarray: grid = np.zeros((GRID_ROWS, GRID_COLS), dtype=np.int64) chunks = state.get("chunks", []) if not chunks: self._encode_history_and_summary(grid, chunks) return grid # Precompute: group by size sizes = [c.get("chunk_size", 0) for c in chunks] states_list = [c.get("state", 0) for c in chunks] size_alloc_count = Counter() size_freed_count = Counter() for sz, st in zip(sizes, states_list): sb = size_bucket(sz) if st == 1: size_alloc_count[sb] += 1 elif st == 2: size_freed_count[sb] += 1 # Precompute: for each size, index of nearest freed chunk freed_indices_by_size = {} for i, (sz, st) in enumerate(zip(sizes, states_list)): sb = size_bucket(sz) if st == 2: freed_indices_by_size.setdefault(sb, []).append(i) n_chunks = len(chunks) for i, c in enumerate(chunks[:CHUNK_ROWS]): sz = c.get("chunk_size", 0) sb = size_bucket(sz) st = c.get("state", 0) # Col 0: state grid[i, 0] = clamp(st, 0, 2) # Col 1: size bucket grid[i, 1] = sb # Col 2: same-size allocated count (excluding self) sa = size_alloc_count.get(sb, 0) if st == 1: sa -= 1 grid[i, 2] = clamp(sa) # Col 3: same-size freed count grid[i, 3] = clamp(size_freed_count.get(sb, 0) - (1 if st == 2 else 0)) # Col 4: prev chunk free if i > 0 and states_list[i - 1] == 2: grid[i, 4] = 1 # Col 5: next chunk free if i + 1 < n_chunks and states_list[i + 1] == 2: grid[i, 5] = 1 # Col 6: can_coalesce (would merge if freed) if st == 1: prev_free = (i > 0 and states_list[i - 1] == 2) next_free = (i + 1 < n_chunks and states_list[i + 1] == 2) grid[i, 6] = 1 if (prev_free or next_free) else 0 # Col 7: forward pointer target fd_idx = c.get("fd_idx", -1) if fd_idx == -1: grid[i, 7] = 0 elif fd_idx == -2: grid[i, 7] = 33 # external else: grid[i, 7] = clamp(fd_idx + 1, 1, 32) # Col 8: forward pointer corrupted # Heuristic: if freed chunk's fd points to external or to a non-same-size chunk if st == 2 and c.get("fd", 0) != 0: fd_idx_val = c.get("fd_idx", -1) if fd_idx_val == -2: grid[i, 8] = 1 # points outside heap elif fd_idx_val >= 0 and fd_idx_val < len(chunks): target_sz = size_bucket(chunks[fd_idx_val].get("chunk_size", 0)) if target_sz != sb: grid[i, 8] = 1 # points to wrong size class # Col 9: adjacent to freed chunk adj_free = (i > 0 and states_list[i - 1] == 2) or \ (i + 1 < n_chunks and states_list[i + 1] == 2) grid[i, 9] = 1 if adj_free else 0 # Col 10: position in heap (relative) grid[i, 10] = clamp(int(i / max(n_chunks - 1, 1) * 63)) # Col 11: distance to nearest same-size freed chunk freed_idxs = freed_indices_by_size.get(sb, []) if freed_idxs and not (st == 2): min_dist = min(abs(i - fi) for fi in freed_idxs) grid[i, 11] = clamp(min_dist) else: grid[i, 11] = 0 # Col 12: alloc order grid[i, 12] = clamp(c.get("alloc_order", 0)) # Col 13: was target of last operation grid[i, 13] = c.get("is_target", 0) # Col 14: chunk isolation (allocated neighbors within 3 positions) neighbors = 0 for di in range(-3, 4): ni = i + di if di != 0 and 0 <= ni < n_chunks and states_list[ni] == 1: neighbors += 1 grid[i, 14] = clamp(neighbors) # Col 15: size uniqueness total_same = size_alloc_count.get(sb, 0) + size_freed_count.get(sb, 0) grid[i, 15] = 1 if total_same <= 1 else 0 self._encode_history_and_summary(grid, chunks) return grid def _encode_history_and_summary(self, grid, chunks): # History rows (24-27) for i, action in enumerate(self.action_history): row = CHUNK_ROWS + i if row >= CHUNK_ROWS + HISTORY_ROWS: break grid[row, 0] = 60 # marker grid[row, 1] = clamp(action["op"] + 1, 1, 5) grid[row, 2] = size_bucket(action.get("size", 0)) grid[row, 3] = clamp(self.step - action["step"]) grid[row, 4] = clamp(self.total_allocs) grid[row, 5] = clamp(self.total_frees) grid[row, 6] = clamp(self.total_writes) # Summary row (28) sr = CHUNK_ROWS + HISTORY_ROWS n_alloc = sum(1 for c in chunks if c.get("state") == 1) n_freed = sum(1 for c in chunks if c.get("state") == 2) n_total = len(chunks) n_poisoned = sum(1 for c in chunks if c.get("state") == 2 and c.get("fd", 0) != 0) # Count distinct size classes size_set = set() for c in chunks: size_set.add(size_bucket(c.get("chunk_size", 0))) grid[sr, 0] = 50 # marker grid[sr, 1] = clamp(n_alloc) grid[sr, 2] = clamp(n_freed) grid[sr, 3] = clamp(n_total) grid[sr, 4] = clamp(self.total_allocs) grid[sr, 5] = clamp(self.total_frees) grid[sr, 6] = clamp(self.total_writes) grid[sr, 7] = clamp(self.step) grid[sr, 8] = clamp(n_poisoned) # Phase indicator if n_freed == 0: phase = 1 # alloc elif self.total_writes == 0 and n_freed > 0: phase = 2 # ready to corrupt elif self.total_writes > 0: phase = 3 # post-corruption else: phase = 0 grid[sr, 9] = phase # Freed-to-alloc ratio if n_total > 0: grid[sr, 10] = clamp(int(n_freed / n_total * 63)) grid[sr, 11] = clamp(len(size_set)) # Coalesce opportunity count coalesce_count = 0 states_list = [c.get("state", 0) for c in chunks] for i in range(len(chunks)): if states_list[i] == 1: prev_free = (i > 0 and states_list[i-1] == 2) next_free = (i+1 < len(chunks) and states_list[i+1] == 2) if prev_free or next_free: coalesce_count += 1 grid[sr, 12] = clamp(coalesce_count) # Has any freelist corruption grid[sr, 13] = 1 if n_poisoned > 0 else 0