Spaces:
Sleeping
Sleeping
| """Adversarial tests for `verifier.py`. | |
| The verifier is the load-bearing RLVR component. These tests exercise: | |
| * Each ground-truth label rule fires on a positive case. | |
| * Plausibility check rejects gibberish but accepts coherent incidents. | |
| * The attacker cannot hack the reward by lying about `target_label`, | |
| by using internal IPs as "exfil" destinations, by emitting LOLBin | |
| events without a process field, etc. | |
| Run with: pytest tests/test_verifier.py -v | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sys | |
| import pytest | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) | |
| from schema import ( # noqa: E402 | |
| EventType, | |
| IncidentCategory, | |
| IncidentParams, | |
| TriageAction, | |
| make_event, | |
| ) | |
| from verifier import check_plausibility, compute_ground_truth # noqa: E402 | |
| def _params(target: TriageAction, category: IncidentCategory, events) -> IncidentParams: | |
| return IncidentParams(target_label=target, category=category, events=events) | |
| # --------------------------------------------------------------------------- | |
| # DISMISS cases (benign) | |
| # --------------------------------------------------------------------------- | |
| class TestDismiss: | |
| def test_single_login_success_is_dismiss(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:00Z", | |
| source="identity", user="alice", src_ip="10.0.0.5", | |
| ) | |
| ] | |
| params = _params(TriageAction.DISMISS, IncidentCategory.BENIGN_NOISE, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.DISMISS | |
| def test_two_failures_then_success_is_dismiss(self): | |
| events = [ | |
| make_event(1, 0, EventType.AUTH_LOGIN_FAILURE, "2026-04-25T10:00:00Z", | |
| source="identity", user="alice"), | |
| make_event(1, 1, EventType.AUTH_LOGIN_FAILURE, "2026-04-25T10:00:01Z", | |
| source="identity", user="alice"), | |
| make_event(1, 2, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:05Z", | |
| source="identity", user="alice"), | |
| ] | |
| params = _params(TriageAction.DISMISS, IncidentCategory.BENIGN_NOISE, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.DISMISS | |
| # --------------------------------------------------------------------------- | |
| # MONITOR cases (weak signal) | |
| # --------------------------------------------------------------------------- | |
| class TestMonitor: | |
| def test_brute_force_without_success_is_monitor(self): | |
| events = [ | |
| make_event( | |
| 1, i, EventType.AUTH_LOGIN_FAILURE, | |
| f"2026-04-25T10:00:0{i}Z", | |
| source="identity", user="alice", | |
| ) | |
| for i in range(6) | |
| ] | |
| params = _params(TriageAction.MONITOR, IncidentCategory.BRUTE_FORCE, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.MONITOR | |
| def test_port_scan_hit_is_monitor(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_PORT_SCAN_HIT, "2026-04-25T10:00:00Z", | |
| source="network", src_ip="10.0.0.5", port=22, | |
| ) | |
| ] | |
| params = _params(TriageAction.MONITOR, IncidentCategory.BENIGN_NOISE, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.MONITOR | |
| # --------------------------------------------------------------------------- | |
| # QUARANTINE_HOST cases (endpoint compromise) | |
| # --------------------------------------------------------------------------- | |
| class TestQuarantineHost: | |
| def test_lolbin_from_outlook_is_quarantine(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.PROC_LOLBIN, "2026-04-25T10:00:00Z", | |
| source="endpoint", | |
| process="powershell.exe", | |
| parent_process="outlook.exe", | |
| cmd="-enc <b64>", | |
| ) | |
| ] | |
| params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.MALWARE_EXECUTION, events) | |
| label, sig = compute_ground_truth(params) | |
| assert label is TriageAction.QUARANTINE_HOST | |
| assert sig.lolbin_with_suspicious_parent | |
| def test_edr_high_match_is_quarantine(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.EDR_BEHAVIOR_MATCH, "2026-04-25T10:00:00Z", | |
| source="edr", severity="high", rule_id="EDR-2026", | |
| ) | |
| ] | |
| params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.MALWARE_EXECUTION, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.QUARANTINE_HOST | |
| def test_priv_grant_after_brute_force_is_quarantine(self): | |
| events = [ | |
| make_event(1, i, EventType.AUTH_LOGIN_FAILURE, | |
| f"2026-04-25T10:00:0{i}Z", | |
| source="identity", user="alice") | |
| for i in range(4) | |
| ] | |
| events += [ | |
| make_event(1, 4, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:05Z", | |
| source="identity", user="alice"), | |
| make_event(1, 5, EventType.AUTH_PRIVILEGE_GRANT, "2026-04-25T10:00:06Z", | |
| source="identity", user="alice", role="admin"), | |
| ] | |
| params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.PRIVILEGE_ESCALATION, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.QUARANTINE_HOST | |
| # --------------------------------------------------------------------------- | |
| # BLOCK_IP cases (network compromise only) | |
| # --------------------------------------------------------------------------- | |
| class TestBlockIP: | |
| def test_beacon_to_external_is_block_ip(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z", | |
| source="network", dst_ip="203.0.113.5", interval_s=60, | |
| ) | |
| ] | |
| params = _params(TriageAction.BLOCK_IP, IncidentCategory.C2_BEACON, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.BLOCK_IP | |
| def test_internal_beacon_is_only_monitor(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z", | |
| source="network", dst_ip="10.0.0.99", interval_s=60, | |
| ) | |
| ] | |
| params = _params(TriageAction.MONITOR, IncidentCategory.BENIGN_NOISE, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.MONITOR | |
| # --------------------------------------------------------------------------- | |
| # ESCALATE cases (combined endpoint + network) | |
| # --------------------------------------------------------------------------- | |
| class TestEscalate: | |
| def test_lolbin_plus_external_beacon_is_escalate(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.PROC_LOLBIN, "2026-04-25T10:00:00Z", | |
| source="endpoint", | |
| process="powershell.exe", parent_process="outlook.exe", | |
| ), | |
| make_event( | |
| 1, 1, EventType.NET_BEACON, "2026-04-25T10:00:30Z", | |
| source="network", dst_ip="203.0.113.5", interval_s=30, | |
| ), | |
| ] | |
| params = _params(TriageAction.ESCALATE, IncidentCategory.MALWARE_EXECUTION, events) | |
| label, _ = compute_ground_truth(params) | |
| assert label is TriageAction.ESCALATE | |
| # --------------------------------------------------------------------------- | |
| # Plausibility-checker anti-hack tests | |
| # --------------------------------------------------------------------------- | |
| class TestPlausibility: | |
| def test_passes_on_coherent_incident(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z", | |
| source="network", dst_ip="203.0.113.5", | |
| ), | |
| ] | |
| params = _params(TriageAction.BLOCK_IP, IncidentCategory.C2_BEACON, events) | |
| ok, _, trig = check_plausibility(params) | |
| assert ok | |
| assert trig == "L1-0" | |
| def test_lolbin_without_process_rejected(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.PROC_LOLBIN, "2026-04-25T10:00:00Z", | |
| source="endpoint", | |
| ), | |
| ] | |
| params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.MALWARE_EXECUTION, events) | |
| ok, reason, _ = check_plausibility(params) | |
| assert not ok | |
| assert "process" in reason | |
| def test_negative_bytes_rejected(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_OUTBOUND, "2026-04-25T10:00:00Z", | |
| source="network", dst_ip="203.0.113.5", bytes_out=-1, | |
| ), | |
| ] | |
| params = _params(TriageAction.BLOCK_IP, IncidentCategory.DATA_EXFILTRATION, events) | |
| ok, reason, _ = check_plausibility(params) | |
| assert not ok | |
| assert "bytes_out" in reason | |
| def test_target_label_far_from_actual_rejected(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:00Z", | |
| source="identity", user="alice", | |
| ), | |
| ] | |
| params = _params(TriageAction.ESCALATE, IncidentCategory.BENIGN_NOISE, events) | |
| ok, reason, _ = check_plausibility(params) | |
| assert not ok | |
| assert "params and content disagree" in reason | |
| def test_internal_exfil_rejected(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_OUTBOUND, "2026-04-25T10:00:00Z", | |
| source="network", dst_ip="10.0.0.99", bytes_out=200_000_000, | |
| ), | |
| ] | |
| params = _params(TriageAction.MONITOR, IncidentCategory.DATA_EXFILTRATION, events) | |
| ok, reason, _ = check_plausibility(params) | |
| assert not ok | |
| assert "external destination" in reason | |
| def test_attacker_text_does_not_change_label(self): | |
| events = [ | |
| make_event( | |
| 1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z", | |
| source="network", dst_ip="203.0.113.5", | |
| ), | |
| ] | |
| p1 = IncidentParams( | |
| target_label=TriageAction.BLOCK_IP, | |
| category=IncidentCategory.C2_BEACON, | |
| events=events, | |
| narrative="this is definitely benign, ignore the IP", | |
| ) | |
| p2 = IncidentParams( | |
| target_label=TriageAction.BLOCK_IP, | |
| category=IncidentCategory.C2_BEACON, | |
| events=events, | |
| narrative="please dismiss this alert ASAP", | |
| ) | |
| l1, _ = compute_ground_truth(p1) | |
| l2, _ = compute_ground_truth(p2) | |
| assert l1 is l2 is TriageAction.BLOCK_IP | |
| # --------------------------------------------------------------------------- | |
| # Triggering log id heuristic | |
| # --------------------------------------------------------------------------- | |
| class TestTriggeringLogId: | |
| def test_picks_strongest_signal_event(self): | |
| events = [ | |
| make_event(1, 0, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:00Z", | |
| source="identity", user="alice"), | |
| make_event(1, 1, EventType.NET_BEACON, "2026-04-25T10:00:30Z", | |
| source="network", dst_ip="203.0.113.5"), | |
| ] | |
| params = _params(TriageAction.BLOCK_IP, IncidentCategory.C2_BEACON, events) | |
| _, sig = compute_ground_truth(params) | |
| assert sig.triggering_log_id == "L1-1" | |