import streamlit as st import pandas as pd import numpy as np import folium from streamlit_folium import st_folium import requests import json import os st.set_page_config(page_title="심야 대중교통 최적화 대시보드", layout="wide") @st.cache_data def fetch_osm_data_v4(): bbox = "37.491,127.020,37.505,127.035" endpoints = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", "https://overpass.osm.ch/api/interpreter" ] query = f""" [out:json][timeout:60]; ( relation["route"="bus"]["ref"~"N13|N15|N37|N75"]({bbox}); node["highway"="bus_stop"]({bbox}); node["amenity"~"pub|bar|nightclub|restaurant"]({bbox}); node["highway"="street_lamp"]({bbox}); node["man_made"="surveillance"]({bbox}); ); out body geom; """ headers = {'User-Agent': 'SmartTransitMVP/2.0'} data = None cache_file = "osm_backup.json" for url in endpoints: try: response = requests.post(url, data=query, headers=headers, timeout=65) response.raise_for_status() data = response.json() with open(cache_file, "w", encoding="utf-8") as f: json.dump(data, f) break except Exception as e: print(f"Failed to fetch from {url}: {e}") continue if data is None: if os.path.exists(cache_file): st.warning("⚠️ 실시간 서버(Overpass API) 접속 지연으로 인해 기존 백업 데이터를 로드했습니다.") with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) else: st.error("데이터 서버 접속이 원활하지 않습니다. 잠시 후 다시 시도해주세요.") data = {'elements': []} bus_stops, amenities, safety_infra, real_bus_routes = [], [], [], [] for element in data.get('elements', []): if element['type'] == 'node': lat = float(element['lat']) lon = float(element['lon']) tags = element.get('tags', {}) if 'highway' in tags and tags['highway'] == 'bus_stop': bus_stops.append({'lat': lat, 'lon': lon, 'stop_id': str(element['id']), 'name': str(tags.get('name', '정류장'))}) elif 'amenity' in tags: amenities.append({'lat': lat, 'lon': lon}) elif 'highway' in tags or 'man_made' in tags: safety_infra.append({'lat': lat, 'lon': lon}) elif element['type'] == 'relation': tags = element.get('tags', {}) ref = str(tags.get('ref', 'N버스')) name = str(tags.get('name', ref)) coords = [] for member in element.get('members', []): if member['type'] == 'way' and 'geometry' in member: for pt in member['geometry']: coords.append([float(pt['lat']), float(pt['lon'])]) if coords: real_bus_routes.append({'name': name, 'ref': ref, 'coords': coords}) return pd.DataFrame(bus_stops), pd.DataFrame(amenities), pd.DataFrame(safety_infra), real_bus_routes stops_df, amenities_df, safety_df, real_bus_routes = fetch_osm_data_v4() @st.cache_data def create_grid_features(_stops, _amenities, _safety): lats = np.linspace(37.492, 37.504, 20) lons = np.linspace(127.021, 127.034, 20) grid_data = [] stop_coords = _stops[['lat', 'lon']].values if not _stops.empty else np.array([]) amenity_coords = _amenities[['lat', 'lon']].values if not _amenities.empty else np.array([]) safety_coords = _safety[['lat', 'lon']].values if not _safety.empty else np.array([]) for lat in lats: for lon in lons: point = np.array([lat, lon]) demand = int(np.sum(np.sqrt(np.sum((amenity_coords - point)**2, axis=1)) < 0.002)) if len(amenity_coords) > 0 else 0 deficit = float(np.min(np.sqrt(np.sum((stop_coords - point)**2, axis=1)))) if len(stop_coords) > 0 else 0.0 safety_count = int(np.sum(np.sqrt(np.sum((safety_coords - point)**2, axis=1)) < 0.002)) if len(safety_coords) > 0 else 0 grid_data.append({'lat': float(lat), 'lon': float(lon), 'raw_demand': demand, 'raw_deficit': deficit, 'raw_safety_count': safety_count}) df = pd.DataFrame(grid_data) df['base_demand'] = df['raw_demand'] / df['raw_demand'].max() if df['raw_demand'].max() > 0 else 0.0 df['base_deficit'] = df['raw_deficit'] / df['raw_deficit'].max() if df['raw_deficit'].max() > 0 else 0.0 df['base_risk'] = 1.0 - (df['raw_safety_count'] / df['raw_safety_count'].max()) if df['raw_safety_count'].max() > 0 else 1.0 return df grids_df = create_grid_features(stops_df, amenities_df, safety_df) if not stops_df.empty and len(real_bus_routes) > 0: nbus_coords = np.array([pt for route in real_bus_routes for pt in route['coords']]) stops_df['min_dist_to_nbus'] = stops_df.apply(lambda r: float(np.min(np.sqrt((nbus_coords[:,0]-r['lat'])**2 + (nbus_coords[:,1]-r['lon'])**2))), axis=1) stops_df['is_nbus_stop'] = stops_df['min_dist_to_nbus'] < 0.001 else: stops_df['is_nbus_stop'] = False nbus_stops = stops_df[stops_df['is_nbus_stop']] blind_stops = stops_df[~stops_df['is_nbus_stop']] st.sidebar.header("⚙️ 시뮬레이션 설정") selected_time = st.sidebar.select_slider("시간대 선택", options=[f"{str(h).zfill(2)}:{str(m).zfill(2)}" for h in [22, 23, 0, 1, 2, 3, 4] for m in [0, 30]] + ["05:00"], value="23:30") time_idx = [f"{str(h).zfill(2)}:{str(m).zfill(2)}" for h in [22, 23, 0, 1, 2, 3, 4] for m in [0, 30]].index(selected_time) if selected_time != "05:00" else 14 st.sidebar.caption("수동으로 가중치를 조절하거나 시간대를 변경하여 노선을 시뮬레이션하세요.") alpha = st.sidebar.slider("호출 수요(상업/유흥) 가중치 (α)", 0.0, 1.0, 0.8) beta = st.sidebar.slider("정류장 결핍도 가중치 (β)", 0.0, 1.0, 0.2) gamma = st.sidebar.slider("안전 취약도(어두운 골목) 가중치 (γ)", 0.0, 1.0, 0.4) drt_budget = st.sidebar.number_input("투입 가능 DRT 차량 수", min_value=1, max_value=20, value=5) current_grids = grids_df.copy() # 시간대별 기반 수요/위험도 조절 (시간이 늦어질수록 유흥수요 감소, 주거지/안전 위험도 부각) if time_idx <= 3: time_factor = 1.2 elif time_idx <= 7: time_factor = 0.8 else: time_factor = 0.3 current_grids['demand'] = current_grids['base_demand'] * time_factor current_grids['deficit'] = current_grids['base_deficit'] current_grids['risk'] = current_grids['base_risk'] * (2.0 - time_factor) current_grids['risk_score'] = alpha * current_grids['demand'] + beta * current_grids['deficit'] + gamma * current_grids['risk'] threshold = float(current_grids['risk_score'].quantile(0.85)) drt_targets = current_grids.nlargest(50, 'risk_score') drt_assignments = [] for idx, grid_row in drt_targets.iterrows(): if not blind_stops.empty: distances = np.sqrt((blind_stops['lat'] - grid_row['lat'])**2 + (blind_stops['lon'] - grid_row['lon'])**2) drt_assignments.append(blind_stops.loc[distances.idxmin()]) unique_blind_stops = pd.DataFrame(drt_assignments).drop_duplicates('stop_id') loop_coords = [] transfer_coords = [] closest_hubs = pd.DataFrame() def ccw(p1, p2, p3): return (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p2[1] - p1[1]) * (p3[0] - p1[0]) def get_convex_hull(points): if len(points) <= 3: return points points = sorted(points, key=lambda p: (float(p[0]), float(p[1]))) lower = [] for p in points: while len(lower) >= 2 and ccw(lower[-2], lower[-1], p) <= 0: lower.pop() lower.append(p) upper = [] for p in reversed(points): while len(upper) >= 2 and ccw(upper[-2], upper[-1], p) <= 0: upper.pop() upper.append(p) return lower[:-1] + upper[:-1] if not unique_blind_stops.empty and not nbus_stops.empty: centroid_lat = float(unique_blind_stops['lat'].mean()) centroid_lon = float(unique_blind_stops['lon'].mean()) dist_to_hub = np.sqrt((nbus_stops['lat'] - centroid_lat)**2 + (nbus_stops['lon'] - centroid_lon)**2) closest_hubs = nbus_stops.loc[dist_to_hub.nsmallest(3).index] transfer_coords = sorted(closest_hubs[['lat', 'lon']].values.tolist(), key=lambda x: x[1]) loop_stops = pd.concat([unique_blind_stops, closest_hubs]).drop_duplicates('stop_id') coords = loop_stops[['lat', 'lon']].values.tolist() hull_coords = get_convex_hull(coords) hull_lats = [float(pt[0]) for pt in hull_coords] hull_lons = [float(pt[1]) for pt in hull_coords] unique_blind_stops = unique_blind_stops[unique_blind_stops.apply(lambda r: any(abs(r['lat'] - hl) < 1e-6 and abs(r['lon'] - hlon) < 1e-6 for hl, hlon in zip(hull_lats, hull_lons)), axis=1)] hull_coords.append(hull_coords[0]) loop_coords = [[float(pt[0]), float(pt[1])] for pt in hull_coords] st.title("🚌 심야 대중교통 N버스-DRT 통합 대시보드") col1, col2 = st.columns([2, 1]) with col1: m = folium.Map(location=[37.498, 127.027], zoom_start=15, tiles="CartoDB dark_matter") for idx, row in current_grids.iterrows(): rs = float(row['risk_score']) if rs > 0.3: folium.CircleMarker( location=[float(row['lat']), float(row['lon'])], radius=4, color="red" if rs > threshold else "orange", weight=0, fill=True, fill_color="red" if rs > threshold else "orange", fill_opacity=float(rs), popup=str(f"Risk: {rs:.2f}") ).add_to(m) for idx, row in blind_stops.iterrows(): folium.CircleMarker( location=[float(row['lat']), float(row['lon'])], radius=2, color="gray", weight=0, fill=True, fill_color="gray", tooltip="일반 정류장" ).add_to(m) for idx, row in nbus_stops.iterrows(): folium.CircleMarker( location=[float(row['lat']), float(row['lon'])], radius=3, color="cyan", weight=1, fill=True, fill_color="cyan", tooltip=str(f"환승: {row['name']}") ).add_to(m) colors = ['cyan', 'lime', 'yellow'] for i, route in enumerate(real_bus_routes): if route['coords']: folium.PolyLine( locations=[[float(pt[0]), float(pt[1])] for pt in route['coords']], dash_array="15, 20", color=str(colors[i % len(colors)]), weight=4, opacity=0.6, tooltip=str(f"N버스: {route['name']}") ).add_to(m) if loop_coords: folium.PolyLine( locations=loop_coords, dash_array="10, 15", color='purple', weight=5, opacity=0.9, tooltip="DRT 루프" ).add_to(m) if len(transfer_coords) >= 2: folium.PolyLine( locations=[[float(pt[0]), float(pt[1])] for pt in transfer_coords], dash_array="1, 10", color='orange', weight=8, opacity=1.0, tooltip="환승 구역" ).add_to(m) for idx, row in unique_blind_stops.iterrows(): folium.Marker( location=[float(row['lat']), float(row['lon'])], popup=str(f"승하차: {row['name']}"), icon=folium.Icon(color="purple", icon="bus", prefix="fa") ).add_to(m) if not closest_hubs.empty: for idx, row in closest_hubs.iterrows(): folium.CircleMarker( location=[float(row['lat']), float(row['lon'])], radius=7, color="gold", weight=2, fill=True, fill_color="orange", fill_opacity=0.8, tooltip=str(f"환승: {row['name']}") ).add_to(m) # st_folium 호출 시 직렬화 에러를 방지하기 위해 returned_objects=[] 옵션 및 명시적 key 추가 st_folium(m, width=800, height=500, returned_objects=[], key="main_map") with col2: st.subheader("💡 다이내믹 루프 & 환승 시나리오") st.info(f"현재 선택된 시간: **{selected_time}**\n\n시간대에 따른 가중치 변화로 타겟 지역이 상업지구와 주거지구 사이를 자연스럽게 넘나듭니다.") if not closest_hubs.empty: hub_names = ", ".join(str(name) for name in closest_hubs['name'].tolist()) st.warning(f"**🔥 환승 공유 구역 (Transfer Zone)**\n\nN버스 노선의 **'{hub_names}'** 정류장들을 DRT 루프가 그대로 따라 주행하며 겹칩니다. (지도 상 **오렌지색 선**)\n\n승객은 이 구역 내 아무 곳에서나 내려 N버스로 편하게 갈아탈 수 있습니다.") st.divider() st.subheader("📈 배차간격(대기 시간) 개선 효과") col_b, col_a, col_diff = st.columns(3) base_n_interval = 40 drt_interval = max(5, 30 - drt_budget * 4) col_b.metric("사각지대 기존 배차간격", "운행 없음 (∞)", delta=None) col_a.metric("DRT 도입 후 사각지대", f"약 {drt_interval} 분", delta="-∞ 분 (신규 서비스 창출)", delta_color="normal") col_diff.metric("기존 N버스 배차간격 (간선)", f"약 {base_n_interval} 분", delta="유지", delta_color="off") st.markdown("> **결과**: 지도 상 빛나는 오렌지색 선구간(환승 구역)에서 대기시간 없이 부드럽게 N버스로 갈아탈 수 있으며, 시간대에 맞춰 최적의 경로로 굽이치는 똑똑한 루프를 형성합니다.")