opensoc-env / verifier.py
shivam2k3's picture
OpenSOC v1
bb6a031
"""
verifier.py — Deterministic ground-truth labeler + plausibility checker.
This is the *single most important* file for RLVR correctness in OpenSOC.
Two functions are exposed:
* `compute_ground_truth(params)` — returns a `TriageAction` derived purely
from the structured event content of the incident. The attacker's
`target_label` and any free-text narrative are NEVER consulted here.
This is the authoritative answer the defender's reward is graded against.
* `check_plausibility(params)` — returns `(ok, reason, triggering_log_id)`.
Validates that the events form a coherent, non-self-contradictory
incident (e.g. internal-only "exfil", LOLBin without a parent, beacons
without a destination). The attacker is only credited for fooling the
defender on incidents that pass this check.
The label rules are intentionally a transparent rule-set rather than a
trained classifier — this is what makes the reward verifiable and
reproducible. Any rule change must come with corresponding tests in
`tests/test_verifier.py`.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple
from schema import (
ACTION_COST,
Event,
EventType,
IncidentParams,
KNOWN_LOLBINS,
SUSPICIOUS_LOLBIN_PARENTS,
TriageAction,
is_internal_ip,
)
# ---------------------------------------------------------------------------
# Signal extraction
# ---------------------------------------------------------------------------
@dataclass
class IncidentSignals:
"""Aggregate diagnostic signals computed from the event list.
These are intermediate features the labeler uses; exposing them lets
tests assert *why* the verifier reached a verdict.
"""
auth_failures: int = 0
distinct_failed_users: int = 0
successful_login_after_failures: bool = False
privilege_grants: int = 0
lolbin_with_suspicious_parent: bool = False
lolbin_count: int = 0
proc_parent_mismatch: bool = False
beacon_to_external: bool = False
beacon_count: int = 0
large_outbound_to_external: bool = False
largest_outbound_bytes: int = 0
port_scan_hits: int = 0
edr_high_match: bool = False
file_double_ext: bool = False
cloud_key_created_from_new_ip: bool = False
email_link_clicked_external: bool = False
email_attachment_opened_macro: bool = False
# The single event the verifier considers most diagnostic; defenders
# who cite this log_id earn a +0.1 rubric bonus.
triggering_log_id: Optional[str] = None
def _extract_signals(events: List[Event]) -> IncidentSignals:
"""Walk the event list once and compute aggregate signals."""
sig = IncidentSignals()
failed_users: set[str] = set()
last_auth_failure_idx: Optional[int] = None
diagnostic_score: int = -1
diagnostic_log_id: Optional[str] = None
def bump(score: int, log_id: str) -> None:
nonlocal diagnostic_score, diagnostic_log_id
if score > diagnostic_score:
diagnostic_score = score
diagnostic_log_id = log_id
for idx, e in enumerate(events):
f = e.fields or {}
if e.event_type is EventType.AUTH_LOGIN_FAILURE:
sig.auth_failures += 1
if "user" in f:
failed_users.add(str(f["user"]))
last_auth_failure_idx = idx
bump(1, e.log_id)
elif e.event_type is EventType.AUTH_LOGIN_SUCCESS:
if last_auth_failure_idx is not None and sig.auth_failures >= 3:
sig.successful_login_after_failures = True
bump(5, e.log_id)
elif e.event_type is EventType.AUTH_PRIVILEGE_GRANT:
sig.privilege_grants += 1
bump(7, e.log_id)
elif e.event_type is EventType.PROC_LOLBIN:
sig.lolbin_count += 1
parent = str(f.get("parent_process", "")).lower()
binary = str(f.get("process", "")).lower()
if binary in KNOWN_LOLBINS and parent in SUSPICIOUS_LOLBIN_PARENTS:
sig.lolbin_with_suspicious_parent = True
bump(8, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.PROC_PARENT_MISMATCH:
sig.proc_parent_mismatch = True
bump(6, e.log_id)
elif e.event_type is EventType.NET_BEACON:
sig.beacon_count += 1
dst = str(f.get("dst_ip", ""))
if dst and not is_internal_ip(dst):
sig.beacon_to_external = True
bump(7, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.NET_OUTBOUND:
dst = str(f.get("dst_ip", ""))
try:
size = int(f.get("bytes_out", 0))
except (TypeError, ValueError):
size = 0
sig.largest_outbound_bytes = max(sig.largest_outbound_bytes, size)
if dst and not is_internal_ip(dst) and size >= 50_000_000:
sig.large_outbound_to_external = True
bump(7, e.log_id)
elif e.event_type is EventType.NET_PORT_SCAN_HIT:
sig.port_scan_hits += 1
bump(3, e.log_id)
elif e.event_type is EventType.EDR_BEHAVIOR_MATCH:
sev = str(f.get("severity", "medium")).lower()
if sev in ("high", "critical"):
sig.edr_high_match = True
bump(8, e.log_id)
else:
bump(3, e.log_id)
elif e.event_type is EventType.FILE_RENAME_DOUBLE_EXT:
sig.file_double_ext = True
bump(5, e.log_id)
elif e.event_type is EventType.CLOUD_KEY_CREATED:
ip = str(f.get("source_ip", ""))
if ip and not is_internal_ip(ip):
sig.cloud_key_created_from_new_ip = True
bump(7, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.EMAIL_LINK_CLICKED:
url = str(f.get("url", ""))
if any(tld in url for tld in (".ru", ".cn", ".top", ".xyz", ".click")):
sig.email_link_clicked_external = True
bump(5, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.EMAIL_ATTACHMENT_OPENED:
if bool(f.get("macro_enabled", False)):
sig.email_attachment_opened_macro = True
bump(6, e.log_id)
else:
bump(2, e.log_id)
sig.distinct_failed_users = len(failed_users)
sig.triggering_log_id = diagnostic_log_id or (events[-1].log_id if events else None)
return sig
# ---------------------------------------------------------------------------
# Ground-truth labeler
# ---------------------------------------------------------------------------
def compute_ground_truth(params: IncidentParams) -> Tuple[TriageAction, IncidentSignals]:
"""Derive the canonical triage action from event content.
Rules (highest-priority first; first match wins):
1. ESCALATE if the incident shows multiple containment-grade signals
(e.g. LOLBin-with-suspicious-parent AND beacon-to-external) — these
are blast-radius events worth a human pager.
2. BLOCK_IP if there is a clear external malicious-network signal
(beacon-to-external, large outbound to external, malicious URL click).
3. QUARANTINE_HOST if there is a clear endpoint-compromise signal
(LOLBin from suspicious parent, parent mismatch, EDR high match,
privilege grant after auth-failure burst).
4. MONITOR for weaker signals worth watching (port scan hits,
email link click to common TLD, brute-force-without-success).
5. DISMISS otherwise (benign noise).
"""
sig = _extract_signals(params.events)
network_compromise = (
sig.beacon_to_external
or sig.large_outbound_to_external
or sig.email_link_clicked_external
)
endpoint_compromise = (
sig.lolbin_with_suspicious_parent
or sig.proc_parent_mismatch
or sig.edr_high_match
or (sig.privilege_grants > 0 and sig.successful_login_after_failures)
or sig.cloud_key_created_from_new_ip
or sig.email_attachment_opened_macro
)
if network_compromise and endpoint_compromise:
return TriageAction.ESCALATE, sig
if network_compromise:
return TriageAction.BLOCK_IP, sig
if endpoint_compromise:
return TriageAction.QUARANTINE_HOST, sig
monitor_signal = (
sig.port_scan_hits >= 1
or sig.successful_login_after_failures
or (sig.auth_failures >= 5 and sig.distinct_failed_users >= 1)
or sig.file_double_ext
or sig.beacon_count > 0
)
if monitor_signal:
return TriageAction.MONITOR, sig
return TriageAction.DISMISS, sig
# ---------------------------------------------------------------------------
# Plausibility checker
# ---------------------------------------------------------------------------
def check_plausibility(params: IncidentParams) -> Tuple[bool, str, Optional[str]]:
"""Return ``(ok, reason, triggering_log_id)``.
The plausibility checker is what stops the attacker from gibberish-hacking
the reward: an incident must form a coherent story, not just contradict
itself. Examples of what we reject:
* The attacker claims `target_label == BLOCK_IP` but no event involves
an external IP at all — the env-computed label cannot match.
* A `proc.lolbin_use` event with no `process` field.
* A `net.outbound_connection` with bytes_out < 0 or non-numeric.
* "exfil"-style large transfer where the destination is internal.
Note: the verifier label is *always* computed from events, regardless of
whether plausibility passes. Plausibility only gates the *attacker's*
reward; the defender is graded on every incident the env materializes.
"""
sig = _extract_signals(params.events)
triggering = sig.triggering_log_id
for e in params.events:
f = e.fields or {}
if e.event_type is EventType.PROC_LOLBIN:
if not f.get("process"):
return False, f"{e.log_id}: lolbin_use without `process` field", triggering
if not f.get("parent_process"):
return False, f"{e.log_id}: lolbin_use without `parent_process` field", triggering
if e.event_type is EventType.NET_OUTBOUND:
try:
size = int(f.get("bytes_out", 0))
except (TypeError, ValueError):
return False, f"{e.log_id}: bytes_out not numeric", triggering
if size < 0:
return False, f"{e.log_id}: negative bytes_out", triggering
dst = str(f.get("dst_ip", ""))
if not dst:
return False, f"{e.log_id}: outbound without dst_ip", triggering
if e.event_type is EventType.NET_BEACON:
if not f.get("dst_ip"):
return False, f"{e.log_id}: beacon without dst_ip", triggering
if e.event_type is EventType.AUTH_LOGIN_FAILURE:
if not f.get("user"):
return False, f"{e.log_id}: login_failure without user", triggering
label_from_events, _ = compute_ground_truth(params)
target_cost = ACTION_COST[params.target_label]
actual_cost = ACTION_COST[label_from_events]
if abs(target_cost - actual_cost) >= 2:
return (
False,
(
f"target_label={params.target_label.value} but events imply "
f"{label_from_events.value} — params and content disagree"
),
triggering,
)
if (
params.category.value == "data_exfiltration"
and not sig.large_outbound_to_external
and sig.largest_outbound_bytes > 0
):
return (
False,
"category=data_exfiltration but no external destination > 50MB",
triggering,
)
return True, "ok", triggering
__all__ = [
"IncidentSignals",
"compute_ground_truth",
"check_plausibility",
]