| """Tests for Task 8 — 10-dimensional Grader.""" |
|
|
| import os |
| import sys |
|
|
| _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) |
| if _PROJECT_ROOT not in sys.path: |
| sys.path.insert(0, _PROJECT_ROOT) |
|
|
| from server.graders import grade_episode, grade_easy |
| from server.threat_graph import ( |
| ThreatGraph, |
| HostNode, |
| ProcessNode, |
| IOCNode, |
| VulnerabilityNode, |
| AlertNode, |
| ) |
| from models import SOCState |
|
|
|
|
| def _state(**overrides): |
| s = SOCState(episode_id="e", step_count=0) |
| for k, v in overrides.items(): |
| setattr(s, k, v) |
| return s |
|
|
|
|
| def _task_def_simple(): |
| return { |
| "containment_requirements": { |
| "must_kill": [{"hostname": "WS-001", "process": "evil.exe", "threat_id": "T1"}], |
| "must_block_iocs": ["1.2.3.4"], |
| "must_forensics": ["WS-001"], |
| "must_not_isolate": [], |
| }, |
| "attack_chain": [{"threat_id": "T1"}], |
| } |
|
|
|
|
| def _empty_graph(): |
| return ThreatGraph() |
|
|
|
|
| def test_returns_correct_keys(): |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) |
| for k in ("final_score", "breakdown", "penalties", "bonuses", "reward_functions"): |
| assert k in res |
|
|
|
|
| def test_breakdown_has_10_keys(): |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) |
| assert len(res["breakdown"]) == 10 |
|
|
|
|
| def test_reward_functions_has_10_keys(): |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) |
| assert len(res["reward_functions"]) == 10 |
|
|
|
|
| def test_all_rubric_met_scores_high(): |
| g = ThreatGraph() |
| g.add_host(HostNode(hostname="WS-001", subnet="corporate", |
| business_criticality="medium", status="contained")) |
| g.add_ioc(IOCNode(ioc_value="1.2.3.4", ioc_type="ip", confidence=0.9, enriched=True, blocked=True)) |
| g.add_process(ProcessNode(process_id="WS-001:1", hostname="WS-001", |
| process_name="evil.exe", killed=True)) |
| g.add_vulnerability(VulnerabilityNode( |
| cve_id="CVE-1", hostname="WS-001", cvss_score=9.0, |
| exploitability="active", patch_available=True, |
| exploited_by_threat="T1", |
| )) |
| state = _state( |
| killed_processes=[{"hostname": "WS-001", "process": "evil.exe"}], |
| blocked_iocs=["1.2.3.4"], |
| scanned_hosts=["WS-001"], |
| enriched_iocs=["1.2.3.4"], |
| correlated_alert_pairs=[("A1", "A2")], |
| triggered_playbooks=["ransomware_containment"], |
| ) |
| actions = [ |
| {"action_type": "correlate_alerts", "target": "A"}, |
| {"action_type": "kill_process", "target": "WS-001"}, |
| ] |
| plan = {"entries": [{"threat_id": "T1", "actions_taken": ["kill"], "root_cause": "CVE-1", "confidence": 0.9}], |
| "primary_threat_id": "T1"} |
| res = grade_episode(actions, plan, g, _task_def_simple(), state) |
| assert res["final_score"] >= 0.7 |
|
|
|
|
| def test_no_actions_scores_low(): |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) |
| assert res["final_score"] <= 0.3 |
|
|
|
|
| def test_blind_blocking_penalty(): |
| state = _state(blocked_iocs=["1.2.3.4"], enriched_iocs=[]) |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), state) |
| pen_types = [p["type"] for p in res["penalties"]] |
| assert "blind_blocking" in pen_types |
|
|
|
|
| def test_business_impact_penalises_over_isolation(): |
| g = ThreatGraph() |
| |
| for i in range(10): |
| status = "isolated" if i < 3 else "healthy" |
| g.add_host(HostNode(hostname=f"H{i}", subnet="corporate", |
| business_criticality="medium", status=status)) |
| res = grade_episode([], None, g, _task_def_simple(), _state()) |
| assert res["breakdown"]["business_impact"] < 0.5 |
|
|
|
|
| def test_step_efficiency_bonus_for_playbook(): |
| state = _state(triggered_playbooks=["ransomware_containment"]) |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), state) |
| assert res["breakdown"]["step_efficiency"] > 0.5 |
|
|
|
|
| def test_plan_coverage_zero_without_plan(): |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) |
| assert res["breakdown"]["plan_coverage"] == 0.0 |
|
|
|
|
| def test_final_score_clamped_0_to_1(): |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) |
| assert 0.0 <= res["final_score"] <= 1.0 |
|
|
|
|
| def test_wrappers_still_return_float(): |
| val = grade_easy([], None, _empty_graph(), _task_def_simple(), _state()) |
| assert isinstance(val, float) |
|
|