Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import logging | |
| import warnings | |
| # Disable usage statistics collection message | |
| st.set_option('browser.gatherUsageStats', False) | |
| # Suppress Streamlit missing ScriptRunContext warnings | |
| logging.getLogger("streamlit.runtime.scriptrunner_utils").setLevel(logging.ERROR) | |
| # Suppress all warnings for a cleaner output | |
| warnings.filterwarnings("ignore") | |
| # Lazy import heavy libraries inside functions to speed up startup | |
| def fetch_station_data(station: str, time_limit: int): | |
| """Fetch station travel-time data via API and return as GeoDataFrame.""" | |
| import requests | |
| import pandas as pd | |
| import geopandas as gpd | |
| api_url = ( | |
| f"https://api.nearby-map.com/search_stations" | |
| f"?station={station}&time_limit={time_limit}&max_transfer=2" | |
| ) | |
| res = requests.get(api_url) | |
| if res.status_code != 200: | |
| st.error(f"API request failed ({res.status_code})") | |
| return gpd.GeoDataFrame() | |
| items = res.json().get("data", []) | |
| df = pd.DataFrame(items) | |
| df.rename(columns={ | |
| "name": "station_name", | |
| "time": "travel_time", | |
| "lat": "latitude", | |
| "lon": "longitude", | |
| }, inplace=True) | |
| return gpd.GeoDataFrame( | |
| df, | |
| geometry=gpd.points_from_xy(df.longitude, df.latitude), | |
| crs="EPSG:4326", | |
| ) | |
| def build_isochrone_polygons(stations_gdf, travel_speeds_kmh, time_limits): | |
| """Generate isochrone polygons using a single pre-fetched graph to limit map data retrieval.""" | |
| # Lazy imports | |
| import pandas as pd | |
| import numpy as np | |
| from matplotlib.colors import LinearSegmentedColormap, to_hex | |
| import networkx as nx | |
| import osmnx as ox | |
| import geopandas as gpd | |
| from shapely.ops import unary_union, polygonize | |
| # Determine global max remaining time | |
| min_travel = stations_gdf['travel_time'].min() | |
| max_time_lim = max(time_limits) | |
| max_remaining = max_time_lim - min_travel | |
| if max_remaining <= 0: | |
| return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326') | |
| # Compute max distance (meters) | |
| max_distance = max_remaining * travel_speeds_kmh * 1000 / 60 | |
| # Center at mean coordinates | |
| center_lat = stations_gdf.geometry.y.mean() | |
| center_lon = stations_gdf.geometry.x.mean() | |
| # Prefetch graph once within bounding radius | |
| G = ox.graph_from_point((center_lat, center_lon), dist=max_distance * 1.2, network_type='walk') | |
| # Project and set edge travel time attribute | |
| G = ox.project_graph(G) | |
| mpm = travel_speeds_kmh * 1000 / 60 | |
| for u, v, k, data in G.edges(keys=True, data=True): | |
| data['time'] = data['length'] / mpm | |
| # Prepare colormap | |
| cmap = LinearSegmentedColormap.from_list('iso', ['green','yellow','orange','red'], N=len(time_limits)) | |
| colors = [to_hex(cmap(i)) for i in np.linspace(0, 1, len(time_limits))] | |
| iso_list = [] | |
| # Generate isochrones for each time limit | |
| for total_min in time_limits: | |
| polys = [] | |
| for _, row in stations_gdf.iterrows(): | |
| rem = total_min - row['travel_time'] | |
| if rem <= 0: | |
| continue | |
| # Find nearest node in pre-fetched graph | |
| node = ox.nearest_nodes(G, row['longitude'], row['latitude']) | |
| # Extract subgraph reachable within 'rem' minutes | |
| sub = nx.ego_graph(G, node, radius=rem, distance='time') | |
| if not sub.edges: | |
| continue | |
| edges_gdf = ox.graph_to_gdfs(sub, nodes=False, edges=True) | |
| if edges_gdf.empty: | |
| continue | |
| merged = edges_gdf['geometry'].unary_union | |
| polys.extend(polygonize(merged)) | |
| if polys: | |
| unified = unary_union(polys) | |
| gdf_iso = gpd.GeoDataFrame( | |
| {'time': [total_min], 'color': [colors[time_limits.index(total_min)]]}, | |
| geometry=[unified], | |
| crs=G.graph['crs'] | |
| ).to_crs(epsg=4326) | |
| iso_list.append(gdf_iso) | |
| if iso_list: | |
| return gpd.GeoDataFrame(pd.concat(iso_list, ignore_index=True), crs='EPSG:4326') | |
| return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326') | |
| def main(): | |
| """Streamlit application entrypoint.""" | |
| st.title("Isochrone Map Demo") | |
| station_name = st.text_input("Station name", "溜池山王") | |
| max_time = st.slider("Max travel time (min)", 10, 60, 30, 10) | |
| step = st.selectbox("Time step (min)", [5, 10, 15], index=1) | |
| if st.button("Generate Map"): | |
| with st.spinner("Calculating…"): | |
| stations = fetch_station_data(station_name, max_time) | |
| if stations.empty: | |
| st.warning("No station data.") | |
| return | |
| time_values = list(range(step, max_time + 1, step)) | |
| iso_gdf = build_isochrone_polygons( | |
| stations, travel_speeds_kmh=4.8, time_limits=time_values | |
| ) | |
| # Lazy import folium and streamlit_folium | |
| import folium | |
| from streamlit_folium import st_folium | |
| center = [stations.geometry.y.mean(), stations.geometry.x.mean()] | |
| fmap = folium.Map(location=center, zoom_start=12) | |
| for _, row in iso_gdf.iterrows(): | |
| folium.GeoJson( | |
| row.geometry, | |
| style_function=lambda feat, col=row["color"]: { | |
| "fillColor": col, | |
| "color": col, | |
| "weight": 2, | |
| "fillOpacity": 0.2, | |
| }, | |
| ).add_to(fmap) | |
| for _, row in stations.iterrows(): | |
| folium.Marker( | |
| [row["latitude"], row["longitude"]], | |
| popup=f"{row['station_name']} ({int(row['travel_time'])}分)" | |
| ).add_to(fmap) | |
| st_folium(fmap, width=700, height=500) | |
| # Allow script to be run directly with "python app.py" | |
| if __name__ == "__main__": | |
| import sys | |
| from streamlit.web import cli as stcli | |
| sys.argv = ["streamlit", "run", sys.argv[0]] | |
| sys.exit(stcli.main()) | |