import gradio as gr
import numpy as np
import plotly.graph_objects as go
from datetime import datetime, timedelta
import warnings
import gc
import sys
import math
from typing import Optional, Dict, Any, List, Tuple
import json
import os
import glob
# Globals used for building overlays without refetching
LAST_RADAR_GRID: Optional[Dict[str, Any]] = None
LAST_ANIMATION_PATH: Optional[str] = None
warnings.filterwarnings('ignore')
# Import weather libraries for REAL data
try:
from herbie import Herbie
import xarray as xr
HERBIE_AVAILABLE = True
print("HERBIE AVAILABLE - Will use real HRRR data including radar")
except ImportError as e:
HERBIE_AVAILABLE = False
print(f"HERBIE NOT AVAILABLE: {e}")
def fetch_real_hrrr_data(param='TMP:2 m', fxx=6, return_src: bool = False):
"""Fetch actual HRRR data from NOAA including radar forecasts.
If return_src is True, returns a tuple (ds, info) where info contains
metadata such as 'date_str' and possible 'file' path.
"""
if not HERBIE_AVAILABLE:
return (None, None) if return_src else None
try:
# Try recent times, working backwards
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f"Trying HRRR data for: {date_str}, parameter: {param}")
# Create Herbie object
H = Herbie(date_str, model='hrrr', product='sfc', fxx=fxx)
# Download specific parameter
ds = H.xarray(param)
if ds is not None:
print(f"SUCCESS: Got real HRRR data for {date_str}")
if return_src:
# Try to discover the source grib path from encodings or Herbie
src_path = None
try:
src_path = ds.encoding.get('source', None)
except Exception:
pass
if not src_path:
try:
# Try variable encodings
for vname in ds.data_vars:
enc = getattr(ds[vname], 'encoding', {})
src_path = enc.get('source', None)
if src_path:
break
except Exception:
pass
# Fallback: ask Herbie for local file path (best effort)
if not src_path:
for attr in ('get_localFilePath', 'get_local_file_path', 'local_file', 'fpath', 'filepath'):
if hasattr(H, attr):
try:
val = getattr(H, attr)
src_path = val() if callable(val) else val
if src_path:
break
except Exception:
continue
info = {
'date_str': date_str,
'param': param,
'fxx': fxx,
'file': src_path
}
return ds, info
else:
return ds
except Exception as e:
print(f"Failed for {date_str}: {e}")
continue
print("All HRRR attempts failed")
return (None, None) if return_src else None
except Exception as e:
print(f"HRRR fetch error: {e}")
return (None, None) if return_src else None
def process_hrrr_data(ds, max_points=400, param_type='temperature'):
"""Process HRRR xarray dataset into plot-ready data"""
if ds is None:
return None
try:
# Get the main data variable
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Get coordinates
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lats = ds.latitude.values
lons = ds.longitude.values
values = data_var.values
elif 'lat' in ds.coords and 'lon' in ds.coords:
lats = ds.lat.values
lons = ds.lon.values
values = data_var.values
else:
return None
# For radar, get MAXIMUM resolution - much more data points
if param_type == 'radar':
max_points = 10000 # Much larger for full radar coverage
min_threshold = 0.1 # Even lower threshold for light precipitation
else:
min_threshold = None
# Less aggressive subsampling for radar to keep more detail
if lats.size > max_points:
if param_type == 'radar':
# For radar, use smaller step to keep more data
step = max(1, int(np.sqrt(lats.size / max_points) * 0.7))
else:
step = max(1, int(np.sqrt(lats.size / max_points)))
if len(lats.shape) == 2:
lats = lats[::step, ::step]
lons = lons[::step, ::step]
values = values[::step, ::step]
else:
lats = lats[::step]
lons = lons[::step]
values = values[::step]
# Flatten arrays
lats_flat = lats.flatten()
lons_flat = lons.flatten()
values_flat = values.flatten()
# Remove invalid values
valid = ~(np.isnan(values_flat) | np.isnan(lats_flat) | np.isnan(lons_flat))
# For radar, use minimal filtering to show maximum coverage
if param_type == 'radar' and min_threshold is not None:
radar_threshold = values_flat > min_threshold
valid = valid & radar_threshold
if not np.any(valid):
return None
return {
'lats': lats_flat[valid],
'lons': lons_flat[valid],
'values': values_flat[valid],
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Data processing error: {e}")
return None
def get_radar_colorscale():
"""Get proper radar reflectivity colorscale in dBZ"""
return [
[0.0, 'rgba(0,0,0,0)'], # Transparent for no echo
[0.1, '#00ECEC'], # Light blue - 5-10 dBZ
[0.2, '#01A0F6'], # Blue - 10-15 dBZ
[0.3, '#0000F6'], # Dark blue - 15-20 dBZ
[0.4, '#00FF00'], # Green - 20-25 dBZ
[0.5, '#00C800'], # Dark green - 25-30 dBZ
[0.6, '#FFFF00'], # Yellow - 30-35 dBZ
[0.7, '#E7C000'], # Orange-yellow - 35-40 dBZ
[0.8, '#FF9000'], # Orange - 40-45 dBZ
[0.9, '#FF0000'], # Red - 45-50 dBZ
[1.0, '#D60000'] # Dark red - 50+ dBZ
]
def process_hrrr_grid(ds, target_cells=50000, param_type='radar', min_threshold=0.1):
"""Return HRRR data as 2D grids (lat2d, lon2d, z2d) suitable for filled contours.
- target_cells: approximate max number of grid cells to draw for performance
- min_threshold: values below are masked as NaN (for radar transparency)
"""
if ds is None:
return None
try:
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Prefer explicit 2D latitude/longitude if available
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lat2d = ds.latitude.values
lon2d = ds.longitude.values
elif 'lat' in ds.coords and 'lon' in ds.coords:
# Some datasets provide 1D lat/lon; try to construct 2D mesh
lat = ds.lat.values
lon = ds.lon.values
if lat.ndim == 1 and lon.ndim == 1:
lon2d, lat2d = np.meshgrid(lon, lat)
else:
lat2d = lat
lon2d = lon
else:
return None
z = data_var.values
# Ensure z is 2D (squeeze time/levels if any)
z = np.squeeze(z)
if z.ndim != 2:
# Cannot contour non-2D
return None
# Subsample to keep performance predictable
ny, nx = z.shape
total = nx * ny
if total > target_cells:
step = int(np.ceil(np.sqrt(total / target_cells)))
step = max(1, step)
z = z[::step, ::step]
lat2d = lat2d[::step, ::step]
lon2d = lon2d[::step, ::step]
# Mask values below threshold for radar
if param_type == 'radar' and min_threshold is not None:
z = np.where(z >= min_threshold, z, np.nan)
return {
'lat2d': lat2d,
'lon2d': lon2d,
'z2d': z,
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Grid processing error: {e}")
return None
def _clamp(val, vmin, vmax):
return max(vmin, min(val, vmax))
def grid_to_geojson(lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray,
max_polygons: Optional[int] = None,
nan_as_transparent: bool = True) -> Optional[Dict[str, Any]]:
"""Convert a lat/lon curvilinear grid into a GeoJSON FeatureCollection of cell polygons.
- Each cell is a quadrilateral around the center (i,j) using neighboring points.
- Values that are NaN are skipped when nan_as_transparent is True.
- max_polygons optionally caps the number of cells included (row/col stride).
"""
try:
ny, nx = z2d.shape
if ny < 2 or nx < 2:
return None
# Determine stride to cap polygons if needed
istep = jstep = 1
total_cells = ny * nx
if max_polygons and total_cells > max_polygons:
factor = math.sqrt(total_cells / max_polygons)
istep = max(1, int(round(factor)))
jstep = istep
features = []
# Helper for safe index
def lat_(i, j):
ii = _clamp(i, 0, ny - 1)
jj = _clamp(j, 0, nx - 1)
return float(lat2d[ii, jj])
def lon_(i, j):
ii = _clamp(i, 0, ny - 1)
jj = _clamp(j, 0, nx - 1)
return float(lon2d[ii, jj])
# Build polygons
for i in range(0, ny, istep):
for j in range(0, nx, jstep):
val = z2d[i, j]
if nan_as_transparent and (val is None or np.isnan(val)):
continue
# Corners as average of 4 surrounding centers (clamped at edges)
# Top-left around (i-0.5, j-0.5)
lat_tl = (lat_(i, j) + lat_(i-1, j) + lat_(i, j-1) + lat_(i-1, j-1)) / 4.0
lon_tl = (lon_(i, j) + lon_(i-1, j) + lon_(i, j-1) + lon_(i-1, j-1)) / 4.0
# Top-right around (i-0.5, j+0.5)
lat_tr = (lat_(i, j) + lat_(i-1, j) + lat_(i, j+1) + lat_(i-1, j+1)) / 4.0
lon_tr = (lon_(i, j) + lon_(i-1, j) + lon_(i, j+1) + lon_(i-1, j+1)) / 4.0
# Bottom-right around (i+0.5, j+0.5)
lat_br = (lat_(i, j) + lat_(i+1, j) + lat_(i, j+1) + lat_(i+1, j+1)) / 4.0
lon_br = (lon_(i, j) + lon_(i+1, j) + lon_(i, j+1) + lon_(i+1, j+1)) / 4.0
# Bottom-left around (i+0.5, j-0.5)
lat_bl = (lat_(i, j) + lat_(i+1, j) + lat_(i, j-1) + lat_(i+1, j-1)) / 4.0
lon_bl = (lon_(i, j) + lon_(i+1, j) + lon_(i, j-1) + lon_(i+1, j-1)) / 4.0
poly = [
[lon_tl, lat_tl],
[lon_tr, lat_tr],
[lon_br, lat_br],
[lon_bl, lat_bl],
[lon_tl, lat_tl]
]
fid = f"{i}-{j}"
feat = {
"type": "Feature",
"id": fid,
"properties": {"id": fid, "value": None if np.isnan(val) else float(val)},
"geometry": {"type": "Polygon", "coordinates": [poly]}
}
features.append(feat)
return {"type": "FeatureCollection", "features": features}
except Exception as e:
print(f"GeoJSON build error: {e}")
return None
def _parse_plotly_color(color_str: str) -> Tuple[float, float, float, float]:
"""Convert '#RRGGBB' or 'rgba(r,g,b,a)' to normalized RGBA tuple."""
color_str = color_str.strip()
if color_str.startswith('#'):
r = int(color_str[1:3], 16) / 255.0
g = int(color_str[3:5], 16) / 255.0
b = int(color_str[5:7], 16) / 255.0
a = 1.0
return (r, g, b, a)
if color_str.startswith('rgba'):
nums = color_str[color_str.find('(')+1:color_str.find(')')].split(',')
r = int(nums[0]) / 255.0
g = int(nums[1]) / 255.0
b = int(nums[2]) / 255.0
a = float(nums[3])
return (r, g, b, a)
raise ValueError(f"Unsupported color: {color_str}")
def build_mpl_colormap(colorscale: List[List[float]], name: str = 'radar'):
"""Build a Matplotlib colormap from a Plotly colorscale definition."""
try:
import matplotlib.colors as mcolors
stops = [(float(p), _parse_plotly_color(c)) for p, c in colorscale]
# mcolors.LinearSegmentedColormap.from_list accepts (x, color) pairs
cmap = mcolors.LinearSegmentedColormap.from_list(name, stops)
# Ensure NaNs are transparent
cmap.set_bad((0, 0, 0, 0))
return cmap
except Exception as e:
print(f"Colormap build error: {e}")
return None
def add_radar_image_layer(fig: go.Figure, lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray,
detail_level: int, param_type: str) -> bool:
"""Render radar as a smooth raster image and overlay via mapbox image layer.
Returns True on success.
"""
try:
import io, base64
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
# Determine output image size based on detail level and grid size
ny, nx = z2d.shape
scale_map = {1: 1.2, 2: 1.6, 3: 2.0, 4: 3.0, 5: 4.0}
scale = scale_map.get(int(detail_level) if detail_level is not None else 3, 2.0)
max_pixels = 2_400_000 # cap to ~2.4 MP for performance
width = int(nx * scale)
height = int(ny * scale)
# Fit within cap preserving aspect
if width * height > max_pixels:
ratio = math.sqrt(max_pixels / (width * height))
width = max(64, int(width * ratio))
height = max(64, int(height * ratio))
# Prepare data (mask NaNs for transparency)
zmask = np.ma.masked_invalid(z2d)
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
return False
dpi = 100
fig_img = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
ax = fig_img.add_axes([0, 0, 1, 1]) # full-bleed
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_img.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_img)
img_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
data_url = f"data:image/png;base64,{img_b64}"
# Corner coordinates (top-left, top-right, bottom-right, bottom-left)
tl = [float(lon2d[0, 0]), float(lat2d[0, 0])]
tr = [float(lon2d[0, -1]), float(lat2d[0, -1])]
br = [float(lon2d[-1, -1]), float(lat2d[-1, -1])]
bl = [float(lon2d[-1, 0]), float(lat2d[-1, 0])]
layers = list(fig.layout.mapbox.layers) if fig.layout.mapbox.layers is not None else []
layers.append(dict(
sourcetype='image',
source=data_url,
coordinates=[tl, tr, br, bl],
opacity=1.0,
below='traces',
name='Radar Raster'
))
fig.update_layout(mapbox_layers=layers)
# Add invisible scatter to provide colorbar for the image
try:
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
fig.add_trace(go.Scattermapbox(
lat=[c_lat, c_lat],
lon=[c_lon, c_lon],
mode='markers',
marker=dict(
size=1,
color=[0, 65],
colorscale=get_radar_colorscale(),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
opacity=0 # invisible points
),
hoverinfo='skip',
name='Radar Scale'
))
except Exception as e:
print(f"Colorbar marker add failed: {e}")
return True
except Exception as e:
print(f"Image layer error: {e}")
return False
def render_radar_png_data_url(z2d: np.ndarray, detail_level: int = 5) -> Optional[str]:
"""Render a single radar frame (z2d) to a transparent PNG data URL."""
try:
import io, base64
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
zmask = np.ma.masked_invalid(z2d)
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
return None
ny, nx = z2d.shape
scale_map = {1: 1.2, 2: 1.6, 3: 2.0, 4: 3.0, 5: 4.0}
scale = scale_map.get(int(detail_level) if detail_level is not None else 3, 2.0)
max_pixels = 2_400_000
width = int(nx * scale)
height = int(ny * scale)
if width * height > max_pixels:
ratio = math.sqrt(max_pixels / (width * height))
width = max(64, int(width * ratio))
height = max(64, int(height * ratio))
dpi = 100
fig_img = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
fig_img.patch.set_alpha(0.0)
ax = fig_img.add_axes([0, 0, 1, 1])
ax.patch.set_alpha(0.0)
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_img.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_img)
img_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
return f"data:image/png;base64,{img_b64}"
except Exception as e:
print(f"render_radar_png_data_url error: {e}")
return None
def build_leaflet_static_overlay_html(grid: Optional[Dict[str, Any]], detail_level: int = 5) -> str:
"""Build a Leaflet HTML with a single transparent PNG overlaid using piecewise projective warping."""
try:
if not grid:
return "
No radar grid available.
"
lat2d = grid['lat2d']
lon2d = grid['lon2d']
z2d = grid['z2d']
data_url = render_radar_png_data_url(z2d, detail_level)
if not data_url:
return "
Failed to render radar image.
"
min_lat = float(np.nanmin(lat2d))
max_lat = float(np.nanmax(lat2d))
min_lon = float(np.nanmin(lon2d))
max_lon = float(np.nanmax(lon2d))
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
ny, nx = lat2d.shape
TY = max(6, min(18, ny // 10))
TX = max(8, min(24, nx // 10))
yi = np.linspace(0, ny - 1, TY + 1).astype(int)
xi = np.linspace(0, nx - 1, TX + 1).astype(int)
tiles = []
for i in range(TY):
for j in range(TX):
i0, i1 = yi[i], yi[i+1]
j0, j1 = xi[j], xi[j+1]
tiles.append({
'tl': [float(lat2d[i0, j0]), float(lon2d[i0, j0])],
'tr': [float(lat2d[i0, j1]), float(lon2d[i0, j1])],
'br': [float(lat2d[i1, j1]), float(lon2d[i1, j1])],
'bl': [float(lat2d[i1, j0]), float(lon2d[i1, j0])],
'ti': int(i), 'tj': int(j)
})
tiles_json = json.dumps(tiles)
doc = f"""
"""
# Escape for srcdoc
doc_escaped = doc.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """)
return f""
except Exception as e:
return f"
Leaflet static overlay error: {str(e)}
"
def _locate_or_download_grib(forecast_hour: int):
"""Return local GRIB2 path for HRRR REFC at fxx, downloading if needed."""
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
try:
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [0, 1, 2, 3, 6, 12, 18, 24]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
H = Herbie(date_str, model='hrrr', product='sfc', fxx=int(forecast_hour))
# Ensure local file
local = None
try:
local = H.get_localFilePath()
except Exception:
local = None
if not local:
files = None
try:
files = H.download()
except Exception:
files = None
if isinstance(files, (list, tuple)) and files:
local = files[0]
if not local and hasattr(H, 'fpath'):
local = H.fpath
if local and os.path.exists(str(local)):
return str(local), None
# As a fallback, search the expected directory for subset GRIB2 files
# Herbie typically stores under ~/data/hrrr/YYYYMMDD
try:
day_dir = os.path.expanduser(os.path.join('~', 'data', 'hrrr', target_time.strftime('%Y%m%d')))
if os.path.isdir(day_dir):
pattern1 = os.path.join(day_dir, f"*wrfsfcf{int(forecast_hour):02d}.grib2")
pattern2 = os.path.join(day_dir, f"**/*f{int(forecast_hour):02d}*.grib2")
candidates = sorted(glob.glob(pattern1)) + sorted(glob.glob(pattern2, recursive=True))
if candidates:
return candidates[0], None
except Exception as se:
print(f"subset search failed: {se}")
except Exception as e:
print(f"locate/download attempt failed: {e}")
continue
# Global fallback: scan entire cache tree (could be slow but last resort)
try:
root = os.path.expanduser(os.path.join('~', 'data', 'hrrr'))
if os.path.isdir(root):
pat = os.path.join(root, f"**/*f{int(forecast_hour):02d}*.grib2")
cand = glob.glob(pat, recursive=True)
if cand:
return sorted(cand)[0], None
except Exception as e2:
print(f"global scan failed: {e2}")
return None, "Unable to locate/download GRIB file"
except Exception as e:
return None, f"Locate/download error: {e}"
def export_radar_grib(forecast_hour: int, min_dbz: float):
"""Export the HRRR radar (REFC) field to a GRIB2 file with values below min_dbz set to missing.
Returns (path, message). If path is None, message contains error.
"""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available to fetch HRRR data."
# Fetch dataset and try to learn source path and date used
ds, info = fetch_real_hrrr_data('REFC:entire atmosphere', int(forecast_hour), return_src=True)
if ds is None:
return None, "Unable to fetch HRRR radar data for export."
var_names = list(ds.data_vars)
if not var_names:
return None, "Dataset missing variables."
vname = var_names[0]
z = np.squeeze(ds[vname].values)
if z.ndim != 2:
return None, "Unexpected radar array shape."
# Apply threshold
thr = float(min_dbz) if min_dbz is not None else 1.0
z = np.where(z >= thr, z.astype(float), np.nan)
# Determine or download source GRIB path
src = None
if isinstance(info, dict) and info.get('file') and os.path.exists(info['file']):
src = info['file']
if not src:
src, err = _locate_or_download_grib(int(forecast_hour))
if not src:
return None, err or "Could not obtain source GRIB file"
from eccodes import codes_grib_new_from_file, codes_get, codes_set, codes_set_values, codes_write, codes_release
# Iterate file to find the composite reflectivity message
handle = None
with open(src, 'rb') as f:
while True:
try:
gid = codes_grib_new_from_file(f)
except Exception:
gid = None
if gid is None:
break
try:
shortName = None
try:
shortName = codes_get(gid, 'shortName')
except Exception:
shortName = None
name = None
try:
name = codes_get(gid, 'name')
except Exception:
name = None
# Identify composite reflectivity
ok = False
if shortName and str(shortName).lower() in ('refc', 'refd', 'refl', 'ref'): # be lenient
ok = True
if (not ok) and name and 'reflect' in str(name).lower():
ok = True
if ok and handle is None:
handle = gid
break
else:
codes_release(gid)
except Exception:
try:
codes_release(gid)
except Exception:
pass
if handle is None:
return None, "Composite reflectivity message not found in GRIB file."
# Ensure bitmap for missing values
try:
codes_set(handle, 'bitmapPresent', 1)
except Exception:
pass
# Flatten in scan order (assuming row-major)
vals = z.flatten().astype(float)
codes_set_values(handle, vals)
os.makedirs('exports', exist_ok=True)
date_tag = info.get('date_str', 'unknown').replace(':', '').replace(' ', 'T') if isinstance(info, dict) else 'unknown'
out_path = os.path.join('exports', f"hrrr_radar_reflectivity_{date_tag}_f{int(forecast_hour):02d}_mindbz{thr:.1f}.grib2")
with open(out_path, 'wb') as fo:
codes_write(handle, fo)
try:
codes_release(handle)
except Exception:
pass
return out_path, None
except Exception as e:
return None, f"Export error: {e}"
def export_kmz_radar(forecast_hour: int, detail_level: int = 5, min_dbz: float = 0.0):
"""Export a KMZ (KML + PNG) GroundOverlay of HRRR radar reflectivity for the given forecast hour.
Uses gx:LatLonQuad for the four-corner overlay to preserve orientation.
Returns (kmz_path, message)."""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
ds = fetch_real_hrrr_data('REFC:entire atmosphere', int(forecast_hour))
if isinstance(ds, tuple):
ds = ds[0]
grid = process_hrrr_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz))
if not grid:
return None, "Radar grid not available"
lat2d = grid['lat2d']
lon2d = grid['lon2d']
z2d = grid['z2d']
# Render PNG bytes
try:
import io
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
zmask = np.ma.masked_invalid(z2d)
cmap = build_mpl_colormap(get_radar_colorscale())
ny, nx = z2d.shape
scale_map = {1: 1.2, 2: 1.6, 3: 2.0, 4: 3.0, 5: 4.0}
scale = scale_map.get(int(detail_level), 2.0)
width = int(nx * scale)
height = int(ny * scale)
dpi = 100
fig_img = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
fig_img.patch.set_alpha(0.0)
ax = fig_img.add_axes([0, 0, 1, 1])
ax.patch.set_alpha(0.0)
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_img.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_img)
png_bytes = buf.getvalue()
except Exception as e:
return None, f"PNG render error: {e}"
# Corners TL, TR, BR, BL
lat_tl, lon_tl = float(lat2d[0, 0]), float(lon2d[0, 0])
lat_tr, lon_tr = float(lat2d[0, -1]), float(lon2d[0, -1])
lat_br, lon_br = float(lat2d[-1, -1]), float(lon2d[-1, -1])
lat_bl, lon_bl = float(lat2d[-1, 0]), float(lon2d[-1, 0])
kml = f"""
HRRR Radar Reflectivity f{int(forecast_hour):02d}ffffffffoverlay.png
{lon_tl:.6f},{lat_tl:.6f},0 {lon_tr:.6f},{lat_tr:.6f},0 {lon_br:.6f},{lat_br:.6f},0 {lon_bl:.6f},{lat_bl:.6f},0
"""
import zipfile
os.makedirs('exports', exist_ok=True)
kmz_path = os.path.join('exports', f"hrrr_radar_f{int(forecast_hour):02d}.kmz")
with zipfile.ZipFile(kmz_path, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('doc.kml', kml)
zf.writestr('overlay.png', png_bytes)
return kmz_path, None
except Exception as e:
return None, f"KMZ export error: {e}"
def download_raw_grib(forecast_hour: int):
"""Return a copy-path under ./exports for the raw HRRR GRIB2 file used for REFC at the given forecast hour."""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
# Try immediate locate/download via Herbie
src_file, err = _locate_or_download_grib(int(forecast_hour))
if not src_file:
return None, err
try:
import shutil
os.makedirs('exports', exist_ok=True)
base = os.path.basename(str(src_file))
dest = os.path.join('exports', f"raw_{base}")
shutil.copy2(src_file, dest)
return dest, None
except Exception as e:
return None, f"Copy error: {e}"
# Fallback: attempt direct Herbie path
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
H = Herbie(date_str, model='hrrr', product='sfc', fxx=int(forecast_hour))
# This triggers download if not present
local = H.get_localFilePath() if hasattr(H, 'get_localFilePath') else None
if not local and hasattr(H, 'download'):
files = H.download()
if isinstance(files, (list, tuple)) and files:
local = files[0]
if not local and hasattr(H, 'fpath'):
local = H.fpath
# Fallback handled above
except Exception:
continue
return None, "Unable to locate/download raw GRIB file"
except Exception as e:
return None, f"Raw download error: {e}"
def generate_radar_animation_gif(detail_level: int = 5, min_dbz: float = 0.0):
"""Generate a GIF animating radar reflectivity from f00..f18 and return (path, message).
The GIF is set to loop indefinitely.
"""
try:
import os
import imageio
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
frames = []
times = []
for fxx in range(0, 19):
ds = fetch_real_hrrr_data('REFC:entire atmosphere', fxx)
if isinstance(ds, tuple):
ds = ds[0]
grid = process_hrrr_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz))
if grid is None:
continue
z2d = grid['z2d']
zmask = np.ma.masked_invalid(z2d)
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
continue
ny, nx = z2d.shape
scale_map = {1: 1.0, 2: 1.2, 3: 1.6, 4: 2.0, 5: 2.5}
scale = scale_map.get(int(detail_level), 2.5)
width = int(nx * scale)
height = int(ny * scale)
dpi = 100
fig_anim = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
ax = fig_anim.add_axes([0, 0, 1, 1])
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
fig_anim.canvas.draw()
# Convert canvas to array
img = np.frombuffer(fig_anim.canvas.tostring_argb(), dtype=np.uint8)
img = img.reshape(fig_anim.canvas.get_width_height()[::-1] + (4,))
# ARGB to RGBA
img = img[:, :, [1, 2, 3, 0]]
frames.append(img)
times.append(fxx)
plt.close(fig_anim)
if not frames:
return None, "No frames generated"
os.makedirs('exports', exist_ok=True)
out_path = 'exports/hrrr_radar_animation_f00_f18.gif'
imageio.mimsave(out_path, frames, duration=0.25, loop=0) # 4 fps, loop forever
return out_path, None
except Exception as e:
return None, f"Animation error: {e}"
def generate_radar_animation_png_frames(detail_level: int = 5, min_dbz: float = 0.0, fps: float = 4.0):
"""Return (frames, message) where frames is a list of data URLs (PNG with alpha) for f00..f18."""
try:
import io, base64
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
frames = []
for fxx in range(0, 19):
ds = fetch_real_hrrr_data('REFC:entire atmosphere', fxx)
if isinstance(ds, tuple):
ds = ds[0]
grid = process_hrrr_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz))
if grid is None:
continue
z2d = grid['z2d']
zmask = np.ma.masked_invalid(z2d)
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
continue
ny, nx = z2d.shape
scale_map = {1: 1.0, 2: 1.2, 3: 1.6, 4: 2.0, 5: 2.5}
scale = scale_map.get(int(detail_level), 2.0)
width = int(nx * scale)
height = int(ny * scale)
dpi = 100
fig_anim = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
fig_anim.patch.set_alpha(0.0)
ax = fig_anim.add_axes([0, 0, 1, 1])
ax.patch.set_alpha(0.0)
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_anim.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_anim)
img_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
frames.append(f"data:image/png;base64,{img_b64}")
if not frames:
return None, "No frames generated"
return frames, None
except Exception as e:
return None, f"Animation frames error: {e}"
def build_leaflet_overlay_from_frames(frame_data_urls: List[str], grid: Optional[Dict[str, Any]], fps: float = 4.0):
"""Return HTML with Leaflet + JS that cycles through transparent PNG frames warped
by a 4-corner homography (no external plugins), aligned to the HRRR grid.
"""
try:
if not frame_data_urls:
return "
No animation frames.
"
if not grid or 'lat2d' not in grid or 'lon2d' not in grid:
return "
No grid available for overlay bounds.
"
lat2d = grid['lat2d']
lon2d = grid['lon2d']
# Bounds for initial fit
min_lat = float(np.nanmin(lat2d))
max_lat = float(np.nanmax(lat2d))
min_lon = float(np.nanmin(lon2d))
max_lon = float(np.nanmax(lon2d))
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
# Subdivide grid into tiles for piecewise projective warping
ny, nx = lat2d.shape
TY = max(6, min(18, ny // 10))
TX = max(8, min(24, nx // 10))
yi = np.linspace(0, ny - 1, TY + 1).astype(int)
xi = np.linspace(0, nx - 1, TX + 1).astype(int)
tiles = []
for i in range(TY):
for j in range(TX):
i0, i1 = yi[i], yi[i+1]
j0, j1 = xi[j], xi[j+1]
tiles.append({
'tl': [float(lat2d[i0, j0]), float(lon2d[i0, j0])],
'tr': [float(lat2d[i0, j1]), float(lon2d[i0, j1])],
'br': [float(lat2d[i1, j1]), float(lon2d[i1, j1])],
'bl': [float(lat2d[i1, j0]), float(lon2d[i1, j0])],
'ti': int(i), 'tj': int(j)
})
tiles_json = json.dumps(tiles)
# Corner control points for full projective transform
lat_tl, lon_tl = float(lat2d[0, 0]), float(lon2d[0, 0])
lat_tr, lon_tr = float(lat2d[0, -1]), float(lon2d[0, -1])
lat_br, lon_br = float(lat2d[-1, -1]), float(lon2d[-1, -1])
lat_bl, lon_bl = float(lat2d[-1, 0]), float(lon2d[-1, 0])
# Prepare JS array of frame URLs
js_frames = "[" + ",".join([f"'{u}'" for u in frame_data_urls]) + "]"
interval_ms = max(50, int(1000.0 / max(0.5, float(fps))))
doc = f"""
"""
doc_escaped = doc.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """)
iframe = f""
return iframe
except Exception as e:
return f"
Leaflet frames overlay error: {str(e)}
"
def build_leaflet_overlay_html(gif_path: Optional[str], grid: Optional[Dict[str, Any]]):
"""Return HTML for a Leaflet map with the animated GIF overlaid as an image.
If gif_path is provided, it is embedded as a base64 data URL for portability.
"""
try:
if not gif_path or not os.path.exists(gif_path):
return "
No animation generated.
"
if not grid or 'lat2d' not in grid or 'lon2d' not in grid:
return "
No grid available for overlay bounds.
"
lat2d = grid['lat2d']
lon2d = grid['lon2d']
min_lat = float(np.nanmin(lat2d))
max_lat = float(np.nanmax(lat2d))
min_lon = float(np.nanmin(lon2d))
max_lon = float(np.nanmax(lon2d))
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
import base64
with open(gif_path, 'rb') as f:
gif_b64 = base64.b64encode(f.read()).decode('ascii')
data_url = f"data:image/gif;base64,{gif_b64}"
# Prefer Folium (self-contained HTML) and fallback to raw Leaflet
# Build a standalone HTML document and embed via iframe srcdoc to ensure scripts run
doc = f"""
"""
# Escape for srcdoc
doc_escaped = doc.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """)
iframe = f""
return iframe
except Exception as e:
return f"
Leaflet overlay error: {str(e)}
"
def create_weather_map_with_radar(param_type, forecast_hour, show_radar=False, detail_level=3, min_dbz=1.0):
"""Create weather map with optional radar forecast overlay"""
try:
# Map parameter names to GRIB codes
param_map = {
'temperature': 'TMP:2 m',
'humidity': 'RH:2 m',
'wind_speed': 'WIND:10 m',
'pressure': 'MSLMA:mean sea level',
'radar': 'REFC:entire atmosphere' # Composite reflectivity
}
fig = go.Figure()
# Always try to get main weather parameter (unless it's radar-only)
if param_type != 'radar':
grib_param = param_map.get(param_type, 'TMP:2 m')
print(f"Fetching {param_type} ({grib_param}) for +{forecast_hour}h")
# Fetch real weather data
ds = fetch_real_hrrr_data(grib_param, forecast_hour)
processed = process_hrrr_data(ds, max_points=400, param_type=param_type)
if processed is not None:
# Real HRRR weather data
print(f"Plotting {len(processed['values'])} weather data points")
# Choose colorscale based on parameter
if param_type == 'temperature':
colorscale = 'RdYlBu_r'
elif param_type == 'humidity':
colorscale = 'Blues'
elif param_type == 'pressure':
colorscale = 'RdBu_r'
else:
colorscale = 'Viridis'
fig.add_trace(go.Scattermapbox(
lat=processed['lats'],
lon=processed['lons'],
mode='markers',
marker=dict(
size=5,
color=processed['values'],
colorscale=colorscale,
showscale=True,
colorbar=dict(
title=f"{processed.get('long_name', param_type)} ({processed.get('units', '')})",
x=1.02,
len=0.8
),
opacity=0.7
),
text=[f"{v:.1f} {processed.get('units', '')}" for v in processed['values']],
hovertemplate='%{text}',
name="Weather Data"
))
# Add radar data if requested OR if radar is the main parameter
if show_radar or param_type == 'radar':
print(f"Fetching radar data (REFC) for +{forecast_hour}h")
# Fetch radar reflectivity forecast
radar_ds = fetch_real_hrrr_data('REFC:entire atmosphere', forecast_hour)
# Map detail_level (1-5) to target cell counts for performance/detail tradeoff
detail_to_cells = {1: 20000, 2: 40000, 3: 60000, 4: 90000, 5: 120000}
target_cells = detail_to_cells.get(int(detail_level) if detail_level is not None else 3, 60000)
# Use user-selected threshold to control speckle
radar_grid = process_hrrr_grid(radar_ds, target_cells=target_cells, param_type='radar', min_threshold=float(min_dbz) if min_dbz is not None else 0.5)
# Store latest grid globally for other components (animation overlays)
global LAST_RADAR_GRID
LAST_RADAR_GRID = radar_grid
if radar_grid is not None:
lat2d = radar_grid['lat2d']
lon2d = radar_grid['lon2d']
z2d = radar_grid['z2d']
print(f"Plotting radar grid: {z2d.shape[0]}x{z2d.shape[1]} cells")
radar_layer_added = False
# First choice: Contourmapbox if available in this Plotly version
if hasattr(go, 'Contourmapbox') and not radar_layer_added:
try:
fig.add_trace(go.Contourmapbox(
lat=lat2d,
lon=lon2d,
z=z2d,
colorscale=get_radar_colorscale(),
contours=dict(coloring='heatmap', showlines=False),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
zmin=0,
zmax=65,
hovertemplate='Radar: %{z:.1f} dBZ',
name="Radar Reflectivity"
))
radar_layer_added = True
except Exception as e:
print(f"Contourmapbox failed, trying raster fallback: {e}")
elif not hasattr(go, 'Contourmapbox'):
print("Contourmapbox not available in this Plotly version; trying raster fallback")
# Second choice: smooth raster image layer for fused appearance
if not radar_layer_added:
try:
image_added = add_radar_image_layer(fig, lat2d, lon2d, z2d, detail_level, param_type)
radar_layer_added = radar_layer_added or bool(image_added)
except Exception as e:
print(f"Image layer attempt failed: {e}")
# Third choice: Choroplethmapbox (solid polygons per cell)
if not radar_layer_added:
try:
# Limit polygon count based on detail level
detail_to_polys = {1: 4000, 2: 8000, 3: 12000, 4: 20000, 5: 30000}
max_polys = detail_to_polys.get(int(detail_level) if detail_level is not None else 3, 12000)
geojson = grid_to_geojson(lat2d, lon2d, z2d, max_polygons=max_polys)
if geojson and geojson.get('features'):
ids = [f["properties"]["id"] for f in geojson["features"]]
vals = [f["properties"]["value"] for f in geojson["features"]]
fig.add_trace(go.Choroplethmapbox(
geojson=geojson,
locations=ids,
z=vals,
featureidkey="properties.id",
colorscale=get_radar_colorscale(),
zmin=0,
zmax=65,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
marker_opacity=0.85,
marker_line_width=0,
hovertemplate='Radar: %{z:.1f} dBZ',
name="Radar Reflectivity"
))
radar_layer_added = True
else:
print("Choropleth fallback failed: empty geojson or no features")
except Exception as e:
print(f"Choropleth fallback failed: {e}")
# Final fallback: density layer (zoom-dependent appearance)
if not radar_layer_added:
radar_processed = process_hrrr_data(radar_ds, max_points=10000, param_type='radar')
if radar_processed is not None:
detail_to_radius = {1: 18, 2: 14, 3: 12, 4: 10, 5: 8}
radius = detail_to_radius.get(int(detail_level) if detail_level is not None else 3, 12)
fig.add_trace(go.Densitymapbox(
lat=radar_processed['lats'],
lon=radar_processed['lons'],
z=radar_processed['values'],
radius=radius,
colorscale=get_radar_colorscale(),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
opacity=0.85,
zmin=0.1,
zmax=65,
hovertemplate='Radar: %{z:.1f} dBZ',
name="Radar Forecast"
))
else:
print("No radar data available for this time")
# Add note about radar unavailability
if param_type == 'radar':
fig.add_annotation(
text="HRRR radar forecast temporarily unavailable Try a different forecast hour",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False,
font=dict(size=14)
)
# Set title based on what's displayed
if param_type == 'radar':
title = f"HRRR Radar Reflectivity Forecast (+{forecast_hour}h)"
elif show_radar:
title = f"HRRR {param_type.title()} + Radar Forecast (+{forecast_hour}h)"
else:
title = f"HRRR {param_type.title()} Forecast (+{forecast_hour}h)"
# If no data at all
if len(fig.data) == 0:
fig.add_annotation(
text="HRRR data temporarily unavailable Try different parameters or forecast hours",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False,
font=dict(size=16)
)
title = "HRRR Data - Loading"
fig.update_layout(
mapbox=dict(
style="open-street-map",
zoom=4,
center=dict(lat=39.5, lon=-98.5)
),
height=500,
title=title,
margin=dict(l=0, r=80, t=50, b=0)
)
return fig
except Exception as e:
print(f"Map creation error: {e}")
# Force cleanup
gc.collect()
# Return error figure
fig = go.Figure()
fig.add_annotation(
text=f"Error: {str(e)[:100]}",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False
)
fig.update_layout(height=400, title="Error Loading Data")
return fig
def update_display(location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast):
"""Simple stable update function - single map only"""
try:
# Force garbage collection
gc.collect()
print(f"\n=== UPDATE: {location}, +{forecast_hour}h, {parameter}, radar:{show_radar_overlay} ===")
# Create single weather map (no second map to avoid crashes)
weather_map = create_weather_map_with_radar(parameter, forecast_hour, show_radar_overlay, detail_level, min_dbz)
# Simple status
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
status = f"""
## HRRR Weather + Radar Forecasts
**Location:** {location}
**Current:** {current_time.strftime('%H:%M UTC')}
**Forecast:** {forecast_time.strftime('%H:%M UTC')} (+{forecast_hour}h)
**Parameter:** {parameter.title()}
**Radar Overlay:** {"Enabled" if show_radar_overlay else "Disabled"}
**Radar Detail:** {detail_level} (1=Fast, 5=Max)
**Min dBZ:** {min_dbz:.1f}
**Data Source:** {"REAL NOAA HRRR with simulated radar" if HERBIE_AVAILABLE else "HRRR Unavailable"}
**Radar Info:** HRRR includes simulated radar reflectivity forecasts (REFC) showing predicted precipitation intensity in dBZ scale (0.5-65+ dBZ range).
**Note:** Radar forecasts show where precipitation is predicted to develop, move, and intensify over the next 18 hours.
"""
# Optional animation and Leaflet overlay
gif_path = None
leaflet_html = ""
leaflet_static = ""
if animate_forecast:
try:
gif_path, _ = generate_radar_animation_gif(detail_level=int(detail_level), min_dbz=float(min_dbz))
global LAST_ANIMATION_PATH
LAST_ANIMATION_PATH = gif_path
except Exception as e:
print(f"Animation generation error (gif): {e}")
gif_path = None
# Build Leaflet overlay from transparent PNG frames for correct alpha blending
try:
frames, msg = generate_radar_animation_png_frames(detail_level=int(detail_level), min_dbz=float(min_dbz), fps=4.0)
if frames:
leaflet_html = build_leaflet_overlay_from_frames(frames, LAST_RADAR_GRID, fps=4.0)
else:
leaflet_html = f"