""" NexusGrid — Physical Grid Engine (Layer 1). 20-node transmission network with DC power flow (Kirchhoff B-matrix). This is the truth oracle — physics cannot lie. All randomness uses numpy.random.Generator(PCG64(seed)). """ from __future__ import annotations import math from typing import Any, Dict, List, Optional, Tuple import numpy as np try: import networkx as nx except ImportError: nx = None # type: ignore class GridEngine: """ Simplified DC power flow engine for the NexusGrid environment. Maintains a graph of substations (nodes) and transmission lines (edges). Computes ground-truth frequency from supply/demand balance. Provides state estimation (Kirchhoff check) as the anti-spoof mechanism. """ # Physics constants NOMINAL_FREQUENCY_HZ = 60.0 FREQUENCY_DROOP_COEFF = 0.05 # Hz per 100MW imbalance INERTIA_CONSTANT = 5.0 # seconds — how fast frequency responds def __init__(self, seed: int = 42): self._rng = np.random.Generator(np.random.PCG64(seed)) self._seed = seed self.tick = 0 # Node storage: {node_id: {type, capacity_mw, peak_load_mw, generation_mw, # consumption_mw, region, critical, voltage_kv, # phase_angle_deg, ...}} self.nodes: Dict[str, Dict[str, Any]] = {} # Edge storage: {edge_id: {source, target, capacity_mw, current_load_mw, status, reactance}} self.edges: Dict[str, Dict[str, Any]] = {} # Current ground-truth frequency self.frequency_hz: float = self.NOMINAL_FREQUENCY_HZ # Telemetry history (list of per-tick snapshots, max 10) self.telemetry_history: List[List[Dict[str, Any]]] = [] # Weather state per zone self.weather: Dict[str, Dict[str, float]] = {} # Counter-signal state self._counter_signals: List[Dict[str, Any]] = [] # Track dispatches for proactive detection self._dispatch_ticks: List[int] = [] self._first_frequency_drop_tick: Optional[int] = None self._forced_edge_loads: Dict[str, float] = {} def reset(self, seed: int = 42) -> None: """Reset engine to initial state with given seed.""" self._rng = np.random.Generator(np.random.PCG64(seed)) self._seed = seed self.tick = 0 self.frequency_hz = self.NOMINAL_FREQUENCY_HZ self.telemetry_history = [] self._counter_signals = [] self._dispatch_ticks = [] self._first_frequency_drop_tick = None self._forced_edge_loads = {} # ------------------------------------------------------------------ # Topology builders # ------------------------------------------------------------------ def add_node( self, node_id: str, node_type: str, capacity_mw: float, region: str, critical: bool = False, peak_load_mw: float = 0.0, generation_mw: float = 0.0, consumption_mw: float = 0.0, black_start_capable: bool = False, ) -> None: """Add a substation node to the grid.""" self.nodes[node_id] = { "id": node_id, "node_type": node_type, "capacity_mw": capacity_mw, "peak_load_mw": peak_load_mw, "region": region, "critical": critical, "generation_mw": generation_mw, "base_generation_mw": generation_mw, "consumption_mw": consumption_mw, "base_consumption_mw": consumption_mw, "voltage_kv": 345.0, "phase_angle_deg": 0.0, "energized": True, "quarantined": False, "black_start_capable": black_start_capable, } def add_edge( self, edge_id: str, source: str, target: str, capacity_mw: float, reactance: float = 0.01, ) -> None: """Add a transmission line edge to the grid.""" self.edges[edge_id] = { "id": edge_id, "source": source, "target": target, "capacity_mw": capacity_mw, "current_load_mw": 0.0, "status": "LIVE", "reactance": reactance, } def _build_live_adjacency(self) -> Dict[str, List[str]]: """Build an undirected adjacency map for currently live lines.""" adjacency: Dict[str, List[str]] = {node_id: [] for node_id in self.nodes} for edge in self.edges.values(): if edge["status"] != "LIVE": continue adjacency[edge["source"]].append(edge["target"]) adjacency[edge["target"]].append(edge["source"]) return adjacency def _get_powered_components(self) -> Tuple[List[List[str]], List[List[str]]]: """ Return (all_components, powered_components) over live topology. A component is considered powered if it contains a generator currently producing positive power. """ adjacency = self._build_live_adjacency() visited: set[str] = set() components: List[List[str]] = [] powered_components: List[List[str]] = [] for start_node in self.nodes: if start_node in visited: continue stack = [start_node] component: List[str] = [] while stack: node_id = stack.pop() if node_id in visited: continue visited.add(node_id) component.append(node_id) for neighbor in adjacency.get(node_id, []): if neighbor not in visited: stack.append(neighbor) if not component: continue components.append(component) if any( self.nodes[node_id]["node_type"] in ("hydro", "solar", "gas", "battery") and self.nodes[node_id]["generation_mw"] > 0 for node_id in component ): powered_components.append(component) return components, powered_components def _recompute_energization(self) -> List[str]: """ Propagate energized state through live lines from powered islands. Returns: List of node IDs that transitioned from de-energized to energized. """ previous_state = { node_id: bool(node["energized"]) for node_id, node in self.nodes.items() } components, powered_components = self._get_powered_components() powered_nodes = { node_id for component in powered_components for node_id in component } for component in components: component_powered = any(node_id in powered_nodes for node_id in component) for node_id in component: node = self.nodes[node_id] can_self_boot = node.get("black_start_capable", False) and previous_state[node_id] node["energized"] = component_powered or can_self_boot node["voltage_kv"] = 345.0 if component_powered else 0.0 if can_self_boot and not component_powered: node["voltage_kv"] = 345.0 if not node["energized"]: node["phase_angle_deg"] = 0.0 if ( node["energized"] and not previous_state[node_id] and node["node_type"] == "load" and node["consumption_mw"] <= 0 and node.get("base_consumption_mw", 0) > 0 ): node["consumption_mw"] = node["base_consumption_mw"] newly_energized = [ node_id for node_id, was_energized in previous_state.items() if not was_energized and self.nodes[node_id]["energized"] ] return newly_energized # ------------------------------------------------------------------ # Core physics # ------------------------------------------------------------------ def compute_power_flow(self) -> None: """ Simplified DC power flow: distribute generation to match loads through live transmission lines. Updates edge loads and node states. """ self._recompute_energization() # Calculate total generation and consumption total_gen = sum( n["generation_mw"] for n in self.nodes.values() if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["energized"] ) total_load = sum( n["consumption_mw"] for n in self.nodes.values() if n["node_type"] == "load" and n["energized"] ) # Reset all edge loads for edge in self.edges.values(): if edge["status"] == "LIVE": edge["current_load_mw"] = 0.0 # Distribute power through live edges using simplified DC flow live_edges = [e for e in self.edges.values() if e["status"] == "LIVE"] if not live_edges: return # Build adjacency and distribute loads proportionally # For each load node, find paths from generators and distribute generator_nodes = [ n for n in self.nodes.values() if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["generation_mw"] > 0 and n["energized"] ] load_nodes = [ n for n in self.nodes.values() if n["node_type"] == "load" and n["consumption_mw"] > 0 and n["energized"] ] if not generator_nodes or not load_nodes: return # Build adjacency map for live edges adj: Dict[str, List[Tuple[str, str]]] = {} # node -> [(neighbor, edge_id)] for e in live_edges: adj.setdefault(e["source"], []).append((e["target"], e["id"])) adj.setdefault(e["target"], []).append((e["source"], e["id"])) # Simple proportional flow: each edge carries its share total_capacity = sum(e["capacity_mw"] for e in live_edges) if total_capacity == 0: return flow_needed = min(total_gen, total_load) for edge in live_edges: # Distribute flow proportional to edge capacity edge["current_load_mw"] = (edge["capacity_mw"] / total_capacity) * flow_needed for edge_id, forced_load in self._forced_edge_loads.items(): edge = self.edges.get(edge_id) if edge is not None and edge["status"] == "LIVE": edge["current_load_mw"] = forced_load # Update phase angles based on flow for i, node in enumerate(self.nodes.values()): if node["energized"]: node["phase_angle_deg"] = (i * 15.0) % 360.0 - 180.0 def compute_frequency(self) -> float: """ Compute grid frequency from supply/demand balance. frequency = 60.0 + (generation - load) * droop_coefficient / 100 Subject to inertia smoothing. """ total_gen = sum( n["generation_mw"] for n in self.nodes.values() if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["energized"] ) total_load = sum( n["consumption_mw"] for n in self.nodes.values() if n["node_type"] == "load" and n["energized"] ) # Apply counter-signals for cs in self._counter_signals: if cs["remaining_ticks"] > 0: total_gen += cs.get("power_injection_mw", 0) imbalance_mw = total_gen - total_load target_freq = self.NOMINAL_FREQUENCY_HZ + (imbalance_mw * self.FREQUENCY_DROOP_COEFF / 100.0) # Inertia smoothing — frequency moves toward target gradually alpha = 1.0 / self.INERTIA_CONSTANT self.frequency_hz = self.frequency_hz + alpha * (target_freq - self.frequency_hz) # Clamp to physical bounds self.frequency_hz = max(58.0, min(62.0, self.frequency_hz)) # Track first frequency drop if self.frequency_hz < 59.7 and self._first_frequency_drop_tick is None: self._first_frequency_drop_tick = self.tick return self.frequency_hz def advance_tick(self, weather_evolution: bool = True) -> float: """ Advance simulation by one tick (~5 simulated minutes). Returns the new grid frequency. """ self.tick += 1 # Evolve weather (affects solar/wind generation) if weather_evolution: self._evolve_weather() # Update renewable generation based on weather self._update_renewable_generation() # Evolve load with small random fluctuation self._evolve_load() # Update counter-signals for cs in self._counter_signals: if cs["remaining_ticks"] > 0: cs["remaining_ticks"] -= 1 # Compute power flow self.compute_power_flow() # Compute frequency freq = self.compute_frequency() # Record telemetry snapshot self._record_telemetry() return freq # ------------------------------------------------------------------ # Actions # ------------------------------------------------------------------ def dispatch_generation(self, node_id: str, mw: float) -> Dict[str, Any]: """Ramp a generator or battery up/down by mw.""" if node_id not in self.nodes: return {"success": False, "error": f"Unknown node: {node_id}"} node = self.nodes[node_id] if node["node_type"] not in ("hydro", "solar", "gas", "battery"): return {"success": False, "error": f"Node {node_id} is not a generator"} if not node["energized"]: return {"success": False, "error": f"Node {node_id} is not energized"} new_gen = node["generation_mw"] + mw new_gen = max(0.0, min(node["capacity_mw"], new_gen)) node["generation_mw"] = new_gen self._dispatch_ticks.append(self.tick) # Recompute flow and frequency immediately so observations reflect the control action. self.compute_power_flow() self.compute_frequency() return {"success": True, "new_generation_mw": new_gen} def toggle_circuit_breaker(self, edge_id: str, status: str) -> Dict[str, Any]: """Open or close a transmission line circuit breaker.""" if edge_id not in self.edges: return {"success": False, "error": f"Unknown edge: {edge_id}"} edge = self.edges[edge_id] if status not in ("OPEN", "CLOSED"): return {"success": False, "error": f"Invalid status: {status}. Use OPEN or CLOSED."} old_status = edge["status"] pre_energized = { node_id for node_id, node in self.nodes.items() if node["energized"] } pre_components = self.get_stable_islands() if status == "OPEN": edge["status"] = "TRIPPED" edge["current_load_mw"] = 0.0 else: edge["status"] = "LIVE" # Recompute flow and frequency after topology changes. self.compute_power_flow() self.compute_frequency() post_components = self.get_stable_islands() newly_energized = [ node_id for node_id, node in self.nodes.items() if node["energized"] and node_id not in pre_energized ] return { "success": True, "old_status": old_status, "new_status": edge["status"], "newly_energized_nodes": newly_energized, "island_count_before": len(pre_components), "island_count_after": len(post_components), } def run_state_estimation(self, subgraph: List[str], spoofed_telemetry: Dict[str, Dict]) -> Dict[str, Any]: """ Apply Kirchhoff's laws to check telemetry consistency. Compares reported values against physics — the truth oracle. Returns {consistent, violation_node, estimated_true_mw}. """ for node_id in subgraph: if node_id not in self.nodes: continue true_node = self.nodes[node_id] reported = spoofed_telemetry.get(node_id, {}) true_gen = true_node["generation_mw"] reported_gen = reported.get("generation_mw", true_gen) # Check if reported generation deviates from truth significantly if abs(reported_gen - true_gen) > 5.0: # 5MW tolerance return { "consistent": False, "violation_node": node_id, "estimated_true_mw": true_gen, "reported_mw": reported_gen, } # Check voltage consistency true_v = true_node["voltage_kv"] reported_v = reported.get("voltage_kv", true_v) if abs(reported_v - true_v) > 10.0: return { "consistent": False, "violation_node": node_id, "estimated_true_mw": true_gen, "reported_mw": reported_gen, } return { "consistent": True, "violation_node": None, "estimated_true_mw": 0.0, } def quarantine_node(self, node_id: str) -> Dict[str, Any]: """Quarantine a node's SCADA sensor — replace with estimator values.""" if node_id not in self.nodes: return {"success": False, "error": f"Unknown node: {node_id}"} if self.nodes[node_id]["quarantined"]: return { "success": False, "error": f"Node {node_id} is already quarantined", "already_quarantined": True, } self.nodes[node_id]["quarantined"] = True return {"success": True, "node_id": node_id, "already_quarantined": False} def inject_counter_signal( self, node_id: str, hz_offset: float, duration: int ) -> Dict[str, Any]: """ Inject a counter-signal via adjacent battery at given frequency offset. The counter-signal creates destructive interference when hz_offset matches the attack frequency (within ±0.05Hz tolerance). """ if node_id not in self.nodes: return {"success": False, "error": f"Unknown node: {node_id}"} node = self.nodes[node_id] if node["node_type"] != "battery": return {"success": False, "error": f"Node {node_id} is not a battery"} if not node["energized"]: return {"success": False, "error": f"Node {node_id} is not energized"} # Calculate effectiveness based on offset accuracy # Perfect offset = -0.5Hz (to counter +0.5Hz attack) target_offset = -0.5 accuracy = 1.0 - min(abs(hz_offset - target_offset) / 0.5, 1.0) power_injection = node["capacity_mw"] * 0.5 * accuracy # Half capacity at full accuracy self._counter_signals.append({ "node_id": node_id, "hz_offset": hz_offset, "duration": duration, "remaining_ticks": duration, "accuracy": accuracy, "power_injection_mw": power_injection, }) self.compute_frequency() return { "success": True, "accuracy": accuracy, "power_injection_mw": power_injection, } # ------------------------------------------------------------------ # Topology and telemetry getters # ------------------------------------------------------------------ def get_topology(self) -> Dict[str, Any]: """Get the topology graph as a serializable dict.""" nodes = [] for n in self.nodes.values(): nodes.append({ "id": n["id"], "region": n["region"], "node_type": n["node_type"], "capacity_mw": n["capacity_mw"], "peak_load_mw": n["peak_load_mw"], "critical": n["critical"], "energized": n["energized"], }) edges = [] for e in self.edges.values(): edges.append({ "id": e["id"], "source": e["source"], "target": e["target"], "capacity_mw": e["capacity_mw"], "current_load_mw": e["current_load_mw"], "status": e["status"], }) return {"nodes": nodes, "edges": edges} def get_ground_truth_telemetry(self) -> Dict[str, Dict[str, Any]]: """Get the true (unspoofed) telemetry for all nodes.""" result = {} for n in self.nodes.values(): result[n["id"]] = { "node_id": n["id"], "voltage_kv": n["voltage_kv"], "frequency_hz": self.frequency_hz if n["energized"] else 0.0, "generation_mw": n["generation_mw"], "consumption_mw": n["consumption_mw"], } return result def get_telemetry_history(self) -> List[List[Dict[str, Any]]]: """Get the last 10 ticks of telemetry history.""" return self.telemetry_history[-10:] def get_weather(self) -> List[Dict[str, Any]]: """Get weather forecast as list of zone dicts.""" result = [] for zone_name, data in self.weather.items(): result.append({ "zone": zone_name, "solar_irradiance": round(data.get("solar_irradiance", 0.5), 3), "wind_speed_ms": round(data.get("wind_speed_ms", 5.0), 1), "cloud_cover": round(data.get("cloud_cover", 0.3), 3), }) return result def get_weather_summary(self) -> str: """Get a natural language weather summary.""" parts = [] for zone_name, data in self.weather.items(): solar = data.get("solar_irradiance", 0.5) wind = data.get("wind_speed_ms", 5.0) cloud = data.get("cloud_cover", 0.3) if solar > 0.7: sun_desc = "sunny" elif solar > 0.3: sun_desc = "partly cloudy" else: sun_desc = "overcast" if wind > 15: wind_desc = "strong winds" elif wind > 8: wind_desc = "moderate winds" else: wind_desc = "calm" parts.append(f"Zone {zone_name}: {sun_desc}, {wind_desc}, {cloud*100:.0f}% cloud cover") return "; ".join(parts) if parts else "Weather data unavailable" def get_total_generation(self) -> float: """Get total MW currently being generated.""" return sum( n["generation_mw"] for n in self.nodes.values() if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["energized"] ) def get_total_load(self) -> float: """Get total MW demand.""" return sum( n["consumption_mw"] for n in self.nodes.values() if n["node_type"] == "load" and n["energized"] ) def get_total_possible_mwh(self) -> float: """Get total possible MW·h that could be served.""" return sum( n["peak_load_mw"] for n in self.nodes.values() if n["node_type"] == "load" ) def get_mwh_served(self) -> float: """Get actual MW·h served to loads.""" return sum( n["consumption_mw"] for n in self.nodes.values() if n["node_type"] == "load" and n["energized"] ) def get_critical_nodes_shed(self) -> int: """Count critical nodes that have been de-energized (load shed).""" return sum( 1 for n in self.nodes.values() if n["critical"] and not n["energized"] ) def get_overloaded_edges(self) -> List[str]: """Get edges loaded at or above 95% capacity.""" return [ e["id"] for e in self.edges.values() if e["status"] == "LIVE" and e["current_load_mw"] >= 0.95 * e["capacity_mw"] ] def set_forced_edge_load(self, edge_id: str, load_mw: float) -> None: """Force an edge load after power-flow computation for scenario scripting.""" if edge_id not in self.edges: raise ValueError(f"Unknown edge: {edge_id}") self._forced_edge_loads[edge_id] = load_mw edge = self.edges[edge_id] if edge["status"] == "LIVE": edge["current_load_mw"] = load_mw def clear_forced_edge_load(self, edge_id: str) -> None: """Remove a previously forced edge load override.""" self._forced_edge_loads.pop(edge_id, None) def is_dispatch_proactive(self) -> bool: """Check if any dispatch happened before the first frequency drop.""" if not self._dispatch_ticks: return False if self._first_frequency_drop_tick is None: return True # No drop yet, any dispatch is proactive return min(self._dispatch_ticks) < self._first_frequency_drop_tick def get_stable_islands(self) -> List[List[str]]: """ Find connected components (power islands) of energized nodes. Returns list of node-id lists. """ # Build adjacency for live edges between energized nodes adj: Dict[str, set] = {n_id: set() for n_id in self.nodes if self.nodes[n_id]["energized"]} for e in self.edges.values(): if e["status"] == "LIVE": s, t = e["source"], e["target"] if s in adj and t in adj: adj[s].add(t) adj[t].add(s) visited: set = set() islands: List[List[str]] = [] for start in adj: if start in visited: continue component: List[str] = [] stack = [start] while stack: node = stack.pop() if node in visited: continue visited.add(node) component.append(node) for nb in adj.get(node, []): if nb not in visited: stack.append(nb) if component: islands.append(sorted(component)) return islands def check_phase_angle_compatible(self, island_a: List[str], island_b: List[str]) -> bool: """Check if two islands can be merged (|∆phase| ≤ 5°).""" if not island_a or not island_b: return False # Average phase angle of each island avg_a = np.mean([self.nodes[n]["phase_angle_deg"] for n in island_a if n in self.nodes]) avg_b = np.mean([self.nodes[n]["phase_angle_deg"] for n in island_b if n in self.nodes]) diff = abs(avg_a - avg_b) if diff > 180: diff = 360 - diff return diff <= 5.0 # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _evolve_weather(self) -> None: """Evolve weather with small random perturbations.""" for zone in self.weather: w = self.weather[zone] # Solar irradiance changes slowly w["solar_irradiance"] = float(np.clip( w["solar_irradiance"] + self._rng.normal(0, 0.02), 0.0, 1.0 )) # Wind speed fluctuates w["wind_speed_ms"] = float(np.clip( w["wind_speed_ms"] + self._rng.normal(0, 0.5), 0.0, 30.0 )) # Cloud cover w["cloud_cover"] = float(np.clip( w["cloud_cover"] + self._rng.normal(0, 0.03), 0.0, 1.0 )) def _update_renewable_generation(self) -> None: """Update solar and wind generation based on current weather.""" for node in self.nodes.values(): if not node["energized"]: continue region = node["region"] if region not in self.weather: continue w = self.weather[region] if node["node_type"] == "solar": # Solar output = capacity * irradiance * (1 - cloud_cover) solar_factor = w["solar_irradiance"] * (1.0 - w["cloud_cover"] * 0.7) node["generation_mw"] = node["capacity_mw"] * solar_factor # Wind can affect frequency stability (not modeled as wind turbines here) def _evolve_load(self) -> None: """Small random load fluctuations each tick.""" for node in self.nodes.values(): if node["node_type"] == "load" and node["energized"]: fluctuation = float(self._rng.normal(0, node["peak_load_mw"] * 0.01)) node["consumption_mw"] = max( 0.0, min(node["peak_load_mw"], node["consumption_mw"] + fluctuation) ) def _record_telemetry(self) -> None: """Record current telemetry snapshot to history.""" snapshot = [] for n in self.nodes.values(): snapshot.append({ "node_id": n["id"], "voltage_kv": n["voltage_kv"], "frequency_hz": self.frequency_hz if n["energized"] else 0.0, "generation_mw": n["generation_mw"], "consumption_mw": n["consumption_mw"], }) self.telemetry_history.append(snapshot) # Keep only last 10 if len(self.telemetry_history) > 10: self.telemetry_history = self.telemetry_history[-10:]