File size: 6,079 Bytes
c3225ca
975b20f
 
 
 
 
 
 
 
 
 
 
 
 
b102c32
975b20f
 
 
 
c3225ca
 
 
 
 
 
 
 
 
 
b102c32
 
 
 
 
 
c3225ca
 
 
 
 
 
975b20f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b102c32
c3225ca
975b20f
 
 
c3225ca
 
975b20f
 
b102c32
975b20f
 
 
 
 
c3225ca
975b20f
b102c32
c3225ca
975b20f
 
c3225ca
b102c32
 
975b20f
b102c32
975b20f
 
 
 
 
 
 
7fc4060
 
c3225ca
b102c32
c3225ca
b102c32
c3225ca
b102c32
e4e9640
c3225ca
b102c32
c3225ca
 
 
b102c32
975b20f
 
 
 
 
 
 
 
 
c3225ca
 
 
 
 
 
 
 
 
b102c32
c3225ca
 
 
 
b102c32
 
7fc4060
b102c32
c3225ca
b102c32
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import streamlit as st
import logging
import warnings

# Disable usage statistics collection message
st.set_option('browser.gatherUsageStats', False)
# Suppress Streamlit missing ScriptRunContext warnings
logging.getLogger("streamlit.runtime.scriptrunner_utils").setLevel(logging.ERROR)
# Suppress all warnings for a cleaner output
warnings.filterwarnings("ignore")

# Lazy import heavy libraries inside functions to speed up startup

def fetch_station_data(station: str, time_limit: int):
    """Fetch station travel-time data via API and return as GeoDataFrame."""
    import requests
    import pandas as pd
    import geopandas as gpd

    api_url = (
        f"https://api.nearby-map.com/search_stations"
        f"?station={station}&time_limit={time_limit}&max_transfer=2"
    )
    res = requests.get(api_url)
    if res.status_code != 200:
        st.error(f"API request failed ({res.status_code})")
        return gpd.GeoDataFrame()
    items = res.json().get("data", [])
    df = pd.DataFrame(items)
    df.rename(columns={
        "name": "station_name",
        "time": "travel_time",
        "lat": "latitude",
        "lon": "longitude",
    }, inplace=True)
    return gpd.GeoDataFrame(
        df,
        geometry=gpd.points_from_xy(df.longitude, df.latitude),
        crs="EPSG:4326",
    )


def build_isochrone_polygons(stations_gdf, travel_speeds_kmh, time_limits):
    """Generate isochrone polygons using a single pre-fetched graph to limit map data retrieval."""
    # Lazy imports
    import pandas as pd
    import numpy as np
    from matplotlib.colors import LinearSegmentedColormap, to_hex
    import networkx as nx
    import osmnx as ox
    import geopandas as gpd
    from shapely.ops import unary_union, polygonize

    # Determine global max remaining time
    min_travel = stations_gdf['travel_time'].min()
    max_time_lim = max(time_limits)
    max_remaining = max_time_lim - min_travel
    if max_remaining <= 0:
        return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326')

    # Compute max distance (meters)
    max_distance = max_remaining * travel_speeds_kmh * 1000 / 60
    # Center at mean coordinates
    center_lat = stations_gdf.geometry.y.mean()
    center_lon = stations_gdf.geometry.x.mean()

    # Prefetch graph once within bounding radius
    G = ox.graph_from_point((center_lat, center_lon), dist=max_distance * 1.2, network_type='walk')
    # Project and set edge travel time attribute
    G = ox.project_graph(G)
    mpm = travel_speeds_kmh * 1000 / 60
    for u, v, k, data in G.edges(keys=True, data=True):
        data['time'] = data['length'] / mpm

    # Prepare colormap
    cmap = LinearSegmentedColormap.from_list('iso', ['green','yellow','orange','red'], N=len(time_limits))
    colors = [to_hex(cmap(i)) for i in np.linspace(0, 1, len(time_limits))]

    iso_list = []
    # Generate isochrones for each time limit
    for total_min in time_limits:
        polys = []
        for _, row in stations_gdf.iterrows():
            rem = total_min - row['travel_time']
            if rem <= 0:
                continue
            # Find nearest node in pre-fetched graph
            node = ox.nearest_nodes(G, row['longitude'], row['latitude'])
            # Extract subgraph reachable within 'rem' minutes
            sub = nx.ego_graph(G, node, radius=rem, distance='time')
            if not sub.edges:
                continue
            edges_gdf = ox.graph_to_gdfs(sub, nodes=False, edges=True)
            if edges_gdf.empty:
                continue
            merged = edges_gdf['geometry'].unary_union
            polys.extend(polygonize(merged))
        if polys:
            unified = unary_union(polys)
            gdf_iso = gpd.GeoDataFrame(
                {'time': [total_min], 'color': [colors[time_limits.index(total_min)]]},
                geometry=[unified],
                crs=G.graph['crs']
            ).to_crs(epsg=4326)
            iso_list.append(gdf_iso)

    if iso_list:
        return gpd.GeoDataFrame(pd.concat(iso_list, ignore_index=True), crs='EPSG:4326')
    return gpd.GeoDataFrame(columns=['time','color','geometry'], crs='EPSG:4326')


def main():
    """Streamlit application entrypoint."""
    st.title("Isochrone Map Demo")
    station_name = st.text_input("Station name", "溜池山王")
    max_time = st.slider("Max travel time (min)", 10, 60, 30, 10)
    step = st.selectbox("Time step (min)", [5, 10, 15], index=1)
    if st.button("Generate Map"):
        with st.spinner("Calculating…"):
            stations = fetch_station_data(station_name, max_time)
            if stations.empty:
                st.warning("No station data.")
                return
            time_values = list(range(step, max_time + 1, step))
            iso_gdf = build_isochrone_polygons(
                stations, travel_speeds_kmh=4.8, time_limits=time_values
            )
            # Lazy import folium and streamlit_folium
            import folium
            from streamlit_folium import st_folium

            center = [stations.geometry.y.mean(), stations.geometry.x.mean()]
            fmap = folium.Map(location=center, zoom_start=12)
            for _, row in iso_gdf.iterrows():
                folium.GeoJson(
                    row.geometry,
                    style_function=lambda feat, col=row["color"]: {
                        "fillColor": col,
                        "color": col,
                        "weight": 2,
                        "fillOpacity": 0.2,
                    },
                ).add_to(fmap)
            for _, row in stations.iterrows():
                folium.Marker(
                    [row["latitude"], row["longitude"]],
                    popup=f"{row['station_name']} ({int(row['travel_time'])}分)"
                ).add_to(fmap)
            st_folium(fmap, width=700, height=500)

# Allow script to be run directly with "python app.py"
if __name__ == "__main__":
    import sys
    from streamlit.web import cli as stcli

    sys.argv = ["streamlit", "run", sys.argv[0]]
    sys.exit(stcli.main())