NBOM_Forcast / plot_utils.py
nakas's picture
feat: heuristic fallbacks for Snow PoE (lognormal from snowfall mean) and Precip Type probs (wet-bulb from T/Td); expand cloud-layer candidates; add per-step wind rose small-multiples grid
2ccb60b
from typing import Dict, List, Optional
import pandas as pd
import plotly.graph_objects as go
def _parse_time_index(df: pd.DataFrame, time_col: str = "time_utc") -> pd.DatetimeIndex:
if pd.api.types.is_datetime64_any_dtype(df[time_col]):
idx = pd.DatetimeIndex(df[time_col])
else:
idx = pd.to_datetime(df[time_col], utc=True, errors="coerce")
return idx
def make_temp_dew_wind_fig(df: pd.DataFrame) -> go.Figure:
"""Temperature, dewpoint, wind and gust lines stacked in two axes akin to NBM viewer style."""
x = _parse_time_index(df)
fig = go.Figure()
# Temperature and dewpoint on left axis
fig.add_trace(
go.Scatter(x=x, y=df["temp_F"], name="Temp (F)", mode="lines+markers", line=dict(color="#d62728"), marker=dict(size=4))
)
if "dewpoint_F" in df.columns:
fig.add_trace(
go.Scatter(x=x, y=df["dewpoint_F"], name="Dewpoint (F)", mode="lines+markers", line=dict(color="#1f77b4"), marker=dict(size=3))
)
# Wind/Gust on right axis
if "wind_mph" in df.columns:
fig.add_trace(
go.Scatter(
x=x,
y=df["wind_mph"],
name="Wind (mph)",
yaxis="y2",
mode="lines+markers",
line=dict(color="#2ca02c"),
marker=dict(size=3),
)
)
if "gust_mph" in df.columns:
fig.add_trace(
go.Scatter(
x=x,
y=df["gust_mph"],
name="Gust (mph)",
yaxis="y2",
mode="lines",
line=dict(color="#98df8a", dash="dash"),
)
)
fig.update_layout(
margin=dict(l=40, r=40, t=30, b=40),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
xaxis=dict(title="Time (UTC)", showgrid=True),
yaxis=dict(title="Temp (F)", zeroline=False, showgrid=True),
yaxis2=dict(title="Wind (mph)", overlaying="y", side="right", showgrid=False),
)
return fig
def make_cloud_precip_fig(df: pd.DataFrame) -> go.Figure:
x = _parse_time_index(df)
fig = go.Figure()
# Cloud cover as area (left axis)
if "cloud_cover_pct" in df.columns:
fig.add_trace(
go.Scatter(
x=x,
y=df["cloud_cover_pct"],
name="Cloud Cover (%)",
mode="lines",
line=dict(color="#7f7f7f"),
fill="tozeroy",
opacity=0.3,
)
)
# Precip bars (right axis)
if "precip_in" in df.columns:
fig.add_trace(
go.Bar(
x=x,
y=df["precip_in"],
name="Precip (in)",
marker_color="#17becf",
opacity=0.7,
yaxis="y2",
)
)
fig.update_layout(
barmode="overlay",
margin=dict(l=40, r=20, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title="Cloud (%)", rangemode="tozero"),
yaxis2=dict(title="Precip (in)", overlaying="y", side="right", rangemode="tozero"),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_snow_prob_fig(
time_index: pd.DatetimeIndex,
probs: Dict[str, pd.Series],
label_map: Optional[Dict[str, str]] = None,
) -> go.Figure:
"""Multi-series probability of exceeding snowfall thresholds per hour.
probs: mapping of variable_name -> probability values in percent (0-100) indexed by time.
label_map: optional mapping var_name -> legend label.
"""
fig = go.Figure()
order = sorted(probs.keys(), key=lambda k: probs[k].mean() if len(probs[k]) else 0.0)
palette = [
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
"#8c564b",
"#e377c2",
"#7f7f7f",
"#bcbd22",
"#17becf",
]
def _auto_label(k: str) -> str:
# Try to decode asnow threshold to inches
import re as _re
m = _re.search(r"asnow(\d+)", k)
if m:
val = int(m.group(1))
# Known special labels
special = {127: 0.5, 254: 0.1, 381: 1.5, 508: 2.0, 635: 2.5, 762: 0.3, 1016: 4.0}
if val in special:
inc = special[val]
else:
# Fallback: interpret as meters with 1e4 divisor -> inches
inc = round((val / 10000.0) / 0.0254, 2)
return f">= {inc:g} in"
# apcp threshold label
m2 = _re.search(r"apcp(\d+)", k)
if m2:
mm = int(m2.group(1)) / 10.0 # rough fallback
return f"P(precip > {mm:g} mm)"
return k
for i, key in enumerate(order):
label = label_map.get(key, _auto_label(key)) if label_map else _auto_label(key)
fig.add_trace(
go.Bar(
x=time_index,
y=probs[key].values,
name=label,
marker_color=palette[i % len(palette)],
opacity=0.75,
)
)
fig.update_layout(
barmode="group",
margin=dict(l=40, r=20, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title="Prob. Exceedance (%)", range=[0, 100]),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_snow_6h_accum_fig(
x: pd.DatetimeIndex,
snow6h: pd.Series,
accum_line: pd.Series,
) -> go.Figure:
fig = go.Figure()
fig.add_trace(
go.Bar(x=x, y=snow6h.values, name="6 hr Snow (in)", marker_color="#1f77b4", opacity=0.8)
)
fig.add_trace(
go.Scatter(x=x, y=accum_line.values, name="Accumulated (in)", mode="lines", line=dict(color="#2ca02c"), yaxis="y2")
)
fig.update_layout(
barmode="overlay",
margin=dict(l=40, r=40, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title="6 hr Snow (in)", rangemode="tozero"),
yaxis2=dict(title="Accum (in)", overlaying="y", side="right", rangemode="tozero"),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_window_snow_fig(x: pd.DatetimeIndex, snow_win: pd.Series, window_label: str) -> go.Figure:
fig = go.Figure()
fig.add_trace(go.Bar(x=x, y=snow_win.values, name=f"{window_label} Snow (in)", marker_color="#4e79a7", opacity=0.85))
fig.update_layout(
margin=dict(l=40, r=20, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title=f"{window_label} Snow (in)", rangemode="tozero"),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_cloud_layers_fig(
x: pd.DatetimeIndex,
layers: Dict[str, pd.Series],
total: pd.Series | None = None,
) -> go.Figure:
fig = go.Figure()
palette = ["#c7c7c7", "#8c8c8c", "#525252", "#a0a0ff"]
# Plot each layer as filled area
for i, (name, series) in enumerate(layers.items()):
fig.add_trace(
go.Scatter(
x=x,
y=series.values,
name=name,
mode="lines",
line=dict(color=palette[i % len(palette)]),
fill="tozeroy",
opacity=0.4,
)
)
if total is not None:
fig.add_trace(
go.Scatter(
x=x,
y=total.values,
name="Total Cloud (%)",
mode="lines",
line=dict(color="#444444", width=2),
)
)
fig.update_layout(
margin=dict(l=40, r=20, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title="Cloud Cover (%)", range=[0, 100]),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_precip_type_fig(x: pd.DatetimeIndex, probs: Dict[str, pd.Series]) -> go.Figure:
fig = go.Figure()
colors = {
"Rain": "#2ca02c",
"Freezing Rain": "#e377c2",
"Snow": "#1f77b4",
"Sleet": "#9467bd",
}
for name, s in probs.items():
fig.add_trace(
go.Scatter(x=x, y=s.values, name=name, mode="lines", line=dict(color=colors.get(name, None), width=2))
)
fig.update_layout(
margin=dict(l=40, r=20, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title="Precip Type Prob (%)", range=[0, 100]),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_snow_level_fig(
x: pd.DatetimeIndex,
snow_level_kft: pd.Series,
precip_window_in: pd.Series | None = None,
) -> go.Figure:
fig = go.Figure()
fig.add_trace(
go.Scatter(x=x, y=snow_level_kft.values, name="Snow level (kft)", mode="lines", line=dict(color="#1f77b4", width=3))
)
if precip_window_in is not None:
fig.add_trace(
go.Bar(
x=x,
y=precip_window_in.values,
name="Precip (in)",
marker_color="rgba(0,128,0,0.35)",
yaxis="y2",
)
)
fig.update_layout(
margin=dict(l=40, r=40, t=30, b=40),
xaxis=dict(title="Time (UTC)"),
yaxis=dict(title="Snow level (kft)", rangemode="tozero"),
yaxis2=dict(title="Precip (in)", overlaying="y", side="right", rangemode="tozero"),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
return fig
def make_wind_rose_fig(dir_deg: pd.Series, spd_mph: pd.Series) -> go.Figure:
"""Aggregate a wind rose with 16 direction bins and 4 speed classes.
Speed bins: 0-5, 5-10, 10-20, >20 mph
"""
import numpy as np
valid = (~dir_deg.isna()) & (~spd_mph.isna())
d = dir_deg[valid].astype(float)
s = spd_mph[valid].astype(float)
if len(d) == 0:
return go.Figure()
dir_bins = np.arange(-11.25, 360 + 22.5, 22.5)
dir_labels = (dir_bins[:-1] + dir_bins[1:]) / 2.0
speed_edges = [0, 5, 10, 20, 1e6]
speed_labels = ["0-5", "5-10", "10-20", ">20"]
colors = ["#d0f0fd", "#86c5da", "#2ca02c", "#d62728"]
traces = []
for i in range(len(speed_edges) - 1):
mask = (s >= speed_edges[i]) & (s < speed_edges[i + 1])
if mask.sum() == 0:
counts = np.zeros(len(dir_labels))
else:
hist, _ = np.histogram(d[mask] % 360.0, bins=dir_bins)
counts = hist
traces.append(
go.Barpolar(
r=counts,
theta=dir_labels,
name=speed_labels[i],
marker_color=colors[i],
opacity=0.85,
)
)
fig = go.Figure(traces)
fig.update_layout(
margin=dict(l=40, r=40, t=30, b=40),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
polar=dict(
angularaxis=dict(direction="clockwise"),
radialaxis=dict(visible=True, ticks=""),
),
)
return fig
def make_wind_rose_grid(
times: pd.DatetimeIndex,
dir_deg: pd.Series,
spd_mph: pd.Series,
step_hours: float,
max_panels: int = 24,
) -> go.Figure:
"""Render small-multiples wind roses per native time step.
Caps at `max_panels` panels for readability.
"""
import numpy as np
from plotly.subplots import make_subplots
n = len(times)
if n == 0:
return go.Figure()
panels = min(n, max_panels)
cols = min(6, panels)
rows = int(np.ceil(panels / cols))
fig = make_subplots(rows=rows, cols=cols, specs=[[{"type": "polar"}]*cols for _ in range(rows)], subplot_titles=[t.strftime("%b %d %HZ") for t in times[:panels]])
# Binning config reused
dir_bins = np.arange(-11.25, 360 + 22.5, 22.5)
dir_labels = (dir_bins[:-1] + dir_bins[1:]) / 2.0
speed_edges = [0, 5, 10, 20, 1e6]
speed_labels = ["0-5", "5-10", "10-20", ">20"]
colors = ["#d0f0fd", "#86c5da", "#2ca02c", "#d62728"]
import numpy as _np
for idx in range(panels):
d = float(dir_deg.iloc[idx]) if _np.isfinite(dir_deg.iloc[idx]) else _np.nan
s = float(spd_mph.iloc[idx]) if _np.isfinite(spd_mph.iloc[idx]) else _np.nan
# Build a rose with single observation -> show as counts
hist = _np.histogram([d % 360.0] if _np.isfinite(d) else [], bins=dir_bins)[0]
# Place into one of the speed bins
bin_idx = 0
for i in range(len(speed_edges)-1):
if speed_edges[i] <= s < speed_edges[i+1]:
bin_idx = i
break
r_counts = [hist if i == bin_idx else _np.zeros_like(hist) for i in range(4)]
r = (idx // cols) + 1
c = (idx % cols) + 1
for i in range(4):
fig.add_trace(
go.Barpolar(r=r_counts[i], theta=dir_labels, name=speed_labels[i], marker_color=colors[i], opacity=0.9, showlegend=(idx==0)),
row=r, col=c,
)
fig.update_layout(margin=dict(l=30, r=30, t=40, b=30))
return fig