| |
| """Generate trajectory JSON for the heap-trm web visualizer. |
| |
| Runs the HeapSimulator through a tcache-poisoning exploit sequence, |
| captures full state + grid + primitives at each step, and optionally |
| runs the trained model to get per-step predictions. |
| |
| Usage: |
| python viz/generate_trajectory.py [--model PATH] [-o OUTPUT] |
| """ |
|
|
| import argparse |
| import json |
| import sys |
| import os |
| import numpy as np |
|
|
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) |
|
|
| from simulator.heap_sim import HeapSimulator |
|
|
| |
| |
| |
|
|
| SCENARIOS = { |
| "tcache_poison": { |
| "title": "Tcache Poisoning", |
| "description": ( |
| "Allocate chunks, free them into the tcache, then corrupt a freed " |
| "chunk's forward pointer via use-after-free to redirect allocation." |
| ), |
| "ops": [ |
| {"type": "malloc", "size": 0x40, "slot": 0, "desc": "Allocate chunk A (0x40) in slot 0"}, |
| {"type": "malloc", "size": 0x40, "slot": 1, "desc": "Allocate chunk B (0x40) in slot 1"}, |
| {"type": "malloc", "size": 0x40, "slot": 2, "desc": "Allocate guard chunk C (0x40) in slot 2 β prevents consolidation"}, |
| {"type": "free", "slot": 0, "desc": "Free chunk A β enters tcache[0x50]"}, |
| {"type": "free", "slot": 1, "desc": "Free chunk B β enters tcache[0x50], fd β A"}, |
| {"type": "write_freed", "slot": 1, "desc": "UAF write: corrupt B's fd pointer β attacker-controlled address"}, |
| {"type": "malloc", "size": 0x40, "slot": 3, "desc": "Malloc from tcache β returns chunk B (draining)"}, |
| {"type": "malloc", "size": 0x40, "slot": 4, "desc": "Malloc from tcache β returns POISONED address!"}, |
| ], |
| }, |
| "fastbin_dup": { |
| "title": "Fastbin Duplication", |
| "description": ( |
| "Exploit double-free in the fastbin to get two allocations " |
| "pointing to the same memory, enabling overlapping writes." |
| ), |
| "ops": [ |
| {"type": "malloc", "size": 0x20, "slot": 0, "desc": "Allocate chunk A (0x20) in slot 0"}, |
| {"type": "malloc", "size": 0x20, "slot": 1, "desc": "Allocate chunk B (0x20) in slot 1"}, |
| {"type": "malloc", "size": 0x20, "slot": 2, "desc": "Allocate guard chunk C (0x20) in slot 2"}, |
| |
| {"type": "malloc", "size": 0x20, "slot": 5, "desc": "Allocate padding slot 5 (fill tcache later)"}, |
| {"type": "malloc", "size": 0x20, "slot": 6, "desc": "Allocate padding slot 6"}, |
| {"type": "malloc", "size": 0x20, "slot": 7, "desc": "Allocate padding slot 7"}, |
| {"type": "malloc", "size": 0x20, "slot": 8, "desc": "Allocate padding slot 8"}, |
| {"type": "malloc", "size": 0x20, "slot": 9, "desc": "Allocate padding slot 9"}, |
| {"type": "malloc", "size": 0x20, "slot": 10, "desc": "Allocate padding slot 10"}, |
| {"type": "malloc", "size": 0x20, "slot": 11, "desc": "Allocate padding slot 11"}, |
| {"type": "malloc", "size": 0x20, "slot": 12, "desc": "Allocate guard for padding"}, |
| {"type": "free", "slot": 5, "desc": "Free padding β tcache[0x30] (1/7)"}, |
| {"type": "free", "slot": 6, "desc": "Free padding β tcache[0x30] (2/7)"}, |
| {"type": "free", "slot": 7, "desc": "Free padding β tcache[0x30] (3/7)"}, |
| {"type": "free", "slot": 8, "desc": "Free padding β tcache[0x30] (4/7)"}, |
| {"type": "free", "slot": 9, "desc": "Free padding β tcache[0x30] (5/7)"}, |
| {"type": "free", "slot": 10, "desc": "Free padding β tcache[0x30] (6/7)"}, |
| {"type": "free", "slot": 11, "desc": "Free padding β tcache[0x30] (7/7)"}, |
| {"type": "free", "slot": 0, "desc": "Free A β fastbin[0x30] (tcache full!)"}, |
| {"type": "free", "slot": 1, "desc": "Free B β fastbin[0x30], fd β A"}, |
| {"type": "malloc", "size": 0x20, "slot": 13, "desc": "Malloc β returns from tcache drain"}, |
| {"type": "malloc", "size": 0x20, "slot": 14, "desc": "Malloc β tcache drain continues"}, |
| ], |
| }, |
| } |
|
|
|
|
| def _addr_hex(val): |
| """Format address as short hex string.""" |
| return f"0x{val:x}" if val else "0x0" |
|
|
|
|
| def capture_step(sim, step_idx, op_desc, op_info): |
| """Capture full simulator state for one step.""" |
| state = sim.get_state() |
| prims = sim.check_primitives() |
| grid = sim.state_to_grid().tolist() |
|
|
| |
| chunks = [] |
| for c in state["chunks"]: |
| chunks.append({ |
| "addr": _addr_hex(c["addr"]), |
| "addr_int": c["addr"], |
| "size": c["size"], |
| "size_hex": f"0x{c['size']:x}", |
| "allocated": c["allocated"], |
| "fd": _addr_hex(c["fd"]), |
| "fd_int": c["fd"], |
| "bk": _addr_hex(c["bk"]), |
| "bk_int": c["bk"], |
| "bin": c["bin"], |
| "slot": c["slot"], |
| }) |
|
|
| |
| bins = { |
| "tcache": {}, |
| "fastbin": {}, |
| "unsorted": [], |
| } |
| for i, count in enumerate(state["tcache_counts"]): |
| if count > 0: |
| size_hex = f"0x{(i + 2) * 0x10:x}" |
| entries = [_addr_hex(a) for a in sim.tcache[i]] |
| bins["tcache"][size_hex] = {"count": count, "entries": entries} |
|
|
| for i, count in enumerate(state["fastbin_counts"]): |
| if count > 0: |
| size_hex = f"0x{(i + 2) * 0x10:x}" |
| entries = [_addr_hex(a) for a in sim.fastbins[i]] |
| bins["fastbin"][size_hex] = {"count": count, "entries": entries} |
|
|
| if state["unsorted_count"] > 0: |
| bins["unsorted"] = [_addr_hex(a) for a in sim.unsorted_bin] |
|
|
| return { |
| "step": step_idx, |
| "action": op_desc, |
| "op": op_info, |
| "chunks": chunks, |
| "bins": bins, |
| "primitives": prims, |
| "top_addr": _addr_hex(state["top_addr"]), |
| "top_size": f"0x{state['top_size']:x}", |
| "grid": grid, |
| "alloc_count": state["alloc_count"], |
| "free_count": state["free_count"], |
| } |
|
|
|
|
| def run_scenario(name): |
| scenario = SCENARIOS[name] |
| sim = HeapSimulator() |
| steps = [] |
|
|
| |
| steps.append(capture_step(sim, 0, "Initial heap state", {"type": "init"})) |
|
|
| for i, op in enumerate(scenario["ops"]): |
| op_type = op["type"] |
| if op_type == "malloc": |
| sim.malloc(op["size"], slot=op.get("slot")) |
| elif op_type == "free": |
| addr = sim.slots.get(op["slot"], 0) |
| sim.free(user_addr=addr, slot=op.get("slot")) |
| elif op_type == "write_freed": |
| addr = sim.slots.get(op["slot"], 0) |
| |
| |
| target = 0x41414141 |
| sim.write_to_freed(addr, target) |
|
|
| steps.append(capture_step(sim, i + 1, op["desc"], op)) |
|
|
| return { |
| "title": scenario["title"], |
| "description": scenario["description"], |
| "steps": steps, |
| "total_steps": len(steps), |
| } |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Generate heap-trm visualization data") |
| parser.add_argument("-s", "--scenario", default="tcache_poison", |
| choices=list(SCENARIOS.keys()), |
| help="Exploit scenario to visualize") |
| parser.add_argument("-o", "--output", default=None, |
| help="Output JSON file (default: viz/data/<scenario>.json)") |
| parser.add_argument("--all", action="store_true", |
| help="Generate all scenarios") |
| args = parser.parse_args() |
|
|
| os.makedirs(os.path.join(os.path.dirname(__file__), "data"), exist_ok=True) |
|
|
| scenarios_to_run = list(SCENARIOS.keys()) if args.all else [args.scenario] |
|
|
| for name in scenarios_to_run: |
| result = run_scenario(name) |
| out_path = args.output or os.path.join( |
| os.path.dirname(__file__), "data", f"{name}.json" |
| ) |
| with open(out_path, "w") as f: |
| json.dump(result, f, indent=2) |
| print(f"[+] Generated {out_path} ({result['total_steps']} steps)") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|