naohiro701's picture
Update app.py
975b20f verified
import streamlit as st
import logging
import warnings
# Disable usage statistics collection message
st.set_option('browser.gatherUsageStats', False)
# Suppress Streamlit missing ScriptRunContext warnings
logging.getLogger("streamlit.runtime.scriptrunner_utils").setLevel(logging.ERROR)
# Suppress all warnings for a cleaner output
warnings.filterwarnings("ignore")
# Lazy import heavy libraries inside functions to speed up startup
def fetch_station_data(station: str, time_limit: int):
"""Fetch station travel-time data via API and return as GeoDataFrame."""
import requests
import pandas as pd
import geopandas as gpd
api_url = (
f"https://api.nearby-map.com/search_stations"
f"?station={station}&time_limit={time_limit}&max_transfer=2"
)
res = requests.get(api_url)
if res.status_code != 200:
st.error(f"API request failed ({res.status_code})")
return gpd.GeoDataFrame()
items = res.json().get("data", [])
df = pd.DataFrame(items)
df.rename(columns={
"name": "station_name",
"time": "travel_time",
"lat": "latitude",
"lon": "longitude",
}, inplace=True)
return gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df.longitude, df.latitude),
crs="EPSG:4326",
)
def build_isochrone_polygons(stations_gdf, travel_speeds_kmh, time_limits):
"""Generate isochrone polygons using a single pre-fetched graph to limit map data retrieval."""
# Lazy imports
import pandas as pd
import numpy as np
from matplotlib.colors import LinearSegmentedColormap, to_hex
import networkx as nx
import osmnx as ox
import geopandas as gpd
from shapely.ops import unary_union, polygonize
# Determine global max remaining time
min_travel = stations_gdf['travel_time'].min()
max_time_lim = max(time_limits)
max_remaining = max_time_lim - min_travel
if max_remaining <= 0:
return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326')
# Compute max distance (meters)
max_distance = max_remaining * travel_speeds_kmh * 1000 / 60
# Center at mean coordinates
center_lat = stations_gdf.geometry.y.mean()
center_lon = stations_gdf.geometry.x.mean()
# Prefetch graph once within bounding radius
G = ox.graph_from_point((center_lat, center_lon), dist=max_distance * 1.2, network_type='walk')
# Project and set edge travel time attribute
G = ox.project_graph(G)
mpm = travel_speeds_kmh * 1000 / 60
for u, v, k, data in G.edges(keys=True, data=True):
data['time'] = data['length'] / mpm
# Prepare colormap
cmap = LinearSegmentedColormap.from_list('iso', ['green','yellow','orange','red'], N=len(time_limits))
colors = [to_hex(cmap(i)) for i in np.linspace(0, 1, len(time_limits))]
iso_list = []
# Generate isochrones for each time limit
for total_min in time_limits:
polys = []
for _, row in stations_gdf.iterrows():
rem = total_min - row['travel_time']
if rem <= 0:
continue
# Find nearest node in pre-fetched graph
node = ox.nearest_nodes(G, row['longitude'], row['latitude'])
# Extract subgraph reachable within 'rem' minutes
sub = nx.ego_graph(G, node, radius=rem, distance='time')
if not sub.edges:
continue
edges_gdf = ox.graph_to_gdfs(sub, nodes=False, edges=True)
if edges_gdf.empty:
continue
merged = edges_gdf['geometry'].unary_union
polys.extend(polygonize(merged))
if polys:
unified = unary_union(polys)
gdf_iso = gpd.GeoDataFrame(
{'time': [total_min], 'color': [colors[time_limits.index(total_min)]]},
geometry=[unified],
crs=G.graph['crs']
).to_crs(epsg=4326)
iso_list.append(gdf_iso)
if iso_list:
return gpd.GeoDataFrame(pd.concat(iso_list, ignore_index=True), crs='EPSG:4326')
return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326')
def main():
"""Streamlit application entrypoint."""
st.title("Isochrone Map Demo")
station_name = st.text_input("Station name", "溜池山王")
max_time = st.slider("Max travel time (min)", 10, 60, 30, 10)
step = st.selectbox("Time step (min)", [5, 10, 15], index=1)
if st.button("Generate Map"):
with st.spinner("Calculating…"):
stations = fetch_station_data(station_name, max_time)
if stations.empty:
st.warning("No station data.")
return
time_values = list(range(step, max_time + 1, step))
iso_gdf = build_isochrone_polygons(
stations, travel_speeds_kmh=4.8, time_limits=time_values
)
# Lazy import folium and streamlit_folium
import folium
from streamlit_folium import st_folium
center = [stations.geometry.y.mean(), stations.geometry.x.mean()]
fmap = folium.Map(location=center, zoom_start=12)
for _, row in iso_gdf.iterrows():
folium.GeoJson(
row.geometry,
style_function=lambda feat, col=row["color"]: {
"fillColor": col,
"color": col,
"weight": 2,
"fillOpacity": 0.2,
},
).add_to(fmap)
for _, row in stations.iterrows():
folium.Marker(
[row["latitude"], row["longitude"]],
popup=f"{row['station_name']} ({int(row['travel_time'])}分)"
).add_to(fmap)
st_folium(fmap, width=700, height=500)
# Allow script to be run directly with "python app.py"
if __name__ == "__main__":
import sys
from streamlit.web import cli as stcli
sys.argv = ["streamlit", "run", sys.argv[0]]
sys.exit(stcli.main())