open-envs / tests /test_vuln_fixtures.py
user.email
reward
b2cd136
import random
import unittest
from curriculum.adaptive_sampler import AdaptiveSampler
from data.code_scenarios import CORPUS
from data.fixtures import FIXTURES, NPM_FIXTURES, PYPI_FIXTURES, VULN_CLASSES, get_mock_vulns
from examples.catalog import CURATED_EXAMPLES, sample_curated_example
from env.models import GroundTruthEvidence
from env.verification import version_at_least, weighted_ranking_score
from graders.core import grade_task_1, grade_task_2, grade_task_3
class TestFixtureIntegrity(unittest.TestCase):
def test_fixture_catalog_is_populated(self):
self.assertGreaterEqual(len(FIXTURES), 40)
self.assertGreaterEqual(len(PYPI_FIXTURES), 18)
self.assertGreaterEqual(len(NPM_FIXTURES), 18)
def test_fixture_ids_are_unique(self):
cve_ids = [item["cve_id"] for item in FIXTURES]
self.assertEqual(len(cve_ids), len(set(cve_ids)))
def test_fixture_class_coverage(self):
self.assertGreaterEqual(len(VULN_CLASSES), 10)
self.assertTrue({"rce", "ssrf", "redos"}.issubset(set(VULN_CLASSES)))
def test_mock_lookup_matches_fixture(self):
results = get_mock_vulns("jinja2", "3.1.3", "PyPI")
self.assertTrue(results)
self.assertEqual(results[0]["cve_id"], "CVE-2024-56326")
class TestCuratedExamples(unittest.TestCase):
def test_examples_meet_requested_volume(self):
self.assertGreaterEqual(len(CURATED_EXAMPLES), 50)
def test_examples_have_severity_mix(self):
severities = {item.severity for item in CURATED_EXAMPLES}
self.assertIn("CRITICAL", severities)
self.assertIn("HIGH", severities)
self.assertIn("MEDIUM", severities)
def test_examples_have_high_risk_pool(self):
high_risk = [item for item in CURATED_EXAMPLES if item.cvss_score >= 8.8]
self.assertGreaterEqual(len(high_risk), 10)
def test_high_risk_sampling_prefers_critical_examples(self):
example = sample_curated_example(random.Random(7), high_risk_only=True)
self.assertGreaterEqual(example.cvss_score, 8.8)
self.assertTrue(example.vuln_lines)
def test_code_scenarios_are_built_from_curated_examples(self):
self.assertEqual(len(CORPUS), len(CURATED_EXAMPLES))
self.assertEqual(CORPUS[0].present_vulns[0], CURATED_EXAMPLES[0].cve_id)
self.assertTrue(CORPUS[0].vuln_lines)
self.assertEqual(CORPUS[0].path, CURATED_EXAMPLES[0].path)
class TestGraders(unittest.TestCase):
def test_grade_task_1_perfect_score(self):
score = grade_task_1(["CVE-1", "CVE-2"], ["CVE-1", "CVE-2"], 0, 2)
self.assertAlmostEqual(score, 1.0)
def test_grade_task_1_penalizes_false_positives(self):
clean = grade_task_1(["CVE-1"], ["CVE-1"], 0, 1)
noisy = grade_task_1(["CVE-1"], ["CVE-1"], 2, 1)
self.assertGreater(clean, noisy)
def test_grade_task_2_bounds(self):
self.assertEqual(grade_task_2(0, 0, 0), 0.0)
self.assertAlmostEqual(grade_task_2(4, 4, 0), 1.0)
self.assertLess(grade_task_2(4, 4, 3), 1.0)
def test_grade_task_3_requires_critical_resolution(self):
self.assertEqual(grade_task_3(5, 5, 1, 0, 3, 3), 0.0)
self.assertGreater(grade_task_3(5, 5, 0, 0, 3, 3), 0.9)
def test_version_floor_comparison_handles_patch_levels(self):
self.assertTrue(version_at_least("4.44.0", "4.44.0"))
self.assertTrue(version_at_least("4.44.1", "4.44.0"))
self.assertFalse(version_at_least("4.43.9", "4.44.0"))
def test_weighted_ranking_rewards_high_risk_ordering(self):
ranked = sorted(CURATED_EXAMPLES[:4], key=lambda item: (item.cvss_score, item.severity), reverse=True)[:2]
evidence = {
item.cve_id: GroundTruthEvidence(
cve_id=item.cve_id,
package=item.package,
severity=item.severity,
cvss_score=item.cvss_score,
fixed_version=item.fixed_version,
summary=item.summary,
file_path=item.path,
language=item.language,
line_numbers=list(item.vuln_lines),
code_excerpt="",
context=item.context,
incident_source=item.incident_source,
)
for item in ranked
}
correct = weighted_ranking_score([ranked[0].cve_id, ranked[1].cve_id], evidence)
swapped = weighted_ranking_score([ranked[1].cve_id, ranked[0].cve_id], evidence)
self.assertGreaterEqual(correct, swapped)
class TestAdaptiveSampling(unittest.TestCase):
def test_sampler_uses_expected_buckets(self):
sampler = AdaptiveSampler()
limits = {1: (0.0, 0.45), 2: (0.25, 0.65), 3: (0.45, 1.0)}
for task_id, (lower, upper) in limits.items():
slot = sampler.sample_scenario(task_id)
difficulty = CORPUS[slot.idx].difficulty
self.assertGreaterEqual(difficulty, lower)
self.assertLessEqual(difficulty, upper)
def test_sampler_updates_skill_after_reward(self):
sampler = AdaptiveSampler()
original = sampler.skill
sampler.sample_scenario(1)
sampler.update_skill(1.0)
self.assertGreater(sampler.skill, original)
if __name__ == "__main__":
unittest.main()