graf-model / src /streamlit_app.py
hprmn's picture
update
f58a96b
import io
import os
import tempfile
import warnings
import zipfile
import folium
import geopandas as gpd
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import requests
import streamlit as st
from plotly.subplots import make_subplots
from scipy.spatial.distance import cdist
from shapely.geometry import LineString, Point
from streamlit_folium import st_folium
warnings.filterwarnings("ignore")
# === FIXED ENVIRONMENT SETUP FOR HUGGING FACE SPACES ===
# Set up writable directories for Hugging Face Spaces
# Set up writable directories for Hugging Face Spaces
temp_dir = tempfile.gettempdir()
# Environment variables that must be set BEFORE importing streamlit
os.environ["STREAMLIT_CONFIG_DIR"] = os.path.join(temp_dir, ".streamlit")
os.environ["HOME"] = temp_dir
os.environ["MPLCONFIGDIR"] = os.path.join(temp_dir, ".matplotlib")
os.environ["XDG_CONFIG_HOME"] = os.path.join(temp_dir, ".config")
os.environ["XDG_CACHE_HOME"] = os.path.join(temp_dir, ".cache")
# Create all necessary directories
directories = [
os.environ["STREAMLIT_CONFIG_DIR"],
os.environ["MPLCONFIGDIR"],
os.environ["XDG_CONFIG_HOME"],
os.environ["XDG_CACHE_HOME"],
os.path.join(temp_dir, ".local"),
os.path.join(temp_dir, ".cache", "matplotlib"),
os.path.join(temp_dir, ".config", "matplotlib"),
]
for dir_path in directories:
try:
os.makedirs(dir_path, mode=0o777, exist_ok=True)
except (OSError, PermissionError):
pass # Ignore errors
# Suppress warnings
warnings.filterwarnings("ignore")
# Set matplotlib backend to Agg (non-interactive) for server environment
try:
import matplotlib
matplotlib.use("Agg")
except Exception as e:
st.warning(f"Matplotlib configuration warning: {e}")
# === STREAMLIT CONFIGURATION ===
st.set_page_config(
page_title="Analisis Jaringan Listrik DIY",
page_icon="⚡",
layout="wide",
initial_sidebar_state="expanded",
)
# === CSS STYLING ===
st.markdown(
"""
<style>
.main-header {
font-size: 2.5rem;
font-weight: bold;
color: #1e3a8a;
text-align: center;
margin-bottom: 2rem;
}
.sub-header {
font-size: 1.5rem;
font-weight: bold;
color: #3b82f6;
margin-top: 2rem;
margin-bottom: 1rem;
}
.metric-card {
background-color: #f8fafc;
padding: 1rem;
border-radius: 0.5rem;
border-left: 4px solid #3b82f6;
}
.stAlert > div {
padding: 0.5rem;
}
</style>
""",
unsafe_allow_html=True,
)
# === UTILITY FUNCTIONS ===
@st.cache_data
def safe_file_processing(uploaded_file):
"""Safely process uploaded file with error handling"""
try:
# Create temporary directory for file processing
with tempfile.TemporaryDirectory() as temp_dir:
# Save uploaded file
temp_file_path = os.path.join(temp_dir, uploaded_file.name)
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.getvalue())
# Extract and process ZIP
with zipfile.ZipFile(temp_file_path) as zip_file:
zip_file.extractall(temp_dir)
# Find shapefile
shp_files = [f for f in os.listdir(temp_dir) if f.endswith(".shp")]
if not shp_files:
raise ValueError("File shapefile tidak ditemukan dalam ZIP")
# Read shapefile
shp_path = os.path.join(temp_dir, shp_files[0])
gdf = gpd.read_file(shp_path)
return gdf
except Exception as e:
st.error(f"Error memproses file: {str(e)}")
return None
@st.cache_data
def safe_url_download(data_url):
"""Safely download data from URL with timeout and error handling"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
with st.spinner("Mengunduh data dari server..."):
response = requests.get(data_url, timeout=60, headers=headers)
response.raise_for_status()
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file:
zip_file.extractall(temp_dir)
shp_files = [f for f in os.listdir(temp_dir) if f.endswith(".shp")]
if not shp_files:
raise ValueError(
"File shapefile tidak ditemukan dalam download"
)
shp_path = os.path.join(temp_dir, shp_files[0])
gdf = gpd.read_file(shp_path)
return gdf
except requests.exceptions.Timeout:
st.error("⏱️ Timeout: Server terlalu lama merespons")
return None
except requests.exceptions.ConnectionError:
st.error("🌐 Error: Tidak dapat terhubung ke server")
return None
except Exception as e:
st.error(f"Error mengunduh data: {str(e)}")
return None
def create_network_graph(gdf):
"""Membuat graf jaringan dari data geografis dengan penanganan yang lebih baik"""
try:
G = nx.Graph()
# Data sudah dalam UTM Zone 49S (EPSG:32749)
gdf_utm = gdf.copy()
if gdf.crs != "EPSG:32749":
gdf_utm = gdf.to_crs("EPSG:32749")
# Dictionary untuk menyimpan koordinat ke node ID
coord_to_node = {}
node_counter = 0
edges = []
line_segments = []
# Tolerance untuk menggabungkan koordinat yang sangat dekat (dalam meter)
tolerance = 100.0 # 100 meter tolerance
def get_or_create_node(coord):
"""Dapatkan node ID untuk koordinat, atau buat baru jika belum ada"""
nonlocal node_counter
# Cari node yang sudah ada dalam tolerance
for existing_coord, node_id in coord_to_node.items():
if (
abs(existing_coord[0] - coord[0]) < tolerance
and abs(existing_coord[1] - coord[1]) < tolerance
):
return node_id
# Buat node baru
coord_to_node[coord] = node_counter
node_counter += 1
return node_counter - 1
for idx, row in gdf_utm.iterrows():
geom = row.geometry
line_name = row.get("nama", f"Line_{idx}")
line_id = row.get("id", idx)
# Handle MultiLineString dan LineString
if geom.geom_type == "MultiLineString":
# Pecah MultiLineString menjadi LineString individual
for i, line in enumerate(geom.geoms):
coords = list(line.coords)
if len(coords) >= 2:
# Untuk setiap segmen dalam line, buat edges berturut-turut
for j in range(len(coords) - 1):
start_point = coords[j]
end_point = coords[j + 1]
# Dapatkan atau buat node
start_idx = get_or_create_node(start_point)
end_idx = get_or_create_node(end_point)
# Hitung panjang segmen
segment_length = (
(end_point[0] - start_point[0]) ** 2
+ (end_point[1] - start_point[1]) ** 2
) ** 0.5
edge_data = {
"weight": segment_length,
"line_id": f"{line_id}_{i}_{j}",
"nama": f"{line_name}_segment_{i}_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
edges.append((start_idx, end_idx, edge_data))
# Buat geometri LineString untuk segmen ini
segment_geom = LineString([start_point, end_point])
line_segments.append(
{
"geometry": segment_geom,
"start_node": start_idx,
"end_node": end_idx,
"nama": f"{line_name}_segment_{i}_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
)
elif geom.geom_type == "LineString":
coords = list(geom.coords)
if len(coords) >= 2:
# Untuk LineString, buat edges berturut-turut untuk setiap segmen
for j in range(len(coords) - 1):
start_point = coords[j]
end_point = coords[j + 1]
# Dapatkan atau buat node
start_idx = get_or_create_node(start_point)
end_idx = get_or_create_node(end_point)
# Hitung panjang segmen
segment_length = (
(end_point[0] - start_point[0]) ** 2
+ (end_point[1] - start_point[1]) ** 2
) ** 0.5
edge_data = {
"weight": segment_length,
"line_id": f"{line_id}_{j}",
"nama": f"{line_name}_segment_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
edges.append((start_idx, end_idx, edge_data))
# Buat geometri LineString untuk segmen ini
segment_geom = LineString([start_point, end_point])
line_segments.append(
{
"geometry": segment_geom,
"start_node": start_idx,
"end_node": end_idx,
"nama": f"{line_name}_segment_{j}",
"length_m": segment_length,
"length_km": segment_length / 1000,
}
)
# Tambahkan nodes ke graf dengan informasi posisi
for coord, node_id in coord_to_node.items():
node_data = {
"pos": coord,
"type": "junction",
"x": coord[0], # UTM Easting
"y": coord[1], # UTM Northing
"lat": None, # Will be calculated when needed
"lon": None, # Will be calculated when needed
}
G.add_node(node_id, **node_data)
# Tambahkan edges ke graf
G.add_edges_from(edges)
# Konversi coord_to_node menjadi list nodes untuk kompatibilitas
nodes = [None] * len(coord_to_node)
for coord, node_id in coord_to_node.items():
nodes[node_id] = coord
return G, nodes, gdf_utm, line_segments
except Exception as e:
st.error(f"Error creating network graph: {str(e)}")
return nx.Graph(), [], gdf, []
def calculate_centrality_measures(G):
"""Hitung berbagai ukuran sentralitas dengan error handling"""
centrality_measures = {}
try:
if G.number_of_nodes() == 0:
return {"degree": {}, "betweenness": {}, "closeness": {}, "eigenvector": {}}
centrality_measures["degree"] = nx.degree_centrality(G)
centrality_measures["betweenness"] = nx.betweenness_centrality(G)
centrality_measures["closeness"] = nx.closeness_centrality(G)
try:
centrality_measures["eigenvector"] = nx.eigenvector_centrality(
G, max_iter=1000
)
except:
# Jika eigenvector centrality gagal, gunakan nilai default
centrality_measures["eigenvector"] = {node: 0.0 for node in G.nodes()}
except Exception as e:
st.warning(f"Error calculating centrality measures: {str(e)}")
# Jika ada masalah dalam perhitungan, gunakan nilai default
num_nodes = G.number_of_nodes()
for measure in ["degree", "betweenness", "closeness", "eigenvector"]:
centrality_measures[measure] = {i: 0.0 for i in range(num_nodes)}
return centrality_measures
def find_minimum_spanning_tree(G):
"""Cari Minimum Spanning Tree menggunakan algoritma Kruskal dengan error handling"""
try:
if G.number_of_nodes() == 0:
return nx.Graph()
mst = nx.minimum_spanning_tree(G, weight="weight", algorithm="kruskal")
return mst
except Exception as e:
st.warning(f"Error finding MST: {str(e)}")
return nx.Graph()
def analyze_network_connectivity(G, line_segments=None):
"""Analisis konektivitas jaringan dengan detail tambahan dan error handling"""
analysis = {}
try:
analysis["num_nodes"] = G.number_of_nodes()
analysis["num_edges"] = G.number_of_edges()
analysis["is_connected"] = (
nx.is_connected(G) if G.number_of_nodes() > 0 else False
)
analysis["num_components"] = nx.number_connected_components(G)
if G.number_of_nodes() > 0:
analysis["density"] = nx.density(G)
if nx.is_connected(G):
try:
analysis["diameter"] = nx.diameter(G)
analysis["average_path_length"] = nx.average_shortest_path_length(G)
except:
analysis["diameter"] = "N/A (Error computing)"
analysis["average_path_length"] = "N/A (Error computing)"
else:
analysis["diameter"] = "N/A (Graf tidak terhubung)"
analysis["average_path_length"] = "N/A (Graf tidak terhubung)"
# Degree statistics
degrees = [d for n, d in G.degree()]
analysis["avg_degree"] = np.mean(degrees) if degrees else 0
analysis["max_degree"] = max(degrees) if degrees else 0
analysis["min_degree"] = min(degrees) if degrees else 0
# Network length statistics (dari line_segments)
if line_segments:
total_length_m = sum(seg["length_m"] for seg in line_segments)
total_length_km = total_length_m / 1000
avg_segment_length = (
total_length_m / len(line_segments) if line_segments else 0
)
analysis["total_network_length_m"] = total_length_m
analysis["total_network_length_km"] = total_length_km
analysis["avg_segment_length_m"] = avg_segment_length
analysis["avg_segment_length_km"] = avg_segment_length / 1000
analysis["longest_segment_km"] = (
max(seg["length_km"] for seg in line_segments)
if line_segments
else 0
)
analysis["shortest_segment_km"] = (
min(seg["length_km"] for seg in line_segments)
if line_segments
else 0
)
else:
# Default values for empty graph
for key in ["density", "avg_degree", "max_degree", "min_degree"]:
analysis[key] = 0
analysis["diameter"] = "N/A"
analysis["average_path_length"] = "N/A"
except Exception as e:
st.error(f"Error analyzing network connectivity: {str(e)}")
# Return minimal analysis
analysis = {
"num_nodes": 0,
"num_edges": 0,
"is_connected": False,
"num_components": 0,
"density": 0,
"diameter": "N/A",
"average_path_length": "N/A",
"avg_degree": 0,
"max_degree": 0,
"min_degree": 0,
}
return analysis
def create_network_visualization(
G,
nodes,
centrality_measures,
show_labels=False,
show_edge_details=False,
label_size=10,
label_color="white",
edge_offset=0.02,
show_edge_colors=True,
):
"""Buat visualisasi jaringan menggunakan Plotly dengan error handling"""
try:
if G.number_of_nodes() == 0:
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
text="Tidak ada data untuk divisualisasikan",
showarrow=False,
font=dict(size=16),
)
return fig
# Gunakan posisi asli dari koordinat UTM, kemudian normalisasi untuk visualisasi
pos = {}
node_coords = [(G.nodes[node]["x"], G.nodes[node]["y"]) for node in G.nodes()]
if node_coords:
# Normalisasi koordinat untuk visualisasi yang lebih baik
min_x = min(coord[0] for coord in node_coords)
max_x = max(coord[0] for coord in node_coords)
min_y = min(coord[1] for coord in node_coords)
max_y = max(coord[1] for coord in node_coords)
# Avoid division by zero
range_x = max_x - min_x if max_x != min_x else 1
range_y = max_y - min_y if max_y != min_y else 1
for node in G.nodes():
x_norm = (G.nodes[node]["x"] - min_x) / range_x
y_norm = (G.nodes[node]["y"] - min_y) / range_y
pos[node] = (x_norm, y_norm)
else:
# Fallback ke spring layout jika tidak ada koordinat
pos = nx.spring_layout(G, k=1, iterations=50)
# Siapkan data untuk edges dengan multiple edges terpisah
edge_traces = [] # List untuk menyimpan multiple traces
# Hitung statistik edge untuk normalisasi
edge_weights = [data.get("weight", 0) for _, _, data in G.edges(data=True)]
max_weight = max(edge_weights) if edge_weights else 1
min_weight = min(edge_weights) if edge_weights else 0
# Group edges berdasarkan pasangan node untuk mendeteksi multiple edges
edge_groups = {}
for edge in G.edges(data=True):
node_pair = tuple(sorted([edge[0], edge[1]]))
if node_pair not in edge_groups:
edge_groups[node_pair] = []
edge_groups[node_pair].append(edge)
# Fungsi untuk membuat offset untuk multiple edges
def calculate_edge_offset(
x0, y0, x1, y1, offset_distance, edge_index, total_edges
):
"""Hitung offset untuk edge paralel"""
if total_edges == 1:
return x0, y0, x1, y1
# Hitung vektor perpendicular
dx = x1 - x0
dy = y1 - y0
length = (dx**2 + dy**2) ** 0.5
if length == 0:
return x0, y0, x1, y1
# Vektor unit perpendicular
perp_x = -dy / length
perp_y = dx / length
# Hitung offset untuk edge ini
if total_edges % 2 == 1:
# Odd number: center edge at 0, others at ±offset
center_index = total_edges // 2
offset = (edge_index - center_index) * offset_distance
else:
# Even number: no center edge
offset = (edge_index - (total_edges - 1) / 2) * offset_distance
# Apply offset
offset_x0 = x0 + perp_x * offset
offset_y0 = y0 + perp_y * offset
offset_x1 = x1 + perp_x * offset
offset_y1 = y1 + perp_y * offset
return offset_x0, offset_y0, offset_x1, offset_y1
# Proses setiap group edge
for node_pair, edges in edge_groups.items():
if edges[0][0] not in pos or edges[0][1] not in pos:
continue
x0, y0 = pos[edges[0][0]]
x1, y1 = pos[edges[0][1]]
total_edges = len(edges)
offset_distance = edge_offset # Gunakan parameter yang dapat diatur
for edge_index, edge in enumerate(edges):
# Hitung posisi dengan offset
offset_x0, offset_y0, offset_x1, offset_y1 = calculate_edge_offset(
x0, y0, x1, y1, offset_distance, edge_index, total_edges
)
weight = edge[2].get("weight", 0)
line_name = edge[2].get("nama", f"Edge_{edge[0]}_{edge[1]}")
line_id = edge[2].get("line_id", f"ID_{edge[0]}_{edge[1]}")
# Info detail untuk hover
if show_edge_details:
edge_info = (
f"Edge: {edge[0]}{edge[1]}<br>"
f"Nama: {line_name}<br>"
f"ID: {line_id}<br>"
f"Panjang: {weight:.2f}m ({weight/1000:.3f}km)<br>"
f"Saluran {edge_index + 1} dari {total_edges}"
)
else:
edge_info = f"Edge: {edge[0]}{edge[1]}<br>Panjang: {weight:.2f}m"
# Warna berdasarkan jumlah edge paralel
if total_edges > 1 and show_edge_colors:
# Multiple edges: gunakan warna berbeda jika diaktifkan
colors = ["red", "blue", "green", "orange", "purple", "brown"]
color = colors[edge_index % len(colors)]
edge_color = color
edge_width = 2.0 # Lebih tebal untuk multiple edges
elif total_edges > 1:
# Multiple edges tanpa warna berbeda
edge_color = "rgba(255,100,100,0.8)" # Merah muda untuk multiple
edge_width = 2.0
else:
# Single edge: warna berdasarkan panjang
if max_weight > min_weight:
normalized_weight = (weight - min_weight) / (
max_weight - min_weight
)
red_component = int(255 * (1 - normalized_weight))
blue_component = int(255 * normalized_weight)
edge_color = (
f"rgba({red_component}, 100, {blue_component}, 0.7)"
)
else:
edge_color = "rgba(125,125,125,0.8)"
edge_width = 1.2
# Buat trace untuk edge ini
edge_trace = go.Scatter(
x=[offset_x0, offset_x1, None],
y=[offset_y0, offset_y1, None],
line=dict(width=edge_width, color=edge_color),
hoverinfo="text" if show_edge_details else "none",
hovertext=edge_info if show_edge_details else None,
mode="lines",
name=(
f"Saluran {edge_index + 1}"
if total_edges > 1
else "Saluran Listrik"
),
showlegend=False,
)
edge_traces.append(edge_trace)
# Siapkan data untuk nodes
node_x = []
node_y = []
node_text = []
node_color = []
node_size = []
node_ids = [] # Pindahkan ke sini untuk sinkronisasi
# Gunakan degree centrality untuk pewarnaan dan ukuran
degree_cent = centrality_measures.get("degree", {})
for node in G.nodes():
if node in pos:
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_ids.append(str(node)) # Tambahkan ID node sesuai urutan
# Informasi node dengan detail koneksi
adjacencies = list(G.neighbors(node))
node_degree = G.degree(node)
# Hitung total edge secara manual untuk verifikasi
total_edges_manual = 0
connection_details = []
for neighbor in adjacencies:
# Hitung berapa banyak edge antara node ini dan neighbor
edge_count = G.number_of_edges(node, neighbor)
total_edges_manual += edge_count
if edge_count > 1:
connection_details.append(
f"→ Node {neighbor} ({edge_count} saluran)"
)
else:
connection_details.append(f"→ Node {neighbor}")
node_info = f"🔵 Node: {node}<br>"
node_info += f"📊 Degree (NetworkX): {node_degree}<br>"
node_info += f"🔢 Total Edge Manual: {total_edges_manual}<br>"
node_info += f"👥 Tetangga: {len(adjacencies)}<br>"
# Tampilkan peringatan jika ada ketidaksesuaian
if node_degree != total_edges_manual:
node_info += f"⚠️ INCONSISTENCY DETECTED!<br>"
if show_edge_details and connection_details:
node_info += f"🔗 Detail Koneksi:<br>"
node_info += "<br>".join(
connection_details[:5]
) # Batasi 5 koneksi pertama
if len(connection_details) > 5:
node_info += (
f"<br>... dan {len(connection_details) - 5} lainnya"
)
node_info += "<br><br>"
node_info += f"📈 Sentralitas:<br>"
node_info += f"• Degree: {degree_cent.get(node, 0):.4f}<br>"
node_info += f'• Betweenness: {centrality_measures.get("betweenness", {}).get(node, 0):.4f}<br>'
node_info += f'• Closeness: {centrality_measures.get("closeness", {}).get(node, 0):.4f}<br>'
node_info += f'• Eigenvector: {centrality_measures.get("eigenvector", {}).get(node, 0):.4f}'
node_text.append(node_info)
node_color.append(degree_cent.get(node, 0))
# Ukuran node berdasarkan degree centrality
base_size = 8
size_multiplier = 20
node_size.append(base_size + degree_cent.get(node, 0) * size_multiplier)
# node_ids sudah dibuat di loop sebelumnya, tidak perlu duplikasi
# Trace untuk nodes dengan styling yang lebih menarik
node_trace = go.Scatter(
x=node_x,
y=node_y,
mode="markers+text" if show_labels else "markers",
hoverinfo="text",
text=node_ids if show_labels else [],
textposition="middle center",
textfont=dict(size=label_size, color=label_color, family="Arial Black"),
hovertext=node_text,
marker=dict(
showscale=True,
colorscale="Viridis",
reversescale=True,
color=node_color,
size=node_size,
colorbar=dict(
thickness=15,
len=0.7,
x=1.02,
title=dict(text="Degree Centrality", font=dict(size=12)),
tickfont=dict(size=10),
),
line=dict(width=1, color="white"),
opacity=0.9,
),
name="Node/Junction",
)
# Buat figure dengan multiple edge traces
all_traces = edge_traces + [node_trace]
fig = go.Figure(
data=all_traces,
layout=go.Layout(
title=dict(
text="Visualisasi Graf Jaringan Listrik DIY",
font=dict(size=16),
x=0.5,
),
showlegend=False,
hovermode="closest",
margin=dict(b=40, l=40, r=60, t=80),
annotations=[
dict(
text="Node berukuran dan berwarna berdasarkan Degree Centrality.<br>Saluran paralel ditampilkan dengan garis terpisah dan warna berbeda.<br>Node yang lebih besar dan gelap = lebih penting dalam jaringan",
showarrow=False,
xref="paper",
yref="paper",
x=0.02,
y=0.02,
xanchor="left",
yanchor="bottom",
font=dict(color="#666", size=10),
bgcolor="rgba(255,255,255,0.8)",
bordercolor="#ccc",
borderwidth=1,
)
],
xaxis=dict(
showgrid=True,
zeroline=False,
showticklabels=False,
gridcolor="rgba(128,128,128,0.2)",
),
yaxis=dict(
showgrid=True,
zeroline=False,
showticklabels=False,
gridcolor="rgba(128,128,128,0.2)",
),
plot_bgcolor="rgba(240,240,240,0.1)",
height=700,
),
)
return fig
except Exception as e:
st.error(f"Error creating network visualization: {str(e)}")
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
text=f"Error dalam visualisasi: {str(e)}",
showarrow=False,
font=dict(size=14),
)
return fig
def create_centrality_comparison(centrality_measures):
"""Buat perbandingan ukuran sentralitas dengan error handling"""
try:
if not centrality_measures or not centrality_measures.get("degree"):
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
text="Tidak ada data sentralitas untuk dibandingkan",
showarrow=False,
font=dict(size=16),
)
return fig
nodes = list(centrality_measures["degree"].keys())
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=(
"Degree Centrality",
"Betweenness Centrality",
"Closeness Centrality",
"Eigenvector Centrality",
),
vertical_spacing=0.1,
)
measures = ["degree", "betweenness", "closeness", "eigenvector"]
positions = [(1, 1), (1, 2), (2, 1), (2, 2)]
for measure, (row, col) in zip(measures, positions):
values = [centrality_measures[measure].get(node, 0) for node in nodes]
fig.add_trace(
go.Bar(x=nodes, y=values, name=measure.title()), row=row, col=col
)
fig.update_layout(
height=600,
showlegend=False,
title=dict(text="Perbandingan Ukuran Sentralitas", font=dict(size=16)),
)
return fig
except Exception as e:
st.error(f"Error creating centrality comparison: {str(e)}")
return go.Figure()
def create_centrality_matrix(centrality_measures):
"""Buat matriks sentralitas untuk semua node dengan error handling"""
try:
if not centrality_measures or not centrality_measures.get("degree"):
return pd.DataFrame()
# Ambil semua node
nodes = list(centrality_measures["degree"].keys())
# Buat DataFrame dengan semua ukuran sentralitas
centrality_data = {
"Node": nodes,
"Degree Centrality": [
centrality_measures["degree"].get(node, 0) for node in nodes
],
"Closeness Centrality": [
centrality_measures["closeness"].get(node, 0) for node in nodes
],
"Betweenness Centrality": [
centrality_measures["betweenness"].get(node, 0) for node in nodes
],
"Eigenvector Centrality": [
centrality_measures["eigenvector"].get(node, 0) for node in nodes
],
}
df = pd.DataFrame(centrality_data)
# Urutkan berdasarkan Degree Centrality (descending)
df = df.sort_values("Degree Centrality", ascending=False).reset_index(drop=True)
return df
except Exception as e:
st.error(f"Error creating centrality matrix: {str(e)}")
return pd.DataFrame()
def create_node_connection_details(G, top_n=20):
"""Buat tabel detail koneksi untuk node-node teratas"""
try:
if G.number_of_nodes() == 0:
return pd.DataFrame()
# Ambil node dengan degree tertinggi
node_degrees = dict(G.degree())
top_nodes = sorted(node_degrees.items(), key=lambda x: x[1], reverse=True)[
:top_n
]
connection_data = []
# Deteksi self-loops dalam graf
self_loops = list(nx.selfloop_edges(G))
has_self_loops = len(self_loops) > 0
for node, degree in top_nodes:
neighbors = list(G.neighbors(node))
actual_neighbors = [n for n in neighbors if n != node] # Exclude self-loop
# Hitung detail koneksi
connection_details = []
total_edges = 0
# Hitung edges ke tetangga sebenarnya
for neighbor in actual_neighbors:
edge_count = G.number_of_edges(node, neighbor)
total_edges += edge_count
if edge_count > 1:
connection_details.append(f"Node {neighbor} ({edge_count}x)")
else:
connection_details.append(f"Node {neighbor}")
# Tambahkan self-loop jika ada
if G.has_edge(node, node):
self_edge_count = G.number_of_edges(node, node)
total_edges += self_edge_count
connection_details.append(
f"Node {node} (SELF-LOOP: {self_edge_count}x)"
)
# Batasi tampilan koneksi
if len(connection_details) > 8:
display_connections = (
", ".join(connection_details[:8])
+ f", ... (+{len(connection_details)-8})"
)
else:
display_connections = ", ".join(connection_details)
# Bandingkan degree NetworkX dengan perhitungan manual
degree_nx = G.degree(node)
# Cek self-loop untuk node ini
has_self_loop = G.has_edge(node, node)
self_loop_count = (
1 if has_self_loop else 0
) # Self-loop dihitung 1x sesuai teori graf
# Total edges termasuk self-loop
total_edges_with_self = total_edges + self_loop_count
is_consistent = degree_nx == total_edges_with_self
# Status dengan informasi self-loop
if is_consistent:
status = "✅ OK" + (" (with self-loop)" if has_self_loop else "")
elif has_self_loop:
status = f"⚠️ SELF-LOOP (+{self_loop_count})"
else:
status = "⚠️ INCONSISTENT"
connection_data.append(
{
"Node": node,
"Degree (NetworkX)": degree_nx,
"Total Edges (Manual)": total_edges,
"Self-Loop": "Yes" if has_self_loop else "No",
"Jumlah Tetangga": len(actual_neighbors),
"Detail Koneksi": display_connections,
"Rasio Edge/Tetangga": (
f"{total_edges/len(neighbors):.2f}" if neighbors else "0"
),
"Status": status,
}
)
return pd.DataFrame(connection_data)
except Exception as e:
st.error(f"Error creating connection details: {str(e)}")
return pd.DataFrame()
def create_map_visualization(gdf_original):
"""Buat visualisasi peta menggunakan Folium dengan error handling"""
try:
if gdf_original is None or gdf_original.empty:
return None
# Konversi ke WGS84 untuk visualisasi
gdf_wgs84 = gdf_original.to_crs("EPSG:4326")
# Hitung centroid untuk center map
bounds = gdf_wgs84.total_bounds
center_lat = (bounds[1] + bounds[3]) / 2
center_lon = (bounds[0] + bounds[2]) / 2
# Buat peta
m = folium.Map(
location=[center_lat, center_lon], zoom_start=12, tiles="OpenStreetMap"
)
# Tambahkan layer jaringan listrik
for idx, row in gdf_wgs84.iterrows():
geom = row.geometry
line_name = row.get("nama", f"Line_{idx}")
line_id = row.get("id", idx)
# Handle MultiLineString dan LineString
if geom.geom_type == "MultiLineString":
for i, line in enumerate(geom.geoms):
coords = [[lat, lon] for lon, lat in line.coords]
# Hitung panjang untuk popup
line_length_m = line.length * 111000 # Approximate conversion
line_length_km = line_length_m / 1000
popup_text = f"""
<b>{line_name} - Segment {i+1}</b><br>
ID: {line_id}<br>
Panjang: {line_length_km:.3f} km<br>
Tipe: MultiLineString
"""
folium.PolyLine(
locations=coords,
color="red",
weight=2,
opacity=0.8,
popup=folium.Popup(popup_text, max_width=300),
).add_to(m)
elif geom.geom_type == "LineString":
coords = [[lat, lon] for lon, lat in geom.coords]
# Hitung panjang untuk popup
line_length_m = geom.length * 111000 # Approximate conversion
line_length_km = line_length_m / 1000
popup_text = f"""
<b>{line_name}</b><br>
ID: {line_id}<br>
Panjang: {line_length_km:.3f} km<br>
Tipe: LineString
"""
folium.PolyLine(
locations=coords,
color="blue",
weight=2,
opacity=0.8,
popup=folium.Popup(popup_text, max_width=300),
).add_to(m)
# Tambahkan legend
legend_html = """
<div style="position: fixed;
bottom: 50px; left: 50px; width: 120px; height: 60px;
background-color: white; border:2px solid grey; z-index:9999;
font-size:14px; padding: 10px;">
<p><b>Legenda</b></p>
<p><i class="fa fa-minus" style="color:red"></i> MultiLineString</p>
<p><i class="fa fa-minus" style="color:blue"></i> LineString</p>
</div>
"""
m.get_root().html.add_child(folium.Element(legend_html))
return m
except Exception as e:
st.error(f"Error creating map visualization: {str(e)}")
return None
# === MAIN APPLICATION ===
def main():
st.markdown(
'<h1 class="main-header">⚡ Analisis Keterhubungan Jaringan Listrik DIY</h1>',
unsafe_allow_html=True,
)
st.markdown(
"""
**Aplikasi ini menganalisis jaringan listrik di Daerah Istimewa Yogyakarta menggunakan pendekatan Teori Graf**
Berdasarkan proposal penelitian: *"Analisis Keterhubungan Jaringan Listrik Menggunakan Pendekatan Teori Graf dalam Mendukung Ekonomi Digital di Daerah Istimewa Yogyakarta"*
"""
)
# Sidebar
st.sidebar.title("🔧 Konfigurasi Analisis")
# Add environment info for debugging
with st.sidebar.expander("🔍 Debug Info"):
st.write("Environment Variables:")
st.write(f"HOME: {os.environ.get('HOME', 'Not set')}")
st.write(f"MPLCONFIGDIR: {os.environ.get('MPLCONFIGDIR', 'Not set')}")
st.write(
f"STREAMLIT_CONFIG_DIR: {os.environ.get('STREAMLIT_CONFIG_DIR', 'Not set')}"
)
# Pilihan sumber data
data_source = st.sidebar.radio(
"Pilih Sumber Data:",
["📁 Upload File ZIP", "🌐 Download dari URL", "💾 File Lokal"],
)
gdf = None
if data_source == "📁 Upload File ZIP":
uploaded_file = st.sidebar.file_uploader(
"Upload file ZIP berisi data shapefile:",
type=["zip"],
help="Upload file ZIP yang berisi data jaringan listrik dalam format shapefile",
)
if uploaded_file is not None:
with st.spinner("Memproses file yang diupload..."):
gdf = safe_file_processing(uploaded_file)
if gdf is not None:
st.sidebar.success(
f"✅ File berhasil diproses: {len(gdf)} features"
)
elif data_source == "💾 File Lokal":
zip_path = st.sidebar.text_input(
"Path ke file ZIP lokal:",
placeholder="contoh: /path/to/data.zip",
help="Masukkan path lengkap ke file ZIP di sistem lokal",
)
if zip_path and st.sidebar.button("📂 Load File Lokal"):
if os.path.exists(zip_path):
with st.spinner("Memuat file lokal..."):
try:
with zipfile.ZipFile(zip_path, "r") as zip_file:
with tempfile.TemporaryDirectory() as temp_dir:
zip_file.extractall(temp_dir)
shp_files = [
f
for f in os.listdir(temp_dir)
if f.endswith(".shp")
]
if shp_files:
shp_path = os.path.join(temp_dir, shp_files[0])
gdf = gpd.read_file(shp_path)
st.sidebar.success(
f"✅ File lokal berhasil dimuat: {len(gdf)} features"
)
else:
st.sidebar.error(
"File shapefile tidak ditemukan dalam ZIP"
)
except Exception as e:
st.sidebar.error(f"Error memuat file lokal: {str(e)}")
else:
st.sidebar.error("File tidak ditemukan!")
else: # Download dari URL
data_url = st.sidebar.text_input(
"URL Data GeoServer:",
value="http://geoportal.jogjakota.go.id/geoserver/wms?service=WFS&version=1.0.0&request=GetFeature&typeName=Dispertaru:jaringan_listrik_347120201026134638&outputFormat=shape-zip",
)
if st.sidebar.button("🌐 Download dari URL"):
gdf = safe_url_download(data_url)
if gdf is not None:
st.sidebar.success(f"✅ Data berhasil diunduh: {len(gdf)} features")
# Konfigurasi visualisasi
st.sidebar.markdown("### 🎨 Pengaturan Visualisasi")
show_labels = st.sidebar.checkbox(
"Tampilkan Label Node",
value=True,
help="Menampilkan ID node pada visualisasi graf",
)
show_edge_details = st.sidebar.checkbox(
"Tampilkan Detail Edge",
value=False,
help="Menampilkan informasi detail tentang saluran listrik",
)
# Pengaturan tampilan label
with st.sidebar.expander("🏷️ Pengaturan Label Node"):
label_size = st.slider(
"Ukuran Label", 6, 16, 10, help="Ukuran font untuk label node"
)
label_color = st.selectbox(
"Warna Label",
["white", "black", "red", "blue", "green"],
index=0,
help="Warna teks label node",
)
# Pengaturan edge paralel
with st.sidebar.expander("🔗 Pengaturan Saluran Paralel"):
edge_offset = st.slider(
"Jarak Antar Saluran Paralel",
0.01,
0.05,
0.02,
0.005,
help="Mengatur jarak visual antar saluran paralel",
)
show_edge_colors = st.checkbox(
"Warna Berbeda untuk Saluran Paralel",
value=True,
help="Memberikan warna berbeda untuk setiap saluran paralel",
)
# Add performance settings
with st.sidebar.expander("⚙️ Pengaturan Performa"):
max_nodes_viz = st.slider("Max nodes untuk visualisasi", 50, 1000, 500)
use_cache = st.checkbox("Gunakan cache untuk analisis", value=True)
# Proses data jika sudah dimuat
if gdf is not None and not gdf.empty:
if (
st.sidebar.button("🔄 Analisis Data", type="primary")
or "gdf" not in st.session_state
):
with st.spinner("Memproses analisis graf..."):
try:
st.session_state["gdf"] = gdf
# Buat graf jaringan
G, nodes, gdf_utm, line_segments = create_network_graph(gdf)
# Limit nodes for visualization if too many
if G.number_of_nodes() > max_nodes_viz:
st.warning(
f"⚠️ Graf memiliki {G.number_of_nodes()} nodes. Visualisasi dibatasi pada {max_nodes_viz} nodes untuk performa."
)
# Create subgraph with highest degree nodes for visualization
degrees = dict(G.degree())
top_nodes = sorted(
degrees.keys(), key=lambda x: degrees[x], reverse=True
)[:max_nodes_viz]
G_viz = G.subgraph(top_nodes).copy()
st.session_state["G_viz"] = G_viz
else:
st.session_state["G_viz"] = G
st.session_state["G"] = G
st.session_state["nodes"] = nodes
st.session_state["gdf_utm"] = gdf_utm
st.session_state["line_segments"] = line_segments
# Hitung ukuran sentralitas
centrality_measures = calculate_centrality_measures(G)
st.session_state["centrality"] = centrality_measures
# Analisis konektivitas
connectivity_analysis = analyze_network_connectivity(
G, line_segments
)
st.session_state["connectivity"] = connectivity_analysis
# MST
mst = find_minimum_spanning_tree(G)
st.session_state["mst"] = mst
st.success("✅ Data berhasil diproses!")
except Exception as e:
st.error(f"❌ Error dalam analisis: {str(e)}")
st.info(
"Silakan coba dengan file data yang berbeda atau periksa format data."
)
# Tampilkan hasil jika data sudah dimuat
if "gdf" in st.session_state:
gdf = st.session_state["gdf"]
G = st.session_state["G"]
G_viz = st.session_state.get("G_viz", G)
centrality_measures = st.session_state["centrality"]
connectivity_analysis = st.session_state["connectivity"]
mst = st.session_state["mst"]
# Tab layout
tab1, tab2, tab3, tab4, tab5 = st.tabs(
[
"📊 Overview",
"🗺️ Peta Jaringan",
"📈 Analisis Graf",
"🎯 Sentralitas",
"🌳 MST Analysis",
]
)
with tab1:
st.markdown(
'<h2 class="sub-header">Overview Data dan Statistik Jaringan</h2>',
unsafe_allow_html=True,
)
# Informasi dasar dataset
st.markdown("### 📊 Informasi Dataset")
col1, col2, col3 = st.columns(3)
with col1:
st.info(f"**CRS:** {gdf.crs}")
with col2:
st.info(
f"**Geometri:** {gdf.geometry.geom_type.iloc[0] if not gdf.empty else 'N/A'}"
)
with col3:
unique_names = gdf["nama"].nunique() if "nama" in gdf.columns else 1
st.info(f"**Jenis Jaringan:** {unique_names}")
# Metrics utama
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Jumlah Fitur", len(gdf))
with col2:
st.metric("Jumlah Node", connectivity_analysis["num_nodes"])
with col3:
st.metric("Jumlah Edge", connectivity_analysis["num_edges"])
with col4:
st.metric("Komponen Terhubung", connectivity_analysis["num_components"])
# Metrics jaringan fisik
if "total_network_length_km" in connectivity_analysis:
st.markdown("### 📏 Statistik Panjang Jaringan")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Total Panjang",
f"{connectivity_analysis['total_network_length_km']:.2f} km",
)
with col2:
st.metric(
"Rata-rata Segmen",
f"{connectivity_analysis['avg_segment_length_km']:.3f} km",
)
with col3:
st.metric(
"Segmen Terpanjang",
f"{connectivity_analysis['longest_segment_km']:.3f} km",
)
with col4:
st.metric(
"Segmen Terpendek",
f"{connectivity_analysis['shortest_segment_km']:.3f} km",
)
# Statistik detail
st.markdown("### 📋 Statistik Detail Jaringan")
col1, col2 = st.columns(2)
with col1:
st.markdown("**Konektivitas:**")
st.write(
f"- Jaringan Terhubung: {'✅ Ya' if connectivity_analysis['is_connected'] else '❌ Tidak'}"
)
st.write(f"- Densitas Graf: {connectivity_analysis['density']:.4f}")
st.write(f"- Diameter: {connectivity_analysis['diameter']}")
st.write(
f"- Rata-rata Panjang Jalur: {connectivity_analysis['average_path_length']}"
)
with col2:
st.markdown("**Statistik Degree:**")
st.write(
f"- Rata-rata Degree: {connectivity_analysis['avg_degree']:.2f}"
)
st.write(f"- Maximum Degree: {connectivity_analysis['max_degree']}")
st.write(f"- Minimum Degree: {connectivity_analysis['min_degree']}")
# Analisis komponen terpisah jika ada
if connectivity_analysis["num_components"] > 1:
st.markdown("### ⚠️ Analisis Komponen Terpisah")
components = list(nx.connected_components(G))
component_sizes = [len(comp) for comp in components]
col1, col2 = st.columns(2)
with col1:
st.write(f"- Komponen Terbesar: {max(component_sizes)} nodes")
st.write(f"- Komponen Terkecil: {min(component_sizes)} nodes")
with col2:
st.write(
f"- Rata-rata Ukuran: {np.mean(component_sizes):.1f} nodes"
)
st.write(f"- Komponen Singleton: {component_sizes.count(1)}")
# Rekomendasi untuk menghubungkan komponen
st.markdown("**💡 Rekomendasi:**")
st.write("- Periksa gap fisik antar segmen jaringan")
st.write("- Pertimbangkan menambah saluran penghubung")
st.write(
f"- {len([s for s in component_sizes if s == 1])} node terisolasi perlu perhatian"
)
# Tampilkan sample data
st.markdown("### 📄 Sample Data")
# Tampilkan kolom yang relevan
display_cols = (
["id", "nama"]
if all(col in gdf.columns for col in ["id", "nama"])
else gdf.columns.tolist()
)
st.dataframe(gdf[display_cols].head(10))
with tab2:
st.markdown(
'<h2 class="sub-header">Peta Jaringan Listrik</h2>',
unsafe_allow_html=True,
)
# Buat peta
try:
map_viz = create_map_visualization(gdf)
if map_viz:
st_folium(map_viz, width=700, height=500)
else:
st.error("Tidak dapat membuat visualisasi peta")
except Exception as e:
st.error(f"Error creating map: {str(e)}")
with tab3:
st.markdown(
'<h2 class="sub-header">Visualisasi Graf Jaringan</h2>',
unsafe_allow_html=True,
)
# Performance warning
if G.number_of_nodes() > max_nodes_viz:
st.info(
f"ℹ️ Menampilkan {max_nodes_viz} node dengan degree tertinggi dari total {G.number_of_nodes()} nodes"
)
# Kontrol visualisasi tambahan
col1, col2, col3 = st.columns(3)
with col1:
if show_edge_details:
st.info(
"ℹ️ Mode Detail Edge: Hover pada garis untuk melihat detail saluran"
)
with col2:
if show_labels:
st.info("ℹ️ Mode Label: ID node ditampilkan pada graf")
with col3:
# Fitur pencarian node
search_node = st.text_input(
"🔍 Cari Node:",
placeholder="Masukkan ID node (contoh: 13, 83, 154)",
help="Masukkan ID node untuk mencari informasi detail",
)
if search_node:
try:
node_id = int(search_node)
if node_id in G.nodes():
neighbors = list(G.neighbors(node_id))
degree_nx = G.degree(node_id)
# Hitung manual untuk debugging (exclude self-loop dari neighbors)
total_edges_manual = 0
edge_details = []
actual_neighbors = [
n for n in neighbors if n != node_id
] # Exclude self
for neighbor in actual_neighbors:
edge_count = G.number_of_edges(node_id, neighbor)
total_edges_manual += edge_count
edge_details.append(f"→ {neighbor} ({edge_count} edge)")
# Tambahkan self-loop secara terpisah jika ada
if G.has_edge(node_id, node_id):
self_edge_count = G.number_of_edges(node_id, node_id)
edge_details.append(
f"→ {node_id} (SELF-LOOP: {self_edge_count} edge)"
)
total_edges_manual += self_edge_count
st.success(f"✅ Node {node_id} ditemukan!")
st.write(f"• **Degree (NetworkX)**: {degree_nx}")
st.write(
f"• **Total Edges (Manual)**: {total_edges_manual}"
)
st.write(
f"• **Jumlah Tetangga Sebenarnya**: {len(actual_neighbors)}"
)
st.write(
f"• **Neighbors dari NetworkX**: {len(neighbors)} (mungkin termasuk self)"
)
# Debugging mendalam untuk edge
has_self_loop_search = G.has_edge(node_id, node_id)
self_loop_adjustment = 1 if has_self_loop_search else 0
expected_degree = total_edges_manual
# Debug: Lihat semua edge yang terhubung ke node ini
st.write("**🔍 Debug - Semua Edge yang Terhubung:**")
all_edges = []
# Metode 1: Dari G.edges()
for edge in G.edges(node_id, data=True):
all_edges.append(
f"Edge: {edge[0]}{edge[1]} (data: {edge[2]})"
)
# Metode 2: Cek degree calculation NetworkX
degree_dict = dict(G.degree([node_id]))
st.write(f"• NetworkX degree calculation: {degree_dict}")
# Metode 3: Manual count semua edges
manual_degree = 0
for neighbor in G.neighbors(node_id):
edge_count = G.number_of_edges(node_id, neighbor)
manual_degree += edge_count
st.write(f"• To {neighbor}: {edge_count} edge(s)")
st.write(f"• **Manual degree total**: {manual_degree}")
st.write(f"• **NetworkX degree**: {degree_nx}")
st.write(f"• **Difference**: {degree_nx - manual_degree}")
if all_edges:
st.write("**All edges from G.edges():**")
for edge in all_edges:
st.write(f" {edge}")
if degree_nx != manual_degree:
st.error("⚠️ **NETWORKX BUG DETECTED!**")
st.write("**Analysis:**")
st.write(f"- Manual count (CORRECT): {manual_degree}")
st.write(f"- NetworkX degree (WRONG): {degree_nx}")
st.write(f"- Difference: +{degree_nx - manual_degree}")
st.write(
f"- Self-loop present: {'Yes' if has_self_loop_search else 'No'}"
)
st.write("**Root Cause:**")
st.write(
"- NetworkX internal bug with self-loop counting"
)
st.write("- Graf construction issue")
st.write("- Use manual count as the correct value")
st.success(
f"✅ **CORRECTED**: Node {node_id} has {manual_degree} connections"
)
elif has_self_loop_search:
st.info(
"ℹ️ **Self-loop detected** (counted as +1 degree)"
)
else:
st.success("✅ **All calculations consistent!**")
st.write("**Detail Koneksi:**")
for detail in edge_details[:8]:
st.write(f" {detail}")
if len(edge_details) > 8:
st.write(f" ... dan {len(edge_details) - 8} lainnya")
else:
st.warning(f"❌ Node {node_id} tidak ditemukan dalam graf")
# Tampilkan beberapa node yang tersedia untuk referensi
available_nodes = sorted(list(G.nodes()))[:10]
st.info(f"💡 Contoh node yang tersedia: {available_nodes}")
except ValueError:
st.warning("⚠️ Masukkan angka yang valid")
# Info tambahan tentang node
st.markdown("### 📋 Informasi Node")
col1, col2, col3 = st.columns(3)
with col1:
total_nodes = G.number_of_nodes()
st.metric("Total Node", total_nodes)
with col2:
if total_nodes > 0:
min_node = min(G.nodes())
max_node = max(G.nodes())
st.metric("Range Node ID", f"{min_node} - {max_node}")
with col3:
# Tampilkan beberapa node dengan degree tertinggi
if G.number_of_nodes() > 0:
top_degree_nodes = sorted(
G.degree(), key=lambda x: x[1], reverse=True
)[:3]
top_nodes_str = ", ".join(
[str(node) for node, _ in top_degree_nodes]
)
st.metric("Top 3 Node (Degree)", top_nodes_str)
# Visualisasi graf
try:
network_fig = create_network_visualization(
G_viz,
st.session_state["nodes"],
centrality_measures,
show_labels,
show_edge_details,
label_size,
label_color,
edge_offset,
show_edge_colors,
)
st.plotly_chart(network_fig, use_container_width=True)
except Exception as e:
st.error(f"Error creating network visualization: {str(e)}")
# Detail Koneksi Node
st.markdown("### 🔗 Detail Koneksi Node")
try:
connection_df = create_node_connection_details(G_viz, top_n=20)
if not connection_df.empty:
st.markdown(
"""
**Penjelasan Kolom:**
- **Total Edges**: Jumlah total saluran yang terhubung ke node
- **Jumlah Tetangga**: Jumlah node lain yang terhubung langsung
- **Rasio Edge/Tetangga**: Rata-rata saluran per tetangga (>1 = ada saluran paralel)
"""
)
# Highlight nodes dengan multiple edges
def highlight_multiple_edges(df):
def color_ratio(val):
try:
ratio = float(val)
if ratio > 1.5:
return "background-color: #ffcccc; font-weight: bold" # Merah muda untuk rasio tinggi
elif ratio > 1.0:
return "background-color: #fff2cc" # Kuning untuk rasio sedang
else:
return ""
except:
return ""
return df.style.applymap(
color_ratio, subset=["Rasio Edge/Tetangga"]
)
st.dataframe(
highlight_multiple_edges(connection_df),
use_container_width=True,
height=400,
)
# Analisis tambahan
high_ratio_nodes = connection_df[
connection_df["Rasio Edge/Tetangga"].astype(float) > 1.0
]
if not high_ratio_nodes.empty:
st.markdown("### 🔍 Analisis Saluran Paralel")
st.info(
f"Ditemukan {len(high_ratio_nodes)} node dengan saluran paralel (rasio > 1.0)"
)
for _, row in high_ratio_nodes.head(5).iterrows():
st.write(
f"• **Node {row['Node']}**: {row['Total Edges']} saluran ke {row['Jumlah Tetangga']} tetangga (rasio: {row['Rasio Edge/Tetangga']})"
)
else:
st.warning("Tidak ada data koneksi untuk ditampilkan")
except Exception as e:
st.error(f"Error creating connection details: {str(e)}")
# Informasi graf
st.markdown("### 🔍 Interpretasi Graf")
st.markdown(
"""
- **Node (Simpul)**: Merepresentasikan titik-titik penting dalam jaringan (gardu, pembangkit, junction)
- **Edge (Sisi)**: Merepresentasikan saluran listrik yang menghubungkan antar titik
- **Warna Node**: Intensitas warna menunjukkan tingkat kepentingan berdasarkan Degree Centrality
- **Node dengan warna lebih gelap**: Memiliki lebih banyak koneksi (lebih kritis)
- **Saluran Paralel**: Node dengan rasio Edge/Tetangga > 1 memiliki multiple saluran ke tetangga yang sama
"""
)
with tab4:
st.markdown(
'<h2 class="sub-header">Analisis Sentralitas</h2>',
unsafe_allow_html=True,
)
# Perbandingan sentralitas
try:
centrality_fig = create_centrality_comparison(centrality_measures)
st.plotly_chart(centrality_fig, use_container_width=True)
except Exception as e:
st.error(f"Error creating centrality comparison: {str(e)}")
# Matriks Sentralitas
st.markdown("### 📊 Matriks Nilai Sentralitas")
try:
centrality_df = create_centrality_matrix(centrality_measures)
if not centrality_df.empty:
# Tampilkan statistik ringkas
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Node", len(centrality_df))
with col2:
st.metric(
"Max Degree Centrality",
f"{centrality_df['Degree Centrality'].max():.4f}",
)
with col3:
st.metric(
"Max Betweenness",
f"{centrality_df['Betweenness Centrality'].max():.4f}",
)
with col4:
st.metric(
"Max Closeness",
f"{centrality_df['Closeness Centrality'].max():.4f}",
)
# Opsi untuk menampilkan semua data atau hanya top N
display_option = st.radio(
"Pilih tampilan data:",
["Top 20 Node", "Top 50 Node", "Semua Node"],
horizontal=True,
)
if display_option == "Top 20 Node":
display_df = centrality_df.head(20)
elif display_option == "Top 50 Node":
display_df = centrality_df.head(50)
else:
display_df = centrality_df
# Tampilkan tabel dengan styling dan color coding
def highlight_values(df):
"""Apply color coding to centrality values"""
styled_df = df.style
# Color coding untuk setiap kolom centrality
centrality_cols = [
"Degree Centrality",
"Closeness Centrality",
"Betweenness Centrality",
"Eigenvector Centrality",
]
# Color mapping untuk setiap kolom dengan warna berbeda
color_maps = {
"Degree Centrality": "Reds", # Merah
"Closeness Centrality": "Blues", # Biru
"Betweenness Centrality": "Greens", # Hijau
"Eigenvector Centrality": "Purples", # Ungu
}
for col in centrality_cols:
if col in df.columns:
# Gradient color berbeda untuk setiap kolom
styled_df = styled_df.background_gradient(
subset=[col],
cmap=color_maps[col],
vmin=0,
vmax=df[col].max(),
)
# Format angka dengan 6 desimal
format_dict = {}
for col in centrality_cols:
if col in df.columns:
format_dict[col] = "{:.6f}"
styled_df = styled_df.format(format_dict)
# Styling tambahan
styled_df = styled_df.set_properties(
**{"font-weight": "bold", "text-align": "center"},
subset=["Node"],
)
# Highlight top 5 nodes dengan border tebal
top_5_indices = df.head(5).index
styled_df = styled_df.set_properties(
**{"border": "3px solid #ff6b6b", "font-weight": "bold"},
subset=pd.IndexSlice[top_5_indices, :],
)
return styled_df
# Tampilkan legend untuk color coding
st.markdown(
"""
**📋 Keterangan Visualisasi (Warna per Kolom):**
- 🟥 **Degree Centrality**: Gradasi Merah (putih → merah gelap)
- 🟦 **Closeness Centrality**: Gradasi Biru (putih → biru gelap)
- 🟩 **Betweenness Centrality**: Gradasi Hijau (putih → hijau gelap)
- 🟪 **Eigenvector Centrality**: Gradasi Ungu (putih → ungu gelap)
- 🔴 **Border Merah Tebal**: Top 5 node paling penting
*Semakin gelap warna = semakin tinggi nilai sentralitas*
"""
)
# Tampilkan tabel dengan styling
st.dataframe(
highlight_values(display_df),
use_container_width=True,
height=400,
)
# Informasi tambahan tentang interpretasi
with st.expander("ℹ️ Cara Membaca Matriks Sentralitas"):
st.markdown(
"""
**Interpretasi Nilai Sentralitas:**
1. **Degree Centrality (0-1)**:
- Mengukur jumlah koneksi langsung
- Nilai tinggi = node dengan banyak koneksi
2. **Closeness Centrality (0-1)**:
- Mengukur kedekatan ke semua node lain
- Nilai tinggi = node yang mudah dijangkau dari mana saja
3. **Betweenness Centrality (0-1)**:
- Mengukur seberapa sering node berada di jalur terpendek
- Nilai tinggi = node yang berperan sebagai jembatan penting
4. **Eigenvector Centrality (0-1)**:
- Mengukur pengaruh berdasarkan kualitas koneksi
- Nilai tinggi = node yang terhubung ke node-node penting lainnya
**Tips Analisis:**
- Node dengan nilai tinggi di semua kategori = **Super Critical**
- Node dengan Betweenness tinggi = **Bottleneck** potensial
- Node dengan Degree tinggi tapi Eigenvector rendah = **Hub** lokal
"""
)
# Tombol download CSV
csv = centrality_df.to_csv(index=False)
st.download_button(
label="📥 Download Matriks Sentralitas (CSV)",
data=csv,
file_name="centrality_matrix.csv",
mime="text/csv",
)
else:
st.warning("Tidak ada data sentralitas untuk ditampilkan")
except Exception as e:
st.error(f"Error creating centrality matrix: {str(e)}")
# Identifikasi node kritis
st.markdown("### 🎯 Identifikasi Node Kritis")
if centrality_measures.get("degree"):
# Top nodes berdasarkan degree centrality
degree_sorted = sorted(
centrality_measures["degree"].items(),
key=lambda x: x[1],
reverse=True,
)
top_nodes = degree_sorted[:5]
st.markdown("**Top 5 Node Berdasarkan Degree Centrality:**")
for i, (node, centrality) in enumerate(top_nodes, 1):
st.write(f"{i}. Node {node}: {centrality:.4f}")
# Rekomendasi berdasarkan analisis statistik
st.markdown("### 💡 Rekomendasi Berbasis Data")
if top_nodes and centrality_measures.get("degree"):
# Analisis statistik degree centrality
degree_values = list(centrality_measures["degree"].values())
mean_degree = np.mean(degree_values)
std_degree = np.std(degree_values)
q75 = np.percentile(degree_values, 75)
q90 = np.percentile(degree_values, 90)
q95 = np.percentile(degree_values, 95)
# Threshold berdasarkan statistik
critical_threshold = q90 # Top 10%
high_priority_threshold = q75 # Top 25%
# Hitung jumlah node per kategori
critical_nodes = [
node
for node, cent in centrality_measures["degree"].items()
if cent >= critical_threshold
]
high_priority_nodes = [
node
for node, cent in centrality_measures["degree"].items()
if cent >= high_priority_threshold and cent < critical_threshold
]
st.markdown("#### 📊 Analisis Statistik Degree Centrality")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Mean", f"{mean_degree:.4f}")
with col2:
st.metric("Std Dev", f"{std_degree:.4f}")
with col3:
st.metric("75th Percentile", f"{q75:.4f}")
with col4:
st.metric("90th Percentile", f"{q90:.4f}")
st.markdown(
f"""
**Node Paling Kritis:** Node {top_nodes[0][0]} (Degree Centrality: {top_nodes[0][1]:.4f})
**Rekomendasi Kebijakan Berbasis Data:**
1. **🔴 Monitoring Kritis** (≥ {critical_threshold:.4f} - Top 10%):
- **{len(critical_nodes)} node** memerlukan monitoring 24/7
- Sistem backup dan redundansi wajib
- Maintenance preventif bulanan
2. **🟡 Monitoring Prioritas** ({high_priority_threshold:.4f} - {critical_threshold:.4f} - Top 25%):
- **{len(high_priority_nodes)} node** monitoring reguler
- Maintenance preventif triwulanan
- Rencana contingency tersedia
3. **🟢 Monitoring Standar** (< {high_priority_threshold:.4f}):
- Monitoring rutin sesuai jadwal normal
- Maintenance tahunan
**Basis Ilmiah:**
- Threshold berdasarkan distribusi statistik data aktual
- Top 10% (90th percentile) untuk monitoring kritis
- Top 25% (75th percentile) untuk monitoring prioritas
- Menggunakan analisis risiko berbasis data, bukan nilai arbitrary
"""
)
with tab5:
st.markdown(
'<h2 class="sub-header">Minimum Spanning Tree Analysis</h2>',
unsafe_allow_html=True,
)
if mst.number_of_nodes() > 0:
# Perhitungan dasar
total_weight_mst = sum(
[data["weight"] for _, _, data in mst.edges(data=True)]
)
original_weight = sum(
[data["weight"] for _, _, data in G.edges(data=True)]
)
savings = original_weight - total_weight_mst
efficiency = (
(savings / original_weight * 100) if original_weight > 0 else 0
)
# Tampilkan metrics
col1, col2 = st.columns(2)
with col1:
st.metric("Total Bobot MST", f"{total_weight_mst:.2f}m")
st.metric("Jumlah Edge MST", mst.number_of_edges())
with col2:
st.metric("Efisiensi", f"{efficiency:.2f}%")
st.metric("Penghematan", f"{savings:.2f}m")
# Proses Perhitungan Detail
st.markdown("### 🧮 Proses Perhitungan Efisiensi dan Penghematan")
with st.expander("📊 Detail Perhitungan Langkah demi Langkah"):
st.markdown("#### 1️⃣ **Perhitungan Total Bobot Jaringan Asli**")
st.code(
f"""
# Formula: Σ(weight_i) untuk semua edge dalam graf asli
original_edges = {G.number_of_edges()} edge
original_weight = Σ(weight_i) = {original_weight:.2f} meter
"""
)
st.markdown("#### 2️⃣ **Perhitungan Total Bobot MST**")
st.code(
f"""
# Formula: Σ(weight_i) untuk edge dalam MST
mst_edges = {mst.number_of_edges()} edge
mst_weight = Σ(weight_i) = {total_weight_mst:.2f} meter
"""
)
st.markdown("#### 3️⃣ **Perhitungan Penghematan Absolut**")
st.code(
f"""
# Formula: Penghematan = Total_Asli - Total_MST
savings = {original_weight:.2f} - {total_weight_mst:.2f}
savings = {savings:.2f} meter
"""
)
st.markdown("#### 4️⃣ **Perhitungan Efisiensi Relatif**")
st.code(
f"""
# Formula: Efisiensi = (Penghematan / Total_Asli) × 100%
efficiency = ({savings:.2f} / {original_weight:.2f}) × 100%
efficiency = {efficiency:.2f}%
"""
)
st.markdown("#### 5️⃣ **Interpretasi Hasil**")
if efficiency > 50:
interpretation = (
"🔴 **Sangat Tinggi** - Jaringan asli sangat tidak efisien"
)
recommendation = "Pertimbangkan restrukturisasi besar-besaran"
elif efficiency > 30:
interpretation = (
"🟡 **Tinggi** - Ada potensi optimasi signifikan"
)
recommendation = "Evaluasi edge redundan untuk penghematan"
elif efficiency > 10:
interpretation = "🟢 **Sedang** - Jaringan cukup efisien"
recommendation = "Optimasi minor pada area tertentu"
else:
interpretation = "✅ **Rendah** - Jaringan sudah sangat efisien"
recommendation = "Pertahankan struktur existing"
st.markdown(
f"""
**Tingkat Efisiensi:** {interpretation}
**Rekomendasi:** {recommendation}
**Penjelasan:**
- **Efisiensi {efficiency:.2f}%** berarti MST dapat menghemat {efficiency:.2f}% dari total panjang kabel
- **Penghematan {savings:.2f}m** setara dengan {savings/1000:.3f} km kabel
- **Edge yang dihilangkan:** {G.number_of_edges() - mst.number_of_edges()} edge (redundan)
"""
)
# Analisis Biaya (opsional)
st.markdown("### 💰 Analisis Biaya (Estimasi)")
col1, col2 = st.columns(2)
with col1:
cost_per_meter = st.number_input(
"Biaya per meter (Rp)",
min_value=0,
value=500000,
step=50000,
help="Estimasi biaya instalasi kabel per meter",
)
with col2:
if cost_per_meter > 0:
total_cost_original = original_weight * cost_per_meter
total_cost_mst = total_weight_mst * cost_per_meter
cost_savings = total_cost_original - total_cost_mst
st.metric(
"Biaya Jaringan Asli", f"Rp {total_cost_original:,.0f}"
)
st.metric("Biaya MST", f"Rp {total_cost_mst:,.0f}")
st.metric("Penghematan Biaya", f"Rp {cost_savings:,.0f}")
st.success(
f"💡 **Insight**: Dengan MST, dapat menghemat **Rp {cost_savings:,.0f}** ({efficiency:.1f}%) dari biaya konstruksi!"
)
# Visualisasi MST
try:
mst_centrality = calculate_centrality_measures(mst)
mst_fig = create_network_visualization(
mst, st.session_state["nodes"], mst_centrality, show_labels
)
mst_fig.update_layout(
title=dict(
text="Minimum Spanning Tree - Jaringan Optimal",
font=dict(size=16),
)
)
st.plotly_chart(mst_fig, use_container_width=True)
except Exception as e:
st.error(f"Error creating MST visualization: {str(e)}")
st.markdown("### 🔧 Interpretasi MST dan Analisis Redundansi")
# Analisis Edge Redundan
st.markdown("#### 🔍 Analisis Edge Redundan")
# Identifikasi edge redundan
mst_edges = set(mst.edges())
original_edges = set(G.edges())
redundant_edges = []
for edge in original_edges:
# Cek kedua arah karena edge tidak berarah
if edge not in mst_edges and (edge[1], edge[0]) not in mst_edges:
edge_data = G.get_edge_data(edge[0], edge[1])
if edge_data:
redundant_edges.append(
(edge[0], edge[1], edge_data["weight"])
)
# Hitung total bobot edge redundan
total_redundant_weight = sum(
[weight for _, _, weight in redundant_edges]
)
redundant_percentage = (
(total_redundant_weight / original_weight * 100)
if original_weight > 0
else 0
)
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Edge Redundan", f"{len(redundant_edges)}")
with col2:
st.metric("Bobot Redundan", f"{total_redundant_weight:.2f}m")
with col3:
st.metric("% Redundansi", f"{redundant_percentage:.2f}%")
# Detail perhitungan redundansi
with st.expander("🧮 Perhitungan Analisis Redundansi"):
st.markdown("#### 1️⃣ **Identifikasi Edge Redundan**")
st.code(
f"""
# Edge dalam jaringan asli: {len(original_edges)}
# Edge dalam MST: {len(mst_edges)}
# Edge redundan = Edge_asli - Edge_MST
redundant_edges = {len(redundant_edges)}
"""
)
st.markdown("#### 2️⃣ **Perhitungan Bobot Redundan**")
st.code(
f"""
# Formula: Σ(weight_i) untuk edge yang tidak ada dalam MST
total_redundant_weight = Σ(weight_redundant_i)
total_redundant_weight = {total_redundant_weight:.2f} meter
"""
)
st.markdown("#### 3️⃣ **Persentase Redundansi**")
st.code(
f"""
# Formula: (Bobot_Redundan / Bobot_Total_Asli) × 100%
redundancy_percentage = ({total_redundant_weight:.2f} / {original_weight:.2f}) × 100%
redundancy_percentage = {redundant_percentage:.2f}%
"""
)
st.markdown("#### 4️⃣ **Verifikasi Konsistensi**")
st.code(
f"""
# Verifikasi: MST_weight + Redundant_weight = Original_weight
{total_weight_mst:.2f} + {total_redundant_weight:.2f} = {total_weight_mst + total_redundant_weight:.2f}
Original weight: {original_weight:.2f}
Difference: {abs(original_weight - (total_weight_mst + total_redundant_weight)):.2f}m
"""
)
# Tampilkan beberapa edge redundan terbesar
if redundant_edges:
st.markdown("#### 5️⃣ **Top 10 Edge Redundan Terpanjang**")
redundant_sorted = sorted(
redundant_edges, key=lambda x: x[2], reverse=True
)[:10]
redundant_df = pd.DataFrame(
redundant_sorted,
columns=["Node A", "Node B", "Panjang (m)"],
)
redundant_df["Panjang (km)"] = (
redundant_df["Panjang (m)"] / 1000
)
st.dataframe(redundant_df, use_container_width=True)
# Interpretasi berdasarkan tingkat redundansi
st.markdown("#### 📊 Interpretasi Tingkat Redundansi")
if redundant_percentage > 40:
redundancy_level = "🔴 **Sangat Tinggi**"
redundancy_meaning = "Jaringan memiliki banyak jalur alternatif"
redundancy_action = "Pertimbangkan untuk mengurangi edge redundan pada fase konstruksi baru"
elif redundant_percentage > 25:
redundancy_level = "🟡 **Tinggi**"
redundancy_meaning = (
"Jaringan memiliki redundansi yang baik untuk keandalan"
)
redundancy_action = (
"Evaluasi cost-benefit antara redundansi dan efisiensi"
)
elif redundant_percentage > 10:
redundancy_level = "🟢 **Sedang**"
redundancy_meaning = "Tingkat redundansi optimal untuk keseimbangan efisiensi-keandalan"
redundancy_action = "Pertahankan tingkat redundansi saat ini"
else:
redundancy_level = "⚠️ **Rendah**"
redundancy_meaning = (
"Jaringan mendekati struktur minimal (seperti MST)"
)
redundancy_action = (
"Pertimbangkan menambah redundansi untuk meningkatkan keandalan"
)
st.markdown(
f"""
**Tingkat Redundansi:** {redundancy_level} ({redundant_percentage:.1f}%)
**Makna:** {redundancy_meaning}
**Rekomendasi:** {redundancy_action}
**Analisis Teknis:**
- **Edge redundan:** {len(redundant_edges)} dari {len(original_edges)} total edge
- **Bobot redundan:** {total_redundant_weight:.2f}m ({total_redundant_weight/1000:.3f} km)
- **Fungsi redundansi:** Menyediakan jalur alternatif jika terjadi gangguan
- **Trade-off:** Redundansi ↑ = Keandalan ↑, Efisiensi ↓
"""
)
# Kesimpulan MST
st.markdown("#### 🎯 Kesimpulan MST Analysis")
st.markdown(
f"""
**Ringkasan Analisis:**
1. **Efisiensi Jaringan:** {efficiency:.1f}% - MST dapat menghemat {efficiency:.1f}% dari total panjang kabel
2. **Redundansi Jaringan:** {redundant_percentage:.1f}% - {redundant_percentage:.1f}% dari jaringan bersifat redundan
3. **Optimasi Potensial:** {len(redundant_edges)} edge dapat dievaluasi untuk penghematan
4. **Keseimbangan:** Pertimbangkan trade-off antara efisiensi (MST) dan keandalan (redundansi)
**Aplikasi Praktis:**
- **Perencanaan Baru:** Gunakan MST sebagai baseline minimum
- **Optimasi Existing:** Evaluasi edge redundan untuk cost reduction
- **Maintenance:** Prioritaskan edge MST untuk pemeliharaan kritis
- **Expansion:** Tambahkan edge di luar MST untuk meningkatkan redundansi
"""
)
else:
st.warning(
"Tidak dapat membuat MST - graf mungkin tidak terhubung atau kosong"
)
else:
st.info("👆 Pilih sumber data dan klik tombol untuk memulai analisis")
# Tampilkan informasi tentang format data yang didukung
st.markdown("## 📁 Format Data yang Didukung")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown(
"""
**📁 Upload File ZIP**
- Format: ZIP berisi shapefile
- Komponen: .shp, .shx, .dbf, .prj
- Ukuran maksimal: 200MB
- Paling mudah dan cepat
"""
)
with col2:
st.markdown(
"""
**💾 File Lokal**
- Format: ZIP di sistem lokal
- Path: Absolut atau relatif
- Cocok untuk development
- Akses file sistem langsung
"""
)
with col3:
st.markdown(
"""
**🌐 Download URL**
- Format: WFS GeoServer
- Output: shape-zip
- Real-time data
- Memerlukan koneksi internet
"""
)
# Tampilkan informasi metodologi
st.markdown("## 📚 Metodologi")
st.markdown(
"""
### Pendekatan Teori Graf untuk Analisis Jaringan Listrik:
1. **Pemodelan Graf**:
- Node = Gardu listrik, pembangkit, junction
- Edge = Saluran transmisi/distribusi listrik
- Weight = Panjang saluran atau impedansi
2. **Analisis Sentralitas**:
- **Degree Centrality**: Jumlah koneksi langsung
- **Betweenness Centrality**: Peran sebagai jembatan antar node
- **Closeness Centrality**: Kedekatan rata-rata ke semua node lain
- **Eigenvector Centrality**: Pengaruh berdasarkan kualitas koneksi
3. **Minimum Spanning Tree (MST)**:
- Menggunakan algoritma Kruskal
- Mencari jaringan dengan bobot minimum yang tetap terhubung
- Optimasi biaya konstruksi/operasional
4. **Analisis Konektivitas**:
- Identifikasi komponen terhubung
- Perhitungan diameter dan average path length
- Analisis keandalan jaringan
"""
)
# Troubleshooting section
st.markdown("## 🔧 Troubleshooting")
st.markdown(
"""
**Jika mengalami masalah:**
1. **File tidak terbaca**: Pastikan file ZIP berisi shapefile lengkap (.shp, .shx, .dbf, .prj)
2. **Error permission**: Aplikasi otomatis menangani permission di server
3. **Timeout download**: Coba gunakan upload file jika download dari URL gagal
4. **Visualisasi lambat**: Gunakan pengaturan performa untuk membatasi jumlah node
5. **Memory error**: Coba dengan dataset yang lebih kecil
"""
)
if __name__ == "__main__":
try:
main()
except Exception as e:
st.error(f"💥 Application Error: {str(e)}")
st.info(
"Silakan refresh halaman atau hubungi administrator jika masalah berlanjut."
)