Spaces:
Sleeping
Sleeping
| """Tests for Task 8 — 10-dimensional Grader.""" | |
| import os | |
| import sys | |
| _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) | |
| if _PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, _PROJECT_ROOT) | |
| from server.graders import grade_episode, grade_easy | |
| from server.threat_graph import ( | |
| ThreatGraph, | |
| HostNode, | |
| ProcessNode, | |
| IOCNode, | |
| VulnerabilityNode, | |
| AlertNode, | |
| ) | |
| from models import SOCState | |
| def _state(**overrides): | |
| s = SOCState(episode_id="e", step_count=0) | |
| for k, v in overrides.items(): | |
| setattr(s, k, v) | |
| return s | |
| def _task_def_simple(): | |
| return { | |
| "containment_requirements": { | |
| "must_kill": [{"hostname": "WS-001", "process": "evil.exe", "threat_id": "T1"}], | |
| "must_block_iocs": ["1.2.3.4"], | |
| "must_forensics": ["WS-001"], | |
| "must_not_isolate": [], | |
| }, | |
| "attack_chain": [{"threat_id": "T1"}], | |
| } | |
| def _empty_graph(): | |
| return ThreatGraph() | |
| def test_returns_correct_keys(): | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) | |
| for k in ("final_score", "breakdown", "penalties", "bonuses", "reward_functions"): | |
| assert k in res | |
| def test_breakdown_has_10_keys(): | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) | |
| assert len(res["breakdown"]) == 10 | |
| def test_reward_functions_has_10_keys(): | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) | |
| assert len(res["reward_functions"]) == 10 | |
| def test_all_rubric_met_scores_high(): | |
| g = ThreatGraph() | |
| g.add_host(HostNode(hostname="WS-001", subnet="corporate", | |
| business_criticality="medium", status="contained")) | |
| g.add_ioc(IOCNode(ioc_value="1.2.3.4", ioc_type="ip", confidence=0.9, enriched=True, blocked=True)) | |
| g.add_process(ProcessNode(process_id="WS-001:1", hostname="WS-001", | |
| process_name="evil.exe", killed=True)) | |
| g.add_vulnerability(VulnerabilityNode( | |
| cve_id="CVE-1", hostname="WS-001", cvss_score=9.0, | |
| exploitability="active", patch_available=True, | |
| exploited_by_threat="T1", | |
| )) | |
| state = _state( | |
| killed_processes=[{"hostname": "WS-001", "process": "evil.exe"}], | |
| blocked_iocs=["1.2.3.4"], | |
| scanned_hosts=["WS-001"], | |
| enriched_iocs=["1.2.3.4"], | |
| correlated_alert_pairs=[("A1", "A2")], | |
| triggered_playbooks=["ransomware_containment"], | |
| ) | |
| actions = [ | |
| {"action_type": "correlate_alerts", "target": "A"}, | |
| {"action_type": "kill_process", "target": "WS-001"}, | |
| ] | |
| plan = {"entries": [{"threat_id": "T1", "actions_taken": ["kill"], "root_cause": "CVE-1", "confidence": 0.9}], | |
| "primary_threat_id": "T1"} | |
| res = grade_episode(actions, plan, g, _task_def_simple(), state) | |
| assert res["final_score"] >= 0.7 | |
| def test_no_actions_scores_low(): | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) | |
| assert res["final_score"] <= 0.3 | |
| def test_blind_blocking_penalty(): | |
| state = _state(blocked_iocs=["1.2.3.4"], enriched_iocs=[]) | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), state) | |
| pen_types = [p["type"] for p in res["penalties"]] | |
| assert "blind_blocking" in pen_types | |
| def test_business_impact_penalises_over_isolation(): | |
| g = ThreatGraph() | |
| # 10 hosts, 3 isolated -> 30% | |
| for i in range(10): | |
| status = "isolated" if i < 3 else "healthy" | |
| g.add_host(HostNode(hostname=f"H{i}", subnet="corporate", | |
| business_criticality="medium", status=status)) | |
| res = grade_episode([], None, g, _task_def_simple(), _state()) | |
| assert res["breakdown"]["business_impact"] < 0.5 | |
| def test_step_efficiency_bonus_for_playbook(): | |
| state = _state(triggered_playbooks=["ransomware_containment"]) | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), state) | |
| assert res["breakdown"]["step_efficiency"] > 0.5 | |
| def test_plan_coverage_zero_without_plan(): | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) | |
| assert res["breakdown"]["plan_coverage"] == 0.0 | |
| def test_final_score_clamped_0_to_1(): | |
| res = grade_episode([], None, _empty_graph(), _task_def_simple(), _state()) | |
| assert 0.0 <= res["final_score"] <= 1.0 | |
| def test_wrappers_still_return_float(): | |
| val = grade_easy([], None, _empty_graph(), _task_def_simple(), _state()) | |
| assert isinstance(val, float) | |