Nexus-Grid / server /spoof_engine.py
Abineshsdata's picture
OpenEnv Hackathon Final Submission
3e4c342
Raw
History Blame Contribute Delete
8.38 kB
"""
NexusGrid — SCADA Spoof Engine (Layer 2).
Deterministic sensor spoofing engine with three attack vectors.
All randomness uses numpy.random.Generator(PCG64(episode_seed)).
Seed-lock contract: identical seeds produce identical attack sequences.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import numpy as np
class SpoofEngine:
"""
Selectively overwrites ground-truth telemetry with fabricated readings.
Three attack vectors:
1. Man-in-the-Middle injection — substation reports fabricated MW.
2. Resonance oscillation — generator frequency modulated ±0.1Hz.
3. Phantom generation — renewable reports full output during zero-generation.
Network packet logs show latency spikes 1-2 ticks before a node is spoofed.
"""
# Packet log constants
NORMAL_LATENCY_MS = 5.0
ANOMALY_LATENCY_MS = 120.0
ANOMALY_THRESHOLD_MS = 50.0
def __init__(self, seed: int = 42):
self._rng = np.random.Generator(np.random.PCG64(seed))
self._seed = seed
self._tick = 0
# Active spoof configurations
self._spoofs: List[Dict[str, Any]] = []
# Quarantined nodes (no longer spoofable)
self._quarantined: set = set()
# Packet log history (for anomaly detection)
self._packet_history: List[Dict[str, Any]] = []
# Consecutive high-latency counts per source node
self._consecutive_high_latency: Dict[str, int] = {}
def reset(self, seed: int = 42) -> None:
"""Reset spoof engine to initial state."""
self._rng = np.random.Generator(np.random.PCG64(seed))
self._seed = seed
self._tick = 0
self._spoofs = []
self._quarantined = set()
self._packet_history = []
self._consecutive_high_latency = {}
def configure_attack(self, attack_config: Dict[str, Any]) -> None:
"""
Configure an attack based on scenario specification.
Args:
attack_config: Dict with 'type', 'target_node', and attack-specific fields.
"""
if not attack_config or not attack_config.get("active"):
return
self._spoofs.append(dict(attack_config))
def advance_tick(self) -> None:
"""Advance the spoof engine by one tick."""
self._tick += 1
def apply_spoofs(
self,
ground_truth: Dict[str, Dict[str, Any]],
tick: int,
) -> Dict[str, Dict[str, Any]]:
"""
Apply active spoofs to ground-truth telemetry.
Returns a copy of telemetry with spoofed values injected.
Quarantined nodes are NOT spoofed (replaced with estimator values).
"""
self._tick = tick
spoofed = {}
for node_id, truth in ground_truth.items():
spoofed[node_id] = dict(truth)
for spoof in self._spoofs:
if not spoof.get("active"):
continue
target = spoof.get("target_node")
if not target or target in self._quarantined:
continue
if target not in spoofed:
continue
spoof_type = spoof.get("type")
if spoof_type == "phantom_injection":
# Add phantom MW to generation reading
phantom_mw = spoof.get("phantom_mw", 100.0)
spoofed[target]["generation_mw"] = (
ground_truth[target]["generation_mw"] + phantom_mw
)
elif spoof_type == "resonance_oscillation":
# Modulate frequency by ±0.1Hz oscillation
oscillation = 0.1 * np.sin(2 * np.pi * spoof.get("oscillation_hz", 0.5) * tick)
spoofed[target]["frequency_hz"] = (
ground_truth[target]["frequency_hz"] + float(oscillation)
)
elif spoof_type == "mitm_injection":
# Replace generation with fabricated value
fake_mw = spoof.get("fake_mw", 100.0)
spoofed[target]["generation_mw"] = fake_mw
return spoofed
def generate_packet_logs(
self,
all_node_ids: List[str],
tick: int,
) -> List[Dict[str, Any]]:
"""
Generate simulated SCADA packet logs.
Nodes about to be spoofed show latency spikes 1-2 ticks before.
"""
self._tick = tick
logs = []
# Determine which nodes will be spoofed (for pre-attack latency spike)
spoofed_targets = set()
for spoof in self._spoofs:
if spoof.get("active") and spoof.get("target_node"):
spoofed_targets.add(spoof["target_node"])
for i, node_id in enumerate(all_node_ids):
# Determine latency
if node_id in spoofed_targets and node_id not in self._quarantined:
# Under attack: elevated latency
latency = float(np.clip(
self._rng.normal(self.ANOMALY_LATENCY_MS, 20.0),
self.ANOMALY_THRESHOLD_MS + 1,
500.0,
))
else:
# Normal operation
latency = float(np.clip(
self._rng.normal(self.NORMAL_LATENCY_MS, 2.0),
1.0,
30.0,
))
# Track consecutive high-latency packets
if latency > self.ANOMALY_THRESHOLD_MS:
self._consecutive_high_latency[node_id] = (
self._consecutive_high_latency.get(node_id, 0) + 1
)
else:
self._consecutive_high_latency[node_id] = 0
anomaly_flag = self._consecutive_high_latency.get(node_id, 0) >= 2
# Pick a destination (monitoring center or adjacent node)
dest_idx = (i + 1) % len(all_node_ids)
dest_node = all_node_ids[dest_idx]
log_entry = {
"timestamp": float(tick * 300 + i), # 300s per tick
"source_node": node_id,
"dest_node": dest_node,
"latency_ms": round(latency, 1),
"anomaly_flag": anomaly_flag,
}
logs.append(log_entry)
self._packet_history.extend(logs)
# Keep last 200 entries
if len(self._packet_history) > 200:
self._packet_history = self._packet_history[-200:]
return logs
def quarantine_node(self, node_id: str) -> None:
"""Mark a node as quarantined — its spoofed readings will be replaced."""
self._quarantined.add(node_id)
def get_active_spoofs(self) -> List[str]:
"""Get list of node IDs currently being actively spoofed."""
return [
s["target_node"]
for s in self._spoofs
if s.get("active") and s.get("target_node") not in self._quarantined
]
def get_quarantined(self) -> List[str]:
"""Get list of quarantined node IDs."""
return list(self._quarantined)
def get_recent_packet_logs(self, count: int = 20) -> List[Dict[str, Any]]:
"""Get the most recent packet log entries."""
return self._packet_history[-count:]
def is_resonance_active(self) -> bool:
"""Check if a resonance attack is currently active."""
for spoof in self._spoofs:
if spoof.get("type") == "resonance_oscillation" and spoof.get("active"):
if spoof.get("target_node") not in self._quarantined:
return True
return False
def get_resonance_effect(self, tick: int) -> float:
"""
Get the current frequency deviation caused by resonance attack.
Returns 0.0 if no resonance attack is active or it has been mitigated.
"""
for spoof in self._spoofs:
if spoof.get("type") != "resonance_oscillation" or not spoof.get("active"):
continue
if spoof.get("target_node") in self._quarantined:
continue
# Resonance grows over time — modeled as increasing amplitude
base_amplitude = 0.1
growth_factor = min(tick / 5.0, 2.0) # Doubles over 5 ticks
return base_amplitude * growth_factor * float(
np.sin(2 * np.pi * spoof.get("oscillation_hz", 0.5) * tick)
)
return 0.0