| """ |
| verifier.py — Deterministic ground-truth labeler + plausibility checker. |
| |
| This is the *single most important* file for RLVR correctness in OpenSOC. |
| |
| Two functions are exposed: |
| |
| * `compute_ground_truth(params)` — returns a `TriageAction` derived purely |
| from the structured event content of the incident. The attacker's |
| `target_label` and any free-text narrative are NEVER consulted here. |
| This is the authoritative answer the defender's reward is graded against. |
| |
| * `check_plausibility(params)` — returns `(ok, reason, triggering_log_id)`. |
| Validates that the events form a coherent, non-self-contradictory |
| incident (e.g. internal-only "exfil", LOLBin without a parent, beacons |
| without a destination). The attacker is only credited for fooling the |
| defender on incidents that pass this check. |
| |
| The label rules are intentionally a transparent rule-set rather than a |
| trained classifier — this is what makes the reward verifiable and |
| reproducible. Any rule change must come with corresponding tests in |
| `tests/test_verifier.py`. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import List, Optional, Tuple |
|
|
| from schema import ( |
| ACTION_COST, |
| Event, |
| EventType, |
| IncidentParams, |
| KNOWN_LOLBINS, |
| SUSPICIOUS_LOLBIN_PARENTS, |
| TriageAction, |
| is_internal_ip, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class IncidentSignals: |
| """Aggregate diagnostic signals computed from the event list. |
| |
| These are intermediate features the labeler uses; exposing them lets |
| tests assert *why* the verifier reached a verdict. |
| """ |
|
|
| auth_failures: int = 0 |
| distinct_failed_users: int = 0 |
| successful_login_after_failures: bool = False |
| privilege_grants: int = 0 |
| lolbin_with_suspicious_parent: bool = False |
| lolbin_count: int = 0 |
| proc_parent_mismatch: bool = False |
| beacon_to_external: bool = False |
| beacon_count: int = 0 |
| large_outbound_to_external: bool = False |
| largest_outbound_bytes: int = 0 |
| port_scan_hits: int = 0 |
| edr_high_match: bool = False |
| file_double_ext: bool = False |
| cloud_key_created_from_new_ip: bool = False |
| email_link_clicked_external: bool = False |
| email_attachment_opened_macro: bool = False |
|
|
| |
| |
| triggering_log_id: Optional[str] = None |
|
|
|
|
| def _extract_signals(events: List[Event]) -> IncidentSignals: |
| """Walk the event list once and compute aggregate signals.""" |
| sig = IncidentSignals() |
| failed_users: set[str] = set() |
| last_auth_failure_idx: Optional[int] = None |
| diagnostic_score: int = -1 |
| diagnostic_log_id: Optional[str] = None |
|
|
| def bump(score: int, log_id: str) -> None: |
| nonlocal diagnostic_score, diagnostic_log_id |
| if score > diagnostic_score: |
| diagnostic_score = score |
| diagnostic_log_id = log_id |
|
|
| for idx, e in enumerate(events): |
| f = e.fields or {} |
| if e.event_type is EventType.AUTH_LOGIN_FAILURE: |
| sig.auth_failures += 1 |
| if "user" in f: |
| failed_users.add(str(f["user"])) |
| last_auth_failure_idx = idx |
| bump(1, e.log_id) |
|
|
| elif e.event_type is EventType.AUTH_LOGIN_SUCCESS: |
| if last_auth_failure_idx is not None and sig.auth_failures >= 3: |
| sig.successful_login_after_failures = True |
| bump(5, e.log_id) |
|
|
| elif e.event_type is EventType.AUTH_PRIVILEGE_GRANT: |
| sig.privilege_grants += 1 |
| bump(7, e.log_id) |
|
|
| elif e.event_type is EventType.PROC_LOLBIN: |
| sig.lolbin_count += 1 |
| parent = str(f.get("parent_process", "")).lower() |
| binary = str(f.get("process", "")).lower() |
| if binary in KNOWN_LOLBINS and parent in SUSPICIOUS_LOLBIN_PARENTS: |
| sig.lolbin_with_suspicious_parent = True |
| bump(8, e.log_id) |
| else: |
| bump(2, e.log_id) |
|
|
| elif e.event_type is EventType.PROC_PARENT_MISMATCH: |
| sig.proc_parent_mismatch = True |
| bump(6, e.log_id) |
|
|
| elif e.event_type is EventType.NET_BEACON: |
| sig.beacon_count += 1 |
| dst = str(f.get("dst_ip", "")) |
| if dst and not is_internal_ip(dst): |
| sig.beacon_to_external = True |
| bump(7, e.log_id) |
| else: |
| bump(2, e.log_id) |
|
|
| elif e.event_type is EventType.NET_OUTBOUND: |
| dst = str(f.get("dst_ip", "")) |
| try: |
| size = int(f.get("bytes_out", 0)) |
| except (TypeError, ValueError): |
| size = 0 |
| sig.largest_outbound_bytes = max(sig.largest_outbound_bytes, size) |
| if dst and not is_internal_ip(dst) and size >= 50_000_000: |
| sig.large_outbound_to_external = True |
| bump(7, e.log_id) |
|
|
| elif e.event_type is EventType.NET_PORT_SCAN_HIT: |
| sig.port_scan_hits += 1 |
| bump(3, e.log_id) |
|
|
| elif e.event_type is EventType.EDR_BEHAVIOR_MATCH: |
| sev = str(f.get("severity", "medium")).lower() |
| if sev in ("high", "critical"): |
| sig.edr_high_match = True |
| bump(8, e.log_id) |
| else: |
| bump(3, e.log_id) |
|
|
| elif e.event_type is EventType.FILE_RENAME_DOUBLE_EXT: |
| sig.file_double_ext = True |
| bump(5, e.log_id) |
|
|
| elif e.event_type is EventType.CLOUD_KEY_CREATED: |
| ip = str(f.get("source_ip", "")) |
| if ip and not is_internal_ip(ip): |
| sig.cloud_key_created_from_new_ip = True |
| bump(7, e.log_id) |
| else: |
| bump(2, e.log_id) |
|
|
| elif e.event_type is EventType.EMAIL_LINK_CLICKED: |
| url = str(f.get("url", "")) |
| if any(tld in url for tld in (".ru", ".cn", ".top", ".xyz", ".click")): |
| sig.email_link_clicked_external = True |
| bump(5, e.log_id) |
| else: |
| bump(2, e.log_id) |
|
|
| elif e.event_type is EventType.EMAIL_ATTACHMENT_OPENED: |
| if bool(f.get("macro_enabled", False)): |
| sig.email_attachment_opened_macro = True |
| bump(6, e.log_id) |
| else: |
| bump(2, e.log_id) |
|
|
| sig.distinct_failed_users = len(failed_users) |
| sig.triggering_log_id = diagnostic_log_id or (events[-1].log_id if events else None) |
| return sig |
|
|
|
|
| |
| |
| |
|
|
| def compute_ground_truth(params: IncidentParams) -> Tuple[TriageAction, IncidentSignals]: |
| """Derive the canonical triage action from event content. |
| |
| Rules (highest-priority first; first match wins): |
| |
| 1. ESCALATE if the incident shows multiple containment-grade signals |
| (e.g. LOLBin-with-suspicious-parent AND beacon-to-external) — these |
| are blast-radius events worth a human pager. |
| 2. BLOCK_IP if there is a clear external malicious-network signal |
| (beacon-to-external, large outbound to external, malicious URL click). |
| 3. QUARANTINE_HOST if there is a clear endpoint-compromise signal |
| (LOLBin from suspicious parent, parent mismatch, EDR high match, |
| privilege grant after auth-failure burst). |
| 4. MONITOR for weaker signals worth watching (port scan hits, |
| email link click to common TLD, brute-force-without-success). |
| 5. DISMISS otherwise (benign noise). |
| """ |
| sig = _extract_signals(params.events) |
|
|
| network_compromise = ( |
| sig.beacon_to_external |
| or sig.large_outbound_to_external |
| or sig.email_link_clicked_external |
| ) |
| endpoint_compromise = ( |
| sig.lolbin_with_suspicious_parent |
| or sig.proc_parent_mismatch |
| or sig.edr_high_match |
| or (sig.privilege_grants > 0 and sig.successful_login_after_failures) |
| or sig.cloud_key_created_from_new_ip |
| or sig.email_attachment_opened_macro |
| ) |
|
|
| if network_compromise and endpoint_compromise: |
| return TriageAction.ESCALATE, sig |
| if network_compromise: |
| return TriageAction.BLOCK_IP, sig |
| if endpoint_compromise: |
| return TriageAction.QUARANTINE_HOST, sig |
|
|
| monitor_signal = ( |
| sig.port_scan_hits >= 1 |
| or sig.successful_login_after_failures |
| or (sig.auth_failures >= 5 and sig.distinct_failed_users >= 1) |
| or sig.file_double_ext |
| or sig.beacon_count > 0 |
| ) |
| if monitor_signal: |
| return TriageAction.MONITOR, sig |
|
|
| return TriageAction.DISMISS, sig |
|
|
|
|
| |
| |
| |
|
|
| def check_plausibility(params: IncidentParams) -> Tuple[bool, str, Optional[str]]: |
| """Return ``(ok, reason, triggering_log_id)``. |
| |
| The plausibility checker is what stops the attacker from gibberish-hacking |
| the reward: an incident must form a coherent story, not just contradict |
| itself. Examples of what we reject: |
| |
| * The attacker claims `target_label == BLOCK_IP` but no event involves |
| an external IP at all — the env-computed label cannot match. |
| * A `proc.lolbin_use` event with no `process` field. |
| * A `net.outbound_connection` with bytes_out < 0 or non-numeric. |
| * "exfil"-style large transfer where the destination is internal. |
| |
| Note: the verifier label is *always* computed from events, regardless of |
| whether plausibility passes. Plausibility only gates the *attacker's* |
| reward; the defender is graded on every incident the env materializes. |
| """ |
| sig = _extract_signals(params.events) |
| triggering = sig.triggering_log_id |
|
|
| for e in params.events: |
| f = e.fields or {} |
|
|
| if e.event_type is EventType.PROC_LOLBIN: |
| if not f.get("process"): |
| return False, f"{e.log_id}: lolbin_use without `process` field", triggering |
| if not f.get("parent_process"): |
| return False, f"{e.log_id}: lolbin_use without `parent_process` field", triggering |
|
|
| if e.event_type is EventType.NET_OUTBOUND: |
| try: |
| size = int(f.get("bytes_out", 0)) |
| except (TypeError, ValueError): |
| return False, f"{e.log_id}: bytes_out not numeric", triggering |
| if size < 0: |
| return False, f"{e.log_id}: negative bytes_out", triggering |
| dst = str(f.get("dst_ip", "")) |
| if not dst: |
| return False, f"{e.log_id}: outbound without dst_ip", triggering |
|
|
| if e.event_type is EventType.NET_BEACON: |
| if not f.get("dst_ip"): |
| return False, f"{e.log_id}: beacon without dst_ip", triggering |
|
|
| if e.event_type is EventType.AUTH_LOGIN_FAILURE: |
| if not f.get("user"): |
| return False, f"{e.log_id}: login_failure without user", triggering |
|
|
| label_from_events, _ = compute_ground_truth(params) |
|
|
| target_cost = ACTION_COST[params.target_label] |
| actual_cost = ACTION_COST[label_from_events] |
| if abs(target_cost - actual_cost) >= 2: |
| return ( |
| False, |
| ( |
| f"target_label={params.target_label.value} but events imply " |
| f"{label_from_events.value} — params and content disagree" |
| ), |
| triggering, |
| ) |
|
|
| if ( |
| params.category.value == "data_exfiltration" |
| and not sig.large_outbound_to_external |
| and sig.largest_outbound_bytes > 0 |
| ): |
| return ( |
| False, |
| "category=data_exfiltration but no external destination > 50MB", |
| triggering, |
| ) |
|
|
| return True, "ok", triggering |
|
|
|
|
| __all__ = [ |
| "IncidentSignals", |
| "compute_ground_truth", |
| "check_plausibility", |
| ] |
|
|