amarck's picture
Add heaptrm package: v2 harness, CLI, pwntools integration, CVE tests
22374d1
"""
grid.py - Encode heap dump states to 32x16 grids.
Enhanced for v2 harness: includes corruption detection fields.
"""
import numpy as np
import json
from pathlib import Path
GRID_ROWS = 32
GRID_COLS = 16
VOCAB = 64
def clamp(v, lo=0, hi=63):
return max(lo, min(hi, int(v)))
def state_to_grid(state: dict) -> np.ndarray:
"""Convert a heap dump state (v2 format) to a 32x16 grid."""
grid = np.zeros((GRID_ROWS, GRID_COLS), dtype=np.int64)
chunks = state.get("chunks", [])
corruption_count = state.get("corruption_count", 0)
for i, c in enumerate(chunks[:GRID_ROWS - 2]):
# Col 0: state
grid[i, 0] = clamp(c.get("state", 0), 0, 2)
# Col 1: size class
grid[i, 1] = clamp(c.get("chunk_size", 0) >> 4)
# Col 2: prev_inuse
grid[i, 2] = c.get("flag_p", 0)
# Col 3: is_mmapped
grid[i, 3] = c.get("flag_m", 0)
# Col 4: fd target
fd_idx = c.get("fd_idx", -1)
grid[i, 4] = 0 if fd_idx == -1 else (33 if fd_idx == -2 else clamp(fd_idx + 1, 1, 32))
# Col 5: bk target
bk_idx = c.get("bk_idx", -1)
grid[i, 5] = 0 if bk_idx == -1 else (33 if bk_idx == -2 else clamp(bk_idx + 1, 1, 32))
# Col 6: alloc order
grid[i, 6] = clamp(c.get("alloc_order", 0))
# Col 7: free order
grid[i, 7] = clamp(c.get("free_order", 0))
# Col 8: is target of last op
grid[i, 8] = c.get("is_target", 0)
# Col 9: is double freed
grid[i, 9] = c.get("is_double_freed", 0)
# Col 10: is corrupted (NEW in v2)
grid[i, 10] = c.get("is_corrupted", 0)
# Col 11: req size class
grid[i, 11] = clamp(c.get("req_size", 0) >> 3)
# Col 12: data bytes (first 4, quantized)
data_hex = c.get("data_hex", "")
for j in range(min(4, len(data_hex) // 2)):
byte_val = int(data_hex[j*2:j*2+2], 16)
grid[i, 12 + j] = clamp(byte_val >> 2) # 6-bit quantize
# Summary row (row 30): global corruption info
sr = GRID_ROWS - 2
grid[sr, 0] = 50 # marker
grid[sr, 1] = clamp(sum(1 for c in chunks if c.get("state") == 1)) # n_alloc
grid[sr, 2] = clamp(sum(1 for c in chunks if c.get("state") == 2)) # n_freed
grid[sr, 3] = clamp(len(chunks))
grid[sr, 4] = clamp(corruption_count) # total corruptions detected
# Count specific corruption types
corruptions = state.get("corruptions", [])
grid[sr, 5] = clamp(sum(1 for c in corruptions if c.get("type") == "metadata_corrupt"))
grid[sr, 6] = clamp(sum(1 for c in corruptions if c.get("type") == "uaf_write"))
grid[sr, 7] = clamp(sum(1 for c in corruptions if c.get("type") == "double_free"))
grid[sr, 8] = clamp(sum(1 for c in corruptions if c.get("type") == "overflow"))
# Has ANY corruption
grid[sr, 9] = 1 if corruption_count > 0 else 0
return grid
def load_dump(path):
"""Load a JSONL dump file."""
states = []
with open(path) as f:
for line in f:
if line.strip():
states.append(json.loads(line.strip()))
return states