"""Vulnerability detection benchmark adapters (BigVul, DiverseVul, PrimeVul, Devign).""" from __future__ import annotations from typing import Any from adapters import DatasetAdapter # Injected at runtime by _set_helpers() _highlight_code = None _code_offset = None _extract_test_classes = None # --------------------------------------------------------------------------- # BigVul adapter (HuggingFace: bstee615/bigvul) # --------------------------------------------------------------------------- class BigVulAdapter(DatasetAdapter): slug = "bigvul" display_name = "BigVul" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row.get("CVE_ID", str(idx)), "entry_point": row.get("CVE_ID", f"bigvul_{idx}"), "num_inputs": 0, "source": row.get("CWE_ID", "unknown"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] vuln_code = row.get("func_before", "") fixed_code = row.get("func_after", "") lang = row.get("lang", "c") lang_key = {"C": "c", "Java": "java", "PHP": "php"}.get(lang, "c") return { "idx": idx, "task_id": row.get("CVE_ID", str(idx)), "entry_point": row.get("CVE_ID", f"bigvul_{idx}"), "code": fixed_code, "highlighted_code": _highlight_code(fixed_code, language=lang_key), "inputs": [], "outputs": [], "test": None, "tasks": [], "source": row.get("CWE_ID", "unknown"), "has_ground_truth": False, "has_tasks": False, "description": row.get("commit_message", ""), "vulnerable_code": vuln_code, "vulnerable_highlighted_code": _highlight_code(vuln_code, language=lang_key), "patched_code": fixed_code, "patched_highlighted_code": _highlight_code(fixed_code, language=lang_key), "cwe_id": row.get("CWE_ID", ""), "cve_id": row.get("CVE_ID", ""), "project": row.get("project", ""), "language": lang, "is_vulnerable": bool(row.get("vul", 0)), } # --------------------------------------------------------------------------- # DiverseVul adapter (HuggingFace: claudios/DiverseVul) # --------------------------------------------------------------------------- class DiverseVulAdapter(DatasetAdapter): slug = "diversevul" display_name = "DiverseVul" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] cwe_list = row.get("cwe", []) cwe_label = cwe_list[0] if cwe_list else "unknown" label = "Vulnerable" if row.get("target", 0) == 1 else "Patched" return { "idx": idx, "task_id": row.get("commit_id", str(idx))[:12], "entry_point": row.get("project", f"diversevul_{idx}"), "num_inputs": 0, "source": f"{label}/{cwe_label}", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row.get("func", "") cwe_list = list(row.get("cwe", [])) is_vuln = row.get("target", 0) == 1 return { "idx": idx, "task_id": row.get("commit_id", str(idx))[:12], "entry_point": row.get("project", f"diversevul_{idx}"), "code": code, "highlighted_code": _highlight_code(code, language="c"), "inputs": [], "outputs": [], "test": None, "tasks": [], "source": "Vulnerable" if is_vuln else "Patched", "has_ground_truth": False, "has_tasks": False, "description": row.get("message", ""), "vulnerable_code": code if is_vuln else "", "vulnerable_highlighted_code": _highlight_code(code, language="c") if is_vuln else "", "patched_code": code if not is_vuln else "", "patched_highlighted_code": ( _highlight_code(code, language="c") if not is_vuln else "" ), "cwe_id": ", ".join(cwe_list) if cwe_list else "", "project": row.get("project", ""), "language": "C/C++", "is_vulnerable": is_vuln, } # --------------------------------------------------------------------------- # PrimeVul adapter (HuggingFace: starsofchance/PrimeVul) # --------------------------------------------------------------------------- class PrimeVulAdapter(DatasetAdapter): slug = "primevul" display_name = "PrimeVul" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] label = "Vulnerable" if row.get("target", 0) == 1 else "Patched" return { "idx": idx, "task_id": row.get("commit_id", str(idx))[:12], "entry_point": row.get("project", f"primevul_{idx}"), "num_inputs": 0, "source": label, } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row.get("func", "") is_vuln = row.get("target", 0) == 1 cwe_list = list(row.get("cwe", [])) return { "idx": idx, "task_id": row.get("commit_id", str(idx))[:12], "entry_point": row.get("project", f"primevul_{idx}"), "code": code, "highlighted_code": _highlight_code(code, language="c"), "inputs": [], "outputs": [], "test": None, "tasks": [], "source": "Vulnerable" if is_vuln else "Patched", "has_ground_truth": False, "has_tasks": False, "description": row.get("commit_message", ""), "vulnerable_code": code if is_vuln else "", "vulnerable_highlighted_code": _highlight_code(code, language="c") if is_vuln else "", "patched_code": code if not is_vuln else "", "patched_highlighted_code": ( _highlight_code(code, language="c") if not is_vuln else "" ), "cwe_id": ", ".join(cwe_list) if cwe_list else "", "project": row.get("project", ""), "language": "C/C++", "is_vulnerable": is_vuln, } # --------------------------------------------------------------------------- # Devign adapter (HuggingFace: google/code_x_glue_cc_defect_detection) # --------------------------------------------------------------------------- class DevignAdapter(DatasetAdapter): slug = "devign" display_name = "Devign" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] label = "Vulnerable" if row.get("target", 0) == 1 else "Clean" return { "idx": idx, "task_id": str(row.get("commit_id", idx))[:12], "entry_point": row.get("project", f"devign_{idx}"), "num_inputs": 0, "source": label, } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row.get("func", "") is_vuln = row.get("target", 0) == 1 return { "idx": idx, "task_id": str(row.get("commit_id", idx))[:12], "entry_point": row.get("project", f"devign_{idx}"), "code": code, "highlighted_code": _highlight_code(code, language="c"), "inputs": [], "outputs": [], "test": None, "tasks": [], "source": "Vulnerable" if is_vuln else "Clean", "has_ground_truth": False, "has_tasks": False, "description": row.get("commit_message", ""), "is_vulnerable": is_vuln, "project": row.get("project", ""), "language": "C", }