""" verifier.py — Deterministic ground-truth labeler + plausibility checker. This is the *single most important* file for RLVR correctness in OpenSOC. Two functions are exposed: * `compute_ground_truth(params)` — returns a `TriageAction` derived purely from the structured event content of the incident. The attacker's `target_label` and any free-text narrative are NEVER consulted here. This is the authoritative answer the defender's reward is graded against. * `check_plausibility(params)` — returns `(ok, reason, triggering_log_id)`. Validates that the events form a coherent, non-self-contradictory incident (e.g. internal-only "exfil", LOLBin without a parent, beacons without a destination). The attacker is only credited for fooling the defender on incidents that pass this check. The label rules are intentionally a transparent rule-set rather than a trained classifier — this is what makes the reward verifiable and reproducible. Any rule change must come with corresponding tests in `tests/test_verifier.py`. """ from __future__ import annotations from dataclasses import dataclass from typing import List, Optional, Tuple from schema import ( ACTION_COST, Event, EventType, IncidentParams, KNOWN_LOLBINS, SUSPICIOUS_LOLBIN_PARENTS, TriageAction, is_internal_ip, ) # --------------------------------------------------------------------------- # Signal extraction # --------------------------------------------------------------------------- @dataclass class IncidentSignals: """Aggregate diagnostic signals computed from the event list. These are intermediate features the labeler uses; exposing them lets tests assert *why* the verifier reached a verdict. """ auth_failures: int = 0 distinct_failed_users: int = 0 successful_login_after_failures: bool = False privilege_grants: int = 0 lolbin_with_suspicious_parent: bool = False lolbin_count: int = 0 proc_parent_mismatch: bool = False beacon_to_external: bool = False beacon_count: int = 0 large_outbound_to_external: bool = False largest_outbound_bytes: int = 0 port_scan_hits: int = 0 edr_high_match: bool = False file_double_ext: bool = False cloud_key_created_from_new_ip: bool = False email_link_clicked_external: bool = False email_attachment_opened_macro: bool = False # The single event the verifier considers most diagnostic; defenders # who cite this log_id earn a +0.1 rubric bonus. triggering_log_id: Optional[str] = None def _extract_signals(events: List[Event]) -> IncidentSignals: """Walk the event list once and compute aggregate signals.""" sig = IncidentSignals() failed_users: set[str] = set() last_auth_failure_idx: Optional[int] = None diagnostic_score: int = -1 diagnostic_log_id: Optional[str] = None def bump(score: int, log_id: str) -> None: nonlocal diagnostic_score, diagnostic_log_id if score > diagnostic_score: diagnostic_score = score diagnostic_log_id = log_id for idx, e in enumerate(events): f = e.fields or {} if e.event_type is EventType.AUTH_LOGIN_FAILURE: sig.auth_failures += 1 if "user" in f: failed_users.add(str(f["user"])) last_auth_failure_idx = idx bump(1, e.log_id) elif e.event_type is EventType.AUTH_LOGIN_SUCCESS: if last_auth_failure_idx is not None and sig.auth_failures >= 3: sig.successful_login_after_failures = True bump(5, e.log_id) elif e.event_type is EventType.AUTH_PRIVILEGE_GRANT: sig.privilege_grants += 1 bump(7, e.log_id) elif e.event_type is EventType.PROC_LOLBIN: sig.lolbin_count += 1 parent = str(f.get("parent_process", "")).lower() binary = str(f.get("process", "")).lower() if binary in KNOWN_LOLBINS and parent in SUSPICIOUS_LOLBIN_PARENTS: sig.lolbin_with_suspicious_parent = True bump(8, e.log_id) else: bump(2, e.log_id) elif e.event_type is EventType.PROC_PARENT_MISMATCH: sig.proc_parent_mismatch = True bump(6, e.log_id) elif e.event_type is EventType.NET_BEACON: sig.beacon_count += 1 dst = str(f.get("dst_ip", "")) if dst and not is_internal_ip(dst): sig.beacon_to_external = True bump(7, e.log_id) else: bump(2, e.log_id) elif e.event_type is EventType.NET_OUTBOUND: dst = str(f.get("dst_ip", "")) try: size = int(f.get("bytes_out", 0)) except (TypeError, ValueError): size = 0 sig.largest_outbound_bytes = max(sig.largest_outbound_bytes, size) if dst and not is_internal_ip(dst) and size >= 50_000_000: sig.large_outbound_to_external = True bump(7, e.log_id) elif e.event_type is EventType.NET_PORT_SCAN_HIT: sig.port_scan_hits += 1 bump(3, e.log_id) elif e.event_type is EventType.EDR_BEHAVIOR_MATCH: sev = str(f.get("severity", "medium")).lower() if sev in ("high", "critical"): sig.edr_high_match = True bump(8, e.log_id) else: bump(3, e.log_id) elif e.event_type is EventType.FILE_RENAME_DOUBLE_EXT: sig.file_double_ext = True bump(5, e.log_id) elif e.event_type is EventType.CLOUD_KEY_CREATED: ip = str(f.get("source_ip", "")) if ip and not is_internal_ip(ip): sig.cloud_key_created_from_new_ip = True bump(7, e.log_id) else: bump(2, e.log_id) elif e.event_type is EventType.EMAIL_LINK_CLICKED: url = str(f.get("url", "")) if any(tld in url for tld in (".ru", ".cn", ".top", ".xyz", ".click")): sig.email_link_clicked_external = True bump(5, e.log_id) else: bump(2, e.log_id) elif e.event_type is EventType.EMAIL_ATTACHMENT_OPENED: if bool(f.get("macro_enabled", False)): sig.email_attachment_opened_macro = True bump(6, e.log_id) else: bump(2, e.log_id) sig.distinct_failed_users = len(failed_users) sig.triggering_log_id = diagnostic_log_id or (events[-1].log_id if events else None) return sig # --------------------------------------------------------------------------- # Ground-truth labeler # --------------------------------------------------------------------------- def compute_ground_truth(params: IncidentParams) -> Tuple[TriageAction, IncidentSignals]: """Derive the canonical triage action from event content. Rules (highest-priority first; first match wins): 1. ESCALATE if the incident shows multiple containment-grade signals (e.g. LOLBin-with-suspicious-parent AND beacon-to-external) — these are blast-radius events worth a human pager. 2. BLOCK_IP if there is a clear external malicious-network signal (beacon-to-external, large outbound to external, malicious URL click). 3. QUARANTINE_HOST if there is a clear endpoint-compromise signal (LOLBin from suspicious parent, parent mismatch, EDR high match, privilege grant after auth-failure burst). 4. MONITOR for weaker signals worth watching (port scan hits, email link click to common TLD, brute-force-without-success). 5. DISMISS otherwise (benign noise). """ sig = _extract_signals(params.events) network_compromise = ( sig.beacon_to_external or sig.large_outbound_to_external or sig.email_link_clicked_external ) endpoint_compromise = ( sig.lolbin_with_suspicious_parent or sig.proc_parent_mismatch or sig.edr_high_match or (sig.privilege_grants > 0 and sig.successful_login_after_failures) or sig.cloud_key_created_from_new_ip or sig.email_attachment_opened_macro ) if network_compromise and endpoint_compromise: return TriageAction.ESCALATE, sig if network_compromise: return TriageAction.BLOCK_IP, sig if endpoint_compromise: return TriageAction.QUARANTINE_HOST, sig monitor_signal = ( sig.port_scan_hits >= 1 or sig.successful_login_after_failures or (sig.auth_failures >= 5 and sig.distinct_failed_users >= 1) or sig.file_double_ext or sig.beacon_count > 0 ) if monitor_signal: return TriageAction.MONITOR, sig return TriageAction.DISMISS, sig # --------------------------------------------------------------------------- # Plausibility checker # --------------------------------------------------------------------------- def check_plausibility(params: IncidentParams) -> Tuple[bool, str, Optional[str]]: """Return ``(ok, reason, triggering_log_id)``. The plausibility checker is what stops the attacker from gibberish-hacking the reward: an incident must form a coherent story, not just contradict itself. Examples of what we reject: * The attacker claims `target_label == BLOCK_IP` but no event involves an external IP at all — the env-computed label cannot match. * A `proc.lolbin_use` event with no `process` field. * A `net.outbound_connection` with bytes_out < 0 or non-numeric. * "exfil"-style large transfer where the destination is internal. Note: the verifier label is *always* computed from events, regardless of whether plausibility passes. Plausibility only gates the *attacker's* reward; the defender is graded on every incident the env materializes. """ sig = _extract_signals(params.events) triggering = sig.triggering_log_id for e in params.events: f = e.fields or {} if e.event_type is EventType.PROC_LOLBIN: if not f.get("process"): return False, f"{e.log_id}: lolbin_use without `process` field", triggering if not f.get("parent_process"): return False, f"{e.log_id}: lolbin_use without `parent_process` field", triggering if e.event_type is EventType.NET_OUTBOUND: try: size = int(f.get("bytes_out", 0)) except (TypeError, ValueError): return False, f"{e.log_id}: bytes_out not numeric", triggering if size < 0: return False, f"{e.log_id}: negative bytes_out", triggering dst = str(f.get("dst_ip", "")) if not dst: return False, f"{e.log_id}: outbound without dst_ip", triggering if e.event_type is EventType.NET_BEACON: if not f.get("dst_ip"): return False, f"{e.log_id}: beacon without dst_ip", triggering if e.event_type is EventType.AUTH_LOGIN_FAILURE: if not f.get("user"): return False, f"{e.log_id}: login_failure without user", triggering label_from_events, _ = compute_ground_truth(params) target_cost = ACTION_COST[params.target_label] actual_cost = ACTION_COST[label_from_events] if abs(target_cost - actual_cost) >= 2: return ( False, ( f"target_label={params.target_label.value} but events imply " f"{label_from_events.value} — params and content disagree" ), triggering, ) if ( params.category.value == "data_exfiltration" and not sig.large_outbound_to_external and sig.largest_outbound_bytes > 0 ): return ( False, "category=data_exfiltration but no external destination > 50MB", triggering, ) return True, "ok", triggering __all__ = [ "IncidentSignals", "compute_ground_truth", "check_plausibility", ]