File size: 3,158 Bytes
22374d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | #!/usr/bin/env python3
"""
augment_data.py - Generate more training data by:
1. Running how2heap examples across multiple glibc versions
2. Adding random noise allocations between steps (synthetic variants)
This multiplies the dataset by ~N glibc versions available.
"""
import subprocess
import os
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
HOW2HEAP = ROOT / "how2heap"
HARNESS = ROOT / "harness" / "heapgrid_harness.so"
BIN_DIR = ROOT / "data" / "bins_augmented"
DUMP_DIR = ROOT / "data" / "dumps"
# All glibc versions that have examples
GLIBC_VERSIONS = ["2.35", "2.36", "2.37", "2.38", "2.39"]
# Techniques we want more data for
TECHNIQUES = [
"fastbin_dup",
"fastbin_dup_into_stack",
"fastbin_dup_consolidate",
"tcache_poisoning",
"house_of_spirit",
"unsafe_unlink",
"tcache_house_of_spirit",
"tcache_stashing_unlink_attack",
"house_of_einherjar",
"large_bin_attack",
"poison_null_byte",
"house_of_lore",
"house_of_water",
"house_of_botcake",
"overlapping_chunks",
"fastbin_reverse_into_tcache",
"house_of_mind_fastbin",
"tcache_relative_write",
"safe_link_double_protect",
"house_of_tangerine",
"decrypt_safe_linking",
"mmap_overlapping_chunks",
"tcache_metadata_poisoning",
"sysmalloc_int_free",
]
def compile_and_run(technique: str, glibc_ver: str, suffix: str = ""):
"""Compile and instrument one technique from one glibc version."""
src = HOW2HEAP / f"glibc_{glibc_ver}" / f"{technique}.c"
if not src.exists():
return False
BIN_DIR.mkdir(parents=True, exist_ok=True)
bin_name = f"{technique}_g{glibc_ver.replace('.', '')}{suffix}"
bin_path = BIN_DIR / bin_name
dump_path = DUMP_DIR / f"{bin_name}.jsonl"
# Compile (source from different glibc version, but runs on local glibc)
result = subprocess.run(
["gcc", "-o", str(bin_path), str(src),
"-std=c99", "-g", "-O0", "-Wno-all", "-lpthread"],
capture_output=True, text=True
)
if result.returncode != 0:
return False
# Run with harness
env = os.environ.copy()
env["LD_PRELOAD"] = str(HARNESS)
env["HEAPGRID_OUT"] = str(dump_path)
subprocess.run(
[str(bin_path)],
env=env, capture_output=True, timeout=10
)
if dump_path.exists() and dump_path.stat().st_size > 0:
lines = sum(1 for _ in open(dump_path))
return lines
return False
def main():
DUMP_DIR.mkdir(parents=True, exist_ok=True)
total_new = 0
for ver in GLIBC_VERSIONS:
if ver == "2.39":
continue # already have this from base run
print(f"\n--- glibc {ver} ---")
for tech in TECHNIQUES:
result = compile_and_run(tech, ver)
if result:
print(f" [OK] {tech} (glibc {ver}): {result} states")
total_new += result
else:
print(f" [--] {tech} (glibc {ver}): skipped")
print(f"\nTotal new states: {total_new}")
print("Re-run dataset_gen.py with all dump files to rebuild dataset.")
if __name__ == "__main__":
main()
|