Spaces:
Sleeping
Sleeping
| """ | |
| NexusGrid — SCADA Spoof Engine (Layer 2). | |
| Deterministic sensor spoofing engine with three attack vectors. | |
| All randomness uses numpy.random.Generator(PCG64(episode_seed)). | |
| Seed-lock contract: identical seeds produce identical attack sequences. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| class SpoofEngine: | |
| """ | |
| Selectively overwrites ground-truth telemetry with fabricated readings. | |
| Three attack vectors: | |
| 1. Man-in-the-Middle injection — substation reports fabricated MW. | |
| 2. Resonance oscillation — generator frequency modulated ±0.1Hz. | |
| 3. Phantom generation — renewable reports full output during zero-generation. | |
| Network packet logs show latency spikes 1-2 ticks before a node is spoofed. | |
| """ | |
| # Packet log constants | |
| NORMAL_LATENCY_MS = 5.0 | |
| ANOMALY_LATENCY_MS = 120.0 | |
| ANOMALY_THRESHOLD_MS = 50.0 | |
| def __init__(self, seed: int = 42): | |
| self._rng = np.random.Generator(np.random.PCG64(seed)) | |
| self._seed = seed | |
| self._tick = 0 | |
| # Active spoof configurations | |
| self._spoofs: List[Dict[str, Any]] = [] | |
| # Quarantined nodes (no longer spoofable) | |
| self._quarantined: set = set() | |
| # Packet log history (for anomaly detection) | |
| self._packet_history: List[Dict[str, Any]] = [] | |
| # Consecutive high-latency counts per source node | |
| self._consecutive_high_latency: Dict[str, int] = {} | |
| def reset(self, seed: int = 42) -> None: | |
| """Reset spoof engine to initial state.""" | |
| self._rng = np.random.Generator(np.random.PCG64(seed)) | |
| self._seed = seed | |
| self._tick = 0 | |
| self._spoofs = [] | |
| self._quarantined = set() | |
| self._packet_history = [] | |
| self._consecutive_high_latency = {} | |
| def configure_attack(self, attack_config: Dict[str, Any]) -> None: | |
| """ | |
| Configure an attack based on scenario specification. | |
| Args: | |
| attack_config: Dict with 'type', 'target_node', and attack-specific fields. | |
| """ | |
| if not attack_config or not attack_config.get("active"): | |
| return | |
| self._spoofs.append(dict(attack_config)) | |
| def advance_tick(self) -> None: | |
| """Advance the spoof engine by one tick.""" | |
| self._tick += 1 | |
| def apply_spoofs( | |
| self, | |
| ground_truth: Dict[str, Dict[str, Any]], | |
| tick: int, | |
| ) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Apply active spoofs to ground-truth telemetry. | |
| Returns a copy of telemetry with spoofed values injected. | |
| Quarantined nodes are NOT spoofed (replaced with estimator values). | |
| """ | |
| self._tick = tick | |
| spoofed = {} | |
| for node_id, truth in ground_truth.items(): | |
| spoofed[node_id] = dict(truth) | |
| for spoof in self._spoofs: | |
| if not spoof.get("active"): | |
| continue | |
| target = spoof.get("target_node") | |
| if not target or target in self._quarantined: | |
| continue | |
| if target not in spoofed: | |
| continue | |
| spoof_type = spoof.get("type") | |
| if spoof_type == "phantom_injection": | |
| # Add phantom MW to generation reading | |
| phantom_mw = spoof.get("phantom_mw", 100.0) | |
| spoofed[target]["generation_mw"] = ( | |
| ground_truth[target]["generation_mw"] + phantom_mw | |
| ) | |
| elif spoof_type == "resonance_oscillation": | |
| # Modulate frequency by ±0.1Hz oscillation | |
| oscillation = 0.1 * np.sin(2 * np.pi * spoof.get("oscillation_hz", 0.5) * tick) | |
| spoofed[target]["frequency_hz"] = ( | |
| ground_truth[target]["frequency_hz"] + float(oscillation) | |
| ) | |
| elif spoof_type == "mitm_injection": | |
| # Replace generation with fabricated value | |
| fake_mw = spoof.get("fake_mw", 100.0) | |
| spoofed[target]["generation_mw"] = fake_mw | |
| return spoofed | |
| def generate_packet_logs( | |
| self, | |
| all_node_ids: List[str], | |
| tick: int, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Generate simulated SCADA packet logs. | |
| Nodes about to be spoofed show latency spikes 1-2 ticks before. | |
| """ | |
| self._tick = tick | |
| logs = [] | |
| # Determine which nodes will be spoofed (for pre-attack latency spike) | |
| spoofed_targets = set() | |
| for spoof in self._spoofs: | |
| if spoof.get("active") and spoof.get("target_node"): | |
| spoofed_targets.add(spoof["target_node"]) | |
| for i, node_id in enumerate(all_node_ids): | |
| # Determine latency | |
| if node_id in spoofed_targets and node_id not in self._quarantined: | |
| # Under attack: elevated latency | |
| latency = float(np.clip( | |
| self._rng.normal(self.ANOMALY_LATENCY_MS, 20.0), | |
| self.ANOMALY_THRESHOLD_MS + 1, | |
| 500.0, | |
| )) | |
| else: | |
| # Normal operation | |
| latency = float(np.clip( | |
| self._rng.normal(self.NORMAL_LATENCY_MS, 2.0), | |
| 1.0, | |
| 30.0, | |
| )) | |
| # Track consecutive high-latency packets | |
| if latency > self.ANOMALY_THRESHOLD_MS: | |
| self._consecutive_high_latency[node_id] = ( | |
| self._consecutive_high_latency.get(node_id, 0) + 1 | |
| ) | |
| else: | |
| self._consecutive_high_latency[node_id] = 0 | |
| anomaly_flag = self._consecutive_high_latency.get(node_id, 0) >= 2 | |
| # Pick a destination (monitoring center or adjacent node) | |
| dest_idx = (i + 1) % len(all_node_ids) | |
| dest_node = all_node_ids[dest_idx] | |
| log_entry = { | |
| "timestamp": float(tick * 300 + i), # 300s per tick | |
| "source_node": node_id, | |
| "dest_node": dest_node, | |
| "latency_ms": round(latency, 1), | |
| "anomaly_flag": anomaly_flag, | |
| } | |
| logs.append(log_entry) | |
| self._packet_history.extend(logs) | |
| # Keep last 200 entries | |
| if len(self._packet_history) > 200: | |
| self._packet_history = self._packet_history[-200:] | |
| return logs | |
| def quarantine_node(self, node_id: str) -> None: | |
| """Mark a node as quarantined — its spoofed readings will be replaced.""" | |
| self._quarantined.add(node_id) | |
| def get_active_spoofs(self) -> List[str]: | |
| """Get list of node IDs currently being actively spoofed.""" | |
| return [ | |
| s["target_node"] | |
| for s in self._spoofs | |
| if s.get("active") and s.get("target_node") not in self._quarantined | |
| ] | |
| def get_quarantined(self) -> List[str]: | |
| """Get list of quarantined node IDs.""" | |
| return list(self._quarantined) | |
| def get_recent_packet_logs(self, count: int = 20) -> List[Dict[str, Any]]: | |
| """Get the most recent packet log entries.""" | |
| return self._packet_history[-count:] | |
| def is_resonance_active(self) -> bool: | |
| """Check if a resonance attack is currently active.""" | |
| for spoof in self._spoofs: | |
| if spoof.get("type") == "resonance_oscillation" and spoof.get("active"): | |
| if spoof.get("target_node") not in self._quarantined: | |
| return True | |
| return False | |
| def get_resonance_effect(self, tick: int) -> float: | |
| """ | |
| Get the current frequency deviation caused by resonance attack. | |
| Returns 0.0 if no resonance attack is active or it has been mitigated. | |
| """ | |
| for spoof in self._spoofs: | |
| if spoof.get("type") != "resonance_oscillation" or not spoof.get("active"): | |
| continue | |
| if spoof.get("target_node") in self._quarantined: | |
| continue | |
| # Resonance grows over time — modeled as increasing amplitude | |
| base_amplitude = 0.1 | |
| growth_factor = min(tick / 5.0, 2.0) # Doubles over 5 ticks | |
| return base_amplitude * growth_factor * float( | |
| np.sin(2 * np.pi * spoof.get("oscillation_hz", 0.5) * tick) | |
| ) | |
| return 0.0 | |