heap-trm / viz /generate_trajectory.py
amarck's picture
Add heaptrm package: v2 harness, CLI, pwntools integration, CVE tests
22374d1
#!/usr/bin/env python3
"""Generate trajectory JSON for the heap-trm web visualizer.
Runs the HeapSimulator through a tcache-poisoning exploit sequence,
captures full state + grid + primitives at each step, and optionally
runs the trained model to get per-step predictions.
Usage:
python viz/generate_trajectory.py [--model PATH] [-o OUTPUT]
"""
import argparse
import json
import sys
import os
import numpy as np
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from simulator.heap_sim import HeapSimulator
# ---------------------------------------------------------------------------
# Exploit sequences to demonstrate
# ---------------------------------------------------------------------------
SCENARIOS = {
"tcache_poison": {
"title": "Tcache Poisoning",
"description": (
"Allocate chunks, free them into the tcache, then corrupt a freed "
"chunk's forward pointer via use-after-free to redirect allocation."
),
"ops": [
{"type": "malloc", "size": 0x40, "slot": 0, "desc": "Allocate chunk A (0x40) in slot 0"},
{"type": "malloc", "size": 0x40, "slot": 1, "desc": "Allocate chunk B (0x40) in slot 1"},
{"type": "malloc", "size": 0x40, "slot": 2, "desc": "Allocate guard chunk C (0x40) in slot 2 β€” prevents consolidation"},
{"type": "free", "slot": 0, "desc": "Free chunk A β†’ enters tcache[0x50]"},
{"type": "free", "slot": 1, "desc": "Free chunk B β†’ enters tcache[0x50], fd β†’ A"},
{"type": "write_freed", "slot": 1, "desc": "UAF write: corrupt B's fd pointer β†’ attacker-controlled address"},
{"type": "malloc", "size": 0x40, "slot": 3, "desc": "Malloc from tcache β†’ returns chunk B (draining)"},
{"type": "malloc", "size": 0x40, "slot": 4, "desc": "Malloc from tcache β†’ returns POISONED address!"},
],
},
"fastbin_dup": {
"title": "Fastbin Duplication",
"description": (
"Exploit double-free in the fastbin to get two allocations "
"pointing to the same memory, enabling overlapping writes."
),
"ops": [
{"type": "malloc", "size": 0x20, "slot": 0, "desc": "Allocate chunk A (0x20) in slot 0"},
{"type": "malloc", "size": 0x20, "slot": 1, "desc": "Allocate chunk B (0x20) in slot 1"},
{"type": "malloc", "size": 0x20, "slot": 2, "desc": "Allocate guard chunk C (0x20) in slot 2"},
# Fill tcache for 0x30 so frees go to fastbin
{"type": "malloc", "size": 0x20, "slot": 5, "desc": "Allocate padding slot 5 (fill tcache later)"},
{"type": "malloc", "size": 0x20, "slot": 6, "desc": "Allocate padding slot 6"},
{"type": "malloc", "size": 0x20, "slot": 7, "desc": "Allocate padding slot 7"},
{"type": "malloc", "size": 0x20, "slot": 8, "desc": "Allocate padding slot 8"},
{"type": "malloc", "size": 0x20, "slot": 9, "desc": "Allocate padding slot 9"},
{"type": "malloc", "size": 0x20, "slot": 10, "desc": "Allocate padding slot 10"},
{"type": "malloc", "size": 0x20, "slot": 11, "desc": "Allocate padding slot 11"},
{"type": "malloc", "size": 0x20, "slot": 12, "desc": "Allocate guard for padding"},
{"type": "free", "slot": 5, "desc": "Free padding β†’ tcache[0x30] (1/7)"},
{"type": "free", "slot": 6, "desc": "Free padding β†’ tcache[0x30] (2/7)"},
{"type": "free", "slot": 7, "desc": "Free padding β†’ tcache[0x30] (3/7)"},
{"type": "free", "slot": 8, "desc": "Free padding β†’ tcache[0x30] (4/7)"},
{"type": "free", "slot": 9, "desc": "Free padding β†’ tcache[0x30] (5/7)"},
{"type": "free", "slot": 10, "desc": "Free padding β†’ tcache[0x30] (6/7)"},
{"type": "free", "slot": 11, "desc": "Free padding β†’ tcache[0x30] (7/7)"},
{"type": "free", "slot": 0, "desc": "Free A β†’ fastbin[0x30] (tcache full!)"},
{"type": "free", "slot": 1, "desc": "Free B β†’ fastbin[0x30], fd β†’ A"},
{"type": "malloc", "size": 0x20, "slot": 13, "desc": "Malloc β†’ returns from tcache drain"},
{"type": "malloc", "size": 0x20, "slot": 14, "desc": "Malloc β†’ tcache drain continues"},
],
},
}
def _addr_hex(val):
"""Format address as short hex string."""
return f"0x{val:x}" if val else "0x0"
def capture_step(sim, step_idx, op_desc, op_info):
"""Capture full simulator state for one step."""
state = sim.get_state()
prims = sim.check_primitives()
grid = sim.state_to_grid().tolist()
# Build chunk list for visualization
chunks = []
for c in state["chunks"]:
chunks.append({
"addr": _addr_hex(c["addr"]),
"addr_int": c["addr"],
"size": c["size"],
"size_hex": f"0x{c['size']:x}",
"allocated": c["allocated"],
"fd": _addr_hex(c["fd"]),
"fd_int": c["fd"],
"bk": _addr_hex(c["bk"]),
"bk_int": c["bk"],
"bin": c["bin"],
"slot": c["slot"],
})
# Build bin structures for visualization
bins = {
"tcache": {},
"fastbin": {},
"unsorted": [],
}
for i, count in enumerate(state["tcache_counts"]):
if count > 0:
size_hex = f"0x{(i + 2) * 0x10:x}"
entries = [_addr_hex(a) for a in sim.tcache[i]]
bins["tcache"][size_hex] = {"count": count, "entries": entries}
for i, count in enumerate(state["fastbin_counts"]):
if count > 0:
size_hex = f"0x{(i + 2) * 0x10:x}"
entries = [_addr_hex(a) for a in sim.fastbins[i]]
bins["fastbin"][size_hex] = {"count": count, "entries": entries}
if state["unsorted_count"] > 0:
bins["unsorted"] = [_addr_hex(a) for a in sim.unsorted_bin]
return {
"step": step_idx,
"action": op_desc,
"op": op_info,
"chunks": chunks,
"bins": bins,
"primitives": prims,
"top_addr": _addr_hex(state["top_addr"]),
"top_size": f"0x{state['top_size']:x}",
"grid": grid,
"alloc_count": state["alloc_count"],
"free_count": state["free_count"],
}
def run_scenario(name):
scenario = SCENARIOS[name]
sim = HeapSimulator()
steps = []
# Capture initial state
steps.append(capture_step(sim, 0, "Initial heap state", {"type": "init"}))
for i, op in enumerate(scenario["ops"]):
op_type = op["type"]
if op_type == "malloc":
sim.malloc(op["size"], slot=op.get("slot"))
elif op_type == "free":
addr = sim.slots.get(op["slot"], 0)
sim.free(user_addr=addr, slot=op.get("slot"))
elif op_type == "write_freed":
addr = sim.slots.get(op["slot"], 0)
# Find a target address to poison fd with
# Use an arbitrary address to simulate the attack
target = 0x41414141
sim.write_to_freed(addr, target)
steps.append(capture_step(sim, i + 1, op["desc"], op))
return {
"title": scenario["title"],
"description": scenario["description"],
"steps": steps,
"total_steps": len(steps),
}
def main():
parser = argparse.ArgumentParser(description="Generate heap-trm visualization data")
parser.add_argument("-s", "--scenario", default="tcache_poison",
choices=list(SCENARIOS.keys()),
help="Exploit scenario to visualize")
parser.add_argument("-o", "--output", default=None,
help="Output JSON file (default: viz/data/<scenario>.json)")
parser.add_argument("--all", action="store_true",
help="Generate all scenarios")
args = parser.parse_args()
os.makedirs(os.path.join(os.path.dirname(__file__), "data"), exist_ok=True)
scenarios_to_run = list(SCENARIOS.keys()) if args.all else [args.scenario]
for name in scenarios_to_run:
result = run_scenario(name)
out_path = args.output or os.path.join(
os.path.dirname(__file__), "data", f"{name}.json"
)
with open(out_path, "w") as f:
json.dump(result, f, indent=2)
print(f"[+] Generated {out_path} ({result['total_steps']} steps)")
if __name__ == "__main__":
main()