import io
import os
import tempfile
import warnings
import zipfile
import folium
import geopandas as gpd
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import requests
import streamlit as st
from plotly.subplots import make_subplots
from scipy.spatial.distance import cdist
from shapely.geometry import LineString, Point
from streamlit_folium import st_folium
warnings.filterwarnings("ignore")
# === FIXED ENVIRONMENT SETUP FOR HUGGING FACE SPACES ===
# Set up writable directories for Hugging Face Spaces
# Set up writable directories for Hugging Face Spaces
temp_dir = tempfile.gettempdir()
# Environment variables that must be set BEFORE importing streamlit
os.environ["STREAMLIT_CONFIG_DIR"] = os.path.join(temp_dir, ".streamlit")
os.environ["HOME"] = temp_dir
os.environ["MPLCONFIGDIR"] = os.path.join(temp_dir, ".matplotlib")
os.environ["XDG_CONFIG_HOME"] = os.path.join(temp_dir, ".config")
os.environ["XDG_CACHE_HOME"] = os.path.join(temp_dir, ".cache")
# Create all necessary directories
directories = [
os.environ["STREAMLIT_CONFIG_DIR"],
os.environ["MPLCONFIGDIR"],
os.environ["XDG_CONFIG_HOME"],
os.environ["XDG_CACHE_HOME"],
os.path.join(temp_dir, ".local"),
os.path.join(temp_dir, ".cache", "matplotlib"),
os.path.join(temp_dir, ".config", "matplotlib"),
]
for dir_path in directories:
try:
os.makedirs(dir_path, mode=0o777, exist_ok=True)
except (OSError, PermissionError):
pass # Ignore errors
# Suppress warnings
warnings.filterwarnings("ignore")
# Set matplotlib backend to Agg (non-interactive) for server environment
try:
import matplotlib
matplotlib.use("Agg")
except Exception as e:
st.warning(f"Matplotlib configuration warning: {e}")
# === STREAMLIT CONFIGURATION ===
st.set_page_config(
page_title="Analisis Jaringan Listrik DIY",
page_icon="โก",
layout="wide",
initial_sidebar_state="expanded",
)
# === CSS STYLING ===
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# === UTILITY FUNCTIONS ===
@st.cache_data
def safe_file_processing(uploaded_file):
"""Safely process uploaded file with error handling"""
try:
# Create temporary directory for file processing
with tempfile.TemporaryDirectory() as temp_dir:
# Save uploaded file
temp_file_path = os.path.join(temp_dir, uploaded_file.name)
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.getvalue())
# Extract and process ZIP
with zipfile.ZipFile(temp_file_path) as zip_file:
zip_file.extractall(temp_dir)
# Find shapefile
shp_files = [f for f in os.listdir(temp_dir) if f.endswith(".shp")]
if not shp_files:
raise ValueError("File shapefile tidak ditemukan dalam ZIP")
# Read shapefile
shp_path = os.path.join(temp_dir, shp_files[0])
gdf = gpd.read_file(shp_path)
return gdf
except Exception as e:
st.error(f"Error memproses file: {str(e)}")
return None
@st.cache_data
def safe_url_download(data_url):
"""Safely download data from URL with timeout and error handling"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
with st.spinner("Mengunduh data dari server..."):
response = requests.get(data_url, timeout=60, headers=headers)
response.raise_for_status()
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file:
zip_file.extractall(temp_dir)
shp_files = [f for f in os.listdir(temp_dir) if f.endswith(".shp")]
if not shp_files:
raise ValueError(
"File shapefile tidak ditemukan dalam download"
)
shp_path = os.path.join(temp_dir, shp_files[0])
gdf = gpd.read_file(shp_path)
return gdf
except requests.exceptions.Timeout:
st.error("โฑ๏ธ Timeout: Server terlalu lama merespons")
return None
except requests.exceptions.ConnectionError:
st.error("๐ Error: Tidak dapat terhubung ke server")
return None
except Exception as e:
st.error(f"Error mengunduh data: {str(e)}")
return None
def create_network_graph(gdf):
"""Membuat graf jaringan dari data geografis dengan penanganan yang lebih baik"""
try:
G = nx.Graph()
# Data sudah dalam UTM Zone 49S (EPSG:32749)
gdf_utm = gdf.copy()
if gdf.crs != "EPSG:32749":
gdf_utm = gdf.to_crs("EPSG:32749")
# Dictionary untuk menyimpan koordinat ke node ID
coord_to_node = {}
node_counter = 0
edges = []
line_segments = []
# Tolerance untuk menggabungkan koordinat yang sangat dekat (dalam meter)
tolerance = 100.0 # 100 meter tolerance
def get_or_create_node(coord):
"""Dapatkan node ID untuk koordinat, atau buat baru jika belum ada"""
nonlocal node_counter
# Cari node yang sudah ada dalam tolerance
for existing_coord, node_id in coord_to_node.items():
if (
abs(existing_coord[0] - coord[0]) < tolerance
and abs(existing_coord[1] - coord[1]) < tolerance
):
return node_id
# Buat node baru
coord_to_node[coord] = node_counter
node_counter += 1
return node_counter - 1
for idx, row in gdf_utm.iterrows():
geom = row.geometry
line_name = row.get("nama", f"Line_{idx}")
line_id = row.get("id", idx)
# Handle MultiLineString dan LineString
if geom.geom_type == "MultiLineString":
# Pecah MultiLineString menjadi LineString individual
for i, line in enumerate(geom.geoms):
coords = list(line.coords)
if len(coords) >= 2:
# Untuk setiap segmen dalam line, buat edges berturut-turut
for j in range(len(coords) - 1):
start_point = coords[j]
end_point = coords[j + 1]
# Dapatkan atau buat node
start_idx = get_or_create_node(start_point)
end_idx = get_or_create_node(end_point)
# Hitung panjang segmen
segment_length = (
(end_point[0] - start_point[0]) ** 2
+ (end_point[1] - start_point[1]) ** 2
) ** 0.5
edge_data = {
"weight": segment_length,
"line_id": f"{line_id}_{i}_{j}",
"nama": f"{line_name}_segment_{i}_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
edges.append((start_idx, end_idx, edge_data))
# Buat geometri LineString untuk segmen ini
segment_geom = LineString([start_point, end_point])
line_segments.append(
{
"geometry": segment_geom,
"start_node": start_idx,
"end_node": end_idx,
"nama": f"{line_name}_segment_{i}_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
)
elif geom.geom_type == "LineString":
coords = list(geom.coords)
if len(coords) >= 2:
# Untuk LineString, buat edges berturut-turut untuk setiap segmen
for j in range(len(coords) - 1):
start_point = coords[j]
end_point = coords[j + 1]
# Dapatkan atau buat node
start_idx = get_or_create_node(start_point)
end_idx = get_or_create_node(end_point)
# Hitung panjang segmen
segment_length = (
(end_point[0] - start_point[0]) ** 2
+ (end_point[1] - start_point[1]) ** 2
) ** 0.5
edge_data = {
"weight": segment_length,
"line_id": f"{line_id}_{j}",
"nama": f"{line_name}_segment_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
edges.append((start_idx, end_idx, edge_data))
# Buat geometri LineString untuk segmen ini
segment_geom = LineString([start_point, end_point])
line_segments.append(
{
"geometry": segment_geom,
"start_node": start_idx,
"end_node": end_idx,
"nama": f"{line_name}_segment_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
)
# Tambahkan nodes ke graf dengan informasi posisi
for coord, node_id in coord_to_node.items():
node_data = {
"pos": coord,
"type": "junction",
"x": coord[0], # UTM Easting
"y": coord[1], # UTM Northing
"lat": None, # Will be calculated when needed
"lon": None, # Will be calculated when needed
}
G.add_node(node_id, **node_data)
# Tambahkan edges ke graf
G.add_edges_from(edges)
# Konversi coord_to_node menjadi list nodes untuk kompatibilitas
nodes = [None] * len(coord_to_node)
for coord, node_id in coord_to_node.items():
nodes[node_id] = coord
return G, nodes, gdf_utm, line_segments
except Exception as e:
st.error(f"Error creating network graph: {str(e)}")
return nx.Graph(), [], gdf, []
def calculate_centrality_measures(G):
"""Hitung berbagai ukuran sentralitas dengan error handling"""
centrality_measures = {}
try:
if G.number_of_nodes() == 0:
return {"degree": {}, "betweenness": {}, "closeness": {}, "eigenvector": {}}
centrality_measures["degree"] = nx.degree_centrality(G)
centrality_measures["betweenness"] = nx.betweenness_centrality(G)
centrality_measures["closeness"] = nx.closeness_centrality(G)
try:
centrality_measures["eigenvector"] = nx.eigenvector_centrality(
G, max_iter=1000
)
except:
# Jika eigenvector centrality gagal, gunakan nilai default
centrality_measures["eigenvector"] = {node: 0.0 for node in G.nodes()}
except Exception as e:
st.warning(f"Error calculating centrality measures: {str(e)}")
# Jika ada masalah dalam perhitungan, gunakan nilai default
num_nodes = G.number_of_nodes()
for measure in ["degree", "betweenness", "closeness", "eigenvector"]:
centrality_measures[measure] = {i: 0.0 for i in range(num_nodes)}
return centrality_measures
def find_minimum_spanning_tree(G):
"""Cari Minimum Spanning Tree menggunakan algoritma Kruskal dengan error handling"""
try:
if G.number_of_nodes() == 0:
return nx.Graph()
mst = nx.minimum_spanning_tree(G, weight="weight", algorithm="kruskal")
return mst
except Exception as e:
st.warning(f"Error finding MST: {str(e)}")
return nx.Graph()
def analyze_network_connectivity(G, line_segments=None):
"""Analisis konektivitas jaringan dengan detail tambahan dan error handling"""
analysis = {}
try:
analysis["num_nodes"] = G.number_of_nodes()
analysis["num_edges"] = G.number_of_edges()
analysis["is_connected"] = (
nx.is_connected(G) if G.number_of_nodes() > 0 else False
)
analysis["num_components"] = nx.number_connected_components(G)
if G.number_of_nodes() > 0:
analysis["density"] = nx.density(G)
if nx.is_connected(G):
try:
analysis["diameter"] = nx.diameter(G)
analysis["average_path_length"] = nx.average_shortest_path_length(G)
except:
analysis["diameter"] = "N/A (Error computing)"
analysis["average_path_length"] = "N/A (Error computing)"
else:
analysis["diameter"] = "N/A (Graf tidak terhubung)"
analysis["average_path_length"] = "N/A (Graf tidak terhubung)"
# Degree statistics
degrees = [d for n, d in G.degree()]
analysis["avg_degree"] = np.mean(degrees) if degrees else 0
analysis["max_degree"] = max(degrees) if degrees else 0
analysis["min_degree"] = min(degrees) if degrees else 0
# Network length statistics (dari line_segments)
if line_segments:
total_length_m = sum(seg["length_m"] for seg in line_segments)
total_length_km = total_length_m / 1000
avg_segment_length = (
total_length_m / len(line_segments) if line_segments else 0
)
analysis["total_network_length_m"] = total_length_m
analysis["total_network_length_km"] = total_length_km
analysis["avg_segment_length_m"] = avg_segment_length
analysis["avg_segment_length_km"] = avg_segment_length / 1000
analysis["longest_segment_km"] = (
max(seg["length_km"] for seg in line_segments)
if line_segments
else 0
)
analysis["shortest_segment_km"] = (
min(seg["length_km"] for seg in line_segments)
if line_segments
else 0
)
else:
# Default values for empty graph
for key in ["density", "avg_degree", "max_degree", "min_degree"]:
analysis[key] = 0
analysis["diameter"] = "N/A"
analysis["average_path_length"] = "N/A"
except Exception as e:
st.error(f"Error analyzing network connectivity: {str(e)}")
# Return minimal analysis
analysis = {
"num_nodes": 0,
"num_edges": 0,
"is_connected": False,
"num_components": 0,
"density": 0,
"diameter": "N/A",
"average_path_length": "N/A",
"avg_degree": 0,
"max_degree": 0,
"min_degree": 0,
}
return analysis
def create_network_visualization(
G,
nodes,
centrality_measures,
show_labels=False,
show_edge_details=False,
label_size=10,
label_color="white",
edge_offset=0.02,
show_edge_colors=True,
):
"""Buat visualisasi jaringan menggunakan Plotly dengan error handling"""
try:
if G.number_of_nodes() == 0:
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
text="Tidak ada data untuk divisualisasikan",
showarrow=False,
font=dict(size=16),
)
return fig
# Gunakan posisi asli dari koordinat UTM, kemudian normalisasi untuk visualisasi
pos = {}
node_coords = [(G.nodes[node]["x"], G.nodes[node]["y"]) for node in G.nodes()]
if node_coords:
# Normalisasi koordinat untuk visualisasi yang lebih baik
min_x = min(coord[0] for coord in node_coords)
max_x = max(coord[0] for coord in node_coords)
min_y = min(coord[1] for coord in node_coords)
max_y = max(coord[1] for coord in node_coords)
# Avoid division by zero
range_x = max_x - min_x if max_x != min_x else 1
range_y = max_y - min_y if max_y != min_y else 1
for node in G.nodes():
x_norm = (G.nodes[node]["x"] - min_x) / range_x
y_norm = (G.nodes[node]["y"] - min_y) / range_y
pos[node] = (x_norm, y_norm)
else:
# Fallback ke spring layout jika tidak ada koordinat
pos = nx.spring_layout(G, k=1, iterations=50)
# Siapkan data untuk edges dengan multiple edges terpisah
edge_traces = [] # List untuk menyimpan multiple traces
# Hitung statistik edge untuk normalisasi
edge_weights = [data.get("weight", 0) for _, _, data in G.edges(data=True)]
max_weight = max(edge_weights) if edge_weights else 1
min_weight = min(edge_weights) if edge_weights else 0
# Group edges berdasarkan pasangan node untuk mendeteksi multiple edges
edge_groups = {}
for edge in G.edges(data=True):
node_pair = tuple(sorted([edge[0], edge[1]]))
if node_pair not in edge_groups:
edge_groups[node_pair] = []
edge_groups[node_pair].append(edge)
# Fungsi untuk membuat offset untuk multiple edges
def calculate_edge_offset(
x0, y0, x1, y1, offset_distance, edge_index, total_edges
):
"""Hitung offset untuk edge paralel"""
if total_edges == 1:
return x0, y0, x1, y1
# Hitung vektor perpendicular
dx = x1 - x0
dy = y1 - y0
length = (dx**2 + dy**2) ** 0.5
if length == 0:
return x0, y0, x1, y1
# Vektor unit perpendicular
perp_x = -dy / length
perp_y = dx / length
# Hitung offset untuk edge ini
if total_edges % 2 == 1:
# Odd number: center edge at 0, others at ยฑoffset
center_index = total_edges // 2
offset = (edge_index - center_index) * offset_distance
else:
# Even number: no center edge
offset = (edge_index - (total_edges - 1) / 2) * offset_distance
# Apply offset
offset_x0 = x0 + perp_x * offset
offset_y0 = y0 + perp_y * offset
offset_x1 = x1 + perp_x * offset
offset_y1 = y1 + perp_y * offset
return offset_x0, offset_y0, offset_x1, offset_y1
# Proses setiap group edge
for node_pair, edges in edge_groups.items():
if edges[0][0] not in pos or edges[0][1] not in pos:
continue
x0, y0 = pos[edges[0][0]]
x1, y1 = pos[edges[0][1]]
total_edges = len(edges)
offset_distance = edge_offset # Gunakan parameter yang dapat diatur
for edge_index, edge in enumerate(edges):
# Hitung posisi dengan offset
offset_x0, offset_y0, offset_x1, offset_y1 = calculate_edge_offset(
x0, y0, x1, y1, offset_distance, edge_index, total_edges
)
weight = edge[2].get("weight", 0)
line_name = edge[2].get("nama", f"Edge_{edge[0]}_{edge[1]}")
line_id = edge[2].get("line_id", f"ID_{edge[0]}_{edge[1]}")
# Info detail untuk hover
if show_edge_details:
edge_info = (
f"Edge: {edge[0]} โ {edge[1]}
"
f"Nama: {line_name}
"
f"ID: {line_id}
"
f"Panjang: {weight:.2f}m ({weight/1000:.3f}km)
"
f"Saluran {edge_index + 1} dari {total_edges}"
)
else:
edge_info = f"Edge: {edge[0]} โ {edge[1]}
Panjang: {weight:.2f}m"
# Warna berdasarkan jumlah edge paralel
if total_edges > 1 and show_edge_colors:
# Multiple edges: gunakan warna berbeda jika diaktifkan
colors = ["red", "blue", "green", "orange", "purple", "brown"]
color = colors[edge_index % len(colors)]
edge_color = color
edge_width = 2.0 # Lebih tebal untuk multiple edges
elif total_edges > 1:
# Multiple edges tanpa warna berbeda
edge_color = "rgba(255,100,100,0.8)" # Merah muda untuk multiple
edge_width = 2.0
else:
# Single edge: warna berdasarkan panjang
if max_weight > min_weight:
normalized_weight = (weight - min_weight) / (
max_weight - min_weight
)
red_component = int(255 * (1 - normalized_weight))
blue_component = int(255 * normalized_weight)
edge_color = (
f"rgba({red_component}, 100, {blue_component}, 0.7)"
)
else:
edge_color = "rgba(125,125,125,0.8)"
edge_width = 1.2
# Buat trace untuk edge ini
edge_trace = go.Scatter(
x=[offset_x0, offset_x1, None],
y=[offset_y0, offset_y1, None],
line=dict(width=edge_width, color=edge_color),
hoverinfo="text" if show_edge_details else "none",
hovertext=edge_info if show_edge_details else None,
mode="lines",
name=(
f"Saluran {edge_index + 1}"
if total_edges > 1
else "Saluran Listrik"
),
showlegend=False,
)
edge_traces.append(edge_trace)
# Siapkan data untuk nodes
node_x = []
node_y = []
node_text = []
node_color = []
node_size = []
node_ids = [] # Pindahkan ke sini untuk sinkronisasi
# Gunakan degree centrality untuk pewarnaan dan ukuran
degree_cent = centrality_measures.get("degree", {})
for node in G.nodes():
if node in pos:
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_ids.append(str(node)) # Tambahkan ID node sesuai urutan
# Informasi node dengan detail koneksi
adjacencies = list(G.neighbors(node))
node_degree = G.degree(node)
# Hitung total edge secara manual untuk verifikasi
total_edges_manual = 0
connection_details = []
for neighbor in adjacencies:
# Hitung berapa banyak edge antara node ini dan neighbor
edge_count = G.number_of_edges(node, neighbor)
total_edges_manual += edge_count
if edge_count > 1:
connection_details.append(
f"โ Node {neighbor} ({edge_count} saluran)"
)
else:
connection_details.append(f"โ Node {neighbor}")
node_info = f"๐ต Node: {node}
"
node_info += f"๐ Degree (NetworkX): {node_degree}
"
node_info += f"๐ข Total Edge Manual: {total_edges_manual}
"
node_info += f"๐ฅ Tetangga: {len(adjacencies)}
"
# Tampilkan peringatan jika ada ketidaksesuaian
if node_degree != total_edges_manual:
node_info += f"โ ๏ธ INCONSISTENCY DETECTED!
"
if show_edge_details and connection_details:
node_info += f"๐ Detail Koneksi:
"
node_info += "
".join(
connection_details[:5]
) # Batasi 5 koneksi pertama
if len(connection_details) > 5:
node_info += (
f"
... dan {len(connection_details) - 5} lainnya"
)
node_info += "
"
node_info += f"๐ Sentralitas:
"
node_info += f"โข Degree: {degree_cent.get(node, 0):.4f}
"
node_info += f'โข Betweenness: {centrality_measures.get("betweenness", {}).get(node, 0):.4f}
'
node_info += f'โข Closeness: {centrality_measures.get("closeness", {}).get(node, 0):.4f}
'
node_info += f'โข Eigenvector: {centrality_measures.get("eigenvector", {}).get(node, 0):.4f}'
node_text.append(node_info)
node_color.append(degree_cent.get(node, 0))
# Ukuran node berdasarkan degree centrality
base_size = 8
size_multiplier = 20
node_size.append(base_size + degree_cent.get(node, 0) * size_multiplier)
# node_ids sudah dibuat di loop sebelumnya, tidak perlu duplikasi
# Trace untuk nodes dengan styling yang lebih menarik
node_trace = go.Scatter(
x=node_x,
y=node_y,
mode="markers+text" if show_labels else "markers",
hoverinfo="text",
text=node_ids if show_labels else [],
textposition="middle center",
textfont=dict(size=label_size, color=label_color, family="Arial Black"),
hovertext=node_text,
marker=dict(
showscale=True,
colorscale="Viridis",
reversescale=True,
color=node_color,
size=node_size,
colorbar=dict(
thickness=15,
len=0.7,
x=1.02,
title=dict(text="Degree Centrality", font=dict(size=12)),
tickfont=dict(size=10),
),
line=dict(width=1, color="white"),
opacity=0.9,
),
name="Node/Junction",
)
# Buat figure dengan multiple edge traces
all_traces = edge_traces + [node_trace]
fig = go.Figure(
data=all_traces,
layout=go.Layout(
title=dict(
text="Visualisasi Graf Jaringan Listrik DIY",
font=dict(size=16),
x=0.5,
),
showlegend=False,
hovermode="closest",
margin=dict(b=40, l=40, r=60, t=80),
annotations=[
dict(
text="Node berukuran dan berwarna berdasarkan Degree Centrality.
Saluran paralel ditampilkan dengan garis terpisah dan warna berbeda.
Node yang lebih besar dan gelap = lebih penting dalam jaringan",
showarrow=False,
xref="paper",
yref="paper",
x=0.02,
y=0.02,
xanchor="left",
yanchor="bottom",
font=dict(color="#666", size=10),
bgcolor="rgba(255,255,255,0.8)",
bordercolor="#ccc",
borderwidth=1,
)
],
xaxis=dict(
showgrid=True,
zeroline=False,
showticklabels=False,
gridcolor="rgba(128,128,128,0.2)",
),
yaxis=dict(
showgrid=True,
zeroline=False,
showticklabels=False,
gridcolor="rgba(128,128,128,0.2)",
),
plot_bgcolor="rgba(240,240,240,0.1)",
height=700,
),
)
return fig
except Exception as e:
st.error(f"Error creating network visualization: {str(e)}")
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
text=f"Error dalam visualisasi: {str(e)}",
showarrow=False,
font=dict(size=14),
)
return fig
def create_centrality_comparison(centrality_measures):
"""Buat perbandingan ukuran sentralitas dengan error handling"""
try:
if not centrality_measures or not centrality_measures.get("degree"):
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
text="Tidak ada data sentralitas untuk dibandingkan",
showarrow=False,
font=dict(size=16),
)
return fig
nodes = list(centrality_measures["degree"].keys())
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=(
"Degree Centrality",
"Betweenness Centrality",
"Closeness Centrality",
"Eigenvector Centrality",
),
vertical_spacing=0.1,
)
measures = ["degree", "betweenness", "closeness", "eigenvector"]
positions = [(1, 1), (1, 2), (2, 1), (2, 2)]
for measure, (row, col) in zip(measures, positions):
values = [centrality_measures[measure].get(node, 0) for node in nodes]
fig.add_trace(
go.Bar(x=nodes, y=values, name=measure.title()), row=row, col=col
)
fig.update_layout(
height=600,
showlegend=False,
title=dict(text="Perbandingan Ukuran Sentralitas", font=dict(size=16)),
)
return fig
except Exception as e:
st.error(f"Error creating centrality comparison: {str(e)}")
return go.Figure()
def create_centrality_matrix(centrality_measures):
"""Buat matriks sentralitas untuk semua node dengan error handling"""
try:
if not centrality_measures or not centrality_measures.get("degree"):
return pd.DataFrame()
# Ambil semua node
nodes = list(centrality_measures["degree"].keys())
# Buat DataFrame dengan semua ukuran sentralitas
centrality_data = {
"Node": nodes,
"Degree Centrality": [
centrality_measures["degree"].get(node, 0) for node in nodes
],
"Closeness Centrality": [
centrality_measures["closeness"].get(node, 0) for node in nodes
],
"Betweenness Centrality": [
centrality_measures["betweenness"].get(node, 0) for node in nodes
],
"Eigenvector Centrality": [
centrality_measures["eigenvector"].get(node, 0) for node in nodes
],
}
df = pd.DataFrame(centrality_data)
# Urutkan berdasarkan Degree Centrality (descending)
df = df.sort_values("Degree Centrality", ascending=False).reset_index(drop=True)
return df
except Exception as e:
st.error(f"Error creating centrality matrix: {str(e)}")
return pd.DataFrame()
def create_node_connection_details(G, top_n=20):
"""Buat tabel detail koneksi untuk node-node teratas"""
try:
if G.number_of_nodes() == 0:
return pd.DataFrame()
# Ambil node dengan degree tertinggi
node_degrees = dict(G.degree())
top_nodes = sorted(node_degrees.items(), key=lambda x: x[1], reverse=True)[
:top_n
]
connection_data = []
# Deteksi self-loops dalam graf
self_loops = list(nx.selfloop_edges(G))
has_self_loops = len(self_loops) > 0
for node, degree in top_nodes:
neighbors = list(G.neighbors(node))
actual_neighbors = [n for n in neighbors if n != node] # Exclude self-loop
# Hitung detail koneksi
connection_details = []
total_edges = 0
# Hitung edges ke tetangga sebenarnya
for neighbor in actual_neighbors:
edge_count = G.number_of_edges(node, neighbor)
total_edges += edge_count
if edge_count > 1:
connection_details.append(f"Node {neighbor} ({edge_count}x)")
else:
connection_details.append(f"Node {neighbor}")
# Tambahkan self-loop jika ada
if G.has_edge(node, node):
self_edge_count = G.number_of_edges(node, node)
total_edges += self_edge_count
connection_details.append(
f"Node {node} (SELF-LOOP: {self_edge_count}x)"
)
# Batasi tampilan koneksi
if len(connection_details) > 8:
display_connections = (
", ".join(connection_details[:8])
+ f", ... (+{len(connection_details)-8})"
)
else:
display_connections = ", ".join(connection_details)
# Bandingkan degree NetworkX dengan perhitungan manual
degree_nx = G.degree(node)
# Cek self-loop untuk node ini
has_self_loop = G.has_edge(node, node)
self_loop_count = (
1 if has_self_loop else 0
) # Self-loop dihitung 1x sesuai teori graf
# Total edges termasuk self-loop
total_edges_with_self = total_edges + self_loop_count
is_consistent = degree_nx == total_edges_with_self
# Status dengan informasi self-loop
if is_consistent:
status = "โ
OK" + (" (with self-loop)" if has_self_loop else "")
elif has_self_loop:
status = f"โ ๏ธ SELF-LOOP (+{self_loop_count})"
else:
status = "โ ๏ธ INCONSISTENT"
connection_data.append(
{
"Node": node,
"Degree (NetworkX)": degree_nx,
"Total Edges (Manual)": total_edges,
"Self-Loop": "Yes" if has_self_loop else "No",
"Jumlah Tetangga": len(actual_neighbors),
"Detail Koneksi": display_connections,
"Rasio Edge/Tetangga": (
f"{total_edges/len(neighbors):.2f}" if neighbors else "0"
),
"Status": status,
}
)
return pd.DataFrame(connection_data)
except Exception as e:
st.error(f"Error creating connection details: {str(e)}")
return pd.DataFrame()
def create_map_visualization(gdf_original):
"""Buat visualisasi peta menggunakan Folium dengan error handling"""
try:
if gdf_original is None or gdf_original.empty:
return None
# Konversi ke WGS84 untuk visualisasi
gdf_wgs84 = gdf_original.to_crs("EPSG:4326")
# Hitung centroid untuk center map
bounds = gdf_wgs84.total_bounds
center_lat = (bounds[1] + bounds[3]) / 2
center_lon = (bounds[0] + bounds[2]) / 2
# Buat peta
m = folium.Map(
location=[center_lat, center_lon], zoom_start=12, tiles="OpenStreetMap"
)
# Tambahkan layer jaringan listrik
for idx, row in gdf_wgs84.iterrows():
geom = row.geometry
line_name = row.get("nama", f"Line_{idx}")
line_id = row.get("id", idx)
# Handle MultiLineString dan LineString
if geom.geom_type == "MultiLineString":
for i, line in enumerate(geom.geoms):
coords = [[lat, lon] for lon, lat in line.coords]
# Hitung panjang untuk popup
line_length_m = line.length * 111000 # Approximate conversion
line_length_km = line_length_m / 1000
popup_text = f"""
{line_name} - Segment {i+1}
ID: {line_id}
Panjang: {line_length_km:.3f} km
Tipe: MultiLineString
"""
folium.PolyLine(
locations=coords,
color="red",
weight=2,
opacity=0.8,
popup=folium.Popup(popup_text, max_width=300),
).add_to(m)
elif geom.geom_type == "LineString":
coords = [[lat, lon] for lon, lat in geom.coords]
# Hitung panjang untuk popup
line_length_m = geom.length * 111000 # Approximate conversion
line_length_km = line_length_m / 1000
popup_text = f"""
{line_name}
ID: {line_id}
Panjang: {line_length_km:.3f} km
Tipe: LineString
"""
folium.PolyLine(
locations=coords,
color="blue",
weight=2,
opacity=0.8,
popup=folium.Popup(popup_text, max_width=300),
).add_to(m)
# Tambahkan legend
legend_html = """
Legenda
MultiLineString
LineString