File size: 12,188 Bytes
bb6a031 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | """
verifier.py — Deterministic ground-truth labeler + plausibility checker.
This is the *single most important* file for RLVR correctness in OpenSOC.
Two functions are exposed:
* `compute_ground_truth(params)` — returns a `TriageAction` derived purely
from the structured event content of the incident. The attacker's
`target_label` and any free-text narrative are NEVER consulted here.
This is the authoritative answer the defender's reward is graded against.
* `check_plausibility(params)` — returns `(ok, reason, triggering_log_id)`.
Validates that the events form a coherent, non-self-contradictory
incident (e.g. internal-only "exfil", LOLBin without a parent, beacons
without a destination). The attacker is only credited for fooling the
defender on incidents that pass this check.
The label rules are intentionally a transparent rule-set rather than a
trained classifier — this is what makes the reward verifiable and
reproducible. Any rule change must come with corresponding tests in
`tests/test_verifier.py`.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple
from schema import (
ACTION_COST,
Event,
EventType,
IncidentParams,
KNOWN_LOLBINS,
SUSPICIOUS_LOLBIN_PARENTS,
TriageAction,
is_internal_ip,
)
# ---------------------------------------------------------------------------
# Signal extraction
# ---------------------------------------------------------------------------
@dataclass
class IncidentSignals:
"""Aggregate diagnostic signals computed from the event list.
These are intermediate features the labeler uses; exposing them lets
tests assert *why* the verifier reached a verdict.
"""
auth_failures: int = 0
distinct_failed_users: int = 0
successful_login_after_failures: bool = False
privilege_grants: int = 0
lolbin_with_suspicious_parent: bool = False
lolbin_count: int = 0
proc_parent_mismatch: bool = False
beacon_to_external: bool = False
beacon_count: int = 0
large_outbound_to_external: bool = False
largest_outbound_bytes: int = 0
port_scan_hits: int = 0
edr_high_match: bool = False
file_double_ext: bool = False
cloud_key_created_from_new_ip: bool = False
email_link_clicked_external: bool = False
email_attachment_opened_macro: bool = False
# The single event the verifier considers most diagnostic; defenders
# who cite this log_id earn a +0.1 rubric bonus.
triggering_log_id: Optional[str] = None
def _extract_signals(events: List[Event]) -> IncidentSignals:
"""Walk the event list once and compute aggregate signals."""
sig = IncidentSignals()
failed_users: set[str] = set()
last_auth_failure_idx: Optional[int] = None
diagnostic_score: int = -1
diagnostic_log_id: Optional[str] = None
def bump(score: int, log_id: str) -> None:
nonlocal diagnostic_score, diagnostic_log_id
if score > diagnostic_score:
diagnostic_score = score
diagnostic_log_id = log_id
for idx, e in enumerate(events):
f = e.fields or {}
if e.event_type is EventType.AUTH_LOGIN_FAILURE:
sig.auth_failures += 1
if "user" in f:
failed_users.add(str(f["user"]))
last_auth_failure_idx = idx
bump(1, e.log_id)
elif e.event_type is EventType.AUTH_LOGIN_SUCCESS:
if last_auth_failure_idx is not None and sig.auth_failures >= 3:
sig.successful_login_after_failures = True
bump(5, e.log_id)
elif e.event_type is EventType.AUTH_PRIVILEGE_GRANT:
sig.privilege_grants += 1
bump(7, e.log_id)
elif e.event_type is EventType.PROC_LOLBIN:
sig.lolbin_count += 1
parent = str(f.get("parent_process", "")).lower()
binary = str(f.get("process", "")).lower()
if binary in KNOWN_LOLBINS and parent in SUSPICIOUS_LOLBIN_PARENTS:
sig.lolbin_with_suspicious_parent = True
bump(8, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.PROC_PARENT_MISMATCH:
sig.proc_parent_mismatch = True
bump(6, e.log_id)
elif e.event_type is EventType.NET_BEACON:
sig.beacon_count += 1
dst = str(f.get("dst_ip", ""))
if dst and not is_internal_ip(dst):
sig.beacon_to_external = True
bump(7, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.NET_OUTBOUND:
dst = str(f.get("dst_ip", ""))
try:
size = int(f.get("bytes_out", 0))
except (TypeError, ValueError):
size = 0
sig.largest_outbound_bytes = max(sig.largest_outbound_bytes, size)
if dst and not is_internal_ip(dst) and size >= 50_000_000:
sig.large_outbound_to_external = True
bump(7, e.log_id)
elif e.event_type is EventType.NET_PORT_SCAN_HIT:
sig.port_scan_hits += 1
bump(3, e.log_id)
elif e.event_type is EventType.EDR_BEHAVIOR_MATCH:
sev = str(f.get("severity", "medium")).lower()
if sev in ("high", "critical"):
sig.edr_high_match = True
bump(8, e.log_id)
else:
bump(3, e.log_id)
elif e.event_type is EventType.FILE_RENAME_DOUBLE_EXT:
sig.file_double_ext = True
bump(5, e.log_id)
elif e.event_type is EventType.CLOUD_KEY_CREATED:
ip = str(f.get("source_ip", ""))
if ip and not is_internal_ip(ip):
sig.cloud_key_created_from_new_ip = True
bump(7, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.EMAIL_LINK_CLICKED:
url = str(f.get("url", ""))
if any(tld in url for tld in (".ru", ".cn", ".top", ".xyz", ".click")):
sig.email_link_clicked_external = True
bump(5, e.log_id)
else:
bump(2, e.log_id)
elif e.event_type is EventType.EMAIL_ATTACHMENT_OPENED:
if bool(f.get("macro_enabled", False)):
sig.email_attachment_opened_macro = True
bump(6, e.log_id)
else:
bump(2, e.log_id)
sig.distinct_failed_users = len(failed_users)
sig.triggering_log_id = diagnostic_log_id or (events[-1].log_id if events else None)
return sig
# ---------------------------------------------------------------------------
# Ground-truth labeler
# ---------------------------------------------------------------------------
def compute_ground_truth(params: IncidentParams) -> Tuple[TriageAction, IncidentSignals]:
"""Derive the canonical triage action from event content.
Rules (highest-priority first; first match wins):
1. ESCALATE if the incident shows multiple containment-grade signals
(e.g. LOLBin-with-suspicious-parent AND beacon-to-external) — these
are blast-radius events worth a human pager.
2. BLOCK_IP if there is a clear external malicious-network signal
(beacon-to-external, large outbound to external, malicious URL click).
3. QUARANTINE_HOST if there is a clear endpoint-compromise signal
(LOLBin from suspicious parent, parent mismatch, EDR high match,
privilege grant after auth-failure burst).
4. MONITOR for weaker signals worth watching (port scan hits,
email link click to common TLD, brute-force-without-success).
5. DISMISS otherwise (benign noise).
"""
sig = _extract_signals(params.events)
network_compromise = (
sig.beacon_to_external
or sig.large_outbound_to_external
or sig.email_link_clicked_external
)
endpoint_compromise = (
sig.lolbin_with_suspicious_parent
or sig.proc_parent_mismatch
or sig.edr_high_match
or (sig.privilege_grants > 0 and sig.successful_login_after_failures)
or sig.cloud_key_created_from_new_ip
or sig.email_attachment_opened_macro
)
if network_compromise and endpoint_compromise:
return TriageAction.ESCALATE, sig
if network_compromise:
return TriageAction.BLOCK_IP, sig
if endpoint_compromise:
return TriageAction.QUARANTINE_HOST, sig
monitor_signal = (
sig.port_scan_hits >= 1
or sig.successful_login_after_failures
or (sig.auth_failures >= 5 and sig.distinct_failed_users >= 1)
or sig.file_double_ext
or sig.beacon_count > 0
)
if monitor_signal:
return TriageAction.MONITOR, sig
return TriageAction.DISMISS, sig
# ---------------------------------------------------------------------------
# Plausibility checker
# ---------------------------------------------------------------------------
def check_plausibility(params: IncidentParams) -> Tuple[bool, str, Optional[str]]:
"""Return ``(ok, reason, triggering_log_id)``.
The plausibility checker is what stops the attacker from gibberish-hacking
the reward: an incident must form a coherent story, not just contradict
itself. Examples of what we reject:
* The attacker claims `target_label == BLOCK_IP` but no event involves
an external IP at all — the env-computed label cannot match.
* A `proc.lolbin_use` event with no `process` field.
* A `net.outbound_connection` with bytes_out < 0 or non-numeric.
* "exfil"-style large transfer where the destination is internal.
Note: the verifier label is *always* computed from events, regardless of
whether plausibility passes. Plausibility only gates the *attacker's*
reward; the defender is graded on every incident the env materializes.
"""
sig = _extract_signals(params.events)
triggering = sig.triggering_log_id
for e in params.events:
f = e.fields or {}
if e.event_type is EventType.PROC_LOLBIN:
if not f.get("process"):
return False, f"{e.log_id}: lolbin_use without `process` field", triggering
if not f.get("parent_process"):
return False, f"{e.log_id}: lolbin_use without `parent_process` field", triggering
if e.event_type is EventType.NET_OUTBOUND:
try:
size = int(f.get("bytes_out", 0))
except (TypeError, ValueError):
return False, f"{e.log_id}: bytes_out not numeric", triggering
if size < 0:
return False, f"{e.log_id}: negative bytes_out", triggering
dst = str(f.get("dst_ip", ""))
if not dst:
return False, f"{e.log_id}: outbound without dst_ip", triggering
if e.event_type is EventType.NET_BEACON:
if not f.get("dst_ip"):
return False, f"{e.log_id}: beacon without dst_ip", triggering
if e.event_type is EventType.AUTH_LOGIN_FAILURE:
if not f.get("user"):
return False, f"{e.log_id}: login_failure without user", triggering
label_from_events, _ = compute_ground_truth(params)
target_cost = ACTION_COST[params.target_label]
actual_cost = ACTION_COST[label_from_events]
if abs(target_cost - actual_cost) >= 2:
return (
False,
(
f"target_label={params.target_label.value} but events imply "
f"{label_from_events.value} — params and content disagree"
),
triggering,
)
if (
params.category.value == "data_exfiltration"
and not sig.large_outbound_to_external
and sig.largest_outbound_bytes > 0
):
return (
False,
"category=data_exfiltration but no external destination > 50MB",
triggering,
)
return True, "ok", triggering
__all__ = [
"IncidentSignals",
"compute_ground_truth",
"check_plausibility",
]
|