Spaces:
Running
Running
| import pandas as pd | |
| import numpy as np | |
| import folium | |
| from folium.plugins import AntPath | |
| import os | |
| import requests | |
| def export_static_map(): | |
| print("Fetching real OSM data...") | |
| bbox = "37.491,127.020,37.505,127.035" | |
| endpoints = [ | |
| "https://overpass-api.de/api/interpreter", | |
| "https://overpass.kumi.systems/api/interpreter", | |
| "https://overpass.osm.ch/api/interpreter" | |
| ] | |
| query = f""" | |
| [out:json][timeout:60]; | |
| ( | |
| relation["route"="bus"]["ref"~"N13|N15|N37|N75"]({bbox}); | |
| node["highway"="bus_stop"]({bbox}); | |
| node["amenity"~"pub|bar|nightclub|restaurant"]({bbox}); | |
| node["highway"="street_lamp"]({bbox}); | |
| node["man_made"="surveillance"]({bbox}); | |
| ); | |
| out body geom; | |
| """ | |
| headers = {'User-Agent': 'SmartTransitMVP/2.0'} | |
| data = None | |
| for url in endpoints: | |
| try: | |
| response = requests.post(url, data=query, headers=headers, timeout=65) | |
| response.raise_for_status() | |
| data = response.json() | |
| break | |
| except Exception as e: | |
| print(f"Failed to fetch from {url}: {e}") | |
| continue | |
| if data is None: | |
| print("Error: Could not fetch data from any Overpass API endpoint.") | |
| data = {'elements': []} | |
| bus_stops, amenities, safety_infra, real_bus_routes = [], [], [], [] | |
| for element in data.get('elements', []): | |
| if element['type'] == 'node': | |
| lat, lon = element['lat'], element['lon'] | |
| tags = element.get('tags', {}) | |
| if 'highway' in tags and tags['highway'] == 'bus_stop': | |
| bus_stops.append({'lat': lat, 'lon': lon, 'stop_id': element['id'], 'name': tags.get('name', '정류장')}) | |
| elif 'amenity' in tags: | |
| amenities.append({'lat': lat, 'lon': lon}) | |
| elif 'highway' in tags or 'man_made' in tags: | |
| safety_infra.append({'lat': lat, 'lon': lon}) | |
| elif element['type'] == 'relation': | |
| tags = element.get('tags', {}) | |
| name = tags.get('name', tags.get('ref', 'N버스')) | |
| coords = [] | |
| for member in element.get('members', []): | |
| if member['type'] == 'way' and 'geometry' in member: | |
| for pt in member['geometry']: coords.append([pt['lat'], pt['lon']]) | |
| if coords: real_bus_routes.append({'name': name, 'coords': coords}) | |
| stops_df, amenities_df, safety_df = pd.DataFrame(bus_stops), pd.DataFrame(amenities), pd.DataFrame(safety_infra) | |
| lats = np.linspace(37.492, 37.504, 20) | |
| lons = np.linspace(127.021, 127.034, 20) | |
| grid_data = [] | |
| stop_coords = stops_df[['lat', 'lon']].values if not stops_df.empty else np.array([]) | |
| amenity_coords = amenities_df[['lat', 'lon']].values if not amenities_df.empty else np.array([]) | |
| safety_coords = safety_df[['lat', 'lon']].values if not safety_df.empty else np.array([]) | |
| for lat in lats: | |
| for lon in lons: | |
| point = np.array([lat, lon]) | |
| demand = np.sum(np.sqrt(np.sum((amenity_coords - point)**2, axis=1)) < 0.002) if len(amenity_coords) > 0 else 0 | |
| deficit = np.min(np.sqrt(np.sum((stop_coords - point)**2, axis=1))) if len(stop_coords) > 0 else 0 | |
| safety_count = np.sum(np.sqrt(np.sum((safety_coords - point)**2, axis=1)) < 0.002) if len(safety_coords) > 0 else 0 | |
| grid_data.append({'lat': lat, 'lon': lon, 'raw_demand': demand, 'raw_deficit': deficit, 'raw_safety_count': safety_count}) | |
| df = pd.DataFrame(grid_data) | |
| df['base_demand'] = df['raw_demand'] / df['raw_demand'].max() if df['raw_demand'].max() > 0 else 0 | |
| df['base_deficit'] = df['raw_deficit'] / df['raw_deficit'].max() if df['raw_deficit'].max() > 0 else 0 | |
| df['base_risk'] = 1 - (df['raw_safety_count'] / df['raw_safety_count'].max()) if df['raw_safety_count'].max() > 0 else 1.0 | |
| # 23:30 Time settings for the static map | |
| alpha, beta, gamma = 0.8, 0.2, 0.0 | |
| df['risk_score'] = alpha * df['base_demand'] + beta * df['base_deficit'] + gamma * df['base_risk'] | |
| threshold = df['risk_score'].quantile(0.85) | |
| # DRT Loop Logic | |
| nbus_coords = np.array([pt for route in real_bus_routes for pt in route['coords']]) | |
| if not stops_df.empty and len(nbus_coords) > 0: | |
| stops_df['is_nbus_stop'] = stops_df.apply(lambda r: np.min(np.sqrt((nbus_coords[:,0]-r['lat'])**2 + (nbus_coords[:,1]-r['lon'])**2)) < 0.001, axis=1) | |
| else: stops_df['is_nbus_stop'] = False | |
| nbus_stops, blind_stops = stops_df[stops_df['is_nbus_stop']], stops_df[~stops_df['is_nbus_stop']] | |
| drt_targets = df.nlargest(50, 'risk_score') | |
| drt_assignments = [] | |
| for idx, grid_row in drt_targets.iterrows(): | |
| if not blind_stops.empty: | |
| distances = np.sqrt((blind_stops['lat'] - grid_row['lat'])**2 + (blind_stops['lon'] - grid_row['lon'])**2) | |
| drt_assignments.append(blind_stops.loc[distances.idxmin()]) | |
| unique_blind_stops = pd.DataFrame(drt_assignments).drop_duplicates('stop_id') | |
| loop_coords, transfer_coords = [], [] | |
| closest_hubs = pd.DataFrame() | |
| def ccw(p1, p2, p3): | |
| return (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p2[1] - p1[1]) * (p3[0] - p1[0]) | |
| def get_convex_hull(points): | |
| if len(points) <= 3: return points | |
| points = sorted(points, key=lambda p: (p[0], p[1])) | |
| lower = [] | |
| for p in points: | |
| while len(lower) >= 2 and ccw(lower[-2], lower[-1], p) <= 0: lower.pop() | |
| lower.append(p) | |
| upper = [] | |
| for p in reversed(points): | |
| while len(upper) >= 2 and ccw(upper[-2], upper[-1], p) <= 0: upper.pop() | |
| upper.append(p) | |
| return lower[:-1] + upper[:-1] | |
| if not unique_blind_stops.empty and not nbus_stops.empty: | |
| c_lat, c_lon = unique_blind_stops['lat'].mean(), unique_blind_stops['lon'].mean() | |
| dist_to_hub = np.sqrt((nbus_stops['lat'] - c_lat)**2 + (nbus_stops['lon'] - c_lon)**2) | |
| closest_hubs = nbus_stops.loc[dist_to_hub.nsmallest(3).index] | |
| transfer_coords = sorted(closest_hubs[['lat', 'lon']].values.tolist(), key=lambda x: x[1]) | |
| loop_stops = pd.concat([unique_blind_stops, closest_hubs]).drop_duplicates('stop_id') | |
| coords = loop_stops[['lat', 'lon']].values.tolist() | |
| hull_coords = get_convex_hull(coords) | |
| hull_lats = [pt[0] for pt in hull_coords] | |
| hull_lons = [pt[1] for pt in hull_coords] | |
| unique_blind_stops = unique_blind_stops[unique_blind_stops.apply(lambda r: any(abs(r['lat'] - hl) < 1e-6 and abs(r['lon'] - hlon) < 1e-6 for hl, hlon in zip(hull_lats, hull_lons)), axis=1)] | |
| hull_coords.append(hull_coords[0]) | |
| loop_coords = hull_coords | |
| print("Generating HTML map...") | |
| m = folium.Map(location=[37.498, 127.027], zoom_start=15, tiles="CartoDB dark_matter") | |
| for idx, row in df.iterrows(): | |
| if row['risk_score'] > 0.3: | |
| folium.CircleMarker(location=[row['lat'], row['lon']], radius=4, color=None, fill=True, fill_color="red" if row['risk_score'] > threshold else "orange", fill_opacity=row['risk_score']).add_to(m) | |
| for idx, row in blind_stops.iterrows(): | |
| folium.CircleMarker(location=[row['lat'], row['lon']], radius=2, color="gray", fill=True, fill_color="gray").add_to(m) | |
| colors = ['cyan', 'lime', 'yellow'] | |
| for i, route in enumerate(real_bus_routes): | |
| if route['coords']: | |
| AntPath(locations=route['coords'], dash_array=[15, 20], delay=800, color='white', pulse_color=colors[i % len(colors)], weight=4, opacity=0.6).add_to(m) | |
| if loop_coords: | |
| AntPath(locations=loop_coords, dash_array=[10, 15], delay=500, color='purple', pulse_color='magenta', weight=5, opacity=0.9).add_to(m) | |
| if len(transfer_coords) >= 2: | |
| AntPath(locations=transfer_coords, dash_array=[1, 10], delay=300, color='orange', pulse_color='gold', weight=8, opacity=1.0).add_to(m) | |
| for idx, row in unique_blind_stops.iterrows(): | |
| folium.Marker(location=[row['lat'], row['lon']], icon=folium.Icon(color="purple", icon="bus", prefix="fa")).add_to(m) | |
| if not closest_hubs.empty: | |
| for idx, row in closest_hubs.iterrows(): | |
| folium.CircleMarker(location=[row['lat'], row['lon']], radius=7, color="gold", fill=True, fill_color="orange", fill_opacity=0.8).add_to(m) | |
| output_path = os.path.join(os.path.dirname(__file__), 'demo_map.html') | |
| m.save(output_path) | |
| print(f"Map successfully saved to {output_path}") | |
| if __name__ == "__main__": | |
| export_static_map() | |