from typing import Dict, List, Optional import pandas as pd import plotly.graph_objects as go def _parse_time_index(df: pd.DataFrame, time_col: str = "time_utc") -> pd.DatetimeIndex: if pd.api.types.is_datetime64_any_dtype(df[time_col]): idx = pd.DatetimeIndex(df[time_col]) else: idx = pd.to_datetime(df[time_col], utc=True, errors="coerce") return idx def make_temp_dew_wind_fig(df: pd.DataFrame) -> go.Figure: """Temperature, dewpoint, wind and gust lines stacked in two axes akin to NBM viewer style.""" x = _parse_time_index(df) fig = go.Figure() # Temperature and dewpoint on left axis fig.add_trace( go.Scatter(x=x, y=df["temp_F"], name="Temp (F)", mode="lines+markers", line=dict(color="#d62728"), marker=dict(size=4)) ) if "dewpoint_F" in df.columns: fig.add_trace( go.Scatter(x=x, y=df["dewpoint_F"], name="Dewpoint (F)", mode="lines+markers", line=dict(color="#1f77b4"), marker=dict(size=3)) ) # Wind/Gust on right axis if "wind_mph" in df.columns: fig.add_trace( go.Scatter( x=x, y=df["wind_mph"], name="Wind (mph)", yaxis="y2", mode="lines+markers", line=dict(color="#2ca02c"), marker=dict(size=3), ) ) if "gust_mph" in df.columns: fig.add_trace( go.Scatter( x=x, y=df["gust_mph"], name="Gust (mph)", yaxis="y2", mode="lines", line=dict(color="#98df8a", dash="dash"), ) ) fig.update_layout( margin=dict(l=40, r=40, t=30, b=40), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), xaxis=dict(title="Time (UTC)", showgrid=True), yaxis=dict(title="Temp (F)", zeroline=False, showgrid=True), yaxis2=dict(title="Wind (mph)", overlaying="y", side="right", showgrid=False), ) return fig def make_cloud_precip_fig(df: pd.DataFrame) -> go.Figure: x = _parse_time_index(df) fig = go.Figure() # Cloud cover as area (left axis) if "cloud_cover_pct" in df.columns: fig.add_trace( go.Scatter( x=x, y=df["cloud_cover_pct"], name="Cloud Cover (%)", mode="lines", line=dict(color="#7f7f7f"), fill="tozeroy", opacity=0.3, ) ) # Precip bars (right axis) if "precip_in" in df.columns: fig.add_trace( go.Bar( x=x, y=df["precip_in"], name="Precip (in)", marker_color="#17becf", opacity=0.7, yaxis="y2", ) ) fig.update_layout( barmode="overlay", margin=dict(l=40, r=20, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title="Cloud (%)", rangemode="tozero"), yaxis2=dict(title="Precip (in)", overlaying="y", side="right", rangemode="tozero"), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_snow_prob_fig( time_index: pd.DatetimeIndex, probs: Dict[str, pd.Series], label_map: Optional[Dict[str, str]] = None, ) -> go.Figure: """Multi-series probability of exceeding snowfall thresholds per hour. probs: mapping of variable_name -> probability values in percent (0-100) indexed by time. label_map: optional mapping var_name -> legend label. """ fig = go.Figure() order = sorted(probs.keys(), key=lambda k: probs[k].mean() if len(probs[k]) else 0.0) palette = [ "#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd", "#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf", ] def _auto_label(k: str) -> str: # Try to decode asnow threshold to inches import re as _re m = _re.search(r"asnow(\d+)", k) if m: val = int(m.group(1)) # Known special labels special = {127: 0.5, 254: 0.1, 381: 1.5, 508: 2.0, 635: 2.5, 762: 0.3, 1016: 4.0} if val in special: inc = special[val] else: # Fallback: interpret as meters with 1e4 divisor -> inches inc = round((val / 10000.0) / 0.0254, 2) return f">= {inc:g} in" # apcp threshold label m2 = _re.search(r"apcp(\d+)", k) if m2: mm = int(m2.group(1)) / 10.0 # rough fallback return f"P(precip > {mm:g} mm)" return k for i, key in enumerate(order): label = label_map.get(key, _auto_label(key)) if label_map else _auto_label(key) fig.add_trace( go.Bar( x=time_index, y=probs[key].values, name=label, marker_color=palette[i % len(palette)], opacity=0.75, ) ) fig.update_layout( barmode="group", margin=dict(l=40, r=20, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title="Prob. Exceedance (%)", range=[0, 100]), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_snow_6h_accum_fig( x: pd.DatetimeIndex, snow6h: pd.Series, accum_line: pd.Series, ) -> go.Figure: fig = go.Figure() fig.add_trace( go.Bar(x=x, y=snow6h.values, name="6 hr Snow (in)", marker_color="#1f77b4", opacity=0.8) ) fig.add_trace( go.Scatter(x=x, y=accum_line.values, name="Accumulated (in)", mode="lines", line=dict(color="#2ca02c"), yaxis="y2") ) fig.update_layout( barmode="overlay", margin=dict(l=40, r=40, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title="6 hr Snow (in)", rangemode="tozero"), yaxis2=dict(title="Accum (in)", overlaying="y", side="right", rangemode="tozero"), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_window_snow_fig(x: pd.DatetimeIndex, snow_win: pd.Series, window_label: str) -> go.Figure: fig = go.Figure() fig.add_trace(go.Bar(x=x, y=snow_win.values, name=f"{window_label} Snow (in)", marker_color="#4e79a7", opacity=0.85)) fig.update_layout( margin=dict(l=40, r=20, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title=f"{window_label} Snow (in)", rangemode="tozero"), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_cloud_layers_fig( x: pd.DatetimeIndex, layers: Dict[str, pd.Series], total: pd.Series | None = None, ) -> go.Figure: fig = go.Figure() palette = ["#c7c7c7", "#8c8c8c", "#525252", "#a0a0ff"] # Plot each layer as filled area for i, (name, series) in enumerate(layers.items()): fig.add_trace( go.Scatter( x=x, y=series.values, name=name, mode="lines", line=dict(color=palette[i % len(palette)]), fill="tozeroy", opacity=0.4, ) ) if total is not None: fig.add_trace( go.Scatter( x=x, y=total.values, name="Total Cloud (%)", mode="lines", line=dict(color="#444444", width=2), ) ) fig.update_layout( margin=dict(l=40, r=20, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title="Cloud Cover (%)", range=[0, 100]), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_precip_type_fig(x: pd.DatetimeIndex, probs: Dict[str, pd.Series]) -> go.Figure: fig = go.Figure() colors = { "Rain": "#2ca02c", "Freezing Rain": "#e377c2", "Snow": "#1f77b4", "Sleet": "#9467bd", } for name, s in probs.items(): fig.add_trace( go.Scatter(x=x, y=s.values, name=name, mode="lines", line=dict(color=colors.get(name, None), width=2)) ) fig.update_layout( margin=dict(l=40, r=20, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title="Precip Type Prob (%)", range=[0, 100]), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_snow_level_fig( x: pd.DatetimeIndex, snow_level_kft: pd.Series, precip_window_in: pd.Series | None = None, ) -> go.Figure: fig = go.Figure() fig.add_trace( go.Scatter(x=x, y=snow_level_kft.values, name="Snow level (kft)", mode="lines", line=dict(color="#1f77b4", width=3)) ) if precip_window_in is not None: fig.add_trace( go.Bar( x=x, y=precip_window_in.values, name="Precip (in)", marker_color="rgba(0,128,0,0.35)", yaxis="y2", ) ) fig.update_layout( margin=dict(l=40, r=40, t=30, b=40), xaxis=dict(title="Time (UTC)"), yaxis=dict(title="Snow level (kft)", rangemode="tozero"), yaxis2=dict(title="Precip (in)", overlaying="y", side="right", rangemode="tozero"), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), ) return fig def make_wind_rose_fig(dir_deg: pd.Series, spd_mph: pd.Series) -> go.Figure: """Aggregate a wind rose with 16 direction bins and 4 speed classes. Speed bins: 0-5, 5-10, 10-20, >20 mph """ import numpy as np valid = (~dir_deg.isna()) & (~spd_mph.isna()) d = dir_deg[valid].astype(float) s = spd_mph[valid].astype(float) if len(d) == 0: return go.Figure() dir_bins = np.arange(-11.25, 360 + 22.5, 22.5) dir_labels = (dir_bins[:-1] + dir_bins[1:]) / 2.0 speed_edges = [0, 5, 10, 20, 1e6] speed_labels = ["0-5", "5-10", "10-20", ">20"] colors = ["#d0f0fd", "#86c5da", "#2ca02c", "#d62728"] traces = [] for i in range(len(speed_edges) - 1): mask = (s >= speed_edges[i]) & (s < speed_edges[i + 1]) if mask.sum() == 0: counts = np.zeros(len(dir_labels)) else: hist, _ = np.histogram(d[mask] % 360.0, bins=dir_bins) counts = hist traces.append( go.Barpolar( r=counts, theta=dir_labels, name=speed_labels[i], marker_color=colors[i], opacity=0.85, ) ) fig = go.Figure(traces) fig.update_layout( margin=dict(l=40, r=40, t=30, b=40), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), polar=dict( angularaxis=dict(direction="clockwise"), radialaxis=dict(visible=True, ticks=""), ), ) return fig def make_wind_rose_grid( times: pd.DatetimeIndex, dir_deg: pd.Series, spd_mph: pd.Series, step_hours: float, max_panels: int = 24, ) -> go.Figure: """Render small-multiples wind roses per native time step. Caps at `max_panels` panels for readability. """ import numpy as np from plotly.subplots import make_subplots n = len(times) if n == 0: return go.Figure() panels = min(n, max_panels) cols = min(6, panels) rows = int(np.ceil(panels / cols)) fig = make_subplots(rows=rows, cols=cols, specs=[[{"type": "polar"}]*cols for _ in range(rows)], subplot_titles=[t.strftime("%b %d %HZ") for t in times[:panels]]) # Binning config reused dir_bins = np.arange(-11.25, 360 + 22.5, 22.5) dir_labels = (dir_bins[:-1] + dir_bins[1:]) / 2.0 speed_edges = [0, 5, 10, 20, 1e6] speed_labels = ["0-5", "5-10", "10-20", ">20"] colors = ["#d0f0fd", "#86c5da", "#2ca02c", "#d62728"] import numpy as _np for idx in range(panels): d = float(dir_deg.iloc[idx]) if _np.isfinite(dir_deg.iloc[idx]) else _np.nan s = float(spd_mph.iloc[idx]) if _np.isfinite(spd_mph.iloc[idx]) else _np.nan # Build a rose with single observation -> show as counts hist = _np.histogram([d % 360.0] if _np.isfinite(d) else [], bins=dir_bins)[0] # Place into one of the speed bins bin_idx = 0 for i in range(len(speed_edges)-1): if speed_edges[i] <= s < speed_edges[i+1]: bin_idx = i break r_counts = [hist if i == bin_idx else _np.zeros_like(hist) for i in range(4)] r = (idx // cols) + 1 c = (idx % cols) + 1 for i in range(4): fig.add_trace( go.Barpolar(r=r_counts[i], theta=dir_labels, name=speed_labels[i], marker_color=colors[i], opacity=0.9, showlegend=(idx==0)), row=r, col=c, ) fig.update_layout(margin=dict(l=30, r=30, t=40, b=30)) return fig