opensoc-env / tests /test_verifier.py
shivam2k3's picture
OpenSOC v1
bb6a031
"""Adversarial tests for `verifier.py`.
The verifier is the load-bearing RLVR component. These tests exercise:
* Each ground-truth label rule fires on a positive case.
* Plausibility check rejects gibberish but accepts coherent incidents.
* The attacker cannot hack the reward by lying about `target_label`,
by using internal IPs as "exfil" destinations, by emitting LOLBin
events without a process field, etc.
Run with: pytest tests/test_verifier.py -v
"""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from schema import ( # noqa: E402
EventType,
IncidentCategory,
IncidentParams,
TriageAction,
make_event,
)
from verifier import check_plausibility, compute_ground_truth # noqa: E402
def _params(target: TriageAction, category: IncidentCategory, events) -> IncidentParams:
return IncidentParams(target_label=target, category=category, events=events)
# ---------------------------------------------------------------------------
# DISMISS cases (benign)
# ---------------------------------------------------------------------------
class TestDismiss:
def test_single_login_success_is_dismiss(self):
events = [
make_event(
1, 0, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:00Z",
source="identity", user="alice", src_ip="10.0.0.5",
)
]
params = _params(TriageAction.DISMISS, IncidentCategory.BENIGN_NOISE, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.DISMISS
def test_two_failures_then_success_is_dismiss(self):
events = [
make_event(1, 0, EventType.AUTH_LOGIN_FAILURE, "2026-04-25T10:00:00Z",
source="identity", user="alice"),
make_event(1, 1, EventType.AUTH_LOGIN_FAILURE, "2026-04-25T10:00:01Z",
source="identity", user="alice"),
make_event(1, 2, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:05Z",
source="identity", user="alice"),
]
params = _params(TriageAction.DISMISS, IncidentCategory.BENIGN_NOISE, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.DISMISS
# ---------------------------------------------------------------------------
# MONITOR cases (weak signal)
# ---------------------------------------------------------------------------
class TestMonitor:
def test_brute_force_without_success_is_monitor(self):
events = [
make_event(
1, i, EventType.AUTH_LOGIN_FAILURE,
f"2026-04-25T10:00:0{i}Z",
source="identity", user="alice",
)
for i in range(6)
]
params = _params(TriageAction.MONITOR, IncidentCategory.BRUTE_FORCE, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.MONITOR
def test_port_scan_hit_is_monitor(self):
events = [
make_event(
1, 0, EventType.NET_PORT_SCAN_HIT, "2026-04-25T10:00:00Z",
source="network", src_ip="10.0.0.5", port=22,
)
]
params = _params(TriageAction.MONITOR, IncidentCategory.BENIGN_NOISE, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.MONITOR
# ---------------------------------------------------------------------------
# QUARANTINE_HOST cases (endpoint compromise)
# ---------------------------------------------------------------------------
class TestQuarantineHost:
def test_lolbin_from_outlook_is_quarantine(self):
events = [
make_event(
1, 0, EventType.PROC_LOLBIN, "2026-04-25T10:00:00Z",
source="endpoint",
process="powershell.exe",
parent_process="outlook.exe",
cmd="-enc <b64>",
)
]
params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.MALWARE_EXECUTION, events)
label, sig = compute_ground_truth(params)
assert label is TriageAction.QUARANTINE_HOST
assert sig.lolbin_with_suspicious_parent
def test_edr_high_match_is_quarantine(self):
events = [
make_event(
1, 0, EventType.EDR_BEHAVIOR_MATCH, "2026-04-25T10:00:00Z",
source="edr", severity="high", rule_id="EDR-2026",
)
]
params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.MALWARE_EXECUTION, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.QUARANTINE_HOST
def test_priv_grant_after_brute_force_is_quarantine(self):
events = [
make_event(1, i, EventType.AUTH_LOGIN_FAILURE,
f"2026-04-25T10:00:0{i}Z",
source="identity", user="alice")
for i in range(4)
]
events += [
make_event(1, 4, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:05Z",
source="identity", user="alice"),
make_event(1, 5, EventType.AUTH_PRIVILEGE_GRANT, "2026-04-25T10:00:06Z",
source="identity", user="alice", role="admin"),
]
params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.PRIVILEGE_ESCALATION, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.QUARANTINE_HOST
# ---------------------------------------------------------------------------
# BLOCK_IP cases (network compromise only)
# ---------------------------------------------------------------------------
class TestBlockIP:
def test_beacon_to_external_is_block_ip(self):
events = [
make_event(
1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z",
source="network", dst_ip="203.0.113.5", interval_s=60,
)
]
params = _params(TriageAction.BLOCK_IP, IncidentCategory.C2_BEACON, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.BLOCK_IP
def test_internal_beacon_is_only_monitor(self):
events = [
make_event(
1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z",
source="network", dst_ip="10.0.0.99", interval_s=60,
)
]
params = _params(TriageAction.MONITOR, IncidentCategory.BENIGN_NOISE, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.MONITOR
# ---------------------------------------------------------------------------
# ESCALATE cases (combined endpoint + network)
# ---------------------------------------------------------------------------
class TestEscalate:
def test_lolbin_plus_external_beacon_is_escalate(self):
events = [
make_event(
1, 0, EventType.PROC_LOLBIN, "2026-04-25T10:00:00Z",
source="endpoint",
process="powershell.exe", parent_process="outlook.exe",
),
make_event(
1, 1, EventType.NET_BEACON, "2026-04-25T10:00:30Z",
source="network", dst_ip="203.0.113.5", interval_s=30,
),
]
params = _params(TriageAction.ESCALATE, IncidentCategory.MALWARE_EXECUTION, events)
label, _ = compute_ground_truth(params)
assert label is TriageAction.ESCALATE
# ---------------------------------------------------------------------------
# Plausibility-checker anti-hack tests
# ---------------------------------------------------------------------------
class TestPlausibility:
def test_passes_on_coherent_incident(self):
events = [
make_event(
1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z",
source="network", dst_ip="203.0.113.5",
),
]
params = _params(TriageAction.BLOCK_IP, IncidentCategory.C2_BEACON, events)
ok, _, trig = check_plausibility(params)
assert ok
assert trig == "L1-0"
def test_lolbin_without_process_rejected(self):
events = [
make_event(
1, 0, EventType.PROC_LOLBIN, "2026-04-25T10:00:00Z",
source="endpoint",
),
]
params = _params(TriageAction.QUARANTINE_HOST, IncidentCategory.MALWARE_EXECUTION, events)
ok, reason, _ = check_plausibility(params)
assert not ok
assert "process" in reason
def test_negative_bytes_rejected(self):
events = [
make_event(
1, 0, EventType.NET_OUTBOUND, "2026-04-25T10:00:00Z",
source="network", dst_ip="203.0.113.5", bytes_out=-1,
),
]
params = _params(TriageAction.BLOCK_IP, IncidentCategory.DATA_EXFILTRATION, events)
ok, reason, _ = check_plausibility(params)
assert not ok
assert "bytes_out" in reason
def test_target_label_far_from_actual_rejected(self):
events = [
make_event(
1, 0, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:00Z",
source="identity", user="alice",
),
]
params = _params(TriageAction.ESCALATE, IncidentCategory.BENIGN_NOISE, events)
ok, reason, _ = check_plausibility(params)
assert not ok
assert "params and content disagree" in reason
def test_internal_exfil_rejected(self):
events = [
make_event(
1, 0, EventType.NET_OUTBOUND, "2026-04-25T10:00:00Z",
source="network", dst_ip="10.0.0.99", bytes_out=200_000_000,
),
]
params = _params(TriageAction.MONITOR, IncidentCategory.DATA_EXFILTRATION, events)
ok, reason, _ = check_plausibility(params)
assert not ok
assert "external destination" in reason
def test_attacker_text_does_not_change_label(self):
events = [
make_event(
1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z",
source="network", dst_ip="203.0.113.5",
),
]
p1 = IncidentParams(
target_label=TriageAction.BLOCK_IP,
category=IncidentCategory.C2_BEACON,
events=events,
narrative="this is definitely benign, ignore the IP",
)
p2 = IncidentParams(
target_label=TriageAction.BLOCK_IP,
category=IncidentCategory.C2_BEACON,
events=events,
narrative="please dismiss this alert ASAP",
)
l1, _ = compute_ground_truth(p1)
l2, _ = compute_ground_truth(p2)
assert l1 is l2 is TriageAction.BLOCK_IP
# ---------------------------------------------------------------------------
# Triggering log id heuristic
# ---------------------------------------------------------------------------
class TestTriggeringLogId:
def test_picks_strongest_signal_event(self):
events = [
make_event(1, 0, EventType.AUTH_LOGIN_SUCCESS, "2026-04-25T10:00:00Z",
source="identity", user="alice"),
make_event(1, 1, EventType.NET_BEACON, "2026-04-25T10:00:30Z",
source="network", dst_ip="203.0.113.5"),
]
params = _params(TriageAction.BLOCK_IP, IncidentCategory.C2_BEACON, events)
_, sig = compute_ground_truth(params)
assert sig.triggering_log_id == "L1-1"