OptiQ / src /grid /power_flow.py
mohammademad2003's picture
first commit
7477316
"""
Power Flow — AC power flow simulation and result extraction.
Wraps pandapower's Newton-Raphson solver and provides clean result dicts
for the API and evaluation layers.
"""
from __future__ import annotations
import numpy as np
import networkx as nx
import pandapower as pp
from src.grid.loader import clone_network
def run_power_flow(net: pp.pandapowerNet, **kwargs) -> bool:
"""Run AC power flow (Newton-Raphson) on the given network.
Parameters
----------
net : pp.pandapowerNet
The network to solve. Results are stored in-place on ``net.res_*``.
**kwargs
Extra keyword arguments forwarded to ``pp.runpp`` (e.g. ``init``,
``numba``).
Returns
-------
bool
``True`` if the power flow converged, ``False`` otherwise.
"""
try:
pp.runpp(net, **kwargs)
return True
except pp.LoadflowNotConverged:
return False
def extract_results(net: pp.pandapowerNet) -> dict:
"""Extract key results from a solved network.
Returns
-------
dict with keys:
total_loss_kw, total_loss_mw, loss_pct,
min_voltage_pu, max_voltage_pu, mean_voltage_pu,
voltage_violations (count of buses outside 0.95–1.05),
bus_voltages (list), line_loadings (list), line_losses_kw (list)
"""
total_gen_mw = float(net.res_ext_grid.p_mw.sum())
if len(net.res_gen) > 0:
total_gen_mw += float(net.res_gen.p_mw.sum())
total_load_mw = float(net.load.p_mw.sum())
# Only sum losses for in-service lines
in_service_mask = net.line.in_service
loss_mw = float(net.res_line.loc[in_service_mask, "pl_mw"].sum())
loss_kw = loss_mw * 1000
loss_pct = (loss_mw / total_gen_mw * 100) if total_gen_mw > 0 else 0.0
vm = net.res_bus.vm_pu.values
violations = int(np.sum((vm < 0.95) | (vm > 1.05)))
# Line results (only in-service lines)
line_loadings = []
line_losses = []
for idx in net.line.index:
if net.line.at[idx, "in_service"]:
line_loadings.append(round(float(net.res_line.at[idx, "loading_percent"]), 2))
line_losses.append(round(float(net.res_line.at[idx, "pl_mw"]) * 1000, 2))
else:
line_loadings.append(0.0)
line_losses.append(0.0)
return {
"converged": True,
"total_loss_kw": round(loss_kw, 2),
"total_loss_mw": round(loss_mw, 4),
"loss_pct": round(loss_pct, 2),
"total_generation_mw": round(total_gen_mw, 4),
"total_load_mw": round(total_load_mw, 4),
"min_voltage_pu": round(float(vm.min()), 4),
"max_voltage_pu": round(float(vm.max()), 4),
"mean_voltage_pu": round(float(vm.mean()), 4),
"voltage_violations": violations,
"bus_voltages": [round(float(v), 4) for v in vm],
"line_loadings_pct": line_loadings,
"line_losses_kw": line_losses,
}
def get_baseline(net: pp.pandapowerNet) -> dict:
"""Run power flow on the default configuration and return results."""
net_copy = clone_network(net)
converged = run_power_flow(net_copy)
if not converged:
return {"converged": False, "error": "Baseline power flow did not converge."}
return extract_results(net_copy)
def check_topology_valid(net: pp.pandapowerNet, open_lines: list[int], require_radial: bool = True) -> bool:
"""Check if a topology is valid (connected, and optionally radial).
Builds a NetworkX graph from in-service lines and verifies:
1. All buses are reachable (connected graph)
2. Optionally: the graph is a tree (radial — no cycles)
For distribution networks (e.g. IEEE 33-bus) ``require_radial=True``
enforces a tree topology. For meshed transmission networks (e.g. IEEE
118-bus) ``require_radial=False`` only checks connectivity.
Parameters
----------
net : pp.pandapowerNet
The base network (not modified).
open_lines : list[int]
Line indices that are OUT of service.
require_radial : bool
If True, also verify the network is a tree (no loops). Default True.
Returns
-------
bool
``True`` if the topology passes all checks.
"""
open_set = set(open_lines)
G = nx.Graph()
G.add_nodes_from(net.bus.index.tolist())
for idx, row in net.line.iterrows():
if idx not in open_set:
G.add_edge(int(row["from_bus"]), int(row["to_bus"]))
# Include transformers — they connect buses just like lines
if hasattr(net, "trafo") and len(net.trafo) > 0:
for _, row in net.trafo.iterrows():
G.add_edge(int(row["hv_bus"]), int(row["lv_bus"]))
if not nx.is_connected(G):
return False
if require_radial and not nx.is_tree(G):
return False
return True
def check_radial_connected(net: pp.pandapowerNet, open_lines: list[int]) -> bool:
"""Legacy wrapper — checks connected tree (radial) topology."""
return check_topology_valid(net, open_lines, require_radial=True)
def is_distribution_grid(net: pp.pandapowerNet) -> bool:
"""Auto-detect whether a network is distribution (radial) or transmission (meshed).
Distribution grids have exactly N-1 in-service lines for N buses (a tree).
Transmission grids have more edges (meshed with loops).
"""
if "optiq_is_distribution" in net:
return bool(net["optiq_is_distribution"])
n_buses = len(net.bus)
n_in_service = int(net.line.in_service.sum())
return n_in_service == n_buses - 1
def try_repair_connectivity(
net: pp.pandapowerNet,
open_lines: list[int],
) -> tuple[list[int] | None, list[int]]:
"""Try to restore connectivity by closing lines from the OOS list.
Prioritises closing *default* tie-lines (the lines that are already
out-of-service when the network is first loaded) because those are
the natural candidates for toggling in a reconfiguration.
Returns
-------
(repaired_oos, auto_closed) if a connected configuration was found.
(None, []) if repair is impossible.
"""
default_oos = set(net.line.index[~net.line.in_service].tolist())
# Sort: try closing default (tie) lines first, then user-added feeders
candidates = sorted(open_lines, key=lambda l: (l not in default_oos, l))
repaired = list(open_lines)
auto_closed: list[int] = []
for line_to_close in candidates:
if check_topology_valid(net, repaired, require_radial=False):
break
repaired.remove(line_to_close)
auto_closed.append(line_to_close)
if check_topology_valid(net, repaired, require_radial=False):
return repaired, auto_closed
return None, []
def apply_topology(
net: pp.pandapowerNet,
open_lines: list[int],
) -> pp.pandapowerNet:
"""Apply a reconfiguration topology by setting line in_service status.
Parameters
----------
net : pp.pandapowerNet
Base network (will be deep-copied).
open_lines : list[int]
Indices of lines that should be OUT OF SERVICE (open).
All other lines are set to in-service (closed).
Returns
-------
pp.pandapowerNet
A new network with the topology applied.
"""
net_copy = clone_network(net)
# Close all lines first
net_copy.line["in_service"] = True
# Open the specified ones
for idx in open_lines:
if idx in net_copy.line.index:
net_copy.line.at[idx, "in_service"] = False
return net_copy
def evaluate_topology(net: pp.pandapowerNet, open_lines: list[int], require_radial: bool = True) -> dict:
"""Apply a topology and evaluate it via AC power flow.
First checks connectivity (and optionally radiality), then runs AC power flow.
Parameters
----------
net : pp.pandapowerNet
Base network.
open_lines : list[int]
Line indices to set out of service.
require_radial : bool
If True, enforce tree topology (for distribution grids). Default True.
Returns
-------
dict
Full result dict from ``extract_results()``, plus ``open_lines``.
If infeasible or power flow diverges: ``{"converged": False, ...}``.
"""
# Feasibility check
if not check_topology_valid(net, open_lines, require_radial=require_radial):
return {"converged": False, "open_lines": open_lines, "reason": "not_radial_connected"}
net_new = apply_topology(net, open_lines)
converged = run_power_flow(net_new)
if not converged:
return {"converged": False, "open_lines": open_lines, "reason": "power_flow_diverged"}
result = extract_results(net_new)
result["open_lines"] = open_lines
return result