| | """Vulnerability detection benchmark adapters (BigVul, DiverseVul, PrimeVul, Devign).""" |
| |
|
| | from __future__ import annotations |
| |
|
| | from typing import Any |
| |
|
| | from adapters import DatasetAdapter |
| |
|
| | |
| | _highlight_code = None |
| | _code_offset = None |
| | _extract_test_classes = None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class BigVulAdapter(DatasetAdapter): |
| | slug = "bigvul" |
| | display_name = "BigVul" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("CVE_ID", str(idx)), |
| | "entry_point": row.get("CVE_ID", f"bigvul_{idx}"), |
| | "num_inputs": 0, |
| | "source": row.get("CWE_ID", "unknown"), |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | vuln_code = row.get("func_before", "") |
| | fixed_code = row.get("func_after", "") |
| | lang = row.get("lang", "c") |
| | lang_key = {"C": "c", "Java": "java", "PHP": "php"}.get(lang, "c") |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("CVE_ID", str(idx)), |
| | "entry_point": row.get("CVE_ID", f"bigvul_{idx}"), |
| | "code": fixed_code, |
| | "highlighted_code": _highlight_code(fixed_code, language=lang_key), |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": row.get("CWE_ID", "unknown"), |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": row.get("commit_message", ""), |
| | "vulnerable_code": vuln_code, |
| | "vulnerable_highlighted_code": _highlight_code(vuln_code, language=lang_key), |
| | "patched_code": fixed_code, |
| | "patched_highlighted_code": _highlight_code(fixed_code, language=lang_key), |
| | "cwe_id": row.get("CWE_ID", ""), |
| | "cve_id": row.get("CVE_ID", ""), |
| | "project": row.get("project", ""), |
| | "language": lang, |
| | "is_vulnerable": bool(row.get("vul", 0)), |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class DiverseVulAdapter(DatasetAdapter): |
| | slug = "diversevul" |
| | display_name = "DiverseVul" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | cwe_list = row.get("cwe", []) |
| | cwe_label = cwe_list[0] if cwe_list else "unknown" |
| | label = "Vulnerable" if row.get("target", 0) == 1 else "Patched" |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("commit_id", str(idx))[:12], |
| | "entry_point": row.get("project", f"diversevul_{idx}"), |
| | "num_inputs": 0, |
| | "source": f"{label}/{cwe_label}", |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | code = row.get("func", "") |
| | cwe_list = list(row.get("cwe", [])) |
| | is_vuln = row.get("target", 0) == 1 |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("commit_id", str(idx))[:12], |
| | "entry_point": row.get("project", f"diversevul_{idx}"), |
| | "code": code, |
| | "highlighted_code": _highlight_code(code, language="c"), |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": "Vulnerable" if is_vuln else "Patched", |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": row.get("message", ""), |
| | "vulnerable_code": code if is_vuln else "", |
| | "vulnerable_highlighted_code": _highlight_code(code, language="c") if is_vuln else "", |
| | "patched_code": code if not is_vuln else "", |
| | "patched_highlighted_code": ( |
| | _highlight_code(code, language="c") if not is_vuln else "" |
| | ), |
| | "cwe_id": ", ".join(cwe_list) if cwe_list else "", |
| | "project": row.get("project", ""), |
| | "language": "C/C++", |
| | "is_vulnerable": is_vuln, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class PrimeVulAdapter(DatasetAdapter): |
| | slug = "primevul" |
| | display_name = "PrimeVul" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | label = "Vulnerable" if row.get("target", 0) == 1 else "Patched" |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("commit_id", str(idx))[:12], |
| | "entry_point": row.get("project", f"primevul_{idx}"), |
| | "num_inputs": 0, |
| | "source": label, |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | code = row.get("func", "") |
| | is_vuln = row.get("target", 0) == 1 |
| | cwe_list = list(row.get("cwe", [])) |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("commit_id", str(idx))[:12], |
| | "entry_point": row.get("project", f"primevul_{idx}"), |
| | "code": code, |
| | "highlighted_code": _highlight_code(code, language="c"), |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": "Vulnerable" if is_vuln else "Patched", |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": row.get("commit_message", ""), |
| | "vulnerable_code": code if is_vuln else "", |
| | "vulnerable_highlighted_code": _highlight_code(code, language="c") if is_vuln else "", |
| | "patched_code": code if not is_vuln else "", |
| | "patched_highlighted_code": ( |
| | _highlight_code(code, language="c") if not is_vuln else "" |
| | ), |
| | "cwe_id": ", ".join(cwe_list) if cwe_list else "", |
| | "project": row.get("project", ""), |
| | "language": "C/C++", |
| | "is_vulnerable": is_vuln, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class DevignAdapter(DatasetAdapter): |
| | slug = "devign" |
| | display_name = "Devign" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | label = "Vulnerable" if row.get("target", 0) == 1 else "Clean" |
| | return { |
| | "idx": idx, |
| | "task_id": str(row.get("commit_id", idx))[:12], |
| | "entry_point": row.get("project", f"devign_{idx}"), |
| | "num_inputs": 0, |
| | "source": label, |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | code = row.get("func", "") |
| | is_vuln = row.get("target", 0) == 1 |
| | return { |
| | "idx": idx, |
| | "task_id": str(row.get("commit_id", idx))[:12], |
| | "entry_point": row.get("project", f"devign_{idx}"), |
| | "code": code, |
| | "highlighted_code": _highlight_code(code, language="c"), |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": "Vulnerable" if is_vuln else "Clean", |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": row.get("commit_message", ""), |
| | "is_vulnerable": is_vuln, |
| | "project": row.get("project", ""), |
| | "language": "C", |
| | } |
| |
|