MoonlightBus / export_map.py
upsonyeon's picture
Upload export_map.py
a53019e verified
Raw
History Blame Contribute Delete
8.66 kB
import pandas as pd
import numpy as np
import folium
from folium.plugins import AntPath
import os
import requests
def export_static_map():
print("Fetching real OSM data...")
bbox = "37.491,127.020,37.505,127.035"
endpoints = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.osm.ch/api/interpreter"
]
query = f"""
[out:json][timeout:60];
(
relation["route"="bus"]["ref"~"N13|N15|N37|N75"]({bbox});
node["highway"="bus_stop"]({bbox});
node["amenity"~"pub|bar|nightclub|restaurant"]({bbox});
node["highway"="street_lamp"]({bbox});
node["man_made"="surveillance"]({bbox});
);
out body geom;
"""
headers = {'User-Agent': 'SmartTransitMVP/2.0'}
data = None
for url in endpoints:
try:
response = requests.post(url, data=query, headers=headers, timeout=65)
response.raise_for_status()
data = response.json()
break
except Exception as e:
print(f"Failed to fetch from {url}: {e}")
continue
if data is None:
print("Error: Could not fetch data from any Overpass API endpoint.")
data = {'elements': []}
bus_stops, amenities, safety_infra, real_bus_routes = [], [], [], []
for element in data.get('elements', []):
if element['type'] == 'node':
lat, lon = element['lat'], element['lon']
tags = element.get('tags', {})
if 'highway' in tags and tags['highway'] == 'bus_stop':
bus_stops.append({'lat': lat, 'lon': lon, 'stop_id': element['id'], 'name': tags.get('name', '정류장')})
elif 'amenity' in tags:
amenities.append({'lat': lat, 'lon': lon})
elif 'highway' in tags or 'man_made' in tags:
safety_infra.append({'lat': lat, 'lon': lon})
elif element['type'] == 'relation':
tags = element.get('tags', {})
name = tags.get('name', tags.get('ref', 'N버스'))
coords = []
for member in element.get('members', []):
if member['type'] == 'way' and 'geometry' in member:
for pt in member['geometry']: coords.append([pt['lat'], pt['lon']])
if coords: real_bus_routes.append({'name': name, 'coords': coords})
stops_df, amenities_df, safety_df = pd.DataFrame(bus_stops), pd.DataFrame(amenities), pd.DataFrame(safety_infra)
lats = np.linspace(37.492, 37.504, 20)
lons = np.linspace(127.021, 127.034, 20)
grid_data = []
stop_coords = stops_df[['lat', 'lon']].values if not stops_df.empty else np.array([])
amenity_coords = amenities_df[['lat', 'lon']].values if not amenities_df.empty else np.array([])
safety_coords = safety_df[['lat', 'lon']].values if not safety_df.empty else np.array([])
for lat in lats:
for lon in lons:
point = np.array([lat, lon])
demand = np.sum(np.sqrt(np.sum((amenity_coords - point)**2, axis=1)) < 0.002) if len(amenity_coords) > 0 else 0
deficit = np.min(np.sqrt(np.sum((stop_coords - point)**2, axis=1))) if len(stop_coords) > 0 else 0
safety_count = np.sum(np.sqrt(np.sum((safety_coords - point)**2, axis=1)) < 0.002) if len(safety_coords) > 0 else 0
grid_data.append({'lat': lat, 'lon': lon, 'raw_demand': demand, 'raw_deficit': deficit, 'raw_safety_count': safety_count})
df = pd.DataFrame(grid_data)
df['base_demand'] = df['raw_demand'] / df['raw_demand'].max() if df['raw_demand'].max() > 0 else 0
df['base_deficit'] = df['raw_deficit'] / df['raw_deficit'].max() if df['raw_deficit'].max() > 0 else 0
df['base_risk'] = 1 - (df['raw_safety_count'] / df['raw_safety_count'].max()) if df['raw_safety_count'].max() > 0 else 1.0
# 23:30 Time settings for the static map
alpha, beta, gamma = 0.8, 0.2, 0.0
df['risk_score'] = alpha * df['base_demand'] + beta * df['base_deficit'] + gamma * df['base_risk']
threshold = df['risk_score'].quantile(0.85)
# DRT Loop Logic
nbus_coords = np.array([pt for route in real_bus_routes for pt in route['coords']])
if not stops_df.empty and len(nbus_coords) > 0:
stops_df['is_nbus_stop'] = stops_df.apply(lambda r: np.min(np.sqrt((nbus_coords[:,0]-r['lat'])**2 + (nbus_coords[:,1]-r['lon'])**2)) < 0.001, axis=1)
else: stops_df['is_nbus_stop'] = False
nbus_stops, blind_stops = stops_df[stops_df['is_nbus_stop']], stops_df[~stops_df['is_nbus_stop']]
drt_targets = df.nlargest(50, 'risk_score')
drt_assignments = []
for idx, grid_row in drt_targets.iterrows():
if not blind_stops.empty:
distances = np.sqrt((blind_stops['lat'] - grid_row['lat'])**2 + (blind_stops['lon'] - grid_row['lon'])**2)
drt_assignments.append(blind_stops.loc[distances.idxmin()])
unique_blind_stops = pd.DataFrame(drt_assignments).drop_duplicates('stop_id')
loop_coords, transfer_coords = [], []
closest_hubs = pd.DataFrame()
def ccw(p1, p2, p3):
return (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p2[1] - p1[1]) * (p3[0] - p1[0])
def get_convex_hull(points):
if len(points) <= 3: return points
points = sorted(points, key=lambda p: (p[0], p[1]))
lower = []
for p in points:
while len(lower) >= 2 and ccw(lower[-2], lower[-1], p) <= 0: lower.pop()
lower.append(p)
upper = []
for p in reversed(points):
while len(upper) >= 2 and ccw(upper[-2], upper[-1], p) <= 0: upper.pop()
upper.append(p)
return lower[:-1] + upper[:-1]
if not unique_blind_stops.empty and not nbus_stops.empty:
c_lat, c_lon = unique_blind_stops['lat'].mean(), unique_blind_stops['lon'].mean()
dist_to_hub = np.sqrt((nbus_stops['lat'] - c_lat)**2 + (nbus_stops['lon'] - c_lon)**2)
closest_hubs = nbus_stops.loc[dist_to_hub.nsmallest(3).index]
transfer_coords = sorted(closest_hubs[['lat', 'lon']].values.tolist(), key=lambda x: x[1])
loop_stops = pd.concat([unique_blind_stops, closest_hubs]).drop_duplicates('stop_id')
coords = loop_stops[['lat', 'lon']].values.tolist()
hull_coords = get_convex_hull(coords)
hull_lats = [pt[0] for pt in hull_coords]
hull_lons = [pt[1] for pt in hull_coords]
unique_blind_stops = unique_blind_stops[unique_blind_stops.apply(lambda r: any(abs(r['lat'] - hl) < 1e-6 and abs(r['lon'] - hlon) < 1e-6 for hl, hlon in zip(hull_lats, hull_lons)), axis=1)]
hull_coords.append(hull_coords[0])
loop_coords = hull_coords
print("Generating HTML map...")
m = folium.Map(location=[37.498, 127.027], zoom_start=15, tiles="CartoDB dark_matter")
for idx, row in df.iterrows():
if row['risk_score'] > 0.3:
folium.CircleMarker(location=[row['lat'], row['lon']], radius=4, color=None, fill=True, fill_color="red" if row['risk_score'] > threshold else "orange", fill_opacity=row['risk_score']).add_to(m)
for idx, row in blind_stops.iterrows():
folium.CircleMarker(location=[row['lat'], row['lon']], radius=2, color="gray", fill=True, fill_color="gray").add_to(m)
colors = ['cyan', 'lime', 'yellow']
for i, route in enumerate(real_bus_routes):
if route['coords']:
AntPath(locations=route['coords'], dash_array=[15, 20], delay=800, color='white', pulse_color=colors[i % len(colors)], weight=4, opacity=0.6).add_to(m)
if loop_coords:
AntPath(locations=loop_coords, dash_array=[10, 15], delay=500, color='purple', pulse_color='magenta', weight=5, opacity=0.9).add_to(m)
if len(transfer_coords) >= 2:
AntPath(locations=transfer_coords, dash_array=[1, 10], delay=300, color='orange', pulse_color='gold', weight=8, opacity=1.0).add_to(m)
for idx, row in unique_blind_stops.iterrows():
folium.Marker(location=[row['lat'], row['lon']], icon=folium.Icon(color="purple", icon="bus", prefix="fa")).add_to(m)
if not closest_hubs.empty:
for idx, row in closest_hubs.iterrows():
folium.CircleMarker(location=[row['lat'], row['lon']], radius=7, color="gold", fill=True, fill_color="orange", fill_opacity=0.8).add_to(m)
output_path = os.path.join(os.path.dirname(__file__), 'demo_map.html')
m.save(output_path)
print(f"Map successfully saved to {output_path}")
if __name__ == "__main__":
export_static_map()