heap-trm / cve_tests /benchmark.py
amarck's picture
CVE benchmark: 4/5 real CVE patterns detected, 0 false positives
d869ff3
#!/usr/bin/env python3
"""
benchmark.py - Benchmark HeapTRM against real CVE exploits in Docker.
Builds vulnerable software versions in Docker containers, runs exploits
and benign workloads with our v2 harness, evaluates detection.
CVEs tested:
1. CVE-2021-3156 (Baron Samedit) - sudo heap overflow via sudoedit -s
2. CVE-2021-4034 (PwnKit) - pkexec out-of-bounds write
3. CVE-2023-6246 - glibc syslog heap overflow
Each CVE has:
- Docker container with vulnerable version
- Exploit trigger command
- Benign workload command
"""
import subprocess
import os
import sys
import json
import tempfile
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
HARNESS_SRC = ROOT / "heaptrm" / "harness" / "heapgrid_v2.c"
# Each CVE: (name, Dockerfile content, exploit_cmd, benign_cmd)
CVES = {
"CVE-2021-3156": {
"name": "Baron Samedit (sudo heap overflow)",
"dockerfile": """
FROM ubuntu:focal
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y sudo=1.8.31-1ubuntu1 gcc && rm -rf /var/lib/apt/lists/*
RUN useradd -m testuser
COPY heapgrid_v2.c /tmp/
RUN gcc -shared -fPIC -O2 -o /tmp/heapgrid_v2.so /tmp/heapgrid_v2.c -ldl -pthread
""",
"exploit_cmd": [
"bash", "-c",
"HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"sudoedit -s '\\' $(python3 -c \"print('A'*1000)\") 2>/dev/null; true"
],
"benign_cmd": [
"bash", "-c",
"HEAPGRID_OUT=/dumps/benign.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"sudo --help 2>/dev/null; true"
],
"user": "testuser",
},
"CVE-2021-4034": {
"name": "PwnKit (pkexec heap/stack)",
"dockerfile": """
FROM ubuntu:focal
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y policykit-1=0.105-26ubuntu1 gcc && rm -rf /var/lib/apt/lists/*
RUN useradd -m testuser
COPY heapgrid_v2.c /tmp/
RUN gcc -shared -fPIC -O2 -o /tmp/heapgrid_v2.so /tmp/heapgrid_v2.c -ldl -pthread
""",
"exploit_cmd": [
"bash", "-c",
"HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"pkexec --help 2>/dev/null; "
"HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"env -i 'SHELL=bash' 'PATH=GCONV_PATH=.' pkexec 2>/dev/null; true"
],
"benign_cmd": [
"bash", "-c",
"HEAPGRID_OUT=/dumps/benign.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"pkexec --help 2>/dev/null; true"
],
"user": "testuser",
},
"CVE-2023-6246": {
"name": "glibc syslog heap overflow",
"dockerfile": """
FROM ubuntu:lunar
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y gcc && rm -rf /var/lib/apt/lists/*
COPY heapgrid_v2.c /tmp/
RUN gcc -shared -fPIC -O2 -o /tmp/heapgrid_v2.so /tmp/heapgrid_v2.c -ldl -pthread
# Build a small program that calls syslog with a long format string
RUN cat > /tmp/syslog_test.c << 'EOF'
#include <syslog.h>
#include <string.h>
#include <stdlib.h>
int main(int argc, char **argv) {
int exploit = argc > 1 && strcmp(argv[1], "exploit") == 0;
openlog("test", LOG_PID, LOG_USER);
if (exploit) {
char buf[4096];
memset(buf, 'A', sizeof(buf)-1);
buf[sizeof(buf)-1] = 0;
syslog(LOG_INFO, "%s", buf);
} else {
syslog(LOG_INFO, "normal log message");
}
closelog();
return 0;
}
EOF
RUN gcc -o /tmp/syslog_test /tmp/syslog_test.c -O0
""",
"exploit_cmd": [
"bash", "-c",
"HEAPGRID_OUT=/dumps/exploit.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"/tmp/syslog_test exploit 2>/dev/null; true"
],
"benign_cmd": [
"bash", "-c",
"HEAPGRID_OUT=/dumps/benign.jsonl LD_PRELOAD=/tmp/heapgrid_v2.so "
"/tmp/syslog_test benign 2>/dev/null; true"
],
},
}
def build_image(cve_id, info):
"""Build Docker image for a CVE."""
tag = f"heaptrm-{cve_id.lower().replace('-', '')}"
# Write Dockerfile
with tempfile.TemporaryDirectory() as tmpdir:
df_path = Path(tmpdir) / "Dockerfile"
df_path.write_text(info["dockerfile"])
# Copy harness source
import shutil
shutil.copy(HARNESS_SRC, Path(tmpdir) / "heapgrid_v2.c")
result = subprocess.run(
["docker", "build", "--network=host", "-t", tag, tmpdir],
capture_output=True, text=True, timeout=120
)
if result.returncode != 0:
print(f" BUILD FAILED: {result.stderr[-200:]}")
return None
return tag
def run_test(tag, cmd, user=None):
"""Run a command in the Docker container, return dump."""
dump_dir = tempfile.mkdtemp()
docker_cmd = ["docker", "run", "--rm", "--network=none",
"-v", f"{dump_dir}:/dumps"]
if user:
docker_cmd.extend(["-u", user])
docker_cmd.extend([tag] + cmd)
subprocess.run(docker_cmd, capture_output=True, timeout=30)
# Read dumps
results = {}
for dump_file in Path(dump_dir).glob("*.jsonl"):
states = []
total_corruptions = 0
corruption_types = set()
with open(dump_file) as f:
for line in f:
if line.strip():
state = json.loads(line.strip())
states.append(state)
cc = state.get("corruption_count", 0)
if cc > 0:
total_corruptions += cc
for c in state.get("corruptions", []):
corruption_types.add(c.get("type", "unknown"))
results[dump_file.stem] = {
"states": len(states),
"corruptions": total_corruptions,
"types": list(corruption_types),
}
# Cleanup
import shutil
shutil.rmtree(dump_dir)
return results
def main():
print("=" * 70)
print("HeapTRM CVE Benchmark")
print("=" * 70)
results = {}
for cve_id, info in CVES.items():
print(f"\n--- {cve_id}: {info['name']} ---")
tag = build_image(cve_id, info)
if not tag:
results[cve_id] = {"status": "BUILD_FAILED"}
continue
print(f" Running exploit...")
exploit_result = run_test(tag, info["exploit_cmd"], info.get("user"))
print(f" Running benign...")
benign_result = run_test(tag, info["benign_cmd"], info.get("user"))
exploit_data = exploit_result.get("exploit", {"states": 0, "corruptions": 0, "types": []})
benign_data = benign_result.get("benign", {"states": 0, "corruptions": 0, "types": []})
# Verdict
exploit_detected = exploit_data["corruptions"] > 0
benign_clean = benign_data["corruptions"] == 0
if exploit_detected and benign_clean:
verdict = "PERFECT"
elif exploit_detected:
verdict = "DETECTED (with FP)"
elif not exploit_detected and benign_clean:
verdict = "MISSED"
else:
verdict = "FAILED"
results[cve_id] = {
"name": info["name"],
"verdict": verdict,
"exploit": exploit_data,
"benign": benign_data,
}
print(f" Exploit: {exploit_data['states']} states, "
f"{exploit_data['corruptions']} corruptions {exploit_data['types']}")
print(f" Benign: {benign_data['states']} states, "
f"{benign_data['corruptions']} corruptions {benign_data['types']}")
print(f" Verdict: {verdict}")
# Summary
print("\n" + "=" * 70)
print("BENCHMARK SUMMARY")
print("=" * 70)
for cve_id, r in results.items():
if "verdict" in r:
print(f" {cve_id}: {r.get('name', '?'):45s} [{r['verdict']}]")
else:
print(f" {cve_id}: {r.get('status', 'UNKNOWN')}")
detected = sum(1 for r in results.values() if r.get("verdict") in ("PERFECT", "DETECTED (with FP)"))
total = len(results)
print(f"\n Detected: {detected}/{total}")
if __name__ == "__main__":
main()