| """ |
| grid.py - Encode heap dump states to 32x16 grids. |
| |
| Enhanced for v2 harness: includes corruption detection fields. |
| """ |
|
|
| import numpy as np |
| import json |
| from pathlib import Path |
|
|
| GRID_ROWS = 32 |
| GRID_COLS = 16 |
| VOCAB = 64 |
|
|
|
|
| def clamp(v, lo=0, hi=63): |
| return max(lo, min(hi, int(v))) |
|
|
|
|
| def state_to_grid(state: dict) -> np.ndarray: |
| """Convert a heap dump state (v2 format) to a 32x16 grid.""" |
| grid = np.zeros((GRID_ROWS, GRID_COLS), dtype=np.int64) |
| chunks = state.get("chunks", []) |
| corruption_count = state.get("corruption_count", 0) |
|
|
| for i, c in enumerate(chunks[:GRID_ROWS - 2]): |
| |
| grid[i, 0] = clamp(c.get("state", 0), 0, 2) |
| |
| grid[i, 1] = clamp(c.get("chunk_size", 0) >> 4) |
| |
| grid[i, 2] = c.get("flag_p", 0) |
| |
| grid[i, 3] = c.get("flag_m", 0) |
| |
| fd_idx = c.get("fd_idx", -1) |
| grid[i, 4] = 0 if fd_idx == -1 else (33 if fd_idx == -2 else clamp(fd_idx + 1, 1, 32)) |
| |
| bk_idx = c.get("bk_idx", -1) |
| grid[i, 5] = 0 if bk_idx == -1 else (33 if bk_idx == -2 else clamp(bk_idx + 1, 1, 32)) |
| |
| grid[i, 6] = clamp(c.get("alloc_order", 0)) |
| |
| grid[i, 7] = clamp(c.get("free_order", 0)) |
| |
| grid[i, 8] = c.get("is_target", 0) |
| |
| grid[i, 9] = c.get("is_double_freed", 0) |
| |
| grid[i, 10] = c.get("is_corrupted", 0) |
| |
| grid[i, 11] = clamp(c.get("req_size", 0) >> 3) |
| |
| data_hex = c.get("data_hex", "") |
| for j in range(min(4, len(data_hex) // 2)): |
| byte_val = int(data_hex[j*2:j*2+2], 16) |
| grid[i, 12 + j] = clamp(byte_val >> 2) |
|
|
| |
| sr = GRID_ROWS - 2 |
| grid[sr, 0] = 50 |
| grid[sr, 1] = clamp(sum(1 for c in chunks if c.get("state") == 1)) |
| grid[sr, 2] = clamp(sum(1 for c in chunks if c.get("state") == 2)) |
| grid[sr, 3] = clamp(len(chunks)) |
| grid[sr, 4] = clamp(corruption_count) |
| |
| corruptions = state.get("corruptions", []) |
| grid[sr, 5] = clamp(sum(1 for c in corruptions if c.get("type") == "metadata_corrupt")) |
| grid[sr, 6] = clamp(sum(1 for c in corruptions if c.get("type") == "uaf_write")) |
| grid[sr, 7] = clamp(sum(1 for c in corruptions if c.get("type") == "double_free")) |
| grid[sr, 8] = clamp(sum(1 for c in corruptions if c.get("type") == "overflow")) |
| |
| grid[sr, 9] = 1 if corruption_count > 0 else 0 |
|
|
| return grid |
|
|
|
|
| def load_dump(path): |
| """Load a JSONL dump file.""" |
| states = [] |
| with open(path) as f: |
| for line in f: |
| if line.strip(): |
| states.append(json.loads(line.strip())) |
| return states |
|
|