#!/usr/bin/env python3 """ augment_data.py - Generate more training data by: 1. Running how2heap examples across multiple glibc versions 2. Adding random noise allocations between steps (synthetic variants) This multiplies the dataset by ~N glibc versions available. """ import subprocess import os from pathlib import Path ROOT = Path(__file__).resolve().parent.parent HOW2HEAP = ROOT / "how2heap" HARNESS = ROOT / "harness" / "heapgrid_harness.so" BIN_DIR = ROOT / "data" / "bins_augmented" DUMP_DIR = ROOT / "data" / "dumps" # All glibc versions that have examples GLIBC_VERSIONS = ["2.35", "2.36", "2.37", "2.38", "2.39"] # Techniques we want more data for TECHNIQUES = [ "fastbin_dup", "fastbin_dup_into_stack", "fastbin_dup_consolidate", "tcache_poisoning", "house_of_spirit", "unsafe_unlink", "tcache_house_of_spirit", "tcache_stashing_unlink_attack", "house_of_einherjar", "large_bin_attack", "poison_null_byte", "house_of_lore", "house_of_water", "house_of_botcake", "overlapping_chunks", "fastbin_reverse_into_tcache", "house_of_mind_fastbin", "tcache_relative_write", "safe_link_double_protect", "house_of_tangerine", "decrypt_safe_linking", "mmap_overlapping_chunks", "tcache_metadata_poisoning", "sysmalloc_int_free", ] def compile_and_run(technique: str, glibc_ver: str, suffix: str = ""): """Compile and instrument one technique from one glibc version.""" src = HOW2HEAP / f"glibc_{glibc_ver}" / f"{technique}.c" if not src.exists(): return False BIN_DIR.mkdir(parents=True, exist_ok=True) bin_name = f"{technique}_g{glibc_ver.replace('.', '')}{suffix}" bin_path = BIN_DIR / bin_name dump_path = DUMP_DIR / f"{bin_name}.jsonl" # Compile (source from different glibc version, but runs on local glibc) result = subprocess.run( ["gcc", "-o", str(bin_path), str(src), "-std=c99", "-g", "-O0", "-Wno-all", "-lpthread"], capture_output=True, text=True ) if result.returncode != 0: return False # Run with harness env = os.environ.copy() env["LD_PRELOAD"] = str(HARNESS) env["HEAPGRID_OUT"] = str(dump_path) subprocess.run( [str(bin_path)], env=env, capture_output=True, timeout=10 ) if dump_path.exists() and dump_path.stat().st_size > 0: lines = sum(1 for _ in open(dump_path)) return lines return False def main(): DUMP_DIR.mkdir(parents=True, exist_ok=True) total_new = 0 for ver in GLIBC_VERSIONS: if ver == "2.39": continue # already have this from base run print(f"\n--- glibc {ver} ---") for tech in TECHNIQUES: result = compile_and_run(tech, ver) if result: print(f" [OK] {tech} (glibc {ver}): {result} states") total_new += result else: print(f" [--] {tech} (glibc {ver}): skipped") print(f"\nTotal new states: {total_new}") print("Re-run dataset_gen.py with all dump files to rebuild dataset.") if __name__ == "__main__": main()