""" grid.py - Encode heap dump states to 32x16 grids. Enhanced for v2 harness: includes corruption detection fields. """ import numpy as np import json from pathlib import Path GRID_ROWS = 32 GRID_COLS = 16 VOCAB = 64 def clamp(v, lo=0, hi=63): return max(lo, min(hi, int(v))) def state_to_grid(state: dict) -> np.ndarray: """Convert a heap dump state (v2 format) to a 32x16 grid.""" grid = np.zeros((GRID_ROWS, GRID_COLS), dtype=np.int64) chunks = state.get("chunks", []) corruption_count = state.get("corruption_count", 0) for i, c in enumerate(chunks[:GRID_ROWS - 2]): # Col 0: state grid[i, 0] = clamp(c.get("state", 0), 0, 2) # Col 1: size class grid[i, 1] = clamp(c.get("chunk_size", 0) >> 4) # Col 2: prev_inuse grid[i, 2] = c.get("flag_p", 0) # Col 3: is_mmapped grid[i, 3] = c.get("flag_m", 0) # Col 4: fd target fd_idx = c.get("fd_idx", -1) grid[i, 4] = 0 if fd_idx == -1 else (33 if fd_idx == -2 else clamp(fd_idx + 1, 1, 32)) # Col 5: bk target bk_idx = c.get("bk_idx", -1) grid[i, 5] = 0 if bk_idx == -1 else (33 if bk_idx == -2 else clamp(bk_idx + 1, 1, 32)) # Col 6: alloc order grid[i, 6] = clamp(c.get("alloc_order", 0)) # Col 7: free order grid[i, 7] = clamp(c.get("free_order", 0)) # Col 8: is target of last op grid[i, 8] = c.get("is_target", 0) # Col 9: is double freed grid[i, 9] = c.get("is_double_freed", 0) # Col 10: is corrupted (NEW in v2) grid[i, 10] = c.get("is_corrupted", 0) # Col 11: req size class grid[i, 11] = clamp(c.get("req_size", 0) >> 3) # Col 12: data bytes (first 4, quantized) data_hex = c.get("data_hex", "") for j in range(min(4, len(data_hex) // 2)): byte_val = int(data_hex[j*2:j*2+2], 16) grid[i, 12 + j] = clamp(byte_val >> 2) # 6-bit quantize # Summary row (row 30): global corruption info sr = GRID_ROWS - 2 grid[sr, 0] = 50 # marker grid[sr, 1] = clamp(sum(1 for c in chunks if c.get("state") == 1)) # n_alloc grid[sr, 2] = clamp(sum(1 for c in chunks if c.get("state") == 2)) # n_freed grid[sr, 3] = clamp(len(chunks)) grid[sr, 4] = clamp(corruption_count) # total corruptions detected # Count specific corruption types corruptions = state.get("corruptions", []) grid[sr, 5] = clamp(sum(1 for c in corruptions if c.get("type") == "metadata_corrupt")) grid[sr, 6] = clamp(sum(1 for c in corruptions if c.get("type") == "uaf_write")) grid[sr, 7] = clamp(sum(1 for c in corruptions if c.get("type") == "double_free")) grid[sr, 8] = clamp(sum(1 for c in corruptions if c.get("type") == "overflow")) # Has ANY corruption grid[sr, 9] = 1 if corruption_count > 0 else 0 return grid def load_dump(path): """Load a JSONL dump file.""" states = [] with open(path) as f: for line in f: if line.strip(): states.append(json.loads(line.strip())) return states