Spaces:
Sleeping
Sleeping
| """ | |
| verifier.py — Deterministic ground-truth labeler + plausibility checker. | |
| This is the *single most important* file for RLVR correctness in OpenSOC. | |
| Two functions are exposed: | |
| * `compute_ground_truth(params)` — returns a `TriageAction` derived purely | |
| from the structured event content of the incident. The attacker's | |
| `target_label` and any free-text narrative are NEVER consulted here. | |
| This is the authoritative answer the defender's reward is graded against. | |
| * `check_plausibility(params)` — returns `(ok, reason, triggering_log_id)`. | |
| Validates that the events form a coherent, non-self-contradictory | |
| incident (e.g. internal-only "exfil", LOLBin without a parent, beacons | |
| without a destination). The attacker is only credited for fooling the | |
| defender on incidents that pass this check. | |
| The label rules are intentionally a transparent rule-set rather than a | |
| trained classifier — this is what makes the reward verifiable and | |
| reproducible. Any rule change must come with corresponding tests in | |
| `tests/test_verifier.py`. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Tuple | |
| from schema import ( | |
| ACTION_COST, | |
| Event, | |
| EventType, | |
| IncidentParams, | |
| KNOWN_LOLBINS, | |
| SUSPICIOUS_LOLBIN_PARENTS, | |
| TriageAction, | |
| is_internal_ip, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Signal extraction | |
| # --------------------------------------------------------------------------- | |
| class IncidentSignals: | |
| """Aggregate diagnostic signals computed from the event list. | |
| These are intermediate features the labeler uses; exposing them lets | |
| tests assert *why* the verifier reached a verdict. | |
| """ | |
| auth_failures: int = 0 | |
| distinct_failed_users: int = 0 | |
| successful_login_after_failures: bool = False | |
| privilege_grants: int = 0 | |
| lolbin_with_suspicious_parent: bool = False | |
| lolbin_count: int = 0 | |
| proc_parent_mismatch: bool = False | |
| beacon_to_external: bool = False | |
| beacon_count: int = 0 | |
| large_outbound_to_external: bool = False | |
| largest_outbound_bytes: int = 0 | |
| port_scan_hits: int = 0 | |
| edr_high_match: bool = False | |
| file_double_ext: bool = False | |
| cloud_key_created_from_new_ip: bool = False | |
| email_link_clicked_external: bool = False | |
| email_attachment_opened_macro: bool = False | |
| # The single event the verifier considers most diagnostic; defenders | |
| # who cite this log_id earn a +0.1 rubric bonus. | |
| triggering_log_id: Optional[str] = None | |
| def _extract_signals(events: List[Event]) -> IncidentSignals: | |
| """Walk the event list once and compute aggregate signals.""" | |
| sig = IncidentSignals() | |
| failed_users: set[str] = set() | |
| last_auth_failure_idx: Optional[int] = None | |
| diagnostic_score: int = -1 | |
| diagnostic_log_id: Optional[str] = None | |
| def bump(score: int, log_id: str) -> None: | |
| nonlocal diagnostic_score, diagnostic_log_id | |
| if score > diagnostic_score: | |
| diagnostic_score = score | |
| diagnostic_log_id = log_id | |
| for idx, e in enumerate(events): | |
| f = e.fields or {} | |
| if e.event_type is EventType.AUTH_LOGIN_FAILURE: | |
| sig.auth_failures += 1 | |
| if "user" in f: | |
| failed_users.add(str(f["user"])) | |
| last_auth_failure_idx = idx | |
| bump(1, e.log_id) | |
| elif e.event_type is EventType.AUTH_LOGIN_SUCCESS: | |
| if last_auth_failure_idx is not None and sig.auth_failures >= 3: | |
| sig.successful_login_after_failures = True | |
| bump(5, e.log_id) | |
| elif e.event_type is EventType.AUTH_PRIVILEGE_GRANT: | |
| sig.privilege_grants += 1 | |
| bump(7, e.log_id) | |
| elif e.event_type is EventType.PROC_LOLBIN: | |
| sig.lolbin_count += 1 | |
| parent = str(f.get("parent_process", "")).lower() | |
| binary = str(f.get("process", "")).lower() | |
| if binary in KNOWN_LOLBINS and parent in SUSPICIOUS_LOLBIN_PARENTS: | |
| sig.lolbin_with_suspicious_parent = True | |
| bump(8, e.log_id) | |
| else: | |
| bump(2, e.log_id) | |
| elif e.event_type is EventType.PROC_PARENT_MISMATCH: | |
| sig.proc_parent_mismatch = True | |
| bump(6, e.log_id) | |
| elif e.event_type is EventType.NET_BEACON: | |
| sig.beacon_count += 1 | |
| dst = str(f.get("dst_ip", "")) | |
| if dst and not is_internal_ip(dst): | |
| sig.beacon_to_external = True | |
| bump(7, e.log_id) | |
| else: | |
| bump(2, e.log_id) | |
| elif e.event_type is EventType.NET_OUTBOUND: | |
| dst = str(f.get("dst_ip", "")) | |
| try: | |
| size = int(f.get("bytes_out", 0)) | |
| except (TypeError, ValueError): | |
| size = 0 | |
| sig.largest_outbound_bytes = max(sig.largest_outbound_bytes, size) | |
| if dst and not is_internal_ip(dst) and size >= 50_000_000: | |
| sig.large_outbound_to_external = True | |
| bump(7, e.log_id) | |
| elif e.event_type is EventType.NET_PORT_SCAN_HIT: | |
| sig.port_scan_hits += 1 | |
| bump(3, e.log_id) | |
| elif e.event_type is EventType.EDR_BEHAVIOR_MATCH: | |
| sev = str(f.get("severity", "medium")).lower() | |
| if sev in ("high", "critical"): | |
| sig.edr_high_match = True | |
| bump(8, e.log_id) | |
| else: | |
| bump(3, e.log_id) | |
| elif e.event_type is EventType.FILE_RENAME_DOUBLE_EXT: | |
| sig.file_double_ext = True | |
| bump(5, e.log_id) | |
| elif e.event_type is EventType.CLOUD_KEY_CREATED: | |
| ip = str(f.get("source_ip", "")) | |
| if ip and not is_internal_ip(ip): | |
| sig.cloud_key_created_from_new_ip = True | |
| bump(7, e.log_id) | |
| else: | |
| bump(2, e.log_id) | |
| elif e.event_type is EventType.EMAIL_LINK_CLICKED: | |
| url = str(f.get("url", "")) | |
| if any(tld in url for tld in (".ru", ".cn", ".top", ".xyz", ".click")): | |
| sig.email_link_clicked_external = True | |
| bump(5, e.log_id) | |
| else: | |
| bump(2, e.log_id) | |
| elif e.event_type is EventType.EMAIL_ATTACHMENT_OPENED: | |
| if bool(f.get("macro_enabled", False)): | |
| sig.email_attachment_opened_macro = True | |
| bump(6, e.log_id) | |
| else: | |
| bump(2, e.log_id) | |
| sig.distinct_failed_users = len(failed_users) | |
| sig.triggering_log_id = diagnostic_log_id or (events[-1].log_id if events else None) | |
| return sig | |
| # --------------------------------------------------------------------------- | |
| # Ground-truth labeler | |
| # --------------------------------------------------------------------------- | |
| def compute_ground_truth(params: IncidentParams) -> Tuple[TriageAction, IncidentSignals]: | |
| """Derive the canonical triage action from event content. | |
| Rules (highest-priority first; first match wins): | |
| 1. ESCALATE if the incident shows multiple containment-grade signals | |
| (e.g. LOLBin-with-suspicious-parent AND beacon-to-external) — these | |
| are blast-radius events worth a human pager. | |
| 2. BLOCK_IP if there is a clear external malicious-network signal | |
| (beacon-to-external, large outbound to external, malicious URL click). | |
| 3. QUARANTINE_HOST if there is a clear endpoint-compromise signal | |
| (LOLBin from suspicious parent, parent mismatch, EDR high match, | |
| privilege grant after auth-failure burst). | |
| 4. MONITOR for weaker signals worth watching (port scan hits, | |
| email link click to common TLD, brute-force-without-success). | |
| 5. DISMISS otherwise (benign noise). | |
| """ | |
| sig = _extract_signals(params.events) | |
| network_compromise = ( | |
| sig.beacon_to_external | |
| or sig.large_outbound_to_external | |
| or sig.email_link_clicked_external | |
| ) | |
| endpoint_compromise = ( | |
| sig.lolbin_with_suspicious_parent | |
| or sig.proc_parent_mismatch | |
| or sig.edr_high_match | |
| or (sig.privilege_grants > 0 and sig.successful_login_after_failures) | |
| or sig.cloud_key_created_from_new_ip | |
| or sig.email_attachment_opened_macro | |
| ) | |
| if network_compromise and endpoint_compromise: | |
| return TriageAction.ESCALATE, sig | |
| if network_compromise: | |
| return TriageAction.BLOCK_IP, sig | |
| if endpoint_compromise: | |
| return TriageAction.QUARANTINE_HOST, sig | |
| monitor_signal = ( | |
| sig.port_scan_hits >= 1 | |
| or sig.successful_login_after_failures | |
| or (sig.auth_failures >= 5 and sig.distinct_failed_users >= 1) | |
| or sig.file_double_ext | |
| or sig.beacon_count > 0 | |
| ) | |
| if monitor_signal: | |
| return TriageAction.MONITOR, sig | |
| return TriageAction.DISMISS, sig | |
| # --------------------------------------------------------------------------- | |
| # Plausibility checker | |
| # --------------------------------------------------------------------------- | |
| def check_plausibility(params: IncidentParams) -> Tuple[bool, str, Optional[str]]: | |
| """Return ``(ok, reason, triggering_log_id)``. | |
| The plausibility checker is what stops the attacker from gibberish-hacking | |
| the reward: an incident must form a coherent story, not just contradict | |
| itself. Examples of what we reject: | |
| * The attacker claims `target_label == BLOCK_IP` but no event involves | |
| an external IP at all — the env-computed label cannot match. | |
| * A `proc.lolbin_use` event with no `process` field. | |
| * A `net.outbound_connection` with bytes_out < 0 or non-numeric. | |
| * "exfil"-style large transfer where the destination is internal. | |
| Note: the verifier label is *always* computed from events, regardless of | |
| whether plausibility passes. Plausibility only gates the *attacker's* | |
| reward; the defender is graded on every incident the env materializes. | |
| """ | |
| sig = _extract_signals(params.events) | |
| triggering = sig.triggering_log_id | |
| for e in params.events: | |
| f = e.fields or {} | |
| if e.event_type is EventType.PROC_LOLBIN: | |
| if not f.get("process"): | |
| return False, f"{e.log_id}: lolbin_use without `process` field", triggering | |
| if not f.get("parent_process"): | |
| return False, f"{e.log_id}: lolbin_use without `parent_process` field", triggering | |
| if e.event_type is EventType.NET_OUTBOUND: | |
| try: | |
| size = int(f.get("bytes_out", 0)) | |
| except (TypeError, ValueError): | |
| return False, f"{e.log_id}: bytes_out not numeric", triggering | |
| if size < 0: | |
| return False, f"{e.log_id}: negative bytes_out", triggering | |
| dst = str(f.get("dst_ip", "")) | |
| if not dst: | |
| return False, f"{e.log_id}: outbound without dst_ip", triggering | |
| if e.event_type is EventType.NET_BEACON: | |
| if not f.get("dst_ip"): | |
| return False, f"{e.log_id}: beacon without dst_ip", triggering | |
| if e.event_type is EventType.AUTH_LOGIN_FAILURE: | |
| if not f.get("user"): | |
| return False, f"{e.log_id}: login_failure without user", triggering | |
| label_from_events, _ = compute_ground_truth(params) | |
| target_cost = ACTION_COST[params.target_label] | |
| actual_cost = ACTION_COST[label_from_events] | |
| if abs(target_cost - actual_cost) >= 2: | |
| return ( | |
| False, | |
| ( | |
| f"target_label={params.target_label.value} but events imply " | |
| f"{label_from_events.value} — params and content disagree" | |
| ), | |
| triggering, | |
| ) | |
| if ( | |
| params.category.value == "data_exfiltration" | |
| and not sig.large_outbound_to_external | |
| and sig.largest_outbound_bytes > 0 | |
| ): | |
| return ( | |
| False, | |
| "category=data_exfiltration but no external destination > 50MB", | |
| triggering, | |
| ) | |
| return True, "ok", triggering | |
| __all__ = [ | |
| "IncidentSignals", | |
| "compute_ground_truth", | |
| "check_plausibility", | |
| ] | |