| """ |
| universal_grid.py - Allocator-agnostic grid encoding. |
| |
| Encodes RELATIONSHIPS between chunks rather than allocator internals. |
| This should generalize across ptmalloc2, jemalloc, tcmalloc, etc. |
| |
| Grid: 32 rows x 16 cols |
| Rows 0-23: Chunks (sorted by address) |
| Rows 24-27: Action history (last 4 ops) |
| Rows 28-31: Summary statistics |
| |
| Column layout (per chunk row): |
| 0: state (0=pad, 1=allocated, 2=freed) |
| 1: size_bucket (size quantized to 0-63 buckets) |
| 2: same_size_alloc (count of other allocated chunks with same size) |
| 3: same_size_freed (count of other freed chunks with same size) |
| 4: prev_chunk_free (1 if previous adjacent chunk is free) |
| 5: next_chunk_free (1 if next adjacent chunk is free) |
| 6: can_coalesce (1 if freeing this would merge with a neighbor) |
| 7: fwd_ptr_target (0=null, 1-24=chunk index, 33=external) |
| 8: fwd_ptr_corrupted (1 if fd doesn't point to expected freelist member) |
| 9: is_adjacent_to_freed (1 if directly next to a freed chunk) |
| 10: position_in_heap (0=bottom, 63=top, relative position) |
| 11: distance_to_same_size_free (0=none, 1-63 chunks away) |
| 12: n_times_reallocated (alloc_order reuse indicator) |
| 13: was_target_of_last_op (1/0) |
| 14: chunk_isolation (count of allocated neighbors within 3 slots) |
| 15: size_uniqueness (1 if this is the only chunk of this size, 0 otherwise) |
| """ |
|
|
| import numpy as np |
| from typing import List, Dict |
| from collections import Counter, deque |
|
|
| GRID_ROWS = 32 |
| GRID_COLS = 16 |
| CHUNK_ROWS = 24 |
| HISTORY_ROWS = 4 |
| SUMMARY_ROWS = 4 |
| VOCAB = 64 |
|
|
|
|
| def clamp(v, lo=0, hi=63): |
| return max(lo, min(hi, int(v))) |
|
|
|
|
| def size_bucket(size: int) -> int: |
| """Quantize chunk size to bucket 0-63. Logarithmic-ish.""" |
| if size <= 0: |
| return 0 |
| if size <= 0x80: |
| return clamp(size >> 4) |
| if size <= 0x400: |
| return clamp(8 + (size - 0x80) >> 5) |
| return clamp(32 + (size - 0x400) >> 8) |
|
|
|
|
| class UniversalGridEncoder: |
| """Allocator-agnostic grid encoder with action history.""" |
|
|
| def __init__(self): |
| self.action_history = deque(maxlen=HISTORY_ROWS) |
| self.total_allocs = 0 |
| self.total_frees = 0 |
| self.total_writes = 0 |
| self.step = 0 |
|
|
| def record_action(self, op_type: int, size: int = 0): |
| self.action_history.appendleft({"op": op_type, "size": size, "step": self.step}) |
| if op_type == 0: |
| self.total_allocs += 1 |
| elif op_type == 1: |
| self.total_frees += 1 |
| elif op_type == 2: |
| self.total_writes += 1 |
| self.step += 1 |
|
|
| def encode(self, state: dict) -> np.ndarray: |
| grid = np.zeros((GRID_ROWS, GRID_COLS), dtype=np.int64) |
| chunks = state.get("chunks", []) |
| if not chunks: |
| self._encode_history_and_summary(grid, chunks) |
| return grid |
|
|
| |
| sizes = [c.get("chunk_size", 0) for c in chunks] |
| states_list = [c.get("state", 0) for c in chunks] |
|
|
| size_alloc_count = Counter() |
| size_freed_count = Counter() |
| for sz, st in zip(sizes, states_list): |
| sb = size_bucket(sz) |
| if st == 1: |
| size_alloc_count[sb] += 1 |
| elif st == 2: |
| size_freed_count[sb] += 1 |
|
|
| |
| freed_indices_by_size = {} |
| for i, (sz, st) in enumerate(zip(sizes, states_list)): |
| sb = size_bucket(sz) |
| if st == 2: |
| freed_indices_by_size.setdefault(sb, []).append(i) |
|
|
| n_chunks = len(chunks) |
|
|
| for i, c in enumerate(chunks[:CHUNK_ROWS]): |
| sz = c.get("chunk_size", 0) |
| sb = size_bucket(sz) |
| st = c.get("state", 0) |
|
|
| |
| grid[i, 0] = clamp(st, 0, 2) |
|
|
| |
| grid[i, 1] = sb |
|
|
| |
| sa = size_alloc_count.get(sb, 0) |
| if st == 1: |
| sa -= 1 |
| grid[i, 2] = clamp(sa) |
|
|
| |
| grid[i, 3] = clamp(size_freed_count.get(sb, 0) - (1 if st == 2 else 0)) |
|
|
| |
| if i > 0 and states_list[i - 1] == 2: |
| grid[i, 4] = 1 |
|
|
| |
| if i + 1 < n_chunks and states_list[i + 1] == 2: |
| grid[i, 5] = 1 |
|
|
| |
| if st == 1: |
| prev_free = (i > 0 and states_list[i - 1] == 2) |
| next_free = (i + 1 < n_chunks and states_list[i + 1] == 2) |
| grid[i, 6] = 1 if (prev_free or next_free) else 0 |
|
|
| |
| fd_idx = c.get("fd_idx", -1) |
| if fd_idx == -1: |
| grid[i, 7] = 0 |
| elif fd_idx == -2: |
| grid[i, 7] = 33 |
| else: |
| grid[i, 7] = clamp(fd_idx + 1, 1, 32) |
|
|
| |
| |
| if st == 2 and c.get("fd", 0) != 0: |
| fd_idx_val = c.get("fd_idx", -1) |
| if fd_idx_val == -2: |
| grid[i, 8] = 1 |
| elif fd_idx_val >= 0 and fd_idx_val < len(chunks): |
| target_sz = size_bucket(chunks[fd_idx_val].get("chunk_size", 0)) |
| if target_sz != sb: |
| grid[i, 8] = 1 |
|
|
| |
| adj_free = (i > 0 and states_list[i - 1] == 2) or \ |
| (i + 1 < n_chunks and states_list[i + 1] == 2) |
| grid[i, 9] = 1 if adj_free else 0 |
|
|
| |
| grid[i, 10] = clamp(int(i / max(n_chunks - 1, 1) * 63)) |
|
|
| |
| freed_idxs = freed_indices_by_size.get(sb, []) |
| if freed_idxs and not (st == 2): |
| min_dist = min(abs(i - fi) for fi in freed_idxs) |
| grid[i, 11] = clamp(min_dist) |
| else: |
| grid[i, 11] = 0 |
|
|
| |
| grid[i, 12] = clamp(c.get("alloc_order", 0)) |
|
|
| |
| grid[i, 13] = c.get("is_target", 0) |
|
|
| |
| neighbors = 0 |
| for di in range(-3, 4): |
| ni = i + di |
| if di != 0 and 0 <= ni < n_chunks and states_list[ni] == 1: |
| neighbors += 1 |
| grid[i, 14] = clamp(neighbors) |
|
|
| |
| total_same = size_alloc_count.get(sb, 0) + size_freed_count.get(sb, 0) |
| grid[i, 15] = 1 if total_same <= 1 else 0 |
|
|
| self._encode_history_and_summary(grid, chunks) |
| return grid |
|
|
| def _encode_history_and_summary(self, grid, chunks): |
| |
| for i, action in enumerate(self.action_history): |
| row = CHUNK_ROWS + i |
| if row >= CHUNK_ROWS + HISTORY_ROWS: |
| break |
| grid[row, 0] = 60 |
| grid[row, 1] = clamp(action["op"] + 1, 1, 5) |
| grid[row, 2] = size_bucket(action.get("size", 0)) |
| grid[row, 3] = clamp(self.step - action["step"]) |
| grid[row, 4] = clamp(self.total_allocs) |
| grid[row, 5] = clamp(self.total_frees) |
| grid[row, 6] = clamp(self.total_writes) |
|
|
| |
| sr = CHUNK_ROWS + HISTORY_ROWS |
| n_alloc = sum(1 for c in chunks if c.get("state") == 1) |
| n_freed = sum(1 for c in chunks if c.get("state") == 2) |
| n_total = len(chunks) |
| n_poisoned = sum(1 for c in chunks if c.get("state") == 2 and c.get("fd", 0) != 0) |
|
|
| |
| size_set = set() |
| for c in chunks: |
| size_set.add(size_bucket(c.get("chunk_size", 0))) |
|
|
| grid[sr, 0] = 50 |
| grid[sr, 1] = clamp(n_alloc) |
| grid[sr, 2] = clamp(n_freed) |
| grid[sr, 3] = clamp(n_total) |
| grid[sr, 4] = clamp(self.total_allocs) |
| grid[sr, 5] = clamp(self.total_frees) |
| grid[sr, 6] = clamp(self.total_writes) |
| grid[sr, 7] = clamp(self.step) |
| grid[sr, 8] = clamp(n_poisoned) |
|
|
| |
| if n_freed == 0: |
| phase = 1 |
| elif self.total_writes == 0 and n_freed > 0: |
| phase = 2 |
| elif self.total_writes > 0: |
| phase = 3 |
| else: |
| phase = 0 |
| grid[sr, 9] = phase |
|
|
| |
| if n_total > 0: |
| grid[sr, 10] = clamp(int(n_freed / n_total * 63)) |
|
|
| grid[sr, 11] = clamp(len(size_set)) |
|
|
| |
| coalesce_count = 0 |
| states_list = [c.get("state", 0) for c in chunks] |
| for i in range(len(chunks)): |
| if states_list[i] == 1: |
| prev_free = (i > 0 and states_list[i-1] == 2) |
| next_free = (i+1 < len(chunks) and states_list[i+1] == 2) |
| if prev_free or next_free: |
| coalesce_count += 1 |
| grid[sr, 12] = clamp(coalesce_count) |
|
|
| |
| grid[sr, 13] = 1 if n_poisoned > 0 else 0 |
|
|