| |
| """ |
| fuzzer.py - HeapTRM-guided fuzzer for heap exploit discovery. |
| |
| Uses the v2 harness as an oracle: mutates inputs, scores heap states, |
| evolves toward exploit-triggering inputs. |
| |
| Two modes: |
| 1. Standalone: fuzz a binary that reads from stdin |
| 2. AFL post-processor: score AFL inputs for heap exploit potential |
| |
| Usage: |
| # Standalone fuzzing |
| from heaptrm.integrations.fuzzer import HeapFuzzer |
| fuzzer = HeapFuzzer("./target_binary") |
| fuzzer.run(max_iterations=10000) |
| |
| # CLI |
| python -m heaptrm.integrations.fuzzer ./target_binary --seeds seeds/ --output findings/ |
| """ |
|
|
| import os |
| import sys |
| import json |
| import random |
| import subprocess |
| import tempfile |
| import shutil |
| import time |
| import hashlib |
| from pathlib import Path |
| from dataclasses import dataclass, field |
| from typing import List, Optional, Set |
| from collections import defaultdict |
|
|
| |
| _PKG_ROOT = Path(__file__).parent.parent |
|
|
|
|
| @dataclass |
| class FuzzInput: |
| data: bytes |
| score: float = 0.0 |
| corruptions: int = 0 |
| corruption_types: set = field(default_factory=set) |
| n_states: int = 0 |
| generation: int = 0 |
| parent_hash: str = "" |
|
|
|
|
| @dataclass |
| class FuzzStats: |
| iterations: int = 0 |
| executions: int = 0 |
| crashes: int = 0 |
| corruptions_found: int = 0 |
| unique_corruption_types: set = field(default_factory=set) |
| best_score: float = 0.0 |
| corpus_size: int = 0 |
| start_time: float = 0.0 |
|
|
|
|
| class HeapFuzzer: |
| """ |
| Mutation-based fuzzer guided by heap exploit detection. |
| |
| Fitness = corruption_count * 100 + ml_exploit_score * 10 + n_heap_states |
| |
| Inputs that trigger heap corruptions are saved as findings. |
| """ |
|
|
| def __init__( |
| self, |
| binary: str, |
| args: list = None, |
| seeds: list = None, |
| output_dir: str = "heaptrm_findings", |
| harness_path: str = None, |
| ): |
| self.binary = binary |
| self.args = args or [] |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| (self.output_dir / "crashes").mkdir(exist_ok=True) |
| (self.output_dir / "corruptions").mkdir(exist_ok=True) |
| (self.output_dir / "interesting").mkdir(exist_ok=True) |
|
|
| |
| if harness_path: |
| self.harness = harness_path |
| else: |
| candidates = [ |
| _PKG_ROOT / "harness" / "heapgrid_v2.so", |
| _PKG_ROOT.parent / "harness" / "heapgrid_harness.so", |
| ] |
| self.harness = None |
| for c in candidates: |
| if c.exists(): |
| self.harness = str(c.resolve()) |
| break |
| if not self.harness: |
| raise FileNotFoundError("Cannot find harness .so") |
|
|
| |
| self.corpus: List[FuzzInput] = [] |
| self.seen_hashes: Set[str] = set() |
| self.stats = FuzzStats() |
|
|
| |
| if seeds: |
| for seed in seeds: |
| if isinstance(seed, bytes): |
| self._add_to_corpus(FuzzInput(data=seed)) |
| elif Path(seed).is_file(): |
| self._add_to_corpus(FuzzInput(data=Path(seed).read_bytes())) |
| elif Path(seed).is_dir(): |
| for f in Path(seed).iterdir(): |
| if f.is_file(): |
| self._add_to_corpus(FuzzInput(data=f.read_bytes())) |
|
|
| |
| if not self.corpus: |
| self._add_to_corpus(FuzzInput(data=b"A" * 64)) |
| self._add_to_corpus(FuzzInput(data=b"\x00" * 64)) |
| self._add_to_corpus(FuzzInput(data=bytes(range(256)))) |
|
|
| def _input_hash(self, data: bytes) -> str: |
| return hashlib.sha256(data).hexdigest()[:16] |
|
|
| def _add_to_corpus(self, inp: FuzzInput) -> bool: |
| h = self._input_hash(inp.data) |
| if h in self.seen_hashes: |
| return False |
| self.seen_hashes.add(h) |
| self.corpus.append(inp) |
| return True |
|
|
| def _execute(self, data: bytes) -> dict: |
| """Run binary with input, return heap analysis results.""" |
| dump_path = tempfile.mktemp(suffix=".jsonl") |
|
|
| env = os.environ.copy() |
| env["LD_PRELOAD"] = self.harness |
| env["HEAPGRID_OUT"] = dump_path |
|
|
| cmd = [self.binary] + self.args |
|
|
| try: |
| result = subprocess.run( |
| cmd, input=data, env=env, |
| capture_output=True, timeout=5, |
| ) |
| crashed = result.returncode < 0 |
| except subprocess.TimeoutExpired: |
| crashed = False |
|
|
| |
| states = [] |
| total_corruptions = 0 |
| corruption_types = set() |
|
|
| if os.path.exists(dump_path): |
| try: |
| with open(dump_path) as f: |
| for line in f: |
| if line.strip(): |
| state = json.loads(line.strip()) |
| states.append(state) |
| cc = state.get("corruption_count", 0) |
| if cc > 0: |
| total_corruptions += cc |
| for c in state.get("corruptions", []): |
| corruption_types.add(c.get("type", "unknown")) |
| except Exception: |
| pass |
| os.unlink(dump_path) |
|
|
| self.stats.executions += 1 |
|
|
| return { |
| "n_states": len(states), |
| "corruptions": total_corruptions, |
| "corruption_types": corruption_types, |
| "crashed": crashed, |
| } |
|
|
| def _score(self, result: dict) -> float: |
| """Score an execution result. Higher = more interesting.""" |
| score = 0.0 |
| score += result["corruptions"] * 100 |
| score += result["n_states"] * 0.1 |
| if result["crashed"]: |
| score += 50 |
| return score |
|
|
| def _mutate(self, data: bytes) -> bytes: |
| """Mutate input data.""" |
| if len(data) == 0: |
| return bytes([random.randint(0, 255)]) |
|
|
| data = bytearray(data) |
| n_mutations = random.randint(1, max(1, len(data) // 8)) |
|
|
| for _ in range(n_mutations): |
| strategy = random.choice([ |
| "flip_byte", "flip_bit", "interesting_value", |
| "insert", "delete", "splice", "repeat_block", |
| ]) |
|
|
| if strategy == "flip_byte" and data: |
| pos = random.randint(0, len(data) - 1) |
| data[pos] = random.randint(0, 255) |
|
|
| elif strategy == "flip_bit" and data: |
| pos = random.randint(0, len(data) - 1) |
| bit = random.randint(0, 7) |
| data[pos] ^= (1 << bit) |
|
|
| elif strategy == "interesting_value" and data: |
| pos = random.randint(0, len(data) - 1) |
| interesting = [0, 1, 0x7f, 0x80, 0xff, 0x41, 0x00, |
| 0xfe, 0xfd, 0x20, 0x0a, 0x0d] |
| data[pos] = random.choice(interesting) |
|
|
| elif strategy == "insert": |
| pos = random.randint(0, len(data)) |
| val = random.randint(0, 255) |
| count = random.randint(1, 16) |
| data[pos:pos] = bytes([val] * count) |
|
|
| elif strategy == "delete" and len(data) > 1: |
| pos = random.randint(0, len(data) - 1) |
| count = random.randint(1, min(16, len(data) - pos)) |
| del data[pos:pos + count] |
|
|
| elif strategy == "splice" and len(data) > 4: |
| src = random.randint(0, len(data) - 4) |
| dst = random.randint(0, len(data) - 1) |
| length = random.randint(1, min(16, len(data) - src)) |
| data[dst:dst + length] = data[src:src + length] |
|
|
| elif strategy == "repeat_block" and len(data) > 2: |
| pos = random.randint(0, len(data) - 2) |
| length = random.randint(1, min(8, len(data) - pos)) |
| block = data[pos:pos + length] |
| insert_pos = random.randint(0, len(data)) |
| data[insert_pos:insert_pos] = block * random.randint(2, 8) |
|
|
| |
| if len(data) > 4096: |
| data = data[:4096] |
|
|
| return bytes(data) |
|
|
| def _select_parent(self) -> FuzzInput: |
| """Select a parent input, biased toward higher scores.""" |
| if not self.corpus: |
| return FuzzInput(data=b"A" * 64) |
|
|
| |
| k = min(5, len(self.corpus)) |
| candidates = random.sample(self.corpus, k) |
| return max(candidates, key=lambda x: x.score) |
|
|
| def _save_finding(self, inp: FuzzInput, category: str): |
| """Save an interesting input.""" |
| h = self._input_hash(inp.data) |
| path = self.output_dir / category / f"{h}.bin" |
| path.write_bytes(inp.data) |
|
|
| meta = self.output_dir / category / f"{h}.json" |
| meta.write_text(json.dumps({ |
| "hash": h, |
| "score": inp.score, |
| "corruptions": inp.corruptions, |
| "corruption_types": list(inp.corruption_types), |
| "n_states": inp.n_states, |
| "generation": inp.generation, |
| "size": len(inp.data), |
| }, indent=2)) |
|
|
| def run(self, max_iterations: int = 10000, print_every: int = 100): |
| """Run the fuzzer.""" |
| self.stats.start_time = time.time() |
|
|
| print(f"HeapTRM Fuzzer") |
| print(f" Binary: {self.binary}") |
| print(f" Harness: {self.harness}") |
| print(f" Corpus: {len(self.corpus)} seeds") |
| print(f" Output: {self.output_dir}") |
| print() |
|
|
| |
| for inp in self.corpus: |
| result = self._execute(inp.data) |
| inp.score = self._score(result) |
| inp.n_states = result["n_states"] |
| inp.corruptions = result["corruptions"] |
| inp.corruption_types = result["corruption_types"] |
|
|
| for iteration in range(max_iterations): |
| self.stats.iterations = iteration + 1 |
|
|
| |
| parent = self._select_parent() |
| mutated_data = self._mutate(parent.data) |
|
|
| |
| result = self._execute(mutated_data) |
| score = self._score(result) |
|
|
| child = FuzzInput( |
| data=mutated_data, |
| score=score, |
| corruptions=result["corruptions"], |
| corruption_types=result["corruption_types"], |
| n_states=result["n_states"], |
| generation=parent.generation + 1, |
| parent_hash=self._input_hash(parent.data), |
| ) |
|
|
| |
| if result["crashed"]: |
| self.stats.crashes += 1 |
| self._save_finding(child, "crashes") |
|
|
| if result["corruptions"] > 0: |
| self.stats.corruptions_found += 1 |
| self.stats.unique_corruption_types.update(result["corruption_types"]) |
| self._save_finding(child, "corruptions") |
|
|
| |
| if score > 0 and self._add_to_corpus(child): |
| self.stats.corpus_size = len(self.corpus) |
| if score > self.stats.best_score: |
| self.stats.best_score = score |
| self._save_finding(child, "interesting") |
|
|
| |
| if (iteration + 1) % print_every == 0: |
| elapsed = time.time() - self.stats.start_time |
| exec_per_sec = self.stats.executions / max(elapsed, 0.1) |
| print(f" iter={iteration+1:6d} | exec={self.stats.executions} " |
| f"({exec_per_sec:.0f}/s) | corpus={len(self.corpus)} | " |
| f"crashes={self.stats.crashes} | " |
| f"corruptions={self.stats.corruptions_found} | " |
| f"best={self.stats.best_score:.0f} | " |
| f"types={self.stats.unique_corruption_types or 'none'}") |
|
|
| |
| elapsed = time.time() - self.stats.start_time |
| print() |
| print(f"=== Fuzzing Complete ===") |
| print(f" Duration: {elapsed:.1f}s") |
| print(f" Executions: {self.stats.executions} ({self.stats.executions/max(elapsed,0.1):.0f}/s)") |
| print(f" Crashes: {self.stats.crashes}") |
| print(f" Corruption findings: {self.stats.corruptions_found}") |
| print(f" Corruption types: {self.stats.unique_corruption_types or 'none'}") |
| print(f" Corpus: {len(self.corpus)} inputs") |
| print(f" Findings in: {self.output_dir}") |
|
|
| return self.stats |
|
|
|
|
| def main(): |
| import argparse |
| parser = argparse.ArgumentParser(description="HeapTRM-guided heap fuzzer") |
| parser.add_argument("binary", help="Target binary") |
| parser.add_argument("args", nargs="*", help="Binary arguments") |
| parser.add_argument("--seeds", help="Seed directory or file") |
| parser.add_argument("--output", default="heaptrm_findings", help="Output directory") |
| parser.add_argument("--iterations", type=int, default=10000) |
| parser.add_argument("--harness", help="Path to heapgrid harness .so") |
|
|
| args = parser.parse_args() |
|
|
| seeds = [args.seeds] if args.seeds else None |
| fuzzer = HeapFuzzer( |
| args.binary, |
| args=args.args, |
| seeds=seeds, |
| output_dir=args.output, |
| harness_path=args.harness, |
| ) |
| fuzzer.run(max_iterations=args.iterations) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|