File size: 12,188 Bytes
bb6a031
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
"""
verifier.py — Deterministic ground-truth labeler + plausibility checker.

This is the *single most important* file for RLVR correctness in OpenSOC.

Two functions are exposed:

  * `compute_ground_truth(params)` — returns a `TriageAction` derived purely
    from the structured event content of the incident.  The attacker's
    `target_label` and any free-text narrative are NEVER consulted here.
    This is the authoritative answer the defender's reward is graded against.

  * `check_plausibility(params)` — returns `(ok, reason, triggering_log_id)`.
    Validates that the events form a coherent, non-self-contradictory
    incident (e.g. internal-only "exfil", LOLBin without a parent, beacons
    without a destination).  The attacker is only credited for fooling the
    defender on incidents that pass this check.

The label rules are intentionally a transparent rule-set rather than a
trained classifier — this is what makes the reward verifiable and
reproducible.  Any rule change must come with corresponding tests in
`tests/test_verifier.py`.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional, Tuple

from schema import (
    ACTION_COST,
    Event,
    EventType,
    IncidentParams,
    KNOWN_LOLBINS,
    SUSPICIOUS_LOLBIN_PARENTS,
    TriageAction,
    is_internal_ip,
)


# ---------------------------------------------------------------------------
# Signal extraction
# ---------------------------------------------------------------------------

@dataclass
class IncidentSignals:
    """Aggregate diagnostic signals computed from the event list.

    These are intermediate features the labeler uses; exposing them lets
    tests assert *why* the verifier reached a verdict.
    """

    auth_failures: int = 0
    distinct_failed_users: int = 0
    successful_login_after_failures: bool = False
    privilege_grants: int = 0
    lolbin_with_suspicious_parent: bool = False
    lolbin_count: int = 0
    proc_parent_mismatch: bool = False
    beacon_to_external: bool = False
    beacon_count: int = 0
    large_outbound_to_external: bool = False
    largest_outbound_bytes: int = 0
    port_scan_hits: int = 0
    edr_high_match: bool = False
    file_double_ext: bool = False
    cloud_key_created_from_new_ip: bool = False
    email_link_clicked_external: bool = False
    email_attachment_opened_macro: bool = False

    # The single event the verifier considers most diagnostic; defenders
    # who cite this log_id earn a +0.1 rubric bonus.
    triggering_log_id: Optional[str] = None


def _extract_signals(events: List[Event]) -> IncidentSignals:
    """Walk the event list once and compute aggregate signals."""
    sig = IncidentSignals()
    failed_users: set[str] = set()
    last_auth_failure_idx: Optional[int] = None
    diagnostic_score: int = -1
    diagnostic_log_id: Optional[str] = None

    def bump(score: int, log_id: str) -> None:
        nonlocal diagnostic_score, diagnostic_log_id
        if score > diagnostic_score:
            diagnostic_score = score
            diagnostic_log_id = log_id

    for idx, e in enumerate(events):
        f = e.fields or {}
        if e.event_type is EventType.AUTH_LOGIN_FAILURE:
            sig.auth_failures += 1
            if "user" in f:
                failed_users.add(str(f["user"]))
            last_auth_failure_idx = idx
            bump(1, e.log_id)

        elif e.event_type is EventType.AUTH_LOGIN_SUCCESS:
            if last_auth_failure_idx is not None and sig.auth_failures >= 3:
                sig.successful_login_after_failures = True
                bump(5, e.log_id)

        elif e.event_type is EventType.AUTH_PRIVILEGE_GRANT:
            sig.privilege_grants += 1
            bump(7, e.log_id)

        elif e.event_type is EventType.PROC_LOLBIN:
            sig.lolbin_count += 1
            parent = str(f.get("parent_process", "")).lower()
            binary = str(f.get("process", "")).lower()
            if binary in KNOWN_LOLBINS and parent in SUSPICIOUS_LOLBIN_PARENTS:
                sig.lolbin_with_suspicious_parent = True
                bump(8, e.log_id)
            else:
                bump(2, e.log_id)

        elif e.event_type is EventType.PROC_PARENT_MISMATCH:
            sig.proc_parent_mismatch = True
            bump(6, e.log_id)

        elif e.event_type is EventType.NET_BEACON:
            sig.beacon_count += 1
            dst = str(f.get("dst_ip", ""))
            if dst and not is_internal_ip(dst):
                sig.beacon_to_external = True
                bump(7, e.log_id)
            else:
                bump(2, e.log_id)

        elif e.event_type is EventType.NET_OUTBOUND:
            dst = str(f.get("dst_ip", ""))
            try:
                size = int(f.get("bytes_out", 0))
            except (TypeError, ValueError):
                size = 0
            sig.largest_outbound_bytes = max(sig.largest_outbound_bytes, size)
            if dst and not is_internal_ip(dst) and size >= 50_000_000:
                sig.large_outbound_to_external = True
                bump(7, e.log_id)

        elif e.event_type is EventType.NET_PORT_SCAN_HIT:
            sig.port_scan_hits += 1
            bump(3, e.log_id)

        elif e.event_type is EventType.EDR_BEHAVIOR_MATCH:
            sev = str(f.get("severity", "medium")).lower()
            if sev in ("high", "critical"):
                sig.edr_high_match = True
                bump(8, e.log_id)
            else:
                bump(3, e.log_id)

        elif e.event_type is EventType.FILE_RENAME_DOUBLE_EXT:
            sig.file_double_ext = True
            bump(5, e.log_id)

        elif e.event_type is EventType.CLOUD_KEY_CREATED:
            ip = str(f.get("source_ip", ""))
            if ip and not is_internal_ip(ip):
                sig.cloud_key_created_from_new_ip = True
                bump(7, e.log_id)
            else:
                bump(2, e.log_id)

        elif e.event_type is EventType.EMAIL_LINK_CLICKED:
            url = str(f.get("url", ""))
            if any(tld in url for tld in (".ru", ".cn", ".top", ".xyz", ".click")):
                sig.email_link_clicked_external = True
                bump(5, e.log_id)
            else:
                bump(2, e.log_id)

        elif e.event_type is EventType.EMAIL_ATTACHMENT_OPENED:
            if bool(f.get("macro_enabled", False)):
                sig.email_attachment_opened_macro = True
                bump(6, e.log_id)
            else:
                bump(2, e.log_id)

    sig.distinct_failed_users = len(failed_users)
    sig.triggering_log_id = diagnostic_log_id or (events[-1].log_id if events else None)
    return sig


# ---------------------------------------------------------------------------
# Ground-truth labeler
# ---------------------------------------------------------------------------

def compute_ground_truth(params: IncidentParams) -> Tuple[TriageAction, IncidentSignals]:
    """Derive the canonical triage action from event content.

    Rules (highest-priority first; first match wins):

    1. ESCALATE if the incident shows multiple containment-grade signals
       (e.g. LOLBin-with-suspicious-parent AND beacon-to-external) — these
       are blast-radius events worth a human pager.
    2. BLOCK_IP if there is a clear external malicious-network signal
       (beacon-to-external, large outbound to external, malicious URL click).
    3. QUARANTINE_HOST if there is a clear endpoint-compromise signal
       (LOLBin from suspicious parent, parent mismatch, EDR high match,
       privilege grant after auth-failure burst).
    4. MONITOR for weaker signals worth watching (port scan hits,
       email link click to common TLD, brute-force-without-success).
    5. DISMISS otherwise (benign noise).
    """
    sig = _extract_signals(params.events)

    network_compromise = (
        sig.beacon_to_external
        or sig.large_outbound_to_external
        or sig.email_link_clicked_external
    )
    endpoint_compromise = (
        sig.lolbin_with_suspicious_parent
        or sig.proc_parent_mismatch
        or sig.edr_high_match
        or (sig.privilege_grants > 0 and sig.successful_login_after_failures)
        or sig.cloud_key_created_from_new_ip
        or sig.email_attachment_opened_macro
    )

    if network_compromise and endpoint_compromise:
        return TriageAction.ESCALATE, sig
    if network_compromise:
        return TriageAction.BLOCK_IP, sig
    if endpoint_compromise:
        return TriageAction.QUARANTINE_HOST, sig

    monitor_signal = (
        sig.port_scan_hits >= 1
        or sig.successful_login_after_failures
        or (sig.auth_failures >= 5 and sig.distinct_failed_users >= 1)
        or sig.file_double_ext
        or sig.beacon_count > 0
    )
    if monitor_signal:
        return TriageAction.MONITOR, sig

    return TriageAction.DISMISS, sig


# ---------------------------------------------------------------------------
# Plausibility checker
# ---------------------------------------------------------------------------

def check_plausibility(params: IncidentParams) -> Tuple[bool, str, Optional[str]]:
    """Return ``(ok, reason, triggering_log_id)``.

    The plausibility checker is what stops the attacker from gibberish-hacking
    the reward: an incident must form a coherent story, not just contradict
    itself.  Examples of what we reject:

      * The attacker claims `target_label == BLOCK_IP` but no event involves
        an external IP at all — the env-computed label cannot match.
      * A `proc.lolbin_use` event with no `process` field.
      * A `net.outbound_connection` with bytes_out < 0 or non-numeric.
      * "exfil"-style large transfer where the destination is internal.

    Note: the verifier label is *always* computed from events, regardless of
    whether plausibility passes.  Plausibility only gates the *attacker's*
    reward; the defender is graded on every incident the env materializes.
    """
    sig = _extract_signals(params.events)
    triggering = sig.triggering_log_id

    for e in params.events:
        f = e.fields or {}

        if e.event_type is EventType.PROC_LOLBIN:
            if not f.get("process"):
                return False, f"{e.log_id}: lolbin_use without `process` field", triggering
            if not f.get("parent_process"):
                return False, f"{e.log_id}: lolbin_use without `parent_process` field", triggering

        if e.event_type is EventType.NET_OUTBOUND:
            try:
                size = int(f.get("bytes_out", 0))
            except (TypeError, ValueError):
                return False, f"{e.log_id}: bytes_out not numeric", triggering
            if size < 0:
                return False, f"{e.log_id}: negative bytes_out", triggering
            dst = str(f.get("dst_ip", ""))
            if not dst:
                return False, f"{e.log_id}: outbound without dst_ip", triggering

        if e.event_type is EventType.NET_BEACON:
            if not f.get("dst_ip"):
                return False, f"{e.log_id}: beacon without dst_ip", triggering

        if e.event_type is EventType.AUTH_LOGIN_FAILURE:
            if not f.get("user"):
                return False, f"{e.log_id}: login_failure without user", triggering

    label_from_events, _ = compute_ground_truth(params)

    target_cost = ACTION_COST[params.target_label]
    actual_cost = ACTION_COST[label_from_events]
    if abs(target_cost - actual_cost) >= 2:
        return (
            False,
            (
                f"target_label={params.target_label.value} but events imply "
                f"{label_from_events.value} — params and content disagree"
            ),
            triggering,
        )

    if (
        params.category.value == "data_exfiltration"
        and not sig.large_outbound_to_external
        and sig.largest_outbound_bytes > 0
    ):
        return (
            False,
            "category=data_exfiltration but no external destination > 50MB",
            triggering,
        )

    return True, "ok", triggering


__all__ = [
    "IncidentSignals",
    "compute_ground_truth",
    "check_plausibility",
]