Spaces:
Sleeping
Sleeping
| """Scripted attacker policy. | |
| The attacker is a small state machine that walks the per-scenario stage DAG. | |
| It is intentionally *not* a learned agent - we want a deterministic-ish | |
| adversary so reward signals stay attributable, and we want three legible | |
| "personalities" for the storytelling layer: | |
| * STEALTHY - longer dwell, quieter alerts, pauses after defender activity. | |
| * AGGRESSIVE - shorter dwell, louder alerts, never pauses. | |
| * OPPORTUNISTIC- nominal dwell/noise, will reroute around blocked stages. | |
| The attacker's step() runs once per environment tick and returns an | |
| :class:`AttackerEvent` describing what happened: stages it started, stages it | |
| completed (success or failure), stages that were blocked by defender action, | |
| plus any alerts that should surface to the defender (with their per-alert | |
| lag already applied). | |
| """ | |
| from __future__ import annotations | |
| import random | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Dict, List, Optional, Set, Tuple | |
| # Two-mode import: relative when ``cybersec`` is the parent package, | |
| # absolute when this module is loaded as a top-level module from CWD on | |
| # Hugging Face Spaces. | |
| try: | |
| from .models import AlertEvent, AlertSignal, AttackerPersonality | |
| from .scenarios import AttackStage, Scenario | |
| except ImportError: # pragma: no cover - HF Spaces / docker runtime path | |
| from models import AlertEvent, AlertSignal, AttackerPersonality # type: ignore[no-redef] | |
| from scenarios import AttackStage, Scenario # type: ignore[no-redef] | |
| # --------------------------------------------------------------------------- | |
| # Personality knobs | |
| # --------------------------------------------------------------------------- | |
| class PersonalityProfile: | |
| """Numeric tweaks the attacker applies to every stage.""" | |
| dwell_multiplier: float | |
| detection_multiplier: float | |
| success_bias: float # added to (then clamped) per-stage success_prob | |
| pause_after_defender_action_prob: float | |
| reroute_on_block: bool | |
| _PROFILES: Dict[AttackerPersonality, PersonalityProfile] = { | |
| AttackerPersonality.STEALTHY: PersonalityProfile( | |
| dwell_multiplier=1.5, | |
| detection_multiplier=0.55, | |
| success_bias=-0.05, | |
| pause_after_defender_action_prob=0.5, | |
| reroute_on_block=False, | |
| ), | |
| AttackerPersonality.AGGRESSIVE: PersonalityProfile( | |
| dwell_multiplier=0.6, | |
| detection_multiplier=1.3, | |
| success_bias=0.05, | |
| pause_after_defender_action_prob=0.0, | |
| reroute_on_block=False, | |
| ), | |
| AttackerPersonality.OPPORTUNISTIC: PersonalityProfile( | |
| dwell_multiplier=1.0, | |
| detection_multiplier=1.0, | |
| success_bias=0.0, | |
| pause_after_defender_action_prob=0.15, | |
| reroute_on_block=True, | |
| ), | |
| } | |
| def get_personality_profile(personality: AttackerPersonality) -> PersonalityProfile: | |
| return _PROFILES[personality] | |
| # --------------------------------------------------------------------------- | |
| # Per-stage runtime state | |
| # --------------------------------------------------------------------------- | |
| class StageStatus(str, Enum): | |
| IDLE = "idle" | |
| IN_PROGRESS = "in_progress" | |
| SUCCEEDED = "succeeded" | |
| FAILED = "failed" | |
| BLOCKED = "blocked" | |
| class StageRuntime: | |
| """Mutable runtime state for one stage.""" | |
| stage: AttackStage | |
| status: StageStatus = StageStatus.IDLE | |
| started_tick: Optional[int] = None | |
| target_dwell: int = 0 | |
| completes_at: Optional[int] = None | |
| pending_alerts: List[Tuple[int, AlertEvent]] = field(default_factory=list) | |
| last_alert_tick: int = -1 | |
| # --------------------------------------------------------------------------- | |
| # Defender view contract (passed in by env.py) | |
| # --------------------------------------------------------------------------- | |
| class DefenderView: | |
| """Slice of defender state the attacker reads when deciding what to do.""" | |
| isolated_assets: Set[str] | |
| revoked_identities: Set[str] | |
| blocked_egress_assets: Set[str] | |
| patched_assets: Set[str] | |
| defender_acted_this_tick: bool | |
| # --------------------------------------------------------------------------- | |
| # AttackerEvent: what happened this tick | |
| # --------------------------------------------------------------------------- | |
| class AttackerEvent: | |
| started: List[str] = field(default_factory=list) | |
| succeeded: List[str] = field(default_factory=list) | |
| failed: List[str] = field(default_factory=list) | |
| blocked_active: List[str] = field(default_factory=list) | |
| blocked_preemptive: List[str] = field(default_factory=list) | |
| surfaced_alerts: List[AlertEvent] = field(default_factory=list) | |
| exfil_completed: bool = False | |
| def blocked(self) -> List[str]: | |
| """Convenience: union of active + preemptive blocks (read-only).""" | |
| return self.blocked_active + self.blocked_preemptive | |
| # --------------------------------------------------------------------------- | |
| # ScriptedAttacker | |
| # --------------------------------------------------------------------------- | |
| class ScriptedAttacker: | |
| """Step-driven attacker walking a scenario's stage DAG. | |
| Update order each tick (called by ``env.step`` after the defender action | |
| has already been applied): | |
| 1. Drain any pending alerts that have reached their visibility tick. | |
| 2. Resolve in-progress stages whose dwell timer fires this tick. | |
| 3. Possibly pause (stealthy / opportunistic personalities). | |
| 4. Pick at most one new stage to start, respecting prereqs and | |
| defender containment. | |
| 5. Roll lag and severity for any alerts the new stage will produce. | |
| The attacker only ever has one stage in_progress at a time. This keeps | |
| the long-horizon planning challenge legible: the defender is always | |
| racing one specific dwell timer. | |
| """ | |
| def __init__( | |
| self, | |
| scenario: Scenario, | |
| personality: AttackerPersonality, | |
| rng: random.Random, | |
| ): | |
| self.scenario = scenario | |
| self.personality = personality | |
| self.profile = get_personality_profile(personality) | |
| self.rng = rng | |
| self.runtimes: Dict[str, StageRuntime] = { | |
| stage.stage_id: StageRuntime(stage=stage) for stage in scenario.stages | |
| } | |
| self._compromised_assets: Set[str] = set() | |
| self._compromised_identities: Set[str] = set() | |
| # ------------------------------------------------------------------ helpers | |
| def _all_prereqs_met(self, stage: AttackStage) -> bool: | |
| return all( | |
| self.runtimes[p].status is StageStatus.SUCCEEDED | |
| for p in stage.prereq_stages | |
| ) | |
| def _stage_blocked_by_defender( | |
| self, stage: AttackStage, defender: DefenderView | |
| ) -> bool: | |
| if stage.target_asset and stage.target_asset in defender.isolated_assets: | |
| return True | |
| if stage.target_identity and stage.target_identity in defender.revoked_identities: | |
| return True | |
| if stage.is_exfil and stage.target_asset in defender.blocked_egress_assets: | |
| return True | |
| return False | |
| def _pickable_stages(self, defender: DefenderView) -> List[StageRuntime]: | |
| out: List[StageRuntime] = [] | |
| for rt in self.runtimes.values(): | |
| if rt.status is not StageStatus.IDLE: | |
| continue | |
| if not self._all_prereqs_met(rt.stage): | |
| continue | |
| if self._stage_blocked_by_defender(rt.stage, defender): | |
| continue | |
| out.append(rt) | |
| return out | |
| def _has_active_stage(self) -> bool: | |
| return any(rt.status is StageStatus.IN_PROGRESS for rt in self.runtimes.values()) | |
| def _build_alerts_for_stage(self, stage: AttackStage, start_tick: int) -> List[Tuple[int, AlertEvent]]: | |
| """Roll the alert plan a stage will produce while it dwells. | |
| We emit between 0 and 2 alerts per stage. Alert visibility is | |
| ``start_tick + lag``; lag is sampled inside ``alert_lag_range``. | |
| """ | |
| adjusted_strength = max( | |
| 0.0, min(1.0, stage.detection_strength * self.profile.detection_multiplier) | |
| ) | |
| # Expected alert count grows with detection strength, capped at 2. | |
| n_alerts = 0 | |
| for _ in range(2): | |
| if self.rng.random() < adjusted_strength: | |
| n_alerts += 1 | |
| signal = self._signal_for_tactic(stage.mitre_tactic) | |
| plans: List[Tuple[int, AlertEvent]] = [] | |
| for _ in range(n_alerts): | |
| lag = self.rng.randint(*stage.alert_lag_range) | |
| severity = max(0.05, min(1.0, adjusted_strength + self.rng.uniform(-0.15, 0.2))) | |
| event = AlertEvent( | |
| tick=start_tick + lag, | |
| signal=signal, | |
| asset=stage.target_asset or None, | |
| identity=stage.target_identity or None, | |
| severity=round(severity, 3), | |
| description=f"{stage.mitre_tactic} :: {stage.mitre_technique}", | |
| ) | |
| plans.append((start_tick + lag, event)) | |
| return plans | |
| def _signal_for_tactic(tactic: str) -> AlertSignal: | |
| # Keep this mapping coarse-grained so the policy gets a stable signal. | |
| if "Initial Access" in tactic or "Credential" in tactic: | |
| return AlertSignal.AUTH_ANOMALY | |
| if "Lateral Movement" in tactic or "Execution" in tactic: | |
| return AlertSignal.LATERAL_MOVEMENT | |
| if "Collection" in tactic or "Discovery" in tactic: | |
| return AlertSignal.DATA_STAGING | |
| if "Exfiltration" in tactic: | |
| return AlertSignal.EGRESS_ANOMALY | |
| return AlertSignal.AUTH_ANOMALY | |
| def _start_stage(self, rt: StageRuntime, tick: int) -> None: | |
| lo, hi = rt.stage.dwell_range | |
| scaled_lo = max(1, int(round(lo * self.profile.dwell_multiplier))) | |
| scaled_hi = max(scaled_lo, int(round(hi * self.profile.dwell_multiplier))) | |
| rt.target_dwell = self.rng.randint(scaled_lo, scaled_hi) | |
| rt.started_tick = tick | |
| rt.completes_at = tick + rt.target_dwell | |
| rt.status = StageStatus.IN_PROGRESS | |
| rt.pending_alerts = self._build_alerts_for_stage(rt.stage, tick) | |
| def _resolve_stage(self, rt: StageRuntime, defender: DefenderView) -> str: | |
| """Resolve a stage whose dwell timer just expired. | |
| Returns one of: 'succeeded', 'failed', 'blocked'. | |
| """ | |
| if self._stage_blocked_by_defender(rt.stage, defender): | |
| rt.status = StageStatus.BLOCKED | |
| return "blocked" | |
| success_prob = max( | |
| 0.05, | |
| min(0.99, rt.stage.success_prob + self.profile.success_bias), | |
| ) | |
| # Patching the target asset cuts success probability noticeably. | |
| if rt.stage.target_asset and rt.stage.target_asset in defender.patched_assets: | |
| success_prob = max(0.05, success_prob - 0.3) | |
| if self.rng.random() < success_prob: | |
| rt.status = StageStatus.SUCCEEDED | |
| if rt.stage.compromises_asset and rt.stage.target_asset: | |
| self._compromised_assets.add(rt.stage.target_asset) | |
| if rt.stage.compromises_identity and rt.stage.target_identity: | |
| self._compromised_identities.add(rt.stage.target_identity) | |
| return "succeeded" | |
| rt.status = StageStatus.FAILED | |
| return "failed" | |
| # ------------------------------------------------------------------ main step | |
| def step(self, tick: int, defender: DefenderView) -> AttackerEvent: | |
| """Advance the attacker by one tick and return what happened.""" | |
| ev = AttackerEvent() | |
| # 1. Surface alerts that have reached their visibility tick. Even | |
| # for blocked stages, alerts in flight before the block still fire | |
| # (defenders shouldn't be retroactively rewarded for late alerts). | |
| for rt in self.runtimes.values(): | |
| if not rt.pending_alerts: | |
| continue | |
| still_pending: List[Tuple[int, AlertEvent]] = [] | |
| for visible_at, alert in rt.pending_alerts: | |
| if visible_at <= tick: | |
| ev.surfaced_alerts.append(alert) | |
| rt.last_alert_tick = tick | |
| else: | |
| still_pending.append((visible_at, alert)) | |
| rt.pending_alerts = still_pending | |
| # 2. Resolve any in-progress stage that has now reached its dwell. | |
| for rt in self.runtimes.values(): | |
| if rt.status is not StageStatus.IN_PROGRESS: | |
| continue | |
| # Mark blocked early so the defender gets credit even if the | |
| # dwell hasn't fully elapsed. | |
| if self._stage_blocked_by_defender(rt.stage, defender): | |
| rt.status = StageStatus.BLOCKED | |
| ev.blocked_active.append(rt.stage.stage_id) | |
| continue | |
| if rt.completes_at is not None and tick >= rt.completes_at: | |
| outcome = self._resolve_stage(rt, defender) | |
| if outcome == "succeeded": | |
| ev.succeeded.append(rt.stage.stage_id) | |
| if rt.stage.is_exfil: | |
| ev.exfil_completed = True | |
| elif outcome == "blocked": | |
| ev.blocked_active.append(rt.stage.stage_id) | |
| else: | |
| ev.failed.append(rt.stage.stage_id) | |
| # 3. Personality-driven gating: maybe pause this tick. | |
| skip_start = False | |
| if self._has_active_stage(): | |
| # Already busy; nothing else to start. | |
| skip_start = True | |
| elif ( | |
| defender.defender_acted_this_tick | |
| and self.rng.random() < self.profile.pause_after_defender_action_prob | |
| ): | |
| skip_start = True | |
| # 4. Pick the next stage. Prefer canonical DAG order; opportunistic | |
| # will reroute around dead-ends by skipping permanently-blocked stages. | |
| if not skip_start: | |
| candidates = self._pickable_stages(defender) | |
| if candidates: | |
| if self.profile.reroute_on_block: | |
| # Prefer the earliest-defined stage among candidates (DAG | |
| # order is preserved by Python dict insertion). | |
| choice = candidates[0] | |
| else: | |
| # Aggressive / stealthy stick to canonical order even if | |
| # the earliest available has a contained target. | |
| choice = candidates[0] | |
| self._start_stage(choice, tick) | |
| ev.started.append(choice.stage.stage_id) | |
| # 5. Flip any IDLE stage whose target the defender has already | |
| # contained into BLOCKED (preemptive containment). This is what | |
| # rewards good "yank the crown jewels offline" defender play even | |
| # if the attacker never reaches that part of the chain. Preemptive | |
| # blocks are tracked separately because they pay much less reward. | |
| for rt in self.runtimes.values(): | |
| if rt.status is not StageStatus.IDLE: | |
| continue | |
| if self._stage_blocked_by_defender(rt.stage, defender): | |
| rt.status = StageStatus.BLOCKED | |
| ev.blocked_preemptive.append(rt.stage.stage_id) | |
| return ev | |
| # ------------------------------------------------------------------ public read-only | |
| def compromised_assets(self) -> Set[str]: | |
| return set(self._compromised_assets) | |
| def compromised_identities(self) -> Set[str]: | |
| return set(self._compromised_identities) | |
| def succeeded_stage_ids(self) -> List[str]: | |
| return [rt.stage.stage_id for rt in self.runtimes.values() if rt.status is StageStatus.SUCCEEDED] | |
| def in_progress_stage(self) -> Optional[StageRuntime]: | |
| for rt in self.runtimes.values(): | |
| if rt.status is StageStatus.IN_PROGRESS: | |
| return rt | |
| return None | |
| def is_done(self) -> bool: | |
| """All stages are terminal (no idle, no in-progress, no resumable).""" | |
| for rt in self.runtimes.values(): | |
| if rt.status is StageStatus.IN_PROGRESS: | |
| return False | |
| if rt.status is StageStatus.IDLE: | |
| # If prereqs can still be satisfied later, attacker isn't done. | |
| if all( | |
| self.runtimes[p].status is StageStatus.SUCCEEDED | |
| for p in rt.stage.prereq_stages | |
| ): | |
| return False | |
| return True | |
| __all__ = [ | |
| "PersonalityProfile", | |
| "get_personality_profile", | |
| "StageStatus", | |
| "StageRuntime", | |
| "DefenderView", | |
| "AttackerEvent", | |
| "ScriptedAttacker", | |
| ] | |