amarck's picture
HeapTRM fuzzer: mutation-based fuzzing guided by heap corruption detection
5f0bcb2
#!/usr/bin/env python3
"""
fuzzer.py - HeapTRM-guided fuzzer for heap exploit discovery.
Uses the v2 harness as an oracle: mutates inputs, scores heap states,
evolves toward exploit-triggering inputs.
Two modes:
1. Standalone: fuzz a binary that reads from stdin
2. AFL post-processor: score AFL inputs for heap exploit potential
Usage:
# Standalone fuzzing
from heaptrm.integrations.fuzzer import HeapFuzzer
fuzzer = HeapFuzzer("./target_binary")
fuzzer.run(max_iterations=10000)
# CLI
python -m heaptrm.integrations.fuzzer ./target_binary --seeds seeds/ --output findings/
"""
import os
import sys
import json
import random
import subprocess
import tempfile
import shutil
import time
import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional, Set
from collections import defaultdict
# Find package root
_PKG_ROOT = Path(__file__).parent.parent
@dataclass
class FuzzInput:
data: bytes
score: float = 0.0
corruptions: int = 0
corruption_types: set = field(default_factory=set)
n_states: int = 0
generation: int = 0
parent_hash: str = ""
@dataclass
class FuzzStats:
iterations: int = 0
executions: int = 0
crashes: int = 0
corruptions_found: int = 0
unique_corruption_types: set = field(default_factory=set)
best_score: float = 0.0
corpus_size: int = 0
start_time: float = 0.0
class HeapFuzzer:
"""
Mutation-based fuzzer guided by heap exploit detection.
Fitness = corruption_count * 100 + ml_exploit_score * 10 + n_heap_states
Inputs that trigger heap corruptions are saved as findings.
"""
def __init__(
self,
binary: str,
args: list = None,
seeds: list = None,
output_dir: str = "heaptrm_findings",
harness_path: str = None,
):
self.binary = binary
self.args = args or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
(self.output_dir / "crashes").mkdir(exist_ok=True)
(self.output_dir / "corruptions").mkdir(exist_ok=True)
(self.output_dir / "interesting").mkdir(exist_ok=True)
# Find harness
if harness_path:
self.harness = harness_path
else:
candidates = [
_PKG_ROOT / "harness" / "heapgrid_v2.so",
_PKG_ROOT.parent / "harness" / "heapgrid_harness.so",
]
self.harness = None
for c in candidates:
if c.exists():
self.harness = str(c.resolve())
break
if not self.harness:
raise FileNotFoundError("Cannot find harness .so")
# Corpus
self.corpus: List[FuzzInput] = []
self.seen_hashes: Set[str] = set()
self.stats = FuzzStats()
# Load seeds
if seeds:
for seed in seeds:
if isinstance(seed, bytes):
self._add_to_corpus(FuzzInput(data=seed))
elif Path(seed).is_file():
self._add_to_corpus(FuzzInput(data=Path(seed).read_bytes()))
elif Path(seed).is_dir():
for f in Path(seed).iterdir():
if f.is_file():
self._add_to_corpus(FuzzInput(data=f.read_bytes()))
# Default seed if empty
if not self.corpus:
self._add_to_corpus(FuzzInput(data=b"A" * 64))
self._add_to_corpus(FuzzInput(data=b"\x00" * 64))
self._add_to_corpus(FuzzInput(data=bytes(range(256))))
def _input_hash(self, data: bytes) -> str:
return hashlib.sha256(data).hexdigest()[:16]
def _add_to_corpus(self, inp: FuzzInput) -> bool:
h = self._input_hash(inp.data)
if h in self.seen_hashes:
return False
self.seen_hashes.add(h)
self.corpus.append(inp)
return True
def _execute(self, data: bytes) -> dict:
"""Run binary with input, return heap analysis results."""
dump_path = tempfile.mktemp(suffix=".jsonl")
env = os.environ.copy()
env["LD_PRELOAD"] = self.harness
env["HEAPGRID_OUT"] = dump_path
cmd = [self.binary] + self.args
try:
result = subprocess.run(
cmd, input=data, env=env,
capture_output=True, timeout=5,
)
crashed = result.returncode < 0 # signal
except subprocess.TimeoutExpired:
crashed = False
# Parse dump
states = []
total_corruptions = 0
corruption_types = set()
if os.path.exists(dump_path):
try:
with open(dump_path) as f:
for line in f:
if line.strip():
state = json.loads(line.strip())
states.append(state)
cc = state.get("corruption_count", 0)
if cc > 0:
total_corruptions += cc
for c in state.get("corruptions", []):
corruption_types.add(c.get("type", "unknown"))
except Exception:
pass
os.unlink(dump_path)
self.stats.executions += 1
return {
"n_states": len(states),
"corruptions": total_corruptions,
"corruption_types": corruption_types,
"crashed": crashed,
}
def _score(self, result: dict) -> float:
"""Score an execution result. Higher = more interesting."""
score = 0.0
score += result["corruptions"] * 100 # corruptions are gold
score += result["n_states"] * 0.1 # more heap ops = more surface
if result["crashed"]:
score += 50 # crashes are interesting
return score
def _mutate(self, data: bytes) -> bytes:
"""Mutate input data."""
if len(data) == 0:
return bytes([random.randint(0, 255)])
data = bytearray(data)
n_mutations = random.randint(1, max(1, len(data) // 8))
for _ in range(n_mutations):
strategy = random.choice([
"flip_byte", "flip_bit", "interesting_value",
"insert", "delete", "splice", "repeat_block",
])
if strategy == "flip_byte" and data:
pos = random.randint(0, len(data) - 1)
data[pos] = random.randint(0, 255)
elif strategy == "flip_bit" and data:
pos = random.randint(0, len(data) - 1)
bit = random.randint(0, 7)
data[pos] ^= (1 << bit)
elif strategy == "interesting_value" and data:
pos = random.randint(0, len(data) - 1)
interesting = [0, 1, 0x7f, 0x80, 0xff, 0x41, 0x00,
0xfe, 0xfd, 0x20, 0x0a, 0x0d]
data[pos] = random.choice(interesting)
elif strategy == "insert":
pos = random.randint(0, len(data))
val = random.randint(0, 255)
count = random.randint(1, 16)
data[pos:pos] = bytes([val] * count)
elif strategy == "delete" and len(data) > 1:
pos = random.randint(0, len(data) - 1)
count = random.randint(1, min(16, len(data) - pos))
del data[pos:pos + count]
elif strategy == "splice" and len(data) > 4:
src = random.randint(0, len(data) - 4)
dst = random.randint(0, len(data) - 1)
length = random.randint(1, min(16, len(data) - src))
data[dst:dst + length] = data[src:src + length]
elif strategy == "repeat_block" and len(data) > 2:
pos = random.randint(0, len(data) - 2)
length = random.randint(1, min(8, len(data) - pos))
block = data[pos:pos + length]
insert_pos = random.randint(0, len(data))
data[insert_pos:insert_pos] = block * random.randint(2, 8)
# Clamp size
if len(data) > 4096:
data = data[:4096]
return bytes(data)
def _select_parent(self) -> FuzzInput:
"""Select a parent input, biased toward higher scores."""
if not self.corpus:
return FuzzInput(data=b"A" * 64)
# Tournament selection
k = min(5, len(self.corpus))
candidates = random.sample(self.corpus, k)
return max(candidates, key=lambda x: x.score)
def _save_finding(self, inp: FuzzInput, category: str):
"""Save an interesting input."""
h = self._input_hash(inp.data)
path = self.output_dir / category / f"{h}.bin"
path.write_bytes(inp.data)
meta = self.output_dir / category / f"{h}.json"
meta.write_text(json.dumps({
"hash": h,
"score": inp.score,
"corruptions": inp.corruptions,
"corruption_types": list(inp.corruption_types),
"n_states": inp.n_states,
"generation": inp.generation,
"size": len(inp.data),
}, indent=2))
def run(self, max_iterations: int = 10000, print_every: int = 100):
"""Run the fuzzer."""
self.stats.start_time = time.time()
print(f"HeapTRM Fuzzer")
print(f" Binary: {self.binary}")
print(f" Harness: {self.harness}")
print(f" Corpus: {len(self.corpus)} seeds")
print(f" Output: {self.output_dir}")
print()
# Initial scoring of seeds
for inp in self.corpus:
result = self._execute(inp.data)
inp.score = self._score(result)
inp.n_states = result["n_states"]
inp.corruptions = result["corruptions"]
inp.corruption_types = result["corruption_types"]
for iteration in range(max_iterations):
self.stats.iterations = iteration + 1
# Select and mutate
parent = self._select_parent()
mutated_data = self._mutate(parent.data)
# Execute
result = self._execute(mutated_data)
score = self._score(result)
child = FuzzInput(
data=mutated_data,
score=score,
corruptions=result["corruptions"],
corruption_types=result["corruption_types"],
n_states=result["n_states"],
generation=parent.generation + 1,
parent_hash=self._input_hash(parent.data),
)
# Track findings
if result["crashed"]:
self.stats.crashes += 1
self._save_finding(child, "crashes")
if result["corruptions"] > 0:
self.stats.corruptions_found += 1
self.stats.unique_corruption_types.update(result["corruption_types"])
self._save_finding(child, "corruptions")
# Add to corpus if interesting
if score > 0 and self._add_to_corpus(child):
self.stats.corpus_size = len(self.corpus)
if score > self.stats.best_score:
self.stats.best_score = score
self._save_finding(child, "interesting")
# Status
if (iteration + 1) % print_every == 0:
elapsed = time.time() - self.stats.start_time
exec_per_sec = self.stats.executions / max(elapsed, 0.1)
print(f" iter={iteration+1:6d} | exec={self.stats.executions} "
f"({exec_per_sec:.0f}/s) | corpus={len(self.corpus)} | "
f"crashes={self.stats.crashes} | "
f"corruptions={self.stats.corruptions_found} | "
f"best={self.stats.best_score:.0f} | "
f"types={self.stats.unique_corruption_types or 'none'}")
# Final summary
elapsed = time.time() - self.stats.start_time
print()
print(f"=== Fuzzing Complete ===")
print(f" Duration: {elapsed:.1f}s")
print(f" Executions: {self.stats.executions} ({self.stats.executions/max(elapsed,0.1):.0f}/s)")
print(f" Crashes: {self.stats.crashes}")
print(f" Corruption findings: {self.stats.corruptions_found}")
print(f" Corruption types: {self.stats.unique_corruption_types or 'none'}")
print(f" Corpus: {len(self.corpus)} inputs")
print(f" Findings in: {self.output_dir}")
return self.stats
def main():
import argparse
parser = argparse.ArgumentParser(description="HeapTRM-guided heap fuzzer")
parser.add_argument("binary", help="Target binary")
parser.add_argument("args", nargs="*", help="Binary arguments")
parser.add_argument("--seeds", help="Seed directory or file")
parser.add_argument("--output", default="heaptrm_findings", help="Output directory")
parser.add_argument("--iterations", type=int, default=10000)
parser.add_argument("--harness", help="Path to heapgrid harness .so")
args = parser.parse_args()
seeds = [args.seeds] if args.seeds else None
fuzzer = HeapFuzzer(
args.binary,
args=args.args,
seeds=seeds,
output_dir=args.output,
harness_path=args.harness,
)
fuzzer.run(max_iterations=args.iterations)
if __name__ == "__main__":
main()