open-envs / data /code_scenarios.py
user.email
reward
b2cd136
"""
Code scenario corpus backed by curated real-world dependency incidents.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import random
from examples.catalog import CURATED_EXAMPLES
@dataclass(frozen=True)
class CodeScenario:
idx: int
path: str
code: str
language: str
ecosystem: str
package: str
severity: str
cvss_score: float
fixed_version: str
summary: str
present_vulns: Tuple[str, ...]
decoy_imports: Tuple[str, ...]
difficulty: float
vuln_lines: Tuple[int, ...]
fix_hint: str
context: str
incident_source: str
def _fix_hint(package: str, fixed_version: str, summary: str) -> str:
return f"Upgrade {package}>={fixed_version}; {summary}"
def build_corpus(rng: Optional[random.Random] = None) -> List[CodeScenario]:
del rng
scenarios: List[CodeScenario] = []
for example in CURATED_EXAMPLES:
scenarios.append(
CodeScenario(
idx=example.idx,
path=example.path,
code=example.code,
language=example.language,
ecosystem=example.ecosystem,
package=example.package,
severity=example.severity,
cvss_score=float(example.cvss_score),
fixed_version=example.fixed_version,
summary=example.summary,
present_vulns=(example.cve_id,),
decoy_imports=(),
difficulty=example.difficulty,
vuln_lines=example.vuln_lines,
fix_hint=_fix_hint(example.package, example.fixed_version, example.summary),
context=f"{example.title}. {example.context}",
incident_source=example.incident_source,
)
)
return scenarios
def build_composite(
base: List[CodeScenario],
n_files: int = 3,
rng: Optional[random.Random] = None,
) -> Dict[str, Any]:
chooser = rng or random.Random(31415)
shuffled = list(base)
chooser.shuffle(shuffled)
picks: List[CodeScenario] = []
seen_cves = set()
for scenario in shuffled:
cve_id = scenario.present_vulns[0]
if cve_id in seen_cves:
continue
picks.append(scenario)
seen_cves.add(cve_id)
if len(picks) >= min(n_files, len(base)):
break
if len(picks) < min(n_files, len(base)):
for scenario in shuffled:
if scenario in picks:
continue
picks.append(scenario)
if len(picks) >= min(n_files, len(base)):
break
files: Dict[str, str] = {}
all_vulns: List[str] = []
total_diff = 0.0
selected: List[Tuple[str, CodeScenario]] = []
seen_paths = set()
for i, scenario in enumerate(picks):
file_name = scenario.path
if file_name in seen_paths:
stem, dot, suffix = file_name.rpartition(".")
if dot:
file_name = f"{stem}_{i}.{suffix}"
else:
file_name = f"{file_name}_{i}"
seen_paths.add(file_name)
files[file_name] = scenario.code
all_vulns.extend(scenario.present_vulns)
total_diff += scenario.difficulty
selected.append((file_name, scenario))
return {
"files": files,
"vulns": list(dict.fromkeys(all_vulns)),
"difficulty": round(total_diff / max(1, len(picks)), 2),
"n_files": len(picks),
"scenarios": selected,
}
CORPUS = build_corpus()
CORPUS_BY_DIFFICULTY = sorted(CORPUS, key=lambda scenario: scenario.difficulty)