File size: 8,691 Bytes
55e3496
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7477316
 
55e3496
 
7477316
 
 
 
 
 
55e3496
 
 
 
 
 
 
7477316
 
55e3496
 
 
 
7477316
55e3496
 
 
 
 
 
 
7477316
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55e3496
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7477316
55e3496
 
7477316
55e3496
 
 
 
 
 
 
7477316
 
55e3496
 
 
 
 
 
 
7477316
 
55e3496
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
"""
Power Flow — AC power flow simulation and result extraction.
Wraps pandapower's Newton-Raphson solver and provides clean result dicts
for the API and evaluation layers.
"""
from __future__ import annotations

import numpy as np
import networkx as nx
import pandapower as pp

from src.grid.loader import clone_network


def run_power_flow(net: pp.pandapowerNet, **kwargs) -> bool:
    """Run AC power flow (Newton-Raphson) on the given network.

    Parameters
    ----------
    net : pp.pandapowerNet
        The network to solve.  Results are stored in-place on ``net.res_*``.
    **kwargs
        Extra keyword arguments forwarded to ``pp.runpp`` (e.g. ``init``,
        ``numba``).

    Returns
    -------
    bool
        ``True`` if the power flow converged, ``False`` otherwise.
    """
    try:
        pp.runpp(net, **kwargs)
        return True
    except pp.LoadflowNotConverged:
        return False


def extract_results(net: pp.pandapowerNet) -> dict:
    """Extract key results from a solved network.

    Returns
    -------
    dict  with keys:
        total_loss_kw, total_loss_mw, loss_pct,
        min_voltage_pu, max_voltage_pu, mean_voltage_pu,
        voltage_violations (count of buses outside 0.95–1.05),
        bus_voltages (list), line_loadings (list), line_losses_kw (list)
    """
    total_gen_mw = float(net.res_ext_grid.p_mw.sum())
    if len(net.res_gen) > 0:
        total_gen_mw += float(net.res_gen.p_mw.sum())
    total_load_mw = float(net.load.p_mw.sum())

    # Only sum losses for in-service lines
    in_service_mask = net.line.in_service
    loss_mw = float(net.res_line.loc[in_service_mask, "pl_mw"].sum())
    loss_kw = loss_mw * 1000
    loss_pct = (loss_mw / total_gen_mw * 100) if total_gen_mw > 0 else 0.0

    vm = net.res_bus.vm_pu.values
    violations = int(np.sum((vm < 0.95) | (vm > 1.05)))

    # Line results (only in-service lines)
    line_loadings = []
    line_losses = []
    for idx in net.line.index:
        if net.line.at[idx, "in_service"]:
            line_loadings.append(round(float(net.res_line.at[idx, "loading_percent"]), 2))
            line_losses.append(round(float(net.res_line.at[idx, "pl_mw"]) * 1000, 2))
        else:
            line_loadings.append(0.0)
            line_losses.append(0.0)

    return {
        "converged": True,
        "total_loss_kw": round(loss_kw, 2),
        "total_loss_mw": round(loss_mw, 4),
        "loss_pct": round(loss_pct, 2),
        "total_generation_mw": round(total_gen_mw, 4),
        "total_load_mw": round(total_load_mw, 4),
        "min_voltage_pu": round(float(vm.min()), 4),
        "max_voltage_pu": round(float(vm.max()), 4),
        "mean_voltage_pu": round(float(vm.mean()), 4),
        "voltage_violations": violations,
        "bus_voltages": [round(float(v), 4) for v in vm],
        "line_loadings_pct": line_loadings,
        "line_losses_kw": line_losses,
    }


def get_baseline(net: pp.pandapowerNet) -> dict:
    """Run power flow on the default configuration and return results."""
    net_copy = clone_network(net)
    converged = run_power_flow(net_copy)
    if not converged:
        return {"converged": False, "error": "Baseline power flow did not converge."}
    return extract_results(net_copy)


def check_topology_valid(net: pp.pandapowerNet, open_lines: list[int], require_radial: bool = True) -> bool:
    """Check if a topology is valid (connected, and optionally radial).

    Builds a NetworkX graph from in-service lines and verifies:
    1. All buses are reachable (connected graph)
    2. Optionally: the graph is a tree (radial — no cycles)

    For distribution networks (e.g. IEEE 33-bus) ``require_radial=True``
    enforces a tree topology.  For meshed transmission networks (e.g. IEEE
    118-bus) ``require_radial=False`` only checks connectivity.

    Parameters
    ----------
    net : pp.pandapowerNet
        The base network (not modified).
    open_lines : list[int]
        Line indices that are OUT of service.
    require_radial : bool
        If True, also verify the network is a tree (no loops). Default True.

    Returns
    -------
    bool
        ``True`` if the topology passes all checks.
    """
    open_set = set(open_lines)
    G = nx.Graph()
    G.add_nodes_from(net.bus.index.tolist())
    for idx, row in net.line.iterrows():
        if idx not in open_set:
            G.add_edge(int(row["from_bus"]), int(row["to_bus"]))
    # Include transformers — they connect buses just like lines
    if hasattr(net, "trafo") and len(net.trafo) > 0:
        for _, row in net.trafo.iterrows():
            G.add_edge(int(row["hv_bus"]), int(row["lv_bus"]))
    if not nx.is_connected(G):
        return False
    if require_radial and not nx.is_tree(G):
        return False
    return True


def check_radial_connected(net: pp.pandapowerNet, open_lines: list[int]) -> bool:
    """Legacy wrapper — checks connected tree (radial) topology."""
    return check_topology_valid(net, open_lines, require_radial=True)


def is_distribution_grid(net: pp.pandapowerNet) -> bool:
    """Auto-detect whether a network is distribution (radial) or transmission (meshed).

    Distribution grids have exactly N-1 in-service lines for N buses (a tree).
    Transmission grids have more edges (meshed with loops).
    """
    if "optiq_is_distribution" in net:
        return bool(net["optiq_is_distribution"])
    n_buses = len(net.bus)
    n_in_service = int(net.line.in_service.sum())
    return n_in_service == n_buses - 1


def try_repair_connectivity(
    net: pp.pandapowerNet,
    open_lines: list[int],
) -> tuple[list[int] | None, list[int]]:
    """Try to restore connectivity by closing lines from the OOS list.

    Prioritises closing *default* tie-lines (the lines that are already
    out-of-service when the network is first loaded) because those are
    the natural candidates for toggling in a reconfiguration.

    Returns
    -------
    (repaired_oos, auto_closed)  if a connected configuration was found.
    (None, [])                   if repair is impossible.
    """
    default_oos = set(net.line.index[~net.line.in_service].tolist())

    # Sort: try closing default (tie) lines first, then user-added feeders
    candidates = sorted(open_lines, key=lambda l: (l not in default_oos, l))

    repaired = list(open_lines)
    auto_closed: list[int] = []

    for line_to_close in candidates:
        if check_topology_valid(net, repaired, require_radial=False):
            break
        repaired.remove(line_to_close)
        auto_closed.append(line_to_close)

    if check_topology_valid(net, repaired, require_radial=False):
        return repaired, auto_closed
    return None, []


def apply_topology(
    net: pp.pandapowerNet,
    open_lines: list[int],
) -> pp.pandapowerNet:
    """Apply a reconfiguration topology by setting line in_service status.

    Parameters
    ----------
    net : pp.pandapowerNet
        Base network (will be deep-copied).
    open_lines : list[int]
        Indices of lines that should be OUT OF SERVICE (open).
        All other lines are set to in-service (closed).

    Returns
    -------
    pp.pandapowerNet
        A new network with the topology applied.
    """
    net_copy = clone_network(net)
    # Close all lines first
    net_copy.line["in_service"] = True
    # Open the specified ones
    for idx in open_lines:
        if idx in net_copy.line.index:
            net_copy.line.at[idx, "in_service"] = False
    return net_copy


def evaluate_topology(net: pp.pandapowerNet, open_lines: list[int], require_radial: bool = True) -> dict:
    """Apply a topology and evaluate it via AC power flow.

    First checks connectivity (and optionally radiality), then runs AC power flow.

    Parameters
    ----------
    net : pp.pandapowerNet
        Base network.
    open_lines : list[int]
        Line indices to set out of service.
    require_radial : bool
        If True, enforce tree topology (for distribution grids). Default True.

    Returns
    -------
    dict
        Full result dict from ``extract_results()``, plus ``open_lines``.
        If infeasible or power flow diverges: ``{"converged": False, ...}``.
    """
    # Feasibility check
    if not check_topology_valid(net, open_lines, require_radial=require_radial):
        return {"converged": False, "open_lines": open_lines, "reason": "not_radial_connected"}

    net_new = apply_topology(net, open_lines)
    converged = run_power_flow(net_new)
    if not converged:
        return {"converged": False, "open_lines": open_lines, "reason": "power_flow_diverged"}
    result = extract_results(net_new)
    result["open_lines"] = open_lines
    return result