""" Power Flow — AC power flow simulation and result extraction. Wraps pandapower's Newton-Raphson solver and provides clean result dicts for the API and evaluation layers. """ from __future__ import annotations import numpy as np import networkx as nx import pandapower as pp from src.grid.loader import clone_network def run_power_flow(net: pp.pandapowerNet, **kwargs) -> bool: """Run AC power flow (Newton-Raphson) on the given network. Parameters ---------- net : pp.pandapowerNet The network to solve. Results are stored in-place on ``net.res_*``. **kwargs Extra keyword arguments forwarded to ``pp.runpp`` (e.g. ``init``, ``numba``). Returns ------- bool ``True`` if the power flow converged, ``False`` otherwise. """ try: pp.runpp(net, **kwargs) return True except pp.LoadflowNotConverged: return False def extract_results(net: pp.pandapowerNet) -> dict: """Extract key results from a solved network. Returns ------- dict with keys: total_loss_kw, total_loss_mw, loss_pct, min_voltage_pu, max_voltage_pu, mean_voltage_pu, voltage_violations (count of buses outside 0.95–1.05), bus_voltages (list), line_loadings (list), line_losses_kw (list) """ total_gen_mw = float(net.res_ext_grid.p_mw.sum()) if len(net.res_gen) > 0: total_gen_mw += float(net.res_gen.p_mw.sum()) total_load_mw = float(net.load.p_mw.sum()) # Only sum losses for in-service lines in_service_mask = net.line.in_service loss_mw = float(net.res_line.loc[in_service_mask, "pl_mw"].sum()) loss_kw = loss_mw * 1000 loss_pct = (loss_mw / total_gen_mw * 100) if total_gen_mw > 0 else 0.0 vm = net.res_bus.vm_pu.values violations = int(np.sum((vm < 0.95) | (vm > 1.05))) # Line results (only in-service lines) line_loadings = [] line_losses = [] for idx in net.line.index: if net.line.at[idx, "in_service"]: line_loadings.append(round(float(net.res_line.at[idx, "loading_percent"]), 2)) line_losses.append(round(float(net.res_line.at[idx, "pl_mw"]) * 1000, 2)) else: line_loadings.append(0.0) line_losses.append(0.0) return { "converged": True, "total_loss_kw": round(loss_kw, 2), "total_loss_mw": round(loss_mw, 4), "loss_pct": round(loss_pct, 2), "total_generation_mw": round(total_gen_mw, 4), "total_load_mw": round(total_load_mw, 4), "min_voltage_pu": round(float(vm.min()), 4), "max_voltage_pu": round(float(vm.max()), 4), "mean_voltage_pu": round(float(vm.mean()), 4), "voltage_violations": violations, "bus_voltages": [round(float(v), 4) for v in vm], "line_loadings_pct": line_loadings, "line_losses_kw": line_losses, } def get_baseline(net: pp.pandapowerNet) -> dict: """Run power flow on the default configuration and return results.""" net_copy = clone_network(net) converged = run_power_flow(net_copy) if not converged: return {"converged": False, "error": "Baseline power flow did not converge."} return extract_results(net_copy) def check_topology_valid(net: pp.pandapowerNet, open_lines: list[int], require_radial: bool = True) -> bool: """Check if a topology is valid (connected, and optionally radial). Builds a NetworkX graph from in-service lines and verifies: 1. All buses are reachable (connected graph) 2. Optionally: the graph is a tree (radial — no cycles) For distribution networks (e.g. IEEE 33-bus) ``require_radial=True`` enforces a tree topology. For meshed transmission networks (e.g. IEEE 118-bus) ``require_radial=False`` only checks connectivity. Parameters ---------- net : pp.pandapowerNet The base network (not modified). open_lines : list[int] Line indices that are OUT of service. require_radial : bool If True, also verify the network is a tree (no loops). Default True. Returns ------- bool ``True`` if the topology passes all checks. """ open_set = set(open_lines) G = nx.Graph() G.add_nodes_from(net.bus.index.tolist()) for idx, row in net.line.iterrows(): if idx not in open_set: G.add_edge(int(row["from_bus"]), int(row["to_bus"])) # Include transformers — they connect buses just like lines if hasattr(net, "trafo") and len(net.trafo) > 0: for _, row in net.trafo.iterrows(): G.add_edge(int(row["hv_bus"]), int(row["lv_bus"])) if not nx.is_connected(G): return False if require_radial and not nx.is_tree(G): return False return True def check_radial_connected(net: pp.pandapowerNet, open_lines: list[int]) -> bool: """Legacy wrapper — checks connected tree (radial) topology.""" return check_topology_valid(net, open_lines, require_radial=True) def is_distribution_grid(net: pp.pandapowerNet) -> bool: """Auto-detect whether a network is distribution (radial) or transmission (meshed). Distribution grids have exactly N-1 in-service lines for N buses (a tree). Transmission grids have more edges (meshed with loops). """ if "optiq_is_distribution" in net: return bool(net["optiq_is_distribution"]) n_buses = len(net.bus) n_in_service = int(net.line.in_service.sum()) return n_in_service == n_buses - 1 def try_repair_connectivity( net: pp.pandapowerNet, open_lines: list[int], ) -> tuple[list[int] | None, list[int]]: """Try to restore connectivity by closing lines from the OOS list. Prioritises closing *default* tie-lines (the lines that are already out-of-service when the network is first loaded) because those are the natural candidates for toggling in a reconfiguration. Returns ------- (repaired_oos, auto_closed) if a connected configuration was found. (None, []) if repair is impossible. """ default_oos = set(net.line.index[~net.line.in_service].tolist()) # Sort: try closing default (tie) lines first, then user-added feeders candidates = sorted(open_lines, key=lambda l: (l not in default_oos, l)) repaired = list(open_lines) auto_closed: list[int] = [] for line_to_close in candidates: if check_topology_valid(net, repaired, require_radial=False): break repaired.remove(line_to_close) auto_closed.append(line_to_close) if check_topology_valid(net, repaired, require_radial=False): return repaired, auto_closed return None, [] def apply_topology( net: pp.pandapowerNet, open_lines: list[int], ) -> pp.pandapowerNet: """Apply a reconfiguration topology by setting line in_service status. Parameters ---------- net : pp.pandapowerNet Base network (will be deep-copied). open_lines : list[int] Indices of lines that should be OUT OF SERVICE (open). All other lines are set to in-service (closed). Returns ------- pp.pandapowerNet A new network with the topology applied. """ net_copy = clone_network(net) # Close all lines first net_copy.line["in_service"] = True # Open the specified ones for idx in open_lines: if idx in net_copy.line.index: net_copy.line.at[idx, "in_service"] = False return net_copy def evaluate_topology(net: pp.pandapowerNet, open_lines: list[int], require_radial: bool = True) -> dict: """Apply a topology and evaluate it via AC power flow. First checks connectivity (and optionally radiality), then runs AC power flow. Parameters ---------- net : pp.pandapowerNet Base network. open_lines : list[int] Line indices to set out of service. require_radial : bool If True, enforce tree topology (for distribution grids). Default True. Returns ------- dict Full result dict from ``extract_results()``, plus ``open_lines``. If infeasible or power flow diverges: ``{"converged": False, ...}``. """ # Feasibility check if not check_topology_valid(net, open_lines, require_radial=require_radial): return {"converged": False, "open_lines": open_lines, "reason": "not_radial_connected"} net_new = apply_topology(net, open_lines) converged = run_power_flow(net_new) if not converged: return {"converged": False, "open_lines": open_lines, "reason": "power_flow_diverged"} result = extract_results(net_new) result["open_lines"] = open_lines return result