| |
| """ |
| augment_data.py - Generate more training data by: |
| 1. Running how2heap examples across multiple glibc versions |
| 2. Adding random noise allocations between steps (synthetic variants) |
| |
| This multiplies the dataset by ~N glibc versions available. |
| """ |
|
|
| import subprocess |
| import os |
| from pathlib import Path |
|
|
| ROOT = Path(__file__).resolve().parent.parent |
| HOW2HEAP = ROOT / "how2heap" |
| HARNESS = ROOT / "harness" / "heapgrid_harness.so" |
| BIN_DIR = ROOT / "data" / "bins_augmented" |
| DUMP_DIR = ROOT / "data" / "dumps" |
|
|
| |
| GLIBC_VERSIONS = ["2.35", "2.36", "2.37", "2.38", "2.39"] |
|
|
| |
| TECHNIQUES = [ |
| "fastbin_dup", |
| "fastbin_dup_into_stack", |
| "fastbin_dup_consolidate", |
| "tcache_poisoning", |
| "house_of_spirit", |
| "unsafe_unlink", |
| "tcache_house_of_spirit", |
| "tcache_stashing_unlink_attack", |
| "house_of_einherjar", |
| "large_bin_attack", |
| "poison_null_byte", |
| "house_of_lore", |
| "house_of_water", |
| "house_of_botcake", |
| "overlapping_chunks", |
| "fastbin_reverse_into_tcache", |
| "house_of_mind_fastbin", |
| "tcache_relative_write", |
| "safe_link_double_protect", |
| "house_of_tangerine", |
| "decrypt_safe_linking", |
| "mmap_overlapping_chunks", |
| "tcache_metadata_poisoning", |
| "sysmalloc_int_free", |
| ] |
|
|
|
|
| def compile_and_run(technique: str, glibc_ver: str, suffix: str = ""): |
| """Compile and instrument one technique from one glibc version.""" |
| src = HOW2HEAP / f"glibc_{glibc_ver}" / f"{technique}.c" |
| if not src.exists(): |
| return False |
|
|
| BIN_DIR.mkdir(parents=True, exist_ok=True) |
| bin_name = f"{technique}_g{glibc_ver.replace('.', '')}{suffix}" |
| bin_path = BIN_DIR / bin_name |
| dump_path = DUMP_DIR / f"{bin_name}.jsonl" |
|
|
| |
| result = subprocess.run( |
| ["gcc", "-o", str(bin_path), str(src), |
| "-std=c99", "-g", "-O0", "-Wno-all", "-lpthread"], |
| capture_output=True, text=True |
| ) |
| if result.returncode != 0: |
| return False |
|
|
| |
| env = os.environ.copy() |
| env["LD_PRELOAD"] = str(HARNESS) |
| env["HEAPGRID_OUT"] = str(dump_path) |
|
|
| subprocess.run( |
| [str(bin_path)], |
| env=env, capture_output=True, timeout=10 |
| ) |
|
|
| if dump_path.exists() and dump_path.stat().st_size > 0: |
| lines = sum(1 for _ in open(dump_path)) |
| return lines |
| return False |
|
|
|
|
| def main(): |
| DUMP_DIR.mkdir(parents=True, exist_ok=True) |
| total_new = 0 |
|
|
| for ver in GLIBC_VERSIONS: |
| if ver == "2.39": |
| continue |
| print(f"\n--- glibc {ver} ---") |
| for tech in TECHNIQUES: |
| result = compile_and_run(tech, ver) |
| if result: |
| print(f" [OK] {tech} (glibc {ver}): {result} states") |
| total_new += result |
| else: |
| print(f" [--] {tech} (glibc {ver}): skipped") |
|
|
| print(f"\nTotal new states: {total_new}") |
| print("Re-run dataset_gen.py with all dump files to rebuild dataset.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|