""" src/heatmap.py -------------- Folium heatmap builder for Jaam Ctrl. Exports required by app.py: heatmap_to_html(gps_df, title, zoom) -> str (HTML) combined_heatmap_to_html(mode_dfs, zoom) -> str (HTML) per_junction_density(gps_df) -> dict[str, float] flow_balance_score(gps_df) -> float delay_reduction_pct(gps_df_a, gps_df_b) -> float JUNCTION_NAMES -> dict[str, str] GPS DataFrame schema (produced by run_simulation.py): lat float WGS-84 latitude lon float WGS-84 longitude speed_kmph float vehicle speed weight float 1 - speed/max_speed (high = congested) junction str nearest junction id ("J0" / "J1" / "J2") """ from __future__ import annotations import numpy as np import pandas as pd import folium from folium.plugins import HeatMap # --------------------------------------------------------------------------- # Real-world anchor coordinates — Connaught Place, New Delhi # Verified against OpenStreetMap / Google Maps # --------------------------------------------------------------------------- JUNCTION_COORDS: dict[str, tuple[float, float]] = { "J0": (28.6315, 77.2167), # Tolstoy Marg / Janpath core "J1": (28.6328, 77.2195), # Barakhamba Rd × KG Marg "J2": (28.6287, 77.2140), # Patel Chowk } JUNCTION_NAMES: dict[str, str] = { "J0": "Tolstoy Marg", "J1": "CC Inner Ring (KG Marg)", "J2": "Patel Chowk", } # Corridor centre for map initialisation _CP_CENTER = [28.6310, 77.2168] # Max speed used for weight normalisation _MAX_SPEED = 50.0 # Radius of influence for density assignment (degrees, ≈ 300 m) _JUNCTION_RADIUS = 0.003 # --------------------------------------------------------------------------- # Colour gradients per mode # --------------------------------------------------------------------------- _GRADIENTS = { "fixed": { 0.2: "#1a0033", 0.5: "#7C4DFF", 0.8: "#FF2FD6", 1.0: "#FF4444", }, "adaptive": { 0.2: "#001a33", 0.5: "#0077FF", 0.8: "#00E5FF", 1.0: "#00F5D4", }, "rl": { 0.2: "#1a0033", 0.5: "#FF2FD6", 0.8: "#FF9900", 1.0: "#FFFF00", }, "default": { 0.2: "#001a33", 0.5: "#7C4DFF", 0.8: "#00E5FF", 1.0: "#FF2FD6", }, } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _base_map(zoom: int = 15) -> folium.Map: """Return a dark CartoDB map centred on Connaught Place.""" return folium.Map( location=_CP_CENTER, zoom_start=zoom, tiles="CartoDB dark_matter", prefer_canvas=True, ) def _add_junction_markers(m: folium.Map) -> None: """Draw labelled circle markers at each junction's real coordinates.""" for jid, (lat, lon) in JUNCTION_COORDS.items(): name = JUNCTION_NAMES[jid] folium.CircleMarker( location=[lat, lon], radius=10, color="#00E5FF", fill=True, fill_color="#7C4DFF", fill_opacity=0.85, weight=2, tooltip=folium.Tooltip(f"{jid} | {name}", sticky=True), popup=folium.Popup( f"{jid}
{name}", max_width=200, ), ).add_to(m) def _heat_layer( gps_df: pd.DataFrame, name: str, gradient: dict, ) -> folium.FeatureGroup: """Build a HeatMap layer from a GPS DataFrame wrapped in a FeatureGroup.""" fg = folium.FeatureGroup(name=f"{name} (heat)", show=True) if not gps_df.empty: data = gps_df[["lat", "lon", "weight"]].dropna().values.tolist() HeatMap( data, name="", # Empty name since FeatureGroup has the name min_opacity=0.4, # Increased from 0.35 for better visibility max_zoom=18, radius=25, # Increased from 20 for larger heatmap blobs blur=18, # Reduced from 25 for clearer definition gradient=gradient, ).add_to(fg) return fg def _add_markers_layer(m: folium.Map, gps_df: pd.DataFrame, name: str, color_func) -> None: """Add markers layer as fallback if heatmap doesn't render properly.""" if gps_df.empty: return # Create a FeatureGroup for the markers fg = folium.FeatureGroup(name=f"{name} (markers)", show=True) # Sample data if too many points sample_df = gps_df if len(gps_df) > 200: sample_df = gps_df.sample(200, random_state=42) for idx, row in sample_df.iterrows(): weight = row.get("weight", 0.5) color = color_func(weight) folium.CircleMarker( location=[row["lat"], row["lon"]], radius=2 + 4 * weight, # Radius ranges from 2 to 6 color=color, fill=True, fill_color=color, fill_opacity=0.65, weight=0.5, popup=folium.Popup(f"Weight: {weight:.2f}", max_width=100), ).add_to(fg) fg.add_to(m) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def heatmap_to_html( gps_df: pd.DataFrame, title: str = "Traffic Heatmap", zoom: int = 15, mode: str | None = None, ) -> str: """ Build a single-mode heatmap and return it as an HTML string. Parameters ---------- gps_df : DataFrame with columns lat, lon, weight (and optionally speed_kmph) title : shown in the layer control zoom : initial map zoom level mode : gradient mode ("fixed", "adaptive", "rl") — auto-inferred from title if None """ # Auto-detect mode from title if not provided if mode is None: title_lower = title.lower() if "fixed" in title_lower: mode = "fixed" elif "adaptive" in title_lower: mode = "adaptive" elif "rl" in title_lower or "ppo" in title_lower: mode = "rl" else: mode = "default" m = _base_map(zoom) gradient = _GRADIENTS.get(mode, _GRADIENTS["default"]) _heat_layer(gps_df, title, gradient).add_to(m) _add_junction_markers(m) folium.LayerControl(position="topright", collapsed=False).add_to(m) return m._repr_html_() def heatmap_to_map( gps_df: pd.DataFrame, title: str = "Traffic Heatmap", zoom: int = 15, mode: str | None = None, ) -> folium.Map: """ Build a single-mode heatmap and return the folium Map object. Use this with streamlit_folium.st_folium() for proper Streamlit integration. """ if mode is None: title_lower = title.lower() if "fixed" in title_lower: mode = "fixed" elif "adaptive" in title_lower: mode = "adaptive" elif "rl" in title_lower or "ppo" in title_lower: mode = "rl" else: mode = "default" m = _base_map(zoom) gradient = _GRADIENTS.get(mode, _GRADIENTS["default"]) # Add heatmap layer if not gps_df.empty: _heat_layer(gps_df, title, gradient).add_to(m) _add_junction_markers(m) folium.LayerControl(position="topright", collapsed=False).add_to(m) return m def combined_heatmap_to_html( mode_dfs: dict[str, pd.DataFrame], zoom: int = 15, ) -> str: """ Build a multi-layer heatmap (one layer per simulation mode). Parameters ---------- mode_dfs : {"fixed": df, "adaptive": df, "rl": df} (any subset is fine) zoom : initial map zoom level """ m = _base_map(zoom) layer_labels = { "fixed": "Fixed-Time (baseline)", "adaptive": "Rule-Based Adaptive", "rl": "PPO RL Agent", } for mode_key, gps_df in mode_dfs.items(): if gps_df is None or gps_df.empty: continue gradient = _GRADIENTS.get(mode_key, _GRADIENTS["default"]) label = layer_labels.get(mode_key, mode_key.capitalize()) _heat_layer(gps_df, label, gradient).add_to(m) _add_junction_markers(m) folium.LayerControl(position="topright", collapsed=False).add_to(m) return m._repr_html_() def combined_heatmap_to_map( mode_dfs: dict[str, pd.DataFrame], zoom: int = 15, ) -> folium.Map: """ Build a multi-layer heatmap (overlay multiple modes) and return folium Map object. Use this with streamlit_folium.st_folium() for proper Streamlit integration. Args: mode_dfs: Dictionary mapping mode names to gps_dfs e.g., {"fixed": gps_df1, "rl": gps_df2} """ m = _base_map(zoom) layer_labels = { "fixed": "Fixed-Time (baseline)", "adaptive": "Rule-Based Adaptive", "rl": "PPO RL Agent", } for mode_key, gps_df in mode_dfs.items(): if gps_df is None or gps_df.empty: continue gradient = _GRADIENTS.get(mode_key, _GRADIENTS["default"]) label = layer_labels.get(mode_key, mode_key.capitalize()) # Add heatmap layer _heat_layer(gps_df, label, gradient).add_to(m) _add_junction_markers(m) folium.LayerControl(position="topright", collapsed=False).add_to(m) return m def per_junction_density(gps_df: pd.DataFrame) -> dict[str, float]: """ Return average congestion weight for points near each junction. Returns ------- {"J0": float, "J1": float, "J2": float} A value of 0.0 means no probe points near that junction. """ result: dict[str, float] = {jid: 0.0 for jid in JUNCTION_COORDS} if gps_df.empty: return result # If the DataFrame already has a junction column use it directly if "junction" in gps_df.columns: for jid in JUNCTION_COORDS: sub = gps_df[gps_df["junction"] == jid] if not sub.empty: result[jid] = float(sub["weight"].mean()) return result # Otherwise assign by proximity lats = gps_df["lat"].values lons = gps_df["lon"].values wts = gps_df["weight"].values for jid, (jlat, jlon) in JUNCTION_COORDS.items(): dist = np.sqrt((lats - jlat) ** 2 + (lons - jlon) ** 2) mask = dist < _JUNCTION_RADIUS if mask.any(): result[jid] = float(wts[mask].mean()) return result def flow_balance_score(gps_df: pd.DataFrame) -> float: """ Measure how evenly congestion is distributed across junctions. Returns std(density) / (mean(density) + 1e-6). Lower = more even = better. """ dens = per_junction_density(gps_df) vals = np.array(list(dens.values()), dtype=float) if vals.mean() < 1e-9: return 0.0 return float(vals.std() / (vals.mean() + 1e-6)) def delay_reduction_pct( gps_df_a: pd.DataFrame, gps_df_b: pd.DataFrame, ) -> float: """ Estimate congestion reduction from mode A to mode B. Uses mean(weight) as a proxy for delay. Returns percentage reduction (positive = B is better). """ if gps_df_a.empty or gps_df_b.empty: return 0.0 w_a = gps_df_a["weight"].mean() w_b = gps_df_b["weight"].mean() if w_a < 1e-9: return 0.0 return float((w_a - w_b) / w_a * 100.0)