MODA / src /streamlit_app.py
rvouyaa's picture
Update src/streamlit_app.py
d3c3011 verified
import os
from pathlib import Path
import streamlit as st
import pandas as pd
import networkx as nx
import folium
from streamlit_folium import st_folium
from geopy.distance import geodesic
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")
# ── SUPPRESS PERMISSION & TELEMETRY ERRORS ──────────────────────────────────
os.environ["HOME"] = "/tmp"
os.environ["XDG_CONFIG_HOME"] = "/tmp"
os.environ["XDG_CACHE_HOME"] = "/tmp"
os.environ["STREAMLIT_HOME"] = "/tmp"
os.environ["STREAMLIT_GATHER_USAGE_STATS"] = "false"
# ── STREAMLIT PAGE CONFIG ──────────────────────────────────────────────────
st.set_page_config(
page_title="MODA Multimodal Routing",
layout="wide",
initial_sidebar_state="expanded"
)
# ── PATH SETUP ──────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = BASE_DIR.parent / "data"
# ── DATA LOADING ────────────────────────────────────────────────────────────
@st.cache_data
def load_csv_from_local(filename):
"""Load a single CSV from data directory with caching"""
full_path = DATA_DIR / filename
if not full_path.exists():
st.error(f"File tidak ditemukan: {full_path}")
return pd.DataFrame()
df = pd.read_csv(full_path)
df.columns = [col.lower() for col in df.columns]
return df
@st.cache_data
def load_all_data():
"""Load all required datasets at once"""
files = {
'buildings': 'poi_places.csv',
'road_nodes': 'road_nodes.csv',
'road_edges': 'road_edges.csv'
}
data = {key: load_csv_from_local(fname) for key, fname in files.items()}
return data
# ── BUILD GRAPH ─────────────────────────────────────────────────────────────
@st.cache_data
def build_graph(gdf_nodes, gdf_edges, gdf_poi):
G = nx.Graph()
# add road nodes
for _, r in gdf_nodes.iterrows():
G.add_node(r.osmid, x=r.x, y=r.y)
# add road edges
for _, r in gdf_edges.iterrows():
if pd.notna(r.length):
G.add_edge(r.u, r.v, length=r.length)
# add POI nodes
for idx, r in gdf_poi.iterrows():
nid = f"poi_{idx}"
G.add_node(nid, x=r.longitude, y=r.latitude, name=r.name)
# connect POI to nearest road
roads = [(n, d['y'], d['x']) for n, d in G.nodes(data=True) if isinstance(n, (int, float))]
for idx, r in gdf_poi.iterrows():
nid = f"poi_{idx}"
lat, lon = r.latitude, r.longitude
nearest = min(roads, key=lambda x: geodesic((lat, lon), (x[1], x[2])).meters)[0]
dist = geodesic((lat, lon), (G.nodes[nearest]['y'], G.nodes[nearest]['x'])).meters
G.add_edge(nid, nearest, length=dist)
G.add_edge(nearest, nid, length=dist)
return G
# ── ROUTING HELPERS ──────────────────────────────────────────────────────────
def get_poi_id(name, gdf_poi):
match = gdf_poi[gdf_poi['name'].str.lower().str.strip() == name.lower().strip()]
return f"poi_{match.index[0]}" if not match.empty else None
def compute_route(G, start, end, speed_kmh):
path = nx.shortest_path(G, start, end, weight='length')
dist_m = nx.shortest_path_length(G, start, end, weight='length')
km = dist_m / 1000
time_min = km / speed_kmh * 60
return path, km, time_min
# ── MAIN APP ─────────────────────────────────────────────────────────────────
# Load datasets
data = load_all_data()
gdf_poi = data['buildings']
gdf_nodes = data['road_nodes']
gdf_edges = data['road_edges']
# Sidebar for user input
st.sidebar.header("πŸ“ Input Rute")
if 'name' not in gdf_poi.columns:
st.sidebar.error("Kolom 'name' tidak ditemukan di poi_places.csv")
st.stop()
poi_list = gdf_poi['name'].dropna().tolist()
start = st.sidebar.selectbox("Dari:", poi_list)
end = st.sidebar.selectbox("Ke:", poi_list, index=1)
time_input = st.sidebar.time_input("Waktu Berangkat", value=datetime.now().time())
# Display selection
st.write(f"**Start:** {start} β€’ **End:** {end} β€’ **Depart:** {time_input.strftime('%H:%M')}")
# Trigger routing
if st.sidebar.button("πŸ” Cari Rute"):
G = build_graph(gdf_nodes, gdf_edges, gdf_poi)
sid = get_poi_id(start, gdf_poi)
eid = get_poi_id(end, gdf_poi)
if not sid or not eid:
st.error("Lokasi tidak valid.")
else:
# Drive Mode
path_d, km_d, t_d = compute_route(G, sid, eid, speed_kmh=30)
cost_d = int(km_d * 3000)
st.subheader("πŸš— Drive Mode")
st.write(f"Jarak: **{km_d:.2f} km**, Waktu: **{t_d:.1f} menit**, Biaya: **{cost_d:,} IDR**")
m1 = folium.Map(location=[gdf_poi['latitude'].mean(), gdf_poi['longitude'].mean()], zoom_start=13)
coords1 = [(G.nodes[n]['y'], G.nodes[n]['x']) for n in path_d]
folium.PolyLine(coords1, color='blue', weight=4).add_to(m1)
st_folium(m1, width=700, height=400)
# Bus Mode
path_b, km_b, t_b = compute_route(G, sid, eid, speed_kmh=25)
arrival = (datetime.combine(datetime.today(), time_input) + timedelta(minutes=t_b)).time()
st.subheader("🚌 Bus Mode")
st.write(f"Jarak: **{km_b:.2f} km**, Waktu: **{t_b:.1f} menit**, Tiba: **{arrival.strftime('%H:%M')}**, Biaya: **3,500 IDR**")
m2 = folium.Map(location=[gdf_poi['latitude'].mean(), gdf_poi['longitude'].mean()], zoom_start=13)
coords2 = [(G.nodes[n]['y'], G.nodes[n]['x']) for n in path_b]
folium.PolyLine(coords2, color='green', weight=4).add_to(m2)
st_folium(m2, width=700, height=400)
else:
st.info("Pilih rute dan klik Cari Rute di sidebar.")