"""Multi-stage attack orchestrator following Cyber Kill Chain model. Each attacker has a scenario (one of 5 patterns) and progresses through phases 0→3. Adaptation is non-trivial: - Detected attackers may switch to stealth mode (mimic benign profiles) - Undetected attackers escalate normally - Fully blocked attackers are terminated - Attackers that reach exfiltration (phase 3) are marked as succeeded """ from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Set import numpy as np # Updated import path from server.utils.data_loader import TrafficGenerator SCENARIOS = [ "port_scan_exploit_c2", "credential_stuffing_lateral", "supply_chain_compromise", "low_and_slow_apt", "ddos_amplification", ] # How many sessions each scenario generates per phase SESSION_COUNTS: Dict[str, List[int]] = { "port_scan_exploit_c2": [4, 2, 1, 2], "credential_stuffing_lateral": [3, 3, 2, 2], "supply_chain_compromise": [1, 1, 1, 2], "low_and_slow_apt": [1, 1, 1, 1], "ddos_amplification": [6, 10, 15, 20], } # Probability that an attacker escalates per tick (if not detected) ESCALATION_PROB: Dict[str, float] = { "port_scan_exploit_c2": 0.30, "credential_stuffing_lateral": 0.25, "supply_chain_compromise": 0.15, "low_and_slow_apt": 0.10, "ddos_amplification": 0.40, } @dataclass class AttackerState: attacker_id: str scenario: str phase: int = 0 times_detected: int = 0 stealth_mode: bool = False alive: bool = True succeeded: bool = False ticks_alive: int = 0 sessions_blocked: int = 0 sessions_generated: int = 0 class ThreatEngine: """Manages the lifecycle of active attackers and generates attack sessions.""" def __init__(self, seed: int = 0) -> None: self.rng = np.random.default_rng(seed) self._attacker_counter = 0 self._active_attackers: Dict[str, AttackerState] = {} self._dead_attackers: List[AttackerState] = [] self._threat_intel: Dict = { "known_bad_ports": [21, 22, 23, 25, 445, 3389, 5900], "known_bad_ja3_ranges": [(200, 255), (230, 255)], "active_campaigns": [], "recent_detections": 0, } def reset(self) -> None: self._attacker_counter = 0 self._active_attackers = {} self._dead_attackers = [] self._threat_intel["active_campaigns"] = [] self._threat_intel["recent_detections"] = 0 def maybe_spawn_attacker(self, threat_probability: float) -> None: """Probabilistically spawn a new attacker.""" if self.rng.random() > threat_probability: return self._attacker_counter += 1 scenario = SCENARIOS[int(self.rng.integers(0, len(SCENARIOS)))] attacker_id = f"a-{self._attacker_counter:04d}" state = AttackerState(attacker_id=attacker_id, scenario=scenario) self._active_attackers[attacker_id] = state # Update threat intel campaigns = set(self._threat_intel["active_campaigns"]) campaigns.add(scenario) self._threat_intel["active_campaigns"] = sorted(campaigns) def generate_attack_sessions( self, tick: int, generator: TrafficGenerator, blocked_attackers: Set[str], ) -> List[Dict]: """Generate attack sessions for all active attackers, handling adaptation.""" sessions: List[Dict] = [] for attacker in list(self._active_attackers.values()): if not attacker.alive: continue attacker.ticks_alive += 1 # --- Handle detection / blocking --- if attacker.attacker_id in blocked_attackers: attacker.times_detected += 1 attacker.sessions_blocked += 1 self._threat_intel["recent_detections"] += 1 if attacker.times_detected >= 3: # Fully blocked — attacker gives up attacker.alive = False self._dead_attackers.append(attacker) continue elif attacker.times_detected >= 2: # Switch to stealth mode — generate fewer, more benign-looking sessions attacker.stealth_mode = True else: # First detection — try to advance past detected phase attacker.phase = min(attacker.phase + 1, 3) # --- Natural phase escalation --- elif self.rng.random() < ESCALATION_PROB.get(attacker.scenario, 0.2): attacker.phase = min(attacker.phase + 1, 3) # --- Check for success (exfiltration complete) --- if attacker.phase == 3 and attacker.ticks_alive > 8: if self.rng.random() < 0.15: attacker.succeeded = True attacker.alive = False self._dead_attackers.append(attacker) continue # --- Generate sessions based on current state --- counts = SESSION_COUNTS.get(attacker.scenario, [2, 2, 2, 2]) count = counts[min(attacker.phase, 3)] if attacker.stealth_mode: # In stealth mode: reduce count, use profiles that look more benign count = max(1, count // 2) generated = generator.generate_malicious_sessions( tick=tick, count=count, attack_phase=attacker.phase, scenario=attacker.scenario, attacker_id=attacker.attacker_id, ) attacker.sessions_generated += len(generated) sessions.extend(generated) return sessions def intelligence_feed(self) -> Dict: """Return threat intelligence available to the agent.""" active_scenarios = set() for a in self._active_attackers.values(): if a.alive: active_scenarios.add(a.scenario) self._threat_intel["active_campaigns"] = sorted(active_scenarios) return dict(self._threat_intel) def attacker_outcomes(self) -> Dict[str, str]: """Return status of all known attackers (for info/debugging).""" outcomes: Dict[str, str] = {} for a in self._active_attackers.values(): if a.alive: outcomes[a.attacker_id] = "active" elif a.succeeded: outcomes[a.attacker_id] = "succeeded" else: outcomes[a.attacker_id] = "stopped" for a in self._dead_attackers: if a.attacker_id not in outcomes: outcomes[a.attacker_id] = "succeeded" if a.succeeded else "stopped" return outcomes