Spaces:
Runtime error
Runtime error
File size: 6,079 Bytes
c3225ca 975b20f b102c32 975b20f c3225ca b102c32 c3225ca 975b20f b102c32 c3225ca 975b20f c3225ca 975b20f b102c32 975b20f c3225ca 975b20f b102c32 c3225ca 975b20f c3225ca b102c32 975b20f b102c32 975b20f 7fc4060 c3225ca b102c32 c3225ca b102c32 c3225ca b102c32 e4e9640 c3225ca b102c32 c3225ca b102c32 975b20f c3225ca b102c32 c3225ca b102c32 7fc4060 b102c32 c3225ca b102c32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import streamlit as st
import logging
import warnings
# Disable usage statistics collection message
st.set_option('browser.gatherUsageStats', False)
# Suppress Streamlit missing ScriptRunContext warnings
logging.getLogger("streamlit.runtime.scriptrunner_utils").setLevel(logging.ERROR)
# Suppress all warnings for a cleaner output
warnings.filterwarnings("ignore")
# Lazy import heavy libraries inside functions to speed up startup
def fetch_station_data(station: str, time_limit: int):
"""Fetch station travel-time data via API and return as GeoDataFrame."""
import requests
import pandas as pd
import geopandas as gpd
api_url = (
f"https://api.nearby-map.com/search_stations"
f"?station={station}&time_limit={time_limit}&max_transfer=2"
)
res = requests.get(api_url)
if res.status_code != 200:
st.error(f"API request failed ({res.status_code})")
return gpd.GeoDataFrame()
items = res.json().get("data", [])
df = pd.DataFrame(items)
df.rename(columns={
"name": "station_name",
"time": "travel_time",
"lat": "latitude",
"lon": "longitude",
}, inplace=True)
return gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df.longitude, df.latitude),
crs="EPSG:4326",
)
def build_isochrone_polygons(stations_gdf, travel_speeds_kmh, time_limits):
"""Generate isochrone polygons using a single pre-fetched graph to limit map data retrieval."""
# Lazy imports
import pandas as pd
import numpy as np
from matplotlib.colors import LinearSegmentedColormap, to_hex
import networkx as nx
import osmnx as ox
import geopandas as gpd
from shapely.ops import unary_union, polygonize
# Determine global max remaining time
min_travel = stations_gdf['travel_time'].min()
max_time_lim = max(time_limits)
max_remaining = max_time_lim - min_travel
if max_remaining <= 0:
return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326')
# Compute max distance (meters)
max_distance = max_remaining * travel_speeds_kmh * 1000 / 60
# Center at mean coordinates
center_lat = stations_gdf.geometry.y.mean()
center_lon = stations_gdf.geometry.x.mean()
# Prefetch graph once within bounding radius
G = ox.graph_from_point((center_lat, center_lon), dist=max_distance * 1.2, network_type='walk')
# Project and set edge travel time attribute
G = ox.project_graph(G)
mpm = travel_speeds_kmh * 1000 / 60
for u, v, k, data in G.edges(keys=True, data=True):
data['time'] = data['length'] / mpm
# Prepare colormap
cmap = LinearSegmentedColormap.from_list('iso', ['green','yellow','orange','red'], N=len(time_limits))
colors = [to_hex(cmap(i)) for i in np.linspace(0, 1, len(time_limits))]
iso_list = []
# Generate isochrones for each time limit
for total_min in time_limits:
polys = []
for _, row in stations_gdf.iterrows():
rem = total_min - row['travel_time']
if rem <= 0:
continue
# Find nearest node in pre-fetched graph
node = ox.nearest_nodes(G, row['longitude'], row['latitude'])
# Extract subgraph reachable within 'rem' minutes
sub = nx.ego_graph(G, node, radius=rem, distance='time')
if not sub.edges:
continue
edges_gdf = ox.graph_to_gdfs(sub, nodes=False, edges=True)
if edges_gdf.empty:
continue
merged = edges_gdf['geometry'].unary_union
polys.extend(polygonize(merged))
if polys:
unified = unary_union(polys)
gdf_iso = gpd.GeoDataFrame(
{'time': [total_min], 'color': [colors[time_limits.index(total_min)]]},
geometry=[unified],
crs=G.graph['crs']
).to_crs(epsg=4326)
iso_list.append(gdf_iso)
if iso_list:
return gpd.GeoDataFrame(pd.concat(iso_list, ignore_index=True), crs='EPSG:4326')
return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326')
def main():
"""Streamlit application entrypoint."""
st.title("Isochrone Map Demo")
station_name = st.text_input("Station name", "溜池山王")
max_time = st.slider("Max travel time (min)", 10, 60, 30, 10)
step = st.selectbox("Time step (min)", [5, 10, 15], index=1)
if st.button("Generate Map"):
with st.spinner("Calculating…"):
stations = fetch_station_data(station_name, max_time)
if stations.empty:
st.warning("No station data.")
return
time_values = list(range(step, max_time + 1, step))
iso_gdf = build_isochrone_polygons(
stations, travel_speeds_kmh=4.8, time_limits=time_values
)
# Lazy import folium and streamlit_folium
import folium
from streamlit_folium import st_folium
center = [stations.geometry.y.mean(), stations.geometry.x.mean()]
fmap = folium.Map(location=center, zoom_start=12)
for _, row in iso_gdf.iterrows():
folium.GeoJson(
row.geometry,
style_function=lambda feat, col=row["color"]: {
"fillColor": col,
"color": col,
"weight": 2,
"fillOpacity": 0.2,
},
).add_to(fmap)
for _, row in stations.iterrows():
folium.Marker(
[row["latitude"], row["longitude"]],
popup=f"{row['station_name']} ({int(row['travel_time'])}分)"
).add_to(fmap)
st_folium(fmap, width=700, height=500)
# Allow script to be run directly with "python app.py"
if __name__ == "__main__":
import sys
from streamlit.web import cli as stcli
sys.argv = ["streamlit", "run", sys.argv[0]]
sys.exit(stcli.main())
|