Nexus-Grid / server /grid_engine.py
Abineshsdata's picture
Add manifest.json endpoint, update dashboard and app
74965f9
Raw
History Blame Contribute Delete
29.8 kB
"""
NexusGrid — Physical Grid Engine (Layer 1).
20-node transmission network with DC power flow (Kirchhoff B-matrix).
This is the truth oracle — physics cannot lie.
All randomness uses numpy.random.Generator(PCG64(seed)).
"""
from __future__ import annotations
import math
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
try:
import networkx as nx
except ImportError:
nx = None # type: ignore
class GridEngine:
"""
Simplified DC power flow engine for the NexusGrid environment.
Maintains a graph of substations (nodes) and transmission lines (edges).
Computes ground-truth frequency from supply/demand balance.
Provides state estimation (Kirchhoff check) as the anti-spoof mechanism.
"""
# Physics constants
NOMINAL_FREQUENCY_HZ = 60.0
FREQUENCY_DROOP_COEFF = 0.05 # Hz per 100MW imbalance
INERTIA_CONSTANT = 5.0 # seconds — how fast frequency responds
def __init__(self, seed: int = 42):
self._rng = np.random.Generator(np.random.PCG64(seed))
self._seed = seed
self.tick = 0
# Node storage: {node_id: {type, capacity_mw, peak_load_mw, generation_mw,
# consumption_mw, region, critical, voltage_kv,
# phase_angle_deg, ...}}
self.nodes: Dict[str, Dict[str, Any]] = {}
# Edge storage: {edge_id: {source, target, capacity_mw, current_load_mw, status, reactance}}
self.edges: Dict[str, Dict[str, Any]] = {}
# Current ground-truth frequency
self.frequency_hz: float = self.NOMINAL_FREQUENCY_HZ
# Telemetry history (list of per-tick snapshots, max 10)
self.telemetry_history: List[List[Dict[str, Any]]] = []
# Weather state per zone
self.weather: Dict[str, Dict[str, float]] = {}
# Counter-signal state
self._counter_signals: List[Dict[str, Any]] = []
# Track dispatches for proactive detection
self._dispatch_ticks: List[int] = []
self._first_frequency_drop_tick: Optional[int] = None
self._forced_edge_loads: Dict[str, float] = {}
def reset(self, seed: int = 42) -> None:
"""Reset engine to initial state with given seed."""
self._rng = np.random.Generator(np.random.PCG64(seed))
self._seed = seed
self.tick = 0
self.frequency_hz = self.NOMINAL_FREQUENCY_HZ
self.telemetry_history = []
self._counter_signals = []
self._dispatch_ticks = []
self._first_frequency_drop_tick = None
self._forced_edge_loads = {}
# ------------------------------------------------------------------
# Topology builders
# ------------------------------------------------------------------
def add_node(
self,
node_id: str,
node_type: str,
capacity_mw: float,
region: str,
critical: bool = False,
peak_load_mw: float = 0.0,
generation_mw: float = 0.0,
consumption_mw: float = 0.0,
black_start_capable: bool = False,
) -> None:
"""Add a substation node to the grid."""
self.nodes[node_id] = {
"id": node_id,
"node_type": node_type,
"capacity_mw": capacity_mw,
"peak_load_mw": peak_load_mw,
"region": region,
"critical": critical,
"generation_mw": generation_mw,
"base_generation_mw": generation_mw,
"consumption_mw": consumption_mw,
"base_consumption_mw": consumption_mw,
"voltage_kv": 345.0,
"phase_angle_deg": 0.0,
"energized": True,
"quarantined": False,
"black_start_capable": black_start_capable,
}
def add_edge(
self,
edge_id: str,
source: str,
target: str,
capacity_mw: float,
reactance: float = 0.01,
) -> None:
"""Add a transmission line edge to the grid."""
self.edges[edge_id] = {
"id": edge_id,
"source": source,
"target": target,
"capacity_mw": capacity_mw,
"current_load_mw": 0.0,
"status": "LIVE",
"reactance": reactance,
}
def _build_live_adjacency(self) -> Dict[str, List[str]]:
"""Build an undirected adjacency map for currently live lines."""
adjacency: Dict[str, List[str]] = {node_id: [] for node_id in self.nodes}
for edge in self.edges.values():
if edge["status"] != "LIVE":
continue
adjacency[edge["source"]].append(edge["target"])
adjacency[edge["target"]].append(edge["source"])
return adjacency
def _get_powered_components(self) -> Tuple[List[List[str]], List[List[str]]]:
"""
Return (all_components, powered_components) over live topology.
A component is considered powered if it contains a generator currently
producing positive power.
"""
adjacency = self._build_live_adjacency()
visited: set[str] = set()
components: List[List[str]] = []
powered_components: List[List[str]] = []
for start_node in self.nodes:
if start_node in visited:
continue
stack = [start_node]
component: List[str] = []
while stack:
node_id = stack.pop()
if node_id in visited:
continue
visited.add(node_id)
component.append(node_id)
for neighbor in adjacency.get(node_id, []):
if neighbor not in visited:
stack.append(neighbor)
if not component:
continue
components.append(component)
if any(
self.nodes[node_id]["node_type"] in ("hydro", "solar", "gas", "battery")
and self.nodes[node_id]["generation_mw"] > 0
for node_id in component
):
powered_components.append(component)
return components, powered_components
def _recompute_energization(self) -> List[str]:
"""
Propagate energized state through live lines from powered islands.
Returns:
List of node IDs that transitioned from de-energized to energized.
"""
previous_state = {
node_id: bool(node["energized"])
for node_id, node in self.nodes.items()
}
components, powered_components = self._get_powered_components()
powered_nodes = {
node_id
for component in powered_components
for node_id in component
}
for component in components:
component_powered = any(node_id in powered_nodes for node_id in component)
for node_id in component:
node = self.nodes[node_id]
can_self_boot = node.get("black_start_capable", False) and previous_state[node_id]
node["energized"] = component_powered or can_self_boot
node["voltage_kv"] = 345.0 if component_powered else 0.0
if can_self_boot and not component_powered:
node["voltage_kv"] = 345.0
if not node["energized"]:
node["phase_angle_deg"] = 0.0
if (
node["energized"]
and not previous_state[node_id]
and node["node_type"] == "load"
and node["consumption_mw"] <= 0
and node.get("base_consumption_mw", 0) > 0
):
node["consumption_mw"] = node["base_consumption_mw"]
newly_energized = [
node_id
for node_id, was_energized in previous_state.items()
if not was_energized and self.nodes[node_id]["energized"]
]
return newly_energized
# ------------------------------------------------------------------
# Core physics
# ------------------------------------------------------------------
def compute_power_flow(self) -> None:
"""
Simplified DC power flow: distribute generation to match loads
through live transmission lines. Updates edge loads and node states.
"""
self._recompute_energization()
# Calculate total generation and consumption
total_gen = sum(
n["generation_mw"] for n in self.nodes.values()
if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["energized"]
)
total_load = sum(
n["consumption_mw"] for n in self.nodes.values()
if n["node_type"] == "load" and n["energized"]
)
# Reset all edge loads
for edge in self.edges.values():
if edge["status"] == "LIVE":
edge["current_load_mw"] = 0.0
# Distribute power through live edges using simplified DC flow
live_edges = [e for e in self.edges.values() if e["status"] == "LIVE"]
if not live_edges:
return
# Build adjacency and distribute loads proportionally
# For each load node, find paths from generators and distribute
generator_nodes = [
n for n in self.nodes.values()
if n["node_type"] in ("hydro", "solar", "gas", "battery")
and n["generation_mw"] > 0 and n["energized"]
]
load_nodes = [
n for n in self.nodes.values()
if n["node_type"] == "load"
and n["consumption_mw"] > 0 and n["energized"]
]
if not generator_nodes or not load_nodes:
return
# Build adjacency map for live edges
adj: Dict[str, List[Tuple[str, str]]] = {} # node -> [(neighbor, edge_id)]
for e in live_edges:
adj.setdefault(e["source"], []).append((e["target"], e["id"]))
adj.setdefault(e["target"], []).append((e["source"], e["id"]))
# Simple proportional flow: each edge carries its share
total_capacity = sum(e["capacity_mw"] for e in live_edges)
if total_capacity == 0:
return
flow_needed = min(total_gen, total_load)
for edge in live_edges:
# Distribute flow proportional to edge capacity
edge["current_load_mw"] = (edge["capacity_mw"] / total_capacity) * flow_needed
for edge_id, forced_load in self._forced_edge_loads.items():
edge = self.edges.get(edge_id)
if edge is not None and edge["status"] == "LIVE":
edge["current_load_mw"] = forced_load
# Update phase angles based on flow
for i, node in enumerate(self.nodes.values()):
if node["energized"]:
node["phase_angle_deg"] = (i * 15.0) % 360.0 - 180.0
def compute_frequency(self) -> float:
"""
Compute grid frequency from supply/demand balance.
frequency = 60.0 + (generation - load) * droop_coefficient / 100
Subject to inertia smoothing.
"""
total_gen = sum(
n["generation_mw"] for n in self.nodes.values()
if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["energized"]
)
total_load = sum(
n["consumption_mw"] for n in self.nodes.values()
if n["node_type"] == "load" and n["energized"]
)
# Apply counter-signals
for cs in self._counter_signals:
if cs["remaining_ticks"] > 0:
total_gen += cs.get("power_injection_mw", 0)
imbalance_mw = total_gen - total_load
target_freq = self.NOMINAL_FREQUENCY_HZ + (imbalance_mw * self.FREQUENCY_DROOP_COEFF / 100.0)
# Inertia smoothing — frequency moves toward target gradually
alpha = 1.0 / self.INERTIA_CONSTANT
self.frequency_hz = self.frequency_hz + alpha * (target_freq - self.frequency_hz)
# Clamp to physical bounds
self.frequency_hz = max(58.0, min(62.0, self.frequency_hz))
# Track first frequency drop
if self.frequency_hz < 59.7 and self._first_frequency_drop_tick is None:
self._first_frequency_drop_tick = self.tick
return self.frequency_hz
def advance_tick(self, weather_evolution: bool = True) -> float:
"""
Advance simulation by one tick (~5 simulated minutes).
Returns the new grid frequency.
"""
self.tick += 1
# Evolve weather (affects solar/wind generation)
if weather_evolution:
self._evolve_weather()
# Update renewable generation based on weather
self._update_renewable_generation()
# Evolve load with small random fluctuation
self._evolve_load()
# Update counter-signals
for cs in self._counter_signals:
if cs["remaining_ticks"] > 0:
cs["remaining_ticks"] -= 1
# Compute power flow
self.compute_power_flow()
# Compute frequency
freq = self.compute_frequency()
# Record telemetry snapshot
self._record_telemetry()
return freq
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def dispatch_generation(self, node_id: str, mw: float) -> Dict[str, Any]:
"""Ramp a generator or battery up/down by mw."""
if node_id not in self.nodes:
return {"success": False, "error": f"Unknown node: {node_id}"}
node = self.nodes[node_id]
if node["node_type"] not in ("hydro", "solar", "gas", "battery"):
return {"success": False, "error": f"Node {node_id} is not a generator"}
if not node["energized"]:
return {"success": False, "error": f"Node {node_id} is not energized"}
new_gen = node["generation_mw"] + mw
new_gen = max(0.0, min(node["capacity_mw"], new_gen))
node["generation_mw"] = new_gen
self._dispatch_ticks.append(self.tick)
# Recompute flow and frequency immediately so observations reflect the control action.
self.compute_power_flow()
self.compute_frequency()
return {"success": True, "new_generation_mw": new_gen}
def toggle_circuit_breaker(self, edge_id: str, status: str) -> Dict[str, Any]:
"""Open or close a transmission line circuit breaker."""
if edge_id not in self.edges:
return {"success": False, "error": f"Unknown edge: {edge_id}"}
edge = self.edges[edge_id]
if status not in ("OPEN", "CLOSED"):
return {"success": False, "error": f"Invalid status: {status}. Use OPEN or CLOSED."}
old_status = edge["status"]
pre_energized = {
node_id for node_id, node in self.nodes.items()
if node["energized"]
}
pre_components = self.get_stable_islands()
if status == "OPEN":
edge["status"] = "TRIPPED"
edge["current_load_mw"] = 0.0
else:
edge["status"] = "LIVE"
# Recompute flow and frequency after topology changes.
self.compute_power_flow()
self.compute_frequency()
post_components = self.get_stable_islands()
newly_energized = [
node_id for node_id, node in self.nodes.items()
if node["energized"] and node_id not in pre_energized
]
return {
"success": True,
"old_status": old_status,
"new_status": edge["status"],
"newly_energized_nodes": newly_energized,
"island_count_before": len(pre_components),
"island_count_after": len(post_components),
}
def run_state_estimation(self, subgraph: List[str], spoofed_telemetry: Dict[str, Dict]) -> Dict[str, Any]:
"""
Apply Kirchhoff's laws to check telemetry consistency.
Compares reported values against physics — the truth oracle.
Returns {consistent, violation_node, estimated_true_mw}.
"""
for node_id in subgraph:
if node_id not in self.nodes:
continue
true_node = self.nodes[node_id]
reported = spoofed_telemetry.get(node_id, {})
true_gen = true_node["generation_mw"]
reported_gen = reported.get("generation_mw", true_gen)
# Check if reported generation deviates from truth significantly
if abs(reported_gen - true_gen) > 5.0: # 5MW tolerance
return {
"consistent": False,
"violation_node": node_id,
"estimated_true_mw": true_gen,
"reported_mw": reported_gen,
}
# Check voltage consistency
true_v = true_node["voltage_kv"]
reported_v = reported.get("voltage_kv", true_v)
if abs(reported_v - true_v) > 10.0:
return {
"consistent": False,
"violation_node": node_id,
"estimated_true_mw": true_gen,
"reported_mw": reported_gen,
}
return {
"consistent": True,
"violation_node": None,
"estimated_true_mw": 0.0,
}
def quarantine_node(self, node_id: str) -> Dict[str, Any]:
"""Quarantine a node's SCADA sensor — replace with estimator values."""
if node_id not in self.nodes:
return {"success": False, "error": f"Unknown node: {node_id}"}
if self.nodes[node_id]["quarantined"]:
return {
"success": False,
"error": f"Node {node_id} is already quarantined",
"already_quarantined": True,
}
self.nodes[node_id]["quarantined"] = True
return {"success": True, "node_id": node_id, "already_quarantined": False}
def inject_counter_signal(
self, node_id: str, hz_offset: float, duration: int
) -> Dict[str, Any]:
"""
Inject a counter-signal via adjacent battery at given frequency offset.
The counter-signal creates destructive interference when hz_offset
matches the attack frequency (within ±0.05Hz tolerance).
"""
if node_id not in self.nodes:
return {"success": False, "error": f"Unknown node: {node_id}"}
node = self.nodes[node_id]
if node["node_type"] != "battery":
return {"success": False, "error": f"Node {node_id} is not a battery"}
if not node["energized"]:
return {"success": False, "error": f"Node {node_id} is not energized"}
# Calculate effectiveness based on offset accuracy
# Perfect offset = -0.5Hz (to counter +0.5Hz attack)
target_offset = -0.5
accuracy = 1.0 - min(abs(hz_offset - target_offset) / 0.5, 1.0)
power_injection = node["capacity_mw"] * 0.5 * accuracy # Half capacity at full accuracy
self._counter_signals.append({
"node_id": node_id,
"hz_offset": hz_offset,
"duration": duration,
"remaining_ticks": duration,
"accuracy": accuracy,
"power_injection_mw": power_injection,
})
self.compute_frequency()
return {
"success": True,
"accuracy": accuracy,
"power_injection_mw": power_injection,
}
# ------------------------------------------------------------------
# Topology and telemetry getters
# ------------------------------------------------------------------
def get_topology(self) -> Dict[str, Any]:
"""Get the topology graph as a serializable dict."""
nodes = []
for n in self.nodes.values():
nodes.append({
"id": n["id"],
"region": n["region"],
"node_type": n["node_type"],
"capacity_mw": n["capacity_mw"],
"peak_load_mw": n["peak_load_mw"],
"critical": n["critical"],
"energized": n["energized"],
})
edges = []
for e in self.edges.values():
edges.append({
"id": e["id"],
"source": e["source"],
"target": e["target"],
"capacity_mw": e["capacity_mw"],
"current_load_mw": e["current_load_mw"],
"status": e["status"],
})
return {"nodes": nodes, "edges": edges}
def get_ground_truth_telemetry(self) -> Dict[str, Dict[str, Any]]:
"""Get the true (unspoofed) telemetry for all nodes."""
result = {}
for n in self.nodes.values():
result[n["id"]] = {
"node_id": n["id"],
"voltage_kv": n["voltage_kv"],
"frequency_hz": self.frequency_hz if n["energized"] else 0.0,
"generation_mw": n["generation_mw"],
"consumption_mw": n["consumption_mw"],
}
return result
def get_telemetry_history(self) -> List[List[Dict[str, Any]]]:
"""Get the last 10 ticks of telemetry history."""
return self.telemetry_history[-10:]
def get_weather(self) -> List[Dict[str, Any]]:
"""Get weather forecast as list of zone dicts."""
result = []
for zone_name, data in self.weather.items():
result.append({
"zone": zone_name,
"solar_irradiance": round(data.get("solar_irradiance", 0.5), 3),
"wind_speed_ms": round(data.get("wind_speed_ms", 5.0), 1),
"cloud_cover": round(data.get("cloud_cover", 0.3), 3),
})
return result
def get_weather_summary(self) -> str:
"""Get a natural language weather summary."""
parts = []
for zone_name, data in self.weather.items():
solar = data.get("solar_irradiance", 0.5)
wind = data.get("wind_speed_ms", 5.0)
cloud = data.get("cloud_cover", 0.3)
if solar > 0.7:
sun_desc = "sunny"
elif solar > 0.3:
sun_desc = "partly cloudy"
else:
sun_desc = "overcast"
if wind > 15:
wind_desc = "strong winds"
elif wind > 8:
wind_desc = "moderate winds"
else:
wind_desc = "calm"
parts.append(f"Zone {zone_name}: {sun_desc}, {wind_desc}, {cloud*100:.0f}% cloud cover")
return "; ".join(parts) if parts else "Weather data unavailable"
def get_total_generation(self) -> float:
"""Get total MW currently being generated."""
return sum(
n["generation_mw"] for n in self.nodes.values()
if n["node_type"] in ("hydro", "solar", "gas", "battery") and n["energized"]
)
def get_total_load(self) -> float:
"""Get total MW demand."""
return sum(
n["consumption_mw"] for n in self.nodes.values()
if n["node_type"] == "load" and n["energized"]
)
def get_total_possible_mwh(self) -> float:
"""Get total possible MW·h that could be served."""
return sum(
n["peak_load_mw"] for n in self.nodes.values()
if n["node_type"] == "load"
)
def get_mwh_served(self) -> float:
"""Get actual MW·h served to loads."""
return sum(
n["consumption_mw"] for n in self.nodes.values()
if n["node_type"] == "load" and n["energized"]
)
def get_critical_nodes_shed(self) -> int:
"""Count critical nodes that have been de-energized (load shed)."""
return sum(
1 for n in self.nodes.values()
if n["critical"] and not n["energized"]
)
def get_overloaded_edges(self) -> List[str]:
"""Get edges loaded at or above 95% capacity."""
return [
e["id"] for e in self.edges.values()
if e["status"] == "LIVE" and e["current_load_mw"] >= 0.95 * e["capacity_mw"]
]
def set_forced_edge_load(self, edge_id: str, load_mw: float) -> None:
"""Force an edge load after power-flow computation for scenario scripting."""
if edge_id not in self.edges:
raise ValueError(f"Unknown edge: {edge_id}")
self._forced_edge_loads[edge_id] = load_mw
edge = self.edges[edge_id]
if edge["status"] == "LIVE":
edge["current_load_mw"] = load_mw
def clear_forced_edge_load(self, edge_id: str) -> None:
"""Remove a previously forced edge load override."""
self._forced_edge_loads.pop(edge_id, None)
def is_dispatch_proactive(self) -> bool:
"""Check if any dispatch happened before the first frequency drop."""
if not self._dispatch_ticks:
return False
if self._first_frequency_drop_tick is None:
return True # No drop yet, any dispatch is proactive
return min(self._dispatch_ticks) < self._first_frequency_drop_tick
def get_stable_islands(self) -> List[List[str]]:
"""
Find connected components (power islands) of energized nodes.
Returns list of node-id lists.
"""
# Build adjacency for live edges between energized nodes
adj: Dict[str, set] = {n_id: set() for n_id in self.nodes if self.nodes[n_id]["energized"]}
for e in self.edges.values():
if e["status"] == "LIVE":
s, t = e["source"], e["target"]
if s in adj and t in adj:
adj[s].add(t)
adj[t].add(s)
visited: set = set()
islands: List[List[str]] = []
for start in adj:
if start in visited:
continue
component: List[str] = []
stack = [start]
while stack:
node = stack.pop()
if node in visited:
continue
visited.add(node)
component.append(node)
for nb in adj.get(node, []):
if nb not in visited:
stack.append(nb)
if component:
islands.append(sorted(component))
return islands
def check_phase_angle_compatible(self, island_a: List[str], island_b: List[str]) -> bool:
"""Check if two islands can be merged (|∆phase| ≤ 5°)."""
if not island_a or not island_b:
return False
# Average phase angle of each island
avg_a = np.mean([self.nodes[n]["phase_angle_deg"] for n in island_a if n in self.nodes])
avg_b = np.mean([self.nodes[n]["phase_angle_deg"] for n in island_b if n in self.nodes])
diff = abs(avg_a - avg_b)
if diff > 180:
diff = 360 - diff
return diff <= 5.0
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _evolve_weather(self) -> None:
"""Evolve weather with small random perturbations."""
for zone in self.weather:
w = self.weather[zone]
# Solar irradiance changes slowly
w["solar_irradiance"] = float(np.clip(
w["solar_irradiance"] + self._rng.normal(0, 0.02), 0.0, 1.0
))
# Wind speed fluctuates
w["wind_speed_ms"] = float(np.clip(
w["wind_speed_ms"] + self._rng.normal(0, 0.5), 0.0, 30.0
))
# Cloud cover
w["cloud_cover"] = float(np.clip(
w["cloud_cover"] + self._rng.normal(0, 0.03), 0.0, 1.0
))
def _update_renewable_generation(self) -> None:
"""Update solar and wind generation based on current weather."""
for node in self.nodes.values():
if not node["energized"]:
continue
region = node["region"]
if region not in self.weather:
continue
w = self.weather[region]
if node["node_type"] == "solar":
# Solar output = capacity * irradiance * (1 - cloud_cover)
solar_factor = w["solar_irradiance"] * (1.0 - w["cloud_cover"] * 0.7)
node["generation_mw"] = node["capacity_mw"] * solar_factor
# Wind can affect frequency stability (not modeled as wind turbines here)
def _evolve_load(self) -> None:
"""Small random load fluctuations each tick."""
for node in self.nodes.values():
if node["node_type"] == "load" and node["energized"]:
fluctuation = float(self._rng.normal(0, node["peak_load_mw"] * 0.01))
node["consumption_mw"] = max(
0.0,
min(node["peak_load_mw"], node["consumption_mw"] + fluctuation)
)
def _record_telemetry(self) -> None:
"""Record current telemetry snapshot to history."""
snapshot = []
for n in self.nodes.values():
snapshot.append({
"node_id": n["id"],
"voltage_kv": n["voltage_kv"],
"frequency_hz": self.frequency_hz if n["energized"] else 0.0,
"generation_mw": n["generation_mw"],
"consumption_mw": n["consumption_mw"],
})
self.telemetry_history.append(snapshot)
# Keep only last 10
if len(self.telemetry_history) > 10:
self.telemetry_history = self.telemetry_history[-10:]