Spaces:
Running
Running
File size: 8,664 Bytes
bc071d1 a53019e bc071d1 a53019e bc071d1 a53019e bc071d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | import pandas as pd
import numpy as np
import folium
from folium.plugins import AntPath
import os
import requests
def export_static_map():
print("Fetching real OSM data...")
bbox = "37.491,127.020,37.505,127.035"
endpoints = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.osm.ch/api/interpreter"
]
query = f"""
[out:json][timeout:60];
(
relation["route"="bus"]["ref"~"N13|N15|N37|N75"]({bbox});
node["highway"="bus_stop"]({bbox});
node["amenity"~"pub|bar|nightclub|restaurant"]({bbox});
node["highway"="street_lamp"]({bbox});
node["man_made"="surveillance"]({bbox});
);
out body geom;
"""
headers = {'User-Agent': 'SmartTransitMVP/2.0'}
data = None
for url in endpoints:
try:
response = requests.post(url, data=query, headers=headers, timeout=65)
response.raise_for_status()
data = response.json()
break
except Exception as e:
print(f"Failed to fetch from {url}: {e}")
continue
if data is None:
print("Error: Could not fetch data from any Overpass API endpoint.")
data = {'elements': []}
bus_stops, amenities, safety_infra, real_bus_routes = [], [], [], []
for element in data.get('elements', []):
if element['type'] == 'node':
lat, lon = element['lat'], element['lon']
tags = element.get('tags', {})
if 'highway' in tags and tags['highway'] == 'bus_stop':
bus_stops.append({'lat': lat, 'lon': lon, 'stop_id': element['id'], 'name': tags.get('name', '정류장')})
elif 'amenity' in tags:
amenities.append({'lat': lat, 'lon': lon})
elif 'highway' in tags or 'man_made' in tags:
safety_infra.append({'lat': lat, 'lon': lon})
elif element['type'] == 'relation':
tags = element.get('tags', {})
name = tags.get('name', tags.get('ref', 'N버스'))
coords = []
for member in element.get('members', []):
if member['type'] == 'way' and 'geometry' in member:
for pt in member['geometry']: coords.append([pt['lat'], pt['lon']])
if coords: real_bus_routes.append({'name': name, 'coords': coords})
stops_df, amenities_df, safety_df = pd.DataFrame(bus_stops), pd.DataFrame(amenities), pd.DataFrame(safety_infra)
lats = np.linspace(37.492, 37.504, 20)
lons = np.linspace(127.021, 127.034, 20)
grid_data = []
stop_coords = stops_df[['lat', 'lon']].values if not stops_df.empty else np.array([])
amenity_coords = amenities_df[['lat', 'lon']].values if not amenities_df.empty else np.array([])
safety_coords = safety_df[['lat', 'lon']].values if not safety_df.empty else np.array([])
for lat in lats:
for lon in lons:
point = np.array([lat, lon])
demand = np.sum(np.sqrt(np.sum((amenity_coords - point)**2, axis=1)) < 0.002) if len(amenity_coords) > 0 else 0
deficit = np.min(np.sqrt(np.sum((stop_coords - point)**2, axis=1))) if len(stop_coords) > 0 else 0
safety_count = np.sum(np.sqrt(np.sum((safety_coords - point)**2, axis=1)) < 0.002) if len(safety_coords) > 0 else 0
grid_data.append({'lat': lat, 'lon': lon, 'raw_demand': demand, 'raw_deficit': deficit, 'raw_safety_count': safety_count})
df = pd.DataFrame(grid_data)
df['base_demand'] = df['raw_demand'] / df['raw_demand'].max() if df['raw_demand'].max() > 0 else 0
df['base_deficit'] = df['raw_deficit'] / df['raw_deficit'].max() if df['raw_deficit'].max() > 0 else 0
df['base_risk'] = 1 - (df['raw_safety_count'] / df['raw_safety_count'].max()) if df['raw_safety_count'].max() > 0 else 1.0
# 23:30 Time settings for the static map
alpha, beta, gamma = 0.8, 0.2, 0.0
df['risk_score'] = alpha * df['base_demand'] + beta * df['base_deficit'] + gamma * df['base_risk']
threshold = df['risk_score'].quantile(0.85)
# DRT Loop Logic
nbus_coords = np.array([pt for route in real_bus_routes for pt in route['coords']])
if not stops_df.empty and len(nbus_coords) > 0:
stops_df['is_nbus_stop'] = stops_df.apply(lambda r: np.min(np.sqrt((nbus_coords[:,0]-r['lat'])**2 + (nbus_coords[:,1]-r['lon'])**2)) < 0.001, axis=1)
else: stops_df['is_nbus_stop'] = False
nbus_stops, blind_stops = stops_df[stops_df['is_nbus_stop']], stops_df[~stops_df['is_nbus_stop']]
drt_targets = df.nlargest(50, 'risk_score')
drt_assignments = []
for idx, grid_row in drt_targets.iterrows():
if not blind_stops.empty:
distances = np.sqrt((blind_stops['lat'] - grid_row['lat'])**2 + (blind_stops['lon'] - grid_row['lon'])**2)
drt_assignments.append(blind_stops.loc[distances.idxmin()])
unique_blind_stops = pd.DataFrame(drt_assignments).drop_duplicates('stop_id')
loop_coords, transfer_coords = [], []
closest_hubs = pd.DataFrame()
def ccw(p1, p2, p3):
return (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p2[1] - p1[1]) * (p3[0] - p1[0])
def get_convex_hull(points):
if len(points) <= 3: return points
points = sorted(points, key=lambda p: (p[0], p[1]))
lower = []
for p in points:
while len(lower) >= 2 and ccw(lower[-2], lower[-1], p) <= 0: lower.pop()
lower.append(p)
upper = []
for p in reversed(points):
while len(upper) >= 2 and ccw(upper[-2], upper[-1], p) <= 0: upper.pop()
upper.append(p)
return lower[:-1] + upper[:-1]
if not unique_blind_stops.empty and not nbus_stops.empty:
c_lat, c_lon = unique_blind_stops['lat'].mean(), unique_blind_stops['lon'].mean()
dist_to_hub = np.sqrt((nbus_stops['lat'] - c_lat)**2 + (nbus_stops['lon'] - c_lon)**2)
closest_hubs = nbus_stops.loc[dist_to_hub.nsmallest(3).index]
transfer_coords = sorted(closest_hubs[['lat', 'lon']].values.tolist(), key=lambda x: x[1])
loop_stops = pd.concat([unique_blind_stops, closest_hubs]).drop_duplicates('stop_id')
coords = loop_stops[['lat', 'lon']].values.tolist()
hull_coords = get_convex_hull(coords)
hull_lats = [pt[0] for pt in hull_coords]
hull_lons = [pt[1] for pt in hull_coords]
unique_blind_stops = unique_blind_stops[unique_blind_stops.apply(lambda r: any(abs(r['lat'] - hl) < 1e-6 and abs(r['lon'] - hlon) < 1e-6 for hl, hlon in zip(hull_lats, hull_lons)), axis=1)]
hull_coords.append(hull_coords[0])
loop_coords = hull_coords
print("Generating HTML map...")
m = folium.Map(location=[37.498, 127.027], zoom_start=15, tiles="CartoDB dark_matter")
for idx, row in df.iterrows():
if row['risk_score'] > 0.3:
folium.CircleMarker(location=[row['lat'], row['lon']], radius=4, color=None, fill=True, fill_color="red" if row['risk_score'] > threshold else "orange", fill_opacity=row['risk_score']).add_to(m)
for idx, row in blind_stops.iterrows():
folium.CircleMarker(location=[row['lat'], row['lon']], radius=2, color="gray", fill=True, fill_color="gray").add_to(m)
colors = ['cyan', 'lime', 'yellow']
for i, route in enumerate(real_bus_routes):
if route['coords']:
AntPath(locations=route['coords'], dash_array=[15, 20], delay=800, color='white', pulse_color=colors[i % len(colors)], weight=4, opacity=0.6).add_to(m)
if loop_coords:
AntPath(locations=loop_coords, dash_array=[10, 15], delay=500, color='purple', pulse_color='magenta', weight=5, opacity=0.9).add_to(m)
if len(transfer_coords) >= 2:
AntPath(locations=transfer_coords, dash_array=[1, 10], delay=300, color='orange', pulse_color='gold', weight=8, opacity=1.0).add_to(m)
for idx, row in unique_blind_stops.iterrows():
folium.Marker(location=[row['lat'], row['lon']], icon=folium.Icon(color="purple", icon="bus", prefix="fa")).add_to(m)
if not closest_hubs.empty:
for idx, row in closest_hubs.iterrows():
folium.CircleMarker(location=[row['lat'], row['lon']], radius=7, color="gold", fill=True, fill_color="orange", fill_opacity=0.8).add_to(m)
output_path = os.path.join(os.path.dirname(__file__), 'demo_map.html')
m.save(output_path)
print(f"Map successfully saved to {output_path}")
if __name__ == "__main__":
export_static_map()
|