| """ |
| generator.py — Deterministic, seeded incident generator. |
| |
| Used by: |
| |
| * `OpenSOCEnv` in `defender_only` mode, when the env needs to materialize a |
| self-contained incident for the defender (SFT warm-start, eval, smoke |
| tests, curriculum starter prompts). |
| * `train/sft_warmstart.py` to produce ~600 (incident, triage) pairs for |
| bootstrapping defender format learning. |
| * `train/make_holdout.py` to build the frozen 200-incident eval set. |
| |
| The generator emits `IncidentParams` instances; the env then materializes |
| them into `Incident` objects with a SIEM-style `Alert` summary. The |
| attacker is *not* required to use this generator — its only job is to give |
| the env a deterministic starting distribution for stages 1-4. |
| |
| Seeding contract |
| ---------------- |
| ``generate_incident(stage_id, seed=N)`` is referentially transparent: |
| calling it with the same arguments anywhere in the codebase returns the |
| exact same incident. This is what makes the held-out eval set |
| reproducible across machines. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import random |
| from datetime import datetime, timedelta, timezone |
| from typing import Callable, Dict, List, Tuple |
|
|
| from schema import ( |
| Alert, |
| EventType, |
| IncidentCategory, |
| IncidentParams, |
| TriageAction, |
| make_event, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _ts_iter(start: datetime, n: int, step_s: int = 5) -> List[str]: |
| """Return n monotonic ISO-8601 UTC timestamps starting at `start`.""" |
| return [ |
| (start + timedelta(seconds=step_s * i)) |
| .replace(tzinfo=timezone.utc) |
| .strftime("%Y-%m-%dT%H:%M:%SZ") |
| for i in range(n) |
| ] |
|
|
|
|
| def _start_time(rng: random.Random) -> datetime: |
| """Pick a recent UTC start time anchored on the current calendar day.""" |
| base = datetime(2026, 4, 25, 0, 0, 0) |
| minutes = rng.randint(0, 60 * 23) |
| return base + timedelta(minutes=minutes) |
|
|
|
|
| |
| |
| |
|
|
| def _benign_login(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 1) |
| user = rng.choice(["alice", "bob", "carol", "dave", "erin"]) |
| return IncidentCategory.BENIGN_NOISE, [ |
| make_event( |
| 1, 0, EventType.AUTH_LOGIN_SUCCESS, ts[0], |
| source="identity", user=user, src_ip=f"10.0.0.{rng.randint(2, 200)}", |
| ) |
| ] |
|
|
|
|
| def _benign_internal_beacon(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 1) |
| return IncidentCategory.BENIGN_NOISE, [ |
| make_event( |
| 1, 0, EventType.NET_BEACON, ts[0], |
| source="network", dst_ip=f"10.0.{rng.randint(0, 255)}.{rng.randint(2, 250)}", |
| interval_s=rng.choice([30, 60, 90]), |
| ) |
| ] |
|
|
|
|
| def _monitor_brute_force(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| n_failures = rng.randint(5, 9) |
| ts = _ts_iter(_start_time(rng), n_failures) |
| user = rng.choice(["root", "admin", "service_acct", "alice"]) |
| return IncidentCategory.BRUTE_FORCE, [ |
| make_event( |
| 1, i, EventType.AUTH_LOGIN_FAILURE, ts[i], |
| source="identity", user=user, |
| src_ip=f"203.0.113.{rng.randint(2, 250)}", |
| ) |
| for i in range(n_failures) |
| ] |
|
|
|
|
| def _monitor_port_scan(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 1) |
| return IncidentCategory.BENIGN_NOISE, [ |
| make_event( |
| 1, 0, EventType.NET_PORT_SCAN_HIT, ts[0], |
| source="network", |
| src_ip=f"203.0.113.{rng.randint(2, 250)}", |
| port=rng.choice([22, 23, 445, 3389, 5985]), |
| ), |
| ] |
|
|
|
|
| def _quarantine_lolbin(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 2) |
| parent = rng.choice(["outlook.exe", "winword.exe", "excel.exe", "chrome.exe"]) |
| return IncidentCategory.MALWARE_EXECUTION, [ |
| make_event( |
| 1, 0, EventType.PROC_LOLBIN, ts[0], |
| source="endpoint", |
| process="powershell.exe", |
| parent_process=parent, |
| cmd="-enc " + "A" * rng.randint(40, 80), |
| ), |
| make_event( |
| 1, 1, EventType.FILE_WRITE, ts[1], |
| source="endpoint", |
| path=f"C:\\Users\\Public\\{rng.choice(['payload', 'svc', 'tmp'])}.exe", |
| ), |
| ] |
|
|
|
|
| def _quarantine_edr_high(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 1) |
| return IncidentCategory.MALWARE_EXECUTION, [ |
| make_event( |
| 1, 0, EventType.EDR_BEHAVIOR_MATCH, ts[0], |
| source="edr", severity="high", rule_id=f"EDR-{rng.randint(1000, 9999)}", |
| host=f"host-{rng.randint(1, 50):03d}", |
| ) |
| ] |
|
|
|
|
| def _block_ip_external_beacon(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 1) |
| return IncidentCategory.C2_BEACON, [ |
| make_event( |
| 1, 0, EventType.NET_BEACON, ts[0], |
| source="network", |
| dst_ip=rng.choice(["203.0.113.5", "198.51.100.42", "185.220.101.7"]), |
| interval_s=rng.choice([30, 60, 90]), |
| ) |
| ] |
|
|
|
|
| def _block_ip_phish_url(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 1) |
| tld = rng.choice([".ru", ".cn", ".top", ".xyz", ".click"]) |
| return IncidentCategory.PHISHING, [ |
| make_event( |
| 1, 0, EventType.EMAIL_LINK_CLICKED, ts[0], |
| source="email", |
| url=f"https://login-update{tld}/secure", |
| user=rng.choice(["alice", "bob", "carol"]), |
| ) |
| ] |
|
|
|
|
| def _escalate_combined(rng: random.Random) -> Tuple[IncidentCategory, list]: |
| ts = _ts_iter(_start_time(rng), 3, step_s=15) |
| return IncidentCategory.MALWARE_EXECUTION, [ |
| make_event( |
| 1, 0, EventType.PROC_LOLBIN, ts[0], |
| source="endpoint", |
| process="powershell.exe", parent_process="outlook.exe", |
| cmd="-enc " + "B" * 60, |
| ), |
| make_event( |
| 1, 1, EventType.NET_BEACON, ts[1], |
| source="network", dst_ip="203.0.113.5", interval_s=30, |
| ), |
| make_event( |
| 1, 2, EventType.AUTH_PRIVILEGE_GRANT, ts[2], |
| source="identity", user=rng.choice(["alice", "bob"]), role="admin", |
| ), |
| ] |
|
|
|
|
| |
| TEMPLATES: Dict[TriageAction, List[Callable[[random.Random], Tuple[IncidentCategory, list]]]] = { |
| TriageAction.DISMISS: [_benign_login, _benign_internal_beacon], |
| TriageAction.MONITOR: [_monitor_brute_force, _monitor_port_scan], |
| TriageAction.QUARANTINE_HOST: [_quarantine_lolbin, _quarantine_edr_high], |
| TriageAction.BLOCK_IP: [_block_ip_external_beacon, _block_ip_phish_url], |
| TriageAction.ESCALATE: [_escalate_combined], |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| STAGE_CONFIGS: Dict[str, dict] = { |
| "stage1_basic": { |
| "label_distribution": { |
| TriageAction.DISMISS: 0.30, |
| TriageAction.MONITOR: 0.20, |
| TriageAction.QUARANTINE_HOST: 0.20, |
| TriageAction.BLOCK_IP: 0.20, |
| TriageAction.ESCALATE: 0.10, |
| }, |
| "decoys": 0, |
| "jitter": 0.0, |
| }, |
| "stage2_multi": { |
| "label_distribution": { |
| TriageAction.DISMISS: 0.20, |
| TriageAction.MONITOR: 0.20, |
| TriageAction.QUARANTINE_HOST: 0.25, |
| TriageAction.BLOCK_IP: 0.20, |
| TriageAction.ESCALATE: 0.15, |
| }, |
| "decoys": 1, |
| "jitter": 0.2, |
| }, |
| "stage3_mixed": { |
| "label_distribution": { |
| TriageAction.DISMISS: 0.25, |
| TriageAction.MONITOR: 0.25, |
| TriageAction.QUARANTINE_HOST: 0.20, |
| TriageAction.BLOCK_IP: 0.15, |
| TriageAction.ESCALATE: 0.15, |
| }, |
| "decoys": 2, |
| "jitter": 0.4, |
| }, |
| "stage4_adversarial": { |
| "label_distribution": { |
| TriageAction.DISMISS: 0.30, |
| TriageAction.MONITOR: 0.25, |
| TriageAction.QUARANTINE_HOST: 0.15, |
| TriageAction.BLOCK_IP: 0.15, |
| TriageAction.ESCALATE: 0.15, |
| }, |
| "decoys": 3, |
| "jitter": 0.7, |
| }, |
| } |
|
|
|
|
| def _sample_label(rng: random.Random, dist: Dict[TriageAction, float]) -> TriageAction: |
| labels = list(dist.keys()) |
| weights = [dist[lab] for lab in labels] |
| return rng.choices(labels, weights=weights, k=1)[0] |
|
|
|
|
| def _make_decoy_events(rng: random.Random, n_decoys: int, start_idx: int) -> list: |
| """Generate `n_decoys` benign decoy events that don't change the label. |
| |
| Decoys are drawn from a pool that is provably benign by the verifier: |
| a successful login, an internal DNS query, an internal outbound flow. |
| """ |
| ts = _ts_iter(_start_time(rng), n_decoys, step_s=2) |
| decoys = [] |
| for i in range(n_decoys): |
| choice = rng.randint(0, 2) |
| n = start_idx + i |
| if choice == 0: |
| decoys.append(make_event( |
| 1, n, EventType.AUTH_LOGIN_SUCCESS, ts[i], |
| source="identity", |
| user=rng.choice(["alice", "bob", "carol", "dave"]), |
| src_ip=f"10.0.0.{rng.randint(2, 250)}", |
| )) |
| elif choice == 1: |
| decoys.append(make_event( |
| 1, n, EventType.NET_DNS_QUERY, ts[i], |
| source="network", |
| domain=rng.choice(["github.com", "google.com", "internal.corp"]), |
| )) |
| else: |
| decoys.append(make_event( |
| 1, n, EventType.NET_OUTBOUND, ts[i], |
| source="network", |
| dst_ip=f"10.0.{rng.randint(0, 255)}.{rng.randint(2, 250)}", |
| bytes_out=rng.randint(1_000, 100_000), |
| )) |
| return decoys |
|
|
|
|
| def _renumber_and_resort(events: list) -> list: |
| """Rewrite log_ids to L1-0..L1-N-1 and sort by timestamp.""" |
| events = sorted(events, key=lambda e: e.timestamp) |
| fixed = [] |
| for i, e in enumerate(events): |
| fixed.append( |
| type(e)( |
| log_id=f"L1-{i}", |
| timestamp=e.timestamp, |
| source=e.source, |
| event_type=e.event_type, |
| fields=e.fields, |
| ) |
| ) |
| return fixed |
|
|
|
|
| |
| |
| |
|
|
| def generate_incident(stage_id: str, seed: int) -> IncidentParams: |
| """Deterministically generate an `IncidentParams` for the given stage.""" |
| if stage_id not in STAGE_CONFIGS: |
| raise ValueError( |
| f"Unknown stage_id {stage_id!r}; choose from {list(STAGE_CONFIGS)}" |
| ) |
| cfg = STAGE_CONFIGS[stage_id] |
| rng = random.Random(seed) |
|
|
| label = _sample_label(rng, cfg["label_distribution"]) |
| template = rng.choice(TEMPLATES[label]) |
| category, core_events = template(rng) |
|
|
| decoy_events = _make_decoy_events(rng, cfg["decoys"], start_idx=len(core_events)) |
| events = _renumber_and_resort(core_events + decoy_events) |
|
|
| return IncidentParams( |
| target_label=label, |
| category=category, |
| events=events, |
| narrative="", |
| ) |
|
|
|
|
| def make_alert(params: IncidentParams, alert_id: str) -> Alert: |
| """Synthesize a SIEM alert summary from an incident's events.""" |
| sev_for_label = { |
| TriageAction.DISMISS: "low", |
| TriageAction.MONITOR: "medium", |
| TriageAction.QUARANTINE_HOST: "high", |
| TriageAction.BLOCK_IP: "high", |
| TriageAction.ESCALATE: "critical", |
| } |
| severity = sev_for_label.get(params.target_label, "medium") |
| first_event = params.events[0] |
| user = str(first_event.fields.get("user", "user-001")) |
| host = str(first_event.fields.get("host", "host-001")) |
| summary = f"{params.category.value}: {len(params.events)} event(s); first={first_event.event_type.value}" |
| return Alert( |
| alert_id=alert_id, |
| category=params.category, |
| severity=severity, |
| summary=summary, |
| host=host, |
| user=user, |
| ) |
|
|
|
|
| __all__ = ["generate_incident", "make_alert", "STAGE_CONFIGS", "TEMPLATES"] |
|
|