Spaces:
Running
Running
| from __future__ import annotations | |
| import math | |
| from collections import defaultdict | |
| from typing import Any, Sequence | |
| import networkx as nx | |
| import numpy as np | |
| def analyze_grid_topology( | |
| obs, | |
| line_or_to_subid: Sequence[int], | |
| line_ex_to_subid: Sequence[int], | |
| n_sub: int, | |
| ) -> dict[str, Any]: | |
| """Build topology intelligence from a raw Grid2Op observation.""" | |
| energy_graph = obs.get_energy_graph() | |
| line_status = [bool(x) for x in obs.line_status.tolist()] | |
| rho = [float(x) for x in obs.rho.tolist()] | |
| overflow = [int(x) for x in obs.timestep_overflow.tolist()] | |
| connected_line_ids = [line_id for line_id, status in enumerate(line_status) if status] | |
| bus_graph = nx.MultiGraph() | |
| for sub_id in range(n_sub): | |
| bus_graph.add_node(sub_id) | |
| for line_id in connected_line_ids: | |
| u = int(line_or_to_subid[line_id]) | |
| v = int(line_ex_to_subid[line_id]) | |
| bus_graph.add_edge( | |
| u, | |
| v, | |
| key=line_id, | |
| line_id=line_id, | |
| rho=rho[line_id], | |
| timestep_overflow=overflow[line_id], | |
| ) | |
| active_bus_graph = nx.Graph() | |
| active_bus_graph.add_nodes_from( | |
| node for node in bus_graph.nodes if bus_graph.degree(node) > 0 | |
| ) | |
| for u, v, data in bus_graph.edges(data=True): | |
| if active_bus_graph.has_edge(u, v): | |
| continue | |
| active_bus_graph.add_edge(u, v, rho=float(data["rho"])) | |
| pair_to_lines: dict[tuple[int, int], list[int]] = defaultdict(list) | |
| for line_id in connected_line_ids: | |
| u = int(line_or_to_subid[line_id]) | |
| v = int(line_ex_to_subid[line_id]) | |
| key = tuple(sorted((u, v))) | |
| pair_to_lines[key].append(line_id) | |
| parallel_groups = { | |
| str(line_id): sorted(other for other in line_ids if other != line_id) | |
| for line_ids in pair_to_lines.values() | |
| if len(line_ids) > 1 | |
| for line_id in line_ids | |
| } | |
| bridge_lines: list[int] = [] | |
| safe_to_disconnect: list[int] = [] | |
| for line_id in connected_line_ids: | |
| trial_graph = bus_graph.copy() | |
| u = int(line_or_to_subid[line_id]) | |
| v = int(line_ex_to_subid[line_id]) | |
| if not trial_graph.has_edge(u, v, key=line_id): | |
| continue | |
| trial_graph.remove_edge(u, v, key=line_id) | |
| active_nodes = [node for node in trial_graph.nodes if trial_graph.degree(node) > 0] | |
| if not active_nodes: | |
| bridge_lines.append(line_id) | |
| continue | |
| reduced_graph = nx.Graph(trial_graph.subgraph(active_nodes)) | |
| if nx.number_connected_components(reduced_graph) > 1: | |
| bridge_lines.append(line_id) | |
| else: | |
| safe_to_disconnect.append(line_id) | |
| components = [ | |
| sorted(component) | |
| for component in nx.connected_components(active_bus_graph) | |
| ] if active_bus_graph.number_of_nodes() else [] | |
| islanded_clusters = components[1:] if len(components) > 1 else [] | |
| centrality_graph = active_bus_graph.copy() | |
| centrality_scores = ( | |
| nx.betweenness_centrality(centrality_graph) | |
| if centrality_graph.number_of_nodes() > 0 | |
| else {} | |
| ) | |
| high_centrality_buses = [ | |
| int(node) | |
| for node, score in sorted( | |
| centrality_scores.items(), | |
| key=lambda item: item[1], | |
| reverse=True, | |
| ) | |
| if score > 0.0 | |
| ][:3] | |
| flow_matrix = _extract_flow_matrix(obs.flow_bus_matrix(active_flow=True)) | |
| exporter_buses, importer_buses = _rank_flow_buses(flow_matrix) | |
| stressed_lines = [ | |
| { | |
| "line_id": line_id, | |
| "rho": round(rho[line_id], 4), | |
| "overflow": overflow[line_id], | |
| "from_sub": int(line_or_to_subid[line_id]), | |
| "to_sub": int(line_ex_to_subid[line_id]), | |
| } | |
| for line_id in sorted(connected_line_ids, key=lambda idx: rho[idx], reverse=True)[:5] | |
| ] | |
| congestion_corridor = "none" | |
| if stressed_lines: | |
| corridor_lines = [entry["line_id"] for entry in stressed_lines[:3]] | |
| congestion_corridor = ( | |
| f"export buses {exporter_buses or ['unknown']} -> " | |
| f"import buses {importer_buses or ['unknown']} via lines {corridor_lines}" | |
| ) | |
| return { | |
| "num_buses": int(active_bus_graph.number_of_nodes()), | |
| "num_connected_lines": len(connected_line_ids), | |
| "bridge_lines": sorted(bridge_lines), | |
| "safe_to_disconnect": sorted(safe_to_disconnect), | |
| "n_minus_1_critical_lines": sorted(bridge_lines), | |
| "parallel_groups": parallel_groups, | |
| "high_centrality_buses": high_centrality_buses, | |
| "islanded_clusters": islanded_clusters, | |
| "congestion_corridor": congestion_corridor, | |
| "flow_clusters": { | |
| "export_buses": exporter_buses, | |
| "import_buses": importer_buses, | |
| }, | |
| "stressed_lines": stressed_lines, | |
| "graph_density": round(nx.density(active_bus_graph), 6) | |
| if active_bus_graph.number_of_nodes() > 1 | |
| else 0.0, | |
| "energy_graph_summary": { | |
| "nodes": energy_graph.number_of_nodes(), | |
| "edges": energy_graph.number_of_edges(), | |
| }, | |
| } | |
| def _extract_flow_matrix(raw_flow_output) -> np.ndarray: | |
| if isinstance(raw_flow_output, np.ndarray): | |
| return raw_flow_output | |
| if isinstance(raw_flow_output, tuple) and raw_flow_output: | |
| first = raw_flow_output[0] | |
| if isinstance(first, np.ndarray): | |
| return first | |
| raise TypeError(f"Unsupported flow_bus_matrix output type: {type(raw_flow_output)!r}") | |
| def _rank_flow_buses(flow_matrix: np.ndarray) -> tuple[list[int], list[int]]: | |
| matrix = np.asarray(flow_matrix, dtype=float) | |
| if matrix.ndim != 2 or matrix.shape[0] != matrix.shape[1]: | |
| raise ValueError(f"Expected square flow matrix, got shape {matrix.shape}") | |
| # In Grid2Op's flow_bus_matrix, the diagonal carries the nodal net active injection | |
| # while off-diagonal entries represent inter-bus transfers. Summing rows therefore | |
| # tends to zero by construction and hides the exporter/importer ranking. | |
| net_exports = np.diag(matrix) | |
| exporters = [ | |
| int(idx) | |
| for idx in np.argsort(-net_exports) | |
| if math.isfinite(net_exports[idx]) and net_exports[idx] > 1.0 | |
| ][:3] | |
| importers = [ | |
| int(idx) | |
| for idx in np.argsort(net_exports) | |
| if math.isfinite(net_exports[idx]) and net_exports[idx] < -1.0 | |
| ][:3] | |
| return exporters, importers | |