OptiQ / api /routes /grid.py
mohammademad2003's picture
first commit
7477316
"""
Grid endpoint — Returns grid topology data for visualization.
"""
from __future__ import annotations
import math
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from src.grid.loader import load_network, get_bus_data, get_topology_data, get_network_summary
from src.grid.power_flow import (
check_radial_connected,
check_topology_valid,
try_repair_connectivity,
is_distribution_grid,
apply_topology,
run_power_flow,
extract_results,
)
from api.auth import optional_auth, FirebaseUser
router = APIRouter()
def compute_bus_positions(net) -> dict[int, dict]:
"""Compute 2D positions for buses using scaled graph layouts.
- Smaller systems (<= 40 buses): layered radial layout from slack bus.
- Larger systems: force-directed layout for better distribution.
"""
import networkx as nx
def scale_positions(raw_positions: dict, width: float, height: float, padding: float) -> dict:
xs = [p[0] for p in raw_positions.values()]
ys = [p[1] for p in raw_positions.values()]
if not xs or not ys:
return {}
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
span_x = max(max_x - min_x, 1e-6)
span_y = max(max_y - min_y, 1e-6)
scaled = {}
for node, (x, y) in raw_positions.items():
nx_pos = padding + (x - min_x) / span_x * (width - 2 * padding)
ny_pos = padding + (y - min_y) / span_y * (height - 2 * padding)
scaled[node] = {"x": float(nx_pos), "y": float(ny_pos)}
return scaled
def spread_positions(raw_positions: dict, min_dist: float, steps: int) -> dict:
# Simple repulsion to reduce overlap in dense layouts.
if not raw_positions:
return raw_positions
positions = {node: [float(x), float(y)] for node, (x, y) in raw_positions.items()}
nodes = list(positions.keys())
for _ in range(steps):
moved = False
for i in range(len(nodes)):
for j in range(i + 1, len(nodes)):
a = nodes[i]
b = nodes[j]
dx = positions[a][0] - positions[b][0]
dy = positions[a][1] - positions[b][1]
dist_sq = dx * dx + dy * dy
if dist_sq == 0:
dx, dy = 1e-3, 0.0
dist_sq = dx * dx + dy * dy
dist = math.sqrt(dist_sq)
if dist < min_dist:
push = (min_dist - dist) * 0.5
nx_dir = dx / dist
ny_dir = dy / dist
positions[a][0] += nx_dir * push
positions[a][1] += ny_dir * push
positions[b][0] -= nx_dir * push
positions[b][1] -= ny_dir * push
moved = True
if not moved:
break
return {node: (pos[0], pos[1]) for node, pos in positions.items()}
# Build graph from all lines to preserve visual continuity
G = nx.Graph()
for idx in net.bus.index:
G.add_node(int(idx))
for _, row in net.line.iterrows():
G.add_edge(int(row["from_bus"]), int(row["to_bus"]))
n_buses = len(net.bus)
slack_bus = int(net.ext_grid.bus.iloc[0]) if len(net.ext_grid) > 0 else 0
if n_buses <= 40:
# Layered radial layout
layers = {slack_bus: 0}
visited = {slack_bus}
queue = [slack_bus]
while queue:
node = queue.pop(0)
for neighbor in G.neighbors(node):
if neighbor not in visited:
visited.add(neighbor)
layers[neighbor] = layers[node] + 1
queue.append(neighbor)
for node in G.nodes():
if node not in layers:
layers[node] = 0
layer_groups: dict[int, list[int]] = {}
for node, layer in layers.items():
layer_groups.setdefault(layer, []).append(node)
raw_positions = {}
for layer, nodes in layer_groups.items():
x = float(layer)
n_nodes = len(nodes)
for i, node in enumerate(sorted(nodes)):
if n_nodes == 1:
y = 0.0
else:
y = float(i) / (n_nodes - 1)
raw_positions[node] = (x, y)
return scale_positions(raw_positions, width=1600, height=900, padding=60)
# Force-directed layout for larger systems
# Use Kamada-Kawai for better distribution than spring_layout
try:
raw_positions = nx.kamada_kawai_layout(G)
except Exception:
raw_positions = nx.spring_layout(
G,
seed=42,
k=3.0 / max(math.sqrt(n_buses), 1.0),
iterations=300,
)
raw_positions = spread_positions(raw_positions, min_dist=0.06, steps=10)
return scale_positions(raw_positions, width=3200, height=2000, padding=100)
@router.get("/grid")
def get_grid(system: str = "case33bw", user: FirebaseUser = Depends(optional_auth)):
"""Get grid topology for visualization.
Returns nodes (buses) and branches (lines) with positions and status.
"""
try:
net = load_network(system)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
# Get bus and line data
buses = get_bus_data(net)
lines = get_topology_data(net)
summary = get_network_summary(net)
# Compute positions for visualization
positions = compute_bus_positions(net)
# Build nodes response
nodes = []
for bus in buses:
pos = positions.get(bus["index"], {"x": 0, "y": 0})
nodes.append({
"id": f"bus_{bus['index']}",
"data": {
"label": f"Bus {bus['index']}",
"busId": bus["index"],
"vn_kv": bus["vn_kv"],
"load_mw": bus["load_mw"],
"load_mvar": bus["load_mvar"],
"is_slack": bus["is_slack"],
},
"position": pos,
"type": "busNode",
})
# Build branches response
branches = []
for line in lines:
branches.append({
"id": f"line_{line['index']}",
"source": f"bus_{line['from_bus']}",
"target": f"bus_{line['to_bus']}",
"data": {
"lineId": line["index"],
"from_bus": line["from_bus"],
"to_bus": line["to_bus"],
"r_ohm_per_km": line["r_ohm_per_km"],
"x_ohm_per_km": line["x_ohm_per_km"],
"length_km": line["length_km"],
"in_service": line["in_service"],
"is_tie": line["is_tie"],
},
"type": "switchEdge",
"animated": not line["in_service"], # Animate open switches
"style": {
"stroke": "#22c55e" if line["in_service"] else "#ef4444",
"strokeWidth": 2,
},
})
return {
"system": system,
"summary": summary,
"nodes": nodes,
"branches": branches,
}
class SetOutOfServiceRequest(BaseModel):
system: str = Field(default="case33bw", description="IEEE test system name")
out_of_service_lines: list[int] = Field(description="Line indices to set as OUT OF SERVICE")
@router.post("/grid/set-out-of-service")
def set_out_of_service(req: SetOutOfServiceRequest, user: FirebaseUser = Depends(optional_auth)):
"""Set specific lines as out of service and return updated grid + power flow."""
try:
net = load_network(req.system)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
# Validate line indices
valid_lines = set(net.line.index.tolist())
invalid = [l for l in req.out_of_service_lines if l not in valid_lines]
if invalid:
raise HTTPException(status_code=400, detail=f"Invalid line indices: {invalid}")
# ── Connectivity check with auto-repair ──
warnings = []
working_oos = list(req.out_of_service_lines)
if not check_topology_valid(net, working_oos, require_radial=False):
# Try to restore connectivity by closing tie-lines automatically
repaired, auto_closed = try_repair_connectivity(net, working_oos)
if repaired is not None:
working_oos = repaired
closed_str = ", ".join(str(l) for l in auto_closed)
n_buses = len(net.bus)
max_open = len(net.line) - (n_buses - 1)
warnings.append(
f"To maintain connectivity, line(s) {closed_str} were automatically "
f"kept in service (closed). The {n_buses}-bus system supports at "
f"most {max_open} open lines while staying connected."
)
else:
return {
"valid": False,
"error": "Configuration is not connected. All buses must remain reachable.",
}
is_dist = is_distribution_grid(net)
if is_dist and not check_topology_valid(net, working_oos, require_radial=True):
n_buses = len(net.bus)
n_required_open = len(net.line) - (n_buses - 1)
warnings.append(
f"Configuration has loops (not radial). Distribution grids are normally "
f"operated as trees. For {n_buses}-bus system you need exactly "
f"{n_required_open} open lines to maintain radiality."
)
# Apply topology and run power flow
net_new = apply_topology(net, working_oos)
if not run_power_flow(net_new):
return {
"valid": False,
"error": "Power flow did not converge for this configuration.",
}
results = extract_results(net_new)
results["open_lines"] = working_oos
resp = {
"valid": True,
"system": req.system,
"open_lines": working_oos,
"power_flow": results,
}
if warnings:
resp["warnings"] = warnings
return resp
@router.get("/grid/{system}/switches")
def get_switch_states(system: str = "case33bw"):
"""Get current switch states for all lines."""
try:
net = load_network(system)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
lines = get_topology_data(net)
switches = []
for line in lines:
switches.append({
"line_id": line["index"],
"from_bus": line["from_bus"],
"to_bus": line["to_bus"],
"status": "closed" if line["in_service"] else "open",
"is_tie": line["is_tie"],
})
return {"system": system, "switches": switches}