import streamlit as st import logging import warnings # Disable usage statistics collection message st.set_option('browser.gatherUsageStats', False) # Suppress Streamlit missing ScriptRunContext warnings logging.getLogger("streamlit.runtime.scriptrunner_utils").setLevel(logging.ERROR) # Suppress all warnings for a cleaner output warnings.filterwarnings("ignore") # Lazy import heavy libraries inside functions to speed up startup def fetch_station_data(station: str, time_limit: int): """Fetch station travel-time data via API and return as GeoDataFrame.""" import requests import pandas as pd import geopandas as gpd api_url = ( f"https://api.nearby-map.com/search_stations" f"?station={station}&time_limit={time_limit}&max_transfer=2" ) res = requests.get(api_url) if res.status_code != 200: st.error(f"API request failed ({res.status_code})") return gpd.GeoDataFrame() items = res.json().get("data", []) df = pd.DataFrame(items) df.rename(columns={ "name": "station_name", "time": "travel_time", "lat": "latitude", "lon": "longitude", }, inplace=True) return gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326", ) def build_isochrone_polygons(stations_gdf, travel_speeds_kmh, time_limits): """Generate isochrone polygons using a single pre-fetched graph to limit map data retrieval.""" # Lazy imports import pandas as pd import numpy as np from matplotlib.colors import LinearSegmentedColormap, to_hex import networkx as nx import osmnx as ox import geopandas as gpd from shapely.ops import unary_union, polygonize # Determine global max remaining time min_travel = stations_gdf['travel_time'].min() max_time_lim = max(time_limits) max_remaining = max_time_lim - min_travel if max_remaining <= 0: return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326') # Compute max distance (meters) max_distance = max_remaining * travel_speeds_kmh * 1000 / 60 # Center at mean coordinates center_lat = stations_gdf.geometry.y.mean() center_lon = stations_gdf.geometry.x.mean() # Prefetch graph once within bounding radius G = ox.graph_from_point((center_lat, center_lon), dist=max_distance * 1.2, network_type='walk') # Project and set edge travel time attribute G = ox.project_graph(G) mpm = travel_speeds_kmh * 1000 / 60 for u, v, k, data in G.edges(keys=True, data=True): data['time'] = data['length'] / mpm # Prepare colormap cmap = LinearSegmentedColormap.from_list('iso', ['green','yellow','orange','red'], N=len(time_limits)) colors = [to_hex(cmap(i)) for i in np.linspace(0, 1, len(time_limits))] iso_list = [] # Generate isochrones for each time limit for total_min in time_limits: polys = [] for _, row in stations_gdf.iterrows(): rem = total_min - row['travel_time'] if rem <= 0: continue # Find nearest node in pre-fetched graph node = ox.nearest_nodes(G, row['longitude'], row['latitude']) # Extract subgraph reachable within 'rem' minutes sub = nx.ego_graph(G, node, radius=rem, distance='time') if not sub.edges: continue edges_gdf = ox.graph_to_gdfs(sub, nodes=False, edges=True) if edges_gdf.empty: continue merged = edges_gdf['geometry'].unary_union polys.extend(polygonize(merged)) if polys: unified = unary_union(polys) gdf_iso = gpd.GeoDataFrame( {'time': [total_min], 'color': [colors[time_limits.index(total_min)]]}, geometry=[unified], crs=G.graph['crs'] ).to_crs(epsg=4326) iso_list.append(gdf_iso) if iso_list: return gpd.GeoDataFrame(pd.concat(iso_list, ignore_index=True), crs='EPSG:4326') return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326') def main(): """Streamlit application entrypoint.""" st.title("Isochrone Map Demo") station_name = st.text_input("Station name", "溜池山王") max_time = st.slider("Max travel time (min)", 10, 60, 30, 10) step = st.selectbox("Time step (min)", [5, 10, 15], index=1) if st.button("Generate Map"): with st.spinner("Calculating…"): stations = fetch_station_data(station_name, max_time) if stations.empty: st.warning("No station data.") return time_values = list(range(step, max_time + 1, step)) iso_gdf = build_isochrone_polygons( stations, travel_speeds_kmh=4.8, time_limits=time_values ) # Lazy import folium and streamlit_folium import folium from streamlit_folium import st_folium center = [stations.geometry.y.mean(), stations.geometry.x.mean()] fmap = folium.Map(location=center, zoom_start=12) for _, row in iso_gdf.iterrows(): folium.GeoJson( row.geometry, style_function=lambda feat, col=row["color"]: { "fillColor": col, "color": col, "weight": 2, "fillOpacity": 0.2, }, ).add_to(fmap) for _, row in stations.iterrows(): folium.Marker( [row["latitude"], row["longitude"]], popup=f"{row['station_name']} ({int(row['travel_time'])}分)" ).add_to(fmap) st_folium(fmap, width=700, height=500) # Allow script to be run directly with "python app.py" if __name__ == "__main__": import sys from streamlit.web import cli as stcli sys.argv = ["streamlit", "run", sys.argv[0]] sys.exit(stcli.main())