| |
| """ |
| benchmark.py - Benchmark HeapTRM against real CVE exploits in Docker. |
| |
| Builds vulnerable software versions in Docker containers, runs exploits |
| and benign workloads with our v2 harness, evaluates detection. |
| |
| CVEs tested: |
| 1. CVE-2021-3156 (Baron Samedit) - sudo heap overflow via sudoedit -s |
| 2. CVE-2021-4034 (PwnKit) - pkexec out-of-bounds write |
| 3. CVE-2023-6246 - glibc syslog heap overflow |
| |
| Each CVE has: |
| - Docker container with vulnerable version |
| - Exploit trigger command |
| - Benign workload command |
| """ |
|
|
| import subprocess |
| import os |
| import sys |
| import json |
| import tempfile |
| from pathlib import Path |
|
|
| ROOT = Path(__file__).resolve().parent.parent |
| HARNESS_SRC = ROOT / "heaptrm" / "harness" / "heapgrid_v2.c" |
|
|
| |
| CVES = { |
| "CVE-2021-3156": { |
| "name": "Baron Samedit (sudo heap overflow)", |
| "dockerfile": """ |
| FROM ubuntu:focal |
| ENV DEBIAN_FRONTEND=noninteractive |
| RUN apt-get update && apt-get install -y sudo=1.8.31-1ubuntu1 gcc && rm -rf /var/lib/apt/lists/* |
| RUN useradd -m testuser |
| COPY heapgrid_v2.c /tmp/ |
| RUN gcc -shared -fPIC -O2 -o /tmp/heapgrid_v2.so /tmp/heapgrid_v2.c -ldl -pthread |
| """, |
| "exploit_cmd": [ |
| "bash", "-c", |
| "HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "sudoedit -s '\\' $(python3 -c \"print('A'*1000)\") 2>/dev/null; true" |
| ], |
| "benign_cmd": [ |
| "bash", "-c", |
| "HEAPGRID_OUT=/dumps/benign.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "sudo --help 2>/dev/null; true" |
| ], |
| "user": "testuser", |
| }, |
| "CVE-2021-4034": { |
| "name": "PwnKit (pkexec heap/stack)", |
| "dockerfile": """ |
| FROM ubuntu:focal |
| ENV DEBIAN_FRONTEND=noninteractive |
| RUN apt-get update && apt-get install -y policykit-1=0.105-26ubuntu1 gcc && rm -rf /var/lib/apt/lists/* |
| RUN useradd -m testuser |
| COPY heapgrid_v2.c /tmp/ |
| RUN gcc -shared -fPIC -O2 -o /tmp/heapgrid_v2.so /tmp/heapgrid_v2.c -ldl -pthread |
| """, |
| "exploit_cmd": [ |
| "bash", "-c", |
| "HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "pkexec --help 2>/dev/null; " |
| "HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "env -i 'SHELL=bash' 'PATH=GCONV_PATH=.' pkexec 2>/dev/null; true" |
| ], |
| "benign_cmd": [ |
| "bash", "-c", |
| "HEAPGRID_OUT=/dumps/benign.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "pkexec --help 2>/dev/null; true" |
| ], |
| "user": "testuser", |
| }, |
| "CVE-2023-6246": { |
| "name": "glibc syslog heap overflow", |
| "dockerfile": """ |
| FROM ubuntu:lunar |
| ENV DEBIAN_FRONTEND=noninteractive |
| RUN apt-get update && apt-get install -y gcc && rm -rf /var/lib/apt/lists/* |
| COPY heapgrid_v2.c /tmp/ |
| RUN gcc -shared -fPIC -O2 -o /tmp/heapgrid_v2.so /tmp/heapgrid_v2.c -ldl -pthread |
| # Build a small program that calls syslog with a long format string |
| RUN cat > /tmp/syslog_test.c << 'EOF' |
| #include <syslog.h> |
| #include <string.h> |
| #include <stdlib.h> |
| int main(int argc, char **argv) { |
| int exploit = argc > 1 && strcmp(argv[1], "exploit") == 0; |
| openlog("test", LOG_PID, LOG_USER); |
| if (exploit) { |
| char buf[4096]; |
| memset(buf, 'A', sizeof(buf)-1); |
| buf[sizeof(buf)-1] = 0; |
| syslog(LOG_INFO, "%s", buf); |
| } else { |
| syslog(LOG_INFO, "normal log message"); |
| } |
| closelog(); |
| return 0; |
| } |
| EOF |
| RUN gcc -o /tmp/syslog_test /tmp/syslog_test.c -O0 |
| """, |
| "exploit_cmd": [ |
| "bash", "-c", |
| "HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "/tmp/syslog_test exploit 2>/dev/null; true" |
| ], |
| "benign_cmd": [ |
| "bash", "-c", |
| "HEAPGRID_OUT=/dumps/benign.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so " |
| "/tmp/syslog_test benign 2>/dev/null; true" |
| ], |
| }, |
| } |
|
|
|
|
| def build_image(cve_id, info): |
| """Build Docker image for a CVE.""" |
| tag = f"heaptrm-{cve_id.lower().replace('-', '')}" |
|
|
| |
| with tempfile.TemporaryDirectory() as tmpdir: |
| df_path = Path(tmpdir) / "Dockerfile" |
| df_path.write_text(info["dockerfile"]) |
|
|
| |
| import shutil |
| shutil.copy(HARNESS_SRC, Path(tmpdir) / "heapgrid_v2.c") |
|
|
| result = subprocess.run( |
| ["docker", "build", "--network=host", "-t", tag, tmpdir], |
| capture_output=True, text=True, timeout=120 |
| ) |
|
|
| if result.returncode != 0: |
| print(f" BUILD FAILED: {result.stderr[-200:]}") |
| return None |
| return tag |
|
|
|
|
| def run_test(tag, cmd, user=None): |
| """Run a command in the Docker container, return dump.""" |
| dump_dir = tempfile.mkdtemp() |
|
|
| docker_cmd = ["docker", "run", "--rm", "--network=none", |
| "-v", f"{dump_dir}:/dumps"] |
| if user: |
| docker_cmd.extend(["-u", user]) |
| docker_cmd.extend([tag] + cmd) |
|
|
| subprocess.run(docker_cmd, capture_output=True, timeout=30) |
|
|
| |
| results = {} |
| for dump_file in Path(dump_dir).glob("*.jsonl"): |
| states = [] |
| total_corruptions = 0 |
| corruption_types = set() |
| with open(dump_file) as f: |
| for line in f: |
| if line.strip(): |
| state = json.loads(line.strip()) |
| states.append(state) |
| cc = state.get("corruption_count", 0) |
| if cc > 0: |
| total_corruptions += cc |
| for c in state.get("corruptions", []): |
| corruption_types.add(c.get("type", "unknown")) |
|
|
| results[dump_file.stem] = { |
| "states": len(states), |
| "corruptions": total_corruptions, |
| "types": list(corruption_types), |
| } |
|
|
| |
| import shutil |
| shutil.rmtree(dump_dir) |
|
|
| return results |
|
|
|
|
| def main(): |
| print("=" * 70) |
| print("HeapTRM CVE Benchmark") |
| print("=" * 70) |
|
|
| results = {} |
| for cve_id, info in CVES.items(): |
| print(f"\n--- {cve_id}: {info['name']} ---") |
|
|
| tag = build_image(cve_id, info) |
| if not tag: |
| results[cve_id] = {"status": "BUILD_FAILED"} |
| continue |
|
|
| print(f" Running exploit...") |
| exploit_result = run_test(tag, info["exploit_cmd"], info.get("user")) |
|
|
| print(f" Running benign...") |
| benign_result = run_test(tag, info["benign_cmd"], info.get("user")) |
|
|
| exploit_data = exploit_result.get("exploit", {"states": 0, "corruptions": 0, "types": []}) |
| benign_data = benign_result.get("benign", {"states": 0, "corruptions": 0, "types": []}) |
|
|
| |
| exploit_detected = exploit_data["corruptions"] > 0 |
| benign_clean = benign_data["corruptions"] == 0 |
|
|
| if exploit_detected and benign_clean: |
| verdict = "PERFECT" |
| elif exploit_detected: |
| verdict = "DETECTED (with FP)" |
| elif not exploit_detected and benign_clean: |
| verdict = "MISSED" |
| else: |
| verdict = "FAILED" |
|
|
| results[cve_id] = { |
| "name": info["name"], |
| "verdict": verdict, |
| "exploit": exploit_data, |
| "benign": benign_data, |
| } |
|
|
| print(f" Exploit: {exploit_data['states']} states, " |
| f"{exploit_data['corruptions']} corruptions {exploit_data['types']}") |
| print(f" Benign: {benign_data['states']} states, " |
| f"{benign_data['corruptions']} corruptions {benign_data['types']}") |
| print(f" Verdict: {verdict}") |
|
|
| |
| print("\n" + "=" * 70) |
| print("BENCHMARK SUMMARY") |
| print("=" * 70) |
| for cve_id, r in results.items(): |
| if "verdict" in r: |
| print(f" {cve_id}: {r.get('name', '?'):45s} [{r['verdict']}]") |
| else: |
| print(f" {cve_id}: {r.get('status', 'UNKNOWN')}") |
|
|
| detected = sum(1 for r in results.values() if r.get("verdict") in ("PERFECT", "DETECTED (with FP)")) |
| total = len(results) |
| print(f"\n Detected: {detected}/{total}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|