import gradio as gr
import numpy as np
import plotly.graph_objects as go
from datetime import datetime, timedelta
import warnings
import gc
import sys
import math
from typing import Optional, Dict, Any, List, Tuple
import os
import glob
# Globals used for building overlays without refetching
LAST_RADAR_GRID: Optional[Dict[str, Any]] = None
LAST_ANIMATION_PATH: Optional[str] = None
warnings.filterwarnings('ignore')
# Import weather libraries for REAL data
try:
from herbie import Herbie
import xarray as xr
HERBIE_AVAILABLE = True
print("HERBIE AVAILABLE - Will use real RAP data including radar")
except ImportError as e:
HERBIE_AVAILABLE = False
print(f"HERBIE NOT AVAILABLE: {e}")
# Try importing projection libraries for coordinate transformation
try:
import cartopy.crs as ccrs
import pyproj
PROJECTION_AVAILABLE = True
except ImportError:
PROJECTION_AVAILABLE = False
print("Projection libraries not available - using raw coordinates")
# Try importing KML/KMZ libraries
try:
import zipfile
import xml.etree.ElementTree as ET
KMZ_AVAILABLE = True
except ImportError:
KMZ_AVAILABLE = False
print("KMZ export not available")
def _try_nam_refc_data(param='REFC:entire atmosphere', fxx=6, return_src: bool = False):
"""Try to fetch REFC data from NAM model for North American coverage."""
try:
# Try recent times for NAM model
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f" Trying NAM data for: {date_str}, parameter: {param}")
# Try NAM with different products for broader coverage
for product in ['afwaca', 'conusnest.hiresf']: # Central America/Caribbean first, then CONUS
try:
H = Herbie(date_str, model='nam', product=product, fxx=fxx)
ds = H.xarray(param)
if ds is not None:
print(f" SUCCESS: Got NAM {product} data for {date_str}")
if return_src:
return (ds, {'date_str': date_str, 'model': 'nam', 'product': product})
return ds
except Exception as e:
print(f" NAM {product} failed: {e}")
continue
except Exception as e:
print(f" NAM attempt failed for {date_str}: {e}")
continue
print(" All NAM attempts failed")
# Try GFS as final fallback for global coverage including North America
print(" Trying GFS model for global REFC coverage...")
try:
for hours_back in [0, 6, 12, 18]:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f" Trying GFS data for: {date_str}, parameter: {param}")
# Try GFS with 0.25 degree resolution (highest available)
try:
H = Herbie(date_str, model='gfs', product='pgrb2.0p25', fxx=fxx)
ds = H.xarray(param)
if ds is not None:
print(f" SUCCESS: Got GFS data for {date_str}")
return (ds, {'date_str': date_str, 'model': 'gfs'}) if return_src else ds
except Exception as e:
print(f" GFS failed: {e}")
continue
except Exception as e:
print(f" GFS fetch error: {e}")
return None
except Exception as e:
print(f" NAM fetch error: {e}")
return None
def fetch_real_rap_data(param='TMP:2 m', fxx=6, return_src: bool = False):
"""Fetch actual RAP data from NOAA including forecasts.
Note: RAP model may not include REFC (composite reflectivity) parameter.
RAP is primarily focused on temperature, pressure, and wind fields.
If return_src is True, returns a tuple (ds, info) where info contains
metadata such as 'date_str' and possible 'file' path.
"""
if not HERBIE_AVAILABLE:
return (None, None) if return_src else None
try:
# For REFC parameter, try NAM model for North American coverage
if 'REFC' in param:
print(f"INFO: Trying NAM model for North American REFC coverage...")
nam_result = _try_nam_refc_data(param, fxx, return_src)
if nam_result:
return nam_result
print(f"NAM failed, falling back to RAP model...")
# Try recent times, working backwards
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f"Trying RAP data for: {date_str}, parameter: {param}")
# Create Herbie object - RAP uses 'sfc' product like HRRR
H = Herbie(date_str, model='rap', product='sfc', fxx=fxx)
# Debug: Check if RAP data is available for this time
try:
# Test basic availability first
print(f" Testing RAP availability: {H}")
if hasattr(H, 'grib'):
print(f" GRIB source: {H.grib}")
# Download specific parameter
ds = H.xarray(param)
except Exception as e:
print(f" RAP xarray error: {e}")
continue
if ds is not None:
print(f"SUCCESS: Got real RAP data for {date_str}")
if return_src:
# Try to discover the source grib path from encodings or Herbie
src_path = None
try:
src_path = ds.encoding.get('source', None)
except Exception:
pass
if not src_path:
try:
# Try variable encodings
for vname in ds.data_vars:
enc = getattr(ds[vname], 'encoding', {})
src_path = enc.get('source', None)
if src_path:
break
except Exception:
pass
# Fallback: ask Herbie for local file path (best effort)
if not src_path:
for attr in ('get_localFilePath', 'get_local_file_path', 'local_file', 'fpath', 'filepath'):
if hasattr(H, attr):
try:
val = getattr(H, attr)
src_path = val() if callable(val) else val
if src_path:
break
except Exception:
continue
info = {
'date_str': date_str,
'param': param,
'fxx': fxx,
'file': src_path
}
return ds, info
else:
return ds
except Exception as e:
print(f"Failed for {date_str}: {e}")
continue
print("All RAP attempts failed")
return (None, None) if return_src else None
except Exception as e:
print(f"RAP fetch error: {e}")
return (None, None) if return_src else None
def get_rap_projection():
"""Get the RAP Lambert Conformal Conic projection parameters.
Official NOAA RAP CONUS domain specifications from GRIB2 metadata:
- Grid: 1799 x 1059 mass points, 3km resolution (DxInMetres: 3000.0, DyInMetres: 3000.0)
- Lambert Conformal GRIB2 Template 30
- LaDInDegrees: 38.5, Latin1InDegrees: 38.5, Latin2InDegrees: 38.5
- LoVInDegrees: 262.5 (orientation longitude, meridian aligned with Y-axis)
- Earth model: Sphere radius 6371229 meters
"""
return {
'proj': 'lcc',
'lat_1': 38.5, # Latin1 - first standard parallel
'lat_2': 38.5, # Latin2 - second standard parallel (tangent cone)
'lat_0': 38.5, # LaD - latitude where grid lengths are specified
'lon_0': 262.5, # LoV - orientation longitude (meridian aligned with Y-axis)
'x_0': 0, # False easting
'y_0': 0, # False northing
'a': 6371229, # Earth sphere radius in meters
'b': 6371229, # Earth sphere radius (same for sphere)
'units': 'm'
}
def validate_rap_coordinates(ds):
"""Validate and potentially correct RAP/NAM coordinate arrays."""
if ds is None:
return None
try:
# Check if we have proper 2D coordinate arrays
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lat2d = ds.latitude.values
lon2d_raw = ds.longitude.values
# Apply longitude correction for 0-360° to -180 to 180° conversion
lon2d = np.where(lon2d_raw > 180, lon2d_raw - 360, lon2d_raw)
# Validate that coordinates are reasonable for North American domain
if lat2d.ndim == 2 and lon2d.ndim == 2:
lat_min, lat_max = np.nanmin(lat2d), np.nanmax(lat2d)
lon_min, lon_max = np.nanmin(lon2d), np.nanmax(lon2d)
# Check for RAP CONUS domain (HRRR-like coverage)
rap_lat_valid = (20.8 <= lat_min <= 21.5) and (47.5 <= lat_max <= 48.2)
rap_lon_valid = (-135.0 <= lon_min <= -133.0) and (-61.5 <= lon_max <= -60.0)
# Check for NAM North American domain (broader coverage including Canada/Mexico)
nam_lat_valid = (20.0 <= lat_min <= 22.0) and (52.0 <= lat_max <= 54.0)
nam_lon_valid = (-140.0 <= lon_min <= -130.0) and (-65.0 <= lon_max <= -55.0)
if rap_lat_valid and rap_lon_valid:
print(f"✓ RAP CONUS coordinates validated: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': True, 'model': 'RAP'}
elif nam_lat_valid and nam_lon_valid:
print(f"✓ NAM North American coordinates validated: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': True, 'model': 'NAM'}
else:
# Still usable coordinates, just warn about potential issues
print(f"Warning: Coordinates outside expected domains: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
print(f"RAP expected: lat [20.8-21.5, 47.5-48.2], lon [-135.0 to -133.0, -61.5 to -60.0]")
print(f"NAM expected: lat [20.0-22.0, 52.0-54.0], lon [-140.0 to -130.0, -65.0 to -55.0]")
print("Warning: Using potentially non-standard coordinates")
return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': False, 'model': 'Unknown'}
return None
except Exception as e:
print(f"Coordinate validation error: {e}")
return None
def process_rap_data(ds, max_points=400, param_type='temperature'):
"""Process RAP xarray dataset into plot-ready data"""
if ds is None:
return None
try:
# Get the main data variable
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Get coordinates
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lats = ds.latitude.values
lons = ds.longitude.values
values = data_var.values
elif 'lat' in ds.coords and 'lon' in ds.coords:
lats = ds.lat.values
lons = ds.lon.values
values = data_var.values
else:
return None
# For radar, get MAXIMUM resolution - much more data points
if param_type == 'radar':
max_points = 10000 # Much larger for full radar coverage
min_threshold = 0.1 # Even lower threshold for light precipitation
else:
min_threshold = None
# Less aggressive subsampling for radar to keep more detail
if lats.size > max_points:
if param_type == 'radar':
# For radar, use smaller step to keep more data
step = max(1, int(np.sqrt(lats.size / max_points) * 0.7))
else:
step = max(1, int(np.sqrt(lats.size / max_points)))
if len(lats.shape) == 2:
lats = lats[::step, ::step]
lons = lons[::step, ::step]
values = values[::step, ::step]
else:
lats = lats[::step]
lons = lons[::step]
values = values[::step]
# Flatten arrays
lats_flat = lats.flatten()
lons_flat = lons.flatten()
values_flat = values.flatten()
# Remove invalid values
valid = ~(np.isnan(values_flat) | np.isnan(lats_flat) | np.isnan(lons_flat))
# For radar, use minimal filtering to show maximum coverage
if param_type == 'radar' and min_threshold is not None:
radar_threshold = values_flat > min_threshold
valid = valid & radar_threshold
if not np.any(valid):
return None
return {
'lats': lats_flat[valid],
'lons': lons_flat[valid],
'values': values_flat[valid],
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Data processing error: {e}")
return None
def get_radar_colorscale():
"""Get proper radar reflectivity colorscale in dBZ"""
return [
[0.0, 'rgba(0,0,0,0)'], # Transparent for no echo
[0.1, '#00ECEC'], # Light blue - 5-10 dBZ
[0.2, '#01A0F6'], # Blue - 10-15 dBZ
[0.3, '#0000F6'], # Dark blue - 15-20 dBZ
[0.4, '#00FF00'], # Green - 20-25 dBZ
[0.5, '#00C800'], # Dark green - 25-30 dBZ
[0.6, '#FFFF00'], # Yellow - 30-35 dBZ
[0.7, '#E7C000'], # Orange-yellow - 35-40 dBZ
[0.8, '#FF9000'], # Orange - 40-45 dBZ
[0.9, '#FF0000'], # Red - 45-50 dBZ
[1.0, '#D60000'] # Dark red - 50+ dBZ
]
def apply_rap_coordinate_correction(lat2d, lon2d):
"""Apply necessary coordinate corrections for RAP/NAM data alignment."""
try:
print(f"Input coordinate shapes: lat {lat2d.shape}, lon {lon2d.shape}")
# Handle different grid structures - NAM products may have different formats
if lat2d.shape != lon2d.shape:
print(f"Warning: Coordinate arrays have different shapes - attempting to fix")
# If one is 1D and the other is 2D, or different 2D shapes, create meshgrid
if lat2d.ndim == 1 and lon2d.ndim == 1:
lon2d, lat2d = np.meshgrid(lon2d, lat2d)
print(f"Created meshgrid: lat {lat2d.shape}, lon {lon2d.shape}")
elif lat2d.ndim == 1:
# Lat is 1D, lon might be 2D - make compatible
lat2d = np.broadcast_to(lat2d[:, np.newaxis], lon2d.shape)
print(f"Broadcasted lat to match lon: {lat2d.shape}")
elif lon2d.ndim == 1:
# Lon is 1D, lat might be 2D - make compatible
lon2d = np.broadcast_to(lon2d[np.newaxis, :], lat2d.shape)
print(f"Broadcasted lon to match lat: {lon2d.shape}")
else:
print(f"Cannot reconcile coordinate shapes: lat {lat2d.shape}, lon {lon2d.shape}")
return lat2d, lon2d, np.ones_like(lat2d, dtype=bool)
# For radar visualization, ensure longitude is in -180 to 180 range
# RAP/NAM often provide longitudes in 0-360° format, convert to -180 to 180°
lon2d_corrected = np.where(lon2d > 180, lon2d - 360, lon2d)
# Remove any obvious outliers or invalid coordinates after correction
valid_mask = (~np.isnan(lat2d) & ~np.isnan(lon2d_corrected) &
(lat2d >= -90) & (lat2d <= 90) &
(lon2d_corrected >= -180) & (lon2d_corrected <= 180))
# Apply RAP-specific coordinate corrections based on official specifications
# RAP uses Lambert Conformal Conic projection with specific parameters
# The coordinates should already be properly projected, but we can verify bounds
# Validate against known RAP domain boundaries
lat_min, lat_max = np.nanmin(lat2d), np.nanmax(lat2d)
lon_min, lon_max = np.nanmin(lon2d_corrected), np.nanmax(lon2d_corrected)
# Check if coordinates fall within known model domains
# RAP CONUS: lat 21.14° to 47.84°N, lon -134.10° to -60.90°W
# NAM North America: broader coverage including Canada and Mexico
rap_domain = (20.8 <= lat_min <= 21.5 and 47.5 <= lat_max <= 48.2 and
-135.0 <= lon_min <= -133.0 and -61.5 <= lon_max <= -60.0)
nam_domain = (20.0 <= lat_min <= 22.0 and 52.0 <= lat_max <= 54.0 and
-140.0 <= lon_min <= -130.0 and -65.0 <= lon_max <= -55.0)
if rap_domain:
print(f"✓ Coordinates match RAP CONUS domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
elif nam_domain:
print(f"✓ Coordinates match NAM North American domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
else:
print(f"Info: Using non-standard coordinate domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
return lat2d, lon2d_corrected, valid_mask
except Exception as e:
print(f"Coordinate correction error: {e}")
return lat2d, lon2d, np.ones_like(lat2d, dtype=bool)
def process_rap_grid(ds, target_cells=50000, param_type='radar', min_threshold=0.1):
"""Return RAP data as 2D grids (lat2d, lon2d, z2d) suitable for filled contours.
- target_cells: approximate max number of grid cells to draw for performance
- min_threshold: values below are masked as NaN (for radar transparency)
"""
if ds is None:
return None
try:
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Prefer explicit 2D latitude/longitude if available
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lat2d_raw = ds.latitude.values
lon2d_raw = ds.longitude.values
# Apply coordinate validation and correction
lat2d, lon2d, valid_mask = apply_rap_coordinate_correction(lat2d_raw, lon2d_raw)
# Store validation info for debugging
coord_validation = validate_rap_coordinates(ds)
if coord_validation and not coord_validation['valid']:
model_type = coord_validation.get('model', 'Unknown')
print(f"Warning: Using potentially non-standard {model_type} coordinates")
elif 'lat' in ds.coords and 'lon' in ds.coords:
# Some datasets provide 1D lat/lon; try to construct 2D mesh
lat = ds.lat.values
lon = ds.lon.values
if lat.ndim == 1 and lon.ndim == 1:
lon2d, lat2d = np.meshgrid(lon, lat)
else:
lat2d = lat
lon2d = lon
else:
return None
z = data_var.values
# Ensure z is 2D (squeeze time/levels if any)
z = np.squeeze(z)
if z.ndim != 2:
# Cannot contour non-2D
return None
# Subsample to keep performance predictable
ny, nx = z.shape
total = nx * ny
if total > target_cells:
step = int(np.ceil(np.sqrt(total / target_cells)))
step = max(1, step)
z = z[::step, ::step]
# Only subsample coordinates if they're 2D and match z dimensions
if lat2d.ndim == 2 and lat2d.shape == (ny, nx):
lat2d = lat2d[::step, ::step]
elif lat2d.ndim == 1 and len(lat2d) == ny:
lat2d = lat2d[::step]
if lon2d.ndim == 2 and lon2d.shape == (ny, nx):
lon2d = lon2d[::step, ::step]
elif lon2d.ndim == 1 and len(lon2d) == nx:
lon2d = lon2d[::step]
# Mask values below threshold for radar
if param_type == 'radar' and min_threshold is not None:
z = np.where(z >= min_threshold, z, np.nan)
return {
'lat2d': lat2d,
'lon2d': lon2d,
'z2d': z,
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Grid processing error: {e}")
return None
def _clamp(val, vmin, vmax):
return max(vmin, min(val, vmax))
def grid_to_geojson(lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray,
max_polygons: Optional[int] = None,
nan_as_transparent: bool = True) -> Optional[Dict[str, Any]]:
"""Convert a lat/lon curvilinear grid into a GeoJSON FeatureCollection of cell polygons.
- Each cell is a quadrilateral around the center (i,j) using neighboring points.
- Values that are NaN are skipped when nan_as_transparent is True.
- max_polygons optionally caps the number of cells included (row/col stride).
"""
try:
ny, nx = z2d.shape
if ny < 2 or nx < 2:
return None
# Determine stride to cap polygons if needed
istep = jstep = 1
total_cells = ny * nx
if max_polygons and total_cells > max_polygons:
factor = math.sqrt(total_cells / max_polygons)
istep = max(1, int(round(factor)))
jstep = istep
features = []
# Helper for safe index
def lat_(i, j):
ii = _clamp(i, 0, ny - 1)
jj = _clamp(j, 0, nx - 1)
return float(lat2d[ii, jj])
def lon_(i, j):
ii = _clamp(i, 0, ny - 1)
jj = _clamp(j, 0, nx - 1)
return float(lon2d[ii, jj])
# Build polygons
for i in range(0, ny, istep):
for j in range(0, nx, jstep):
val = z2d[i, j]
if nan_as_transparent and (val is None or np.isnan(val)):
continue
# Corners as average of 4 surrounding centers (clamped at edges)
# Top-left around (i-0.5, j-0.5)
lat_tl = (lat_(i, j) + lat_(i-1, j) + lat_(i, j-1) + lat_(i-1, j-1)) / 4.0
lon_tl = (lon_(i, j) + lon_(i-1, j) + lon_(i, j-1) + lon_(i-1, j-1)) / 4.0
# Top-right around (i-0.5, j+0.5)
lat_tr = (lat_(i, j) + lat_(i-1, j) + lat_(i, j+1) + lat_(i-1, j+1)) / 4.0
lon_tr = (lon_(i, j) + lon_(i-1, j) + lon_(i, j+1) + lon_(i-1, j+1)) / 4.0
# Bottom-right around (i+0.5, j+0.5)
lat_br = (lat_(i, j) + lat_(i+1, j) + lat_(i, j+1) + lat_(i+1, j+1)) / 4.0
lon_br = (lon_(i, j) + lon_(i+1, j) + lon_(i, j+1) + lon_(i+1, j+1)) / 4.0
# Bottom-left around (i+0.5, j-0.5)
lat_bl = (lat_(i, j) + lat_(i+1, j) + lat_(i, j-1) + lat_(i+1, j-1)) / 4.0
lon_bl = (lon_(i, j) + lon_(i+1, j) + lon_(i, j-1) + lon_(i+1, j-1)) / 4.0
poly = [
[lon_tl, lat_tl],
[lon_tr, lat_tr],
[lon_br, lat_br],
[lon_bl, lat_bl],
[lon_tl, lat_tl]
]
fid = f"{i}-{j}"
feat = {
"type": "Feature",
"id": fid,
"properties": {"id": fid, "value": None if np.isnan(val) else float(val)},
"geometry": {"type": "Polygon", "coordinates": [poly]}
}
features.append(feat)
return {"type": "FeatureCollection", "features": features}
except Exception as e:
print(f"GeoJSON build error: {e}")
return None
def _parse_plotly_color(color_str: str) -> Tuple[float, float, float, float]:
"""Convert '#RRGGBB' or 'rgba(r,g,b,a)' to normalized RGBA tuple."""
color_str = color_str.strip()
if color_str.startswith('#'):
r = int(color_str[1:3], 16) / 255.0
g = int(color_str[3:5], 16) / 255.0
b = int(color_str[5:7], 16) / 255.0
a = 1.0
return (r, g, b, a)
if color_str.startswith('rgba'):
nums = color_str[color_str.find('(')+1:color_str.find(')')].split(',')
r = int(nums[0]) / 255.0
g = int(nums[1]) / 255.0
b = int(nums[2]) / 255.0
a = float(nums[3])
return (r, g, b, a)
raise ValueError(f"Unsupported color: {color_str}")
def build_mpl_colormap(colorscale: List[List[float]], name: str = 'radar'):
"""Build a Matplotlib colormap from a Plotly colorscale definition."""
try:
import matplotlib.colors as mcolors
stops = [(float(p), _parse_plotly_color(c)) for p, c in colorscale]
# mcolors.LinearSegmentedColormap.from_list accepts (x, color) pairs
cmap = mcolors.LinearSegmentedColormap.from_list(name, stops)
# Ensure NaNs are transparent
cmap.set_bad((0, 0, 0, 0))
return cmap
except Exception as e:
print(f"Colormap build error: {e}")
return None
def add_radar_image_layer(fig: go.Figure, lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray,
detail_level: int, param_type: str) -> bool:
"""Render radar as a smooth raster image and overlay via mapbox image layer.
Returns True on success.
"""
try:
import io, base64
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
# Determine output image size based on detail level and grid size
ny, nx = z2d.shape
scale_map = {1: 1.2, 2: 1.6, 3: 2.0, 4: 3.0, 5: 4.0}
scale = scale_map.get(int(detail_level) if detail_level is not None else 3, 2.0)
max_pixels = 2_400_000 # cap to ~2.4 MP for performance
width = int(nx * scale)
height = int(ny * scale)
# Fit within cap preserving aspect
if width * height > max_pixels:
ratio = math.sqrt(max_pixels / (width * height))
width = max(64, int(width * ratio))
height = max(64, int(height * ratio))
# Prepare data (mask NaNs for transparency)
zmask = np.ma.masked_invalid(z2d)
# Check if we need to flip the data to match geographic orientation
ny, nx = lat2d.shape
lat_top = float(lat2d[0, nx//2]) # Middle of top row
lat_bottom = float(lat2d[-1, nx//2]) # Middle of bottom row
# For proper geographic alignment, image top should correspond to highest latitude
# With origin='upper', array[0] should have higher latitudes than array[-1]
if lat_top < lat_bottom:
# Data is ordered south-to-north, need to flip for north-to-south display
zmask = np.flipud(zmask)
print(f"⚠ Flipping radar data vertically: array has S-to-N order ({lat_top:.2f}° to {lat_bottom:.2f}°)")
else:
print(f"✓ Radar data orientation OK: array has N-to-S order ({lat_top:.2f}° to {lat_bottom:.2f}°)")
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
return False
dpi = 100
fig_img = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
ax = fig_img.add_axes([0, 0, 1, 1]) # full-bleed
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_img.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_img)
img_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
data_url = f"data:image/png;base64,{img_b64}"
# Corner coordinates for RAP Lambert Conformal grid
# Account for potential coordinate distortion in curvilinear grid
ny, nx = lat2d.shape
# RAP uses a curvilinear Lambert Conformal grid - corners may not be at exact array indices
# Find actual geographic bounds rather than assuming corner positions
# Get geographic bounds for proper alignment
lat_min, lat_max = float(np.nanmin(lat2d)), float(np.nanmax(lat2d))
lon_min, lon_max = float(np.nanmin(lon2d)), float(np.nanmax(lon2d))
print(f"RAP geographic bounds: lat [{lat_min:.3f}, {lat_max:.3f}], lon [{lon_min:.3f}, {lon_max:.3f}]")
# For Lambert Conformal grids, use the actual geographic bounds as corners
# rather than relying on specific array indices which may not represent true corners
# This approach works better with curvilinear grids
# Define corners based on geographic bounds (standard GeoJSON/Mapbox order: [lon, lat])
tl = [lon_min, lat_max] # Top-left: western edge, northern edge
tr = [lon_max, lat_max] # Top-right: eastern edge, northern edge
br = [lon_max, lat_min] # Bottom-right: eastern edge, southern edge
bl = [lon_min, lat_min] # Bottom-left: western edge, southern edge
# Validate orientation is consistent with image
lat_top = float(lat2d[0, nx//2]) # Middle of top row in data array
lat_bottom = float(lat2d[-1, nx//2]) # Middle of bottom row in data array
if lat_top < lat_bottom:
print(f"⚠ Data array has inverted latitude order: array[0]={lat_top:.2f}° < array[-1]={lat_bottom:.2f}°")
else:
print(f"✓ Data array latitude order: array[0]={lat_top:.2f}° > array[-1]={lat_bottom:.2f}°")
# Log corner coordinates for model validation
print(f"Grid corners: TL({tl[1]:.3f},{tl[0]:.3f}) TR({tr[1]:.3f},{tr[0]:.3f}) BR({br[1]:.3f},{br[0]:.3f}) BL({bl[1]:.3f},{bl[0]:.3f})")
# Check if coordinates match known model domains
# RAP CONUS: SW(21.14°N,122.72°W), NW(47.84°N,134.10°W), NE(47.84°N,60.90°W), SE(21.14°N,72.28°W)
# NAM has broader North American coverage extending into Canada and Mexico
lat_range = max(tl[1], tr[1]) - min(bl[1], br[1])
lon_range = max(tr[0], br[0]) - min(tl[0], bl[0])
if lat_range < 30: # Likely RAP CONUS domain
print("✓ Grid appears to be RAP CONUS domain")
elif lat_range > 30: # Likely NAM North American domain
print("✓ Grid appears to be NAM North American domain")
else:
print("? Grid domain classification unclear")
layers = list(fig.layout.mapbox.layers) if fig.layout.mapbox.layers is not None else []
layers.append(dict(
sourcetype='image',
source=data_url,
coordinates=[tl, tr, br, bl],
opacity=1.0,
below='traces',
name='Radar Raster'
))
fig.update_layout(mapbox_layers=layers)
# Add invisible scatter to provide colorbar for the image
try:
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
fig.add_trace(go.Scattermapbox(
lat=[c_lat, c_lat],
lon=[c_lon, c_lon],
mode='markers',
marker=dict(
size=1,
color=[0, 65],
colorscale=get_radar_colorscale(),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
opacity=0 # invisible points
),
hoverinfo='skip',
name='Radar Scale'
))
except Exception as e:
print(f"Colorbar marker add failed: {e}")
return True
except Exception as e:
print(f"Image layer error: {e}")
return False
def _locate_or_download_grib(forecast_hour: int):
"""Return local GRIB2 path for RAP REFC at fxx, downloading if needed."""
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
try:
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [0, 1, 2, 3, 6, 12, 18, 24]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
H = Herbie(date_str, model='rap', product='sfc', fxx=int(forecast_hour))
# Ensure local file
local = None
try:
local = H.get_localFilePath()
except Exception:
local = None
if not local:
files = None
try:
files = H.download()
except Exception:
files = None
if isinstance(files, (list, tuple)) and files:
local = files[0]
if not local and hasattr(H, 'fpath'):
local = H.fpath
if local and os.path.exists(str(local)):
return str(local), None
# As a fallback, search the expected directory for subset GRIB2 files
# Herbie typically stores under ~/data/rap/YYYYMMDD
try:
day_dir = os.path.expanduser(os.path.join('~', 'data', 'rap', target_time.strftime('%Y%m%d')))
if os.path.isdir(day_dir):
pattern1 = os.path.join(day_dir, f"*wrfsfcf{int(forecast_hour):02d}.grib2")
pattern2 = os.path.join(day_dir, f"**/*f{int(forecast_hour):02d}*.grib2")
candidates = sorted(glob.glob(pattern1)) + sorted(glob.glob(pattern2, recursive=True))
if candidates:
return candidates[0], None
except Exception as se:
print(f"subset search failed: {se}")
except Exception as e:
print(f"locate/download attempt failed: {e}")
continue
# Global fallback: scan entire cache tree (could be slow but last resort)
try:
root = os.path.expanduser(os.path.join('~', 'data', 'rap'))
if os.path.isdir(root):
pat = os.path.join(root, f"**/*f{int(forecast_hour):02d}*.grib2")
cand = glob.glob(pat, recursive=True)
if cand:
return sorted(cand)[0], None
except Exception as e2:
print(f"global scan failed: {e2}")
return None, "Unable to locate/download GRIB file"
except Exception as e:
return None, f"Locate/download error: {e}"
def export_radar_grib(forecast_hour: int, min_dbz: float):
"""Export the RAP radar (REFC) field to a GRIB2 file with values below min_dbz set to missing.
Returns (path, message). If path is None, message contains error.
"""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available to fetch RAP data."
# Fetch dataset and try to learn source path and date used
ds, info = fetch_real_rap_data('REFC:entire atmosphere', int(forecast_hour), return_src=True)
if ds is None:
return None, "Unable to fetch RAP radar data for export."
var_names = list(ds.data_vars)
if not var_names:
return None, "Dataset missing variables."
vname = var_names[0]
z = np.squeeze(ds[vname].values)
if z.ndim != 2:
return None, "Unexpected radar array shape."
# Apply threshold
thr = float(min_dbz) if min_dbz is not None else 1.0
z = np.where(z >= thr, z.astype(float), np.nan)
# Determine or download source GRIB path
src = None
if isinstance(info, dict) and info.get('file') and os.path.exists(info['file']):
src = info['file']
if not src:
src, err = _locate_or_download_grib(int(forecast_hour))
if not src:
return None, err or "Could not obtain source GRIB file"
from eccodes import codes_grib_new_from_file, codes_get, codes_set, codes_set_values, codes_write, codes_release
# Iterate file to find the composite reflectivity message
handle = None
with open(src, 'rb') as f:
while True:
try:
gid = codes_grib_new_from_file(f)
except Exception:
gid = None
if gid is None:
break
try:
shortName = None
try:
shortName = codes_get(gid, 'shortName')
except Exception:
shortName = None
name = None
try:
name = codes_get(gid, 'name')
except Exception:
name = None
# Identify composite reflectivity
ok = False
if shortName and str(shortName).lower() in ('refc', 'refd', 'refl', 'ref'): # be lenient
ok = True
if (not ok) and name and 'reflect' in str(name).lower():
ok = True
if ok and handle is None:
handle = gid
break
else:
codes_release(gid)
except Exception:
try:
codes_release(gid)
except Exception:
pass
if handle is None:
return None, "Composite reflectivity message not found in GRIB file."
# Ensure bitmap for missing values
try:
codes_set(handle, 'bitmapPresent', 1)
except Exception:
pass
# Flatten in scan order (assuming row-major)
vals = z.flatten().astype(float)
codes_set_values(handle, vals)
os.makedirs('exports', exist_ok=True)
date_tag = info.get('date_str', 'unknown').replace(':', '').replace(' ', 'T') if isinstance(info, dict) else 'unknown'
out_path = os.path.join('exports', f"rap_radar_reflectivity_{date_tag}_f{int(forecast_hour):02d}_mindbz{thr:.1f}.grib2")
with open(out_path, 'wb') as fo:
codes_write(handle, fo)
try:
codes_release(handle)
except Exception:
pass
return out_path, None
except Exception as e:
return None, f"Export error: {e}"
def download_raw_grib(forecast_hour: int):
"""Return a copy-path under ./exports for the raw RAP GRIB2 file used for REFC at the given forecast hour."""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
# Try immediate locate/download via Herbie
src_file, err = _locate_or_download_grib(int(forecast_hour))
if not src_file:
return None, err
try:
import shutil
os.makedirs('exports', exist_ok=True)
base = os.path.basename(str(src_file))
dest = os.path.join('exports', f"raw_{base}")
shutil.copy2(src_file, dest)
return dest, None
except Exception as e:
return None, f"Copy error: {e}"
# Fallback: attempt direct Herbie path
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
H = Herbie(date_str, model='rap', product='sfc', fxx=int(forecast_hour))
# This triggers download if not present
local = H.get_localFilePath() if hasattr(H, 'get_localFilePath') else None
if not local and hasattr(H, 'download'):
files = H.download()
if isinstance(files, (list, tuple)) and files:
local = files[0]
if not local and hasattr(H, 'fpath'):
local = H.fpath
# Fallback handled above
except Exception:
continue
return None, "Unable to locate/download raw GRIB file"
except Exception as e:
return None, f"Raw download error: {e}"
def export_rap_to_kmz(forecast_hour: int, min_dbz: float = 0.0):
"""Export RAP radar data to KMZ format for use in mapping applications.
Returns (path, message). If path is None, message contains error.
"""
try:
if not KMZ_AVAILABLE:
return None, "KMZ export libraries not available"
# Fetch RAP radar data
ds = fetch_real_rap_data('REFC:entire atmosphere', int(forecast_hour))
if ds is None:
return None, "Unable to fetch RAP radar data for KMZ export"
# Process the grid data
radar_grid = process_rap_grid(ds, target_cells=50000, param_type='radar', min_threshold=float(min_dbz))
if radar_grid is None:
return None, "Unable to process RAP radar grid for KMZ export"
lat2d = radar_grid['lat2d']
lon2d = radar_grid['lon2d']
z2d = radar_grid['z2d']
# Create KML content
kml_content = create_radar_kml(lat2d, lon2d, z2d, forecast_hour, min_dbz)
# Create KMZ file (zipped KML)
os.makedirs('exports', exist_ok=True)
kmz_path = f"exports/rap_radar_f{int(forecast_hour):02d}_mindbz{min_dbz:.1f}.kmz"
with zipfile.ZipFile(kmz_path, 'w', zipfile.ZIP_DEFLATED) as kmz:
kmz.writestr('doc.kml', kml_content)
return kmz_path, None
except Exception as e:
return None, f"KMZ export error: {e}"
def create_radar_kml(lat2d, lon2d, z2d, forecast_hour, min_dbz):
"""Create KML content for RAP radar data."""
try:
# Create KML structure
kml = ET.Element('kml', xmlns="http://www.opengis.net/kml/2.2")
document = ET.SubElement(kml, 'Document')
# Add document info
name = ET.SubElement(document, 'name')
name.text = f"RAP Radar Forecast +{forecast_hour}h (min {min_dbz} dBZ)"
description = ET.SubElement(document, 'description')
description.text = f"RAP Composite Reflectivity forecast for +{forecast_hour} hours, minimum {min_dbz} dBZ threshold"
# Add styles for different reflectivity ranges
styles = [
(5, 10, '#00ECEC', 'Light precipitation'),
(10, 15, '#01A0F6', 'Light-moderate precipitation'),
(15, 20, '#0000F6', 'Moderate precipitation'),
(20, 25, '#00FF00', 'Moderate-heavy precipitation'),
(25, 30, '#00C800', 'Heavy precipitation'),
(30, 35, '#FFFF00', 'Very heavy precipitation'),
(35, 40, '#E7C000', 'Intense precipitation'),
(40, 45, '#FF9000', 'Very intense precipitation'),
(45, 50, '#FF0000', 'Extreme precipitation'),
(50, 65, '#D60000', 'Severe precipitation')
]
for i, (min_val, max_val, color, desc) in enumerate(styles):
style = ET.SubElement(document, 'Style', id=f"radar{i}")
poly_style = ET.SubElement(style, 'PolyStyle')
color_elem = ET.SubElement(poly_style, 'color')
# Convert hex to KML ABGR format (80% opacity)
hex_color = color.lstrip('#')
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
color_elem.text = f"CC{b:02X}{g:02X}{r:02X}" # ABGR format with CC for ~80% opacity
# Add ground overlay for the radar image
ground_overlay = ET.SubElement(document, 'GroundOverlay')
overlay_name = ET.SubElement(ground_overlay, 'name')
overlay_name.text = f"RAP Radar Grid"
# Create boundaries
lat_box = ET.SubElement(ground_overlay, 'LatLonBox')
north = ET.SubElement(lat_box, 'north')
south = ET.SubElement(lat_box, 'south')
east = ET.SubElement(lat_box, 'east')
west = ET.SubElement(lat_box, 'west')
north.text = str(float(np.nanmax(lat2d)))
south.text = str(float(np.nanmin(lat2d)))
east.text = str(float(np.nanmax(lon2d)))
west.text = str(float(np.nanmin(lon2d)))
# Add sample polygons for areas with significant reflectivity
ny, nx = z2d.shape
step = max(1, min(ny, nx) // 50) # Sample grid for polygon creation
for i in range(0, ny - step, step):
for j in range(0, nx - step, step):
# Get average value for this grid cell
cell_values = z2d[i:i+step, j:j+step]
avg_value = np.nanmean(cell_values)
if np.isnan(avg_value) or avg_value < min_dbz:
continue
# Create polygon for this cell
placemark = ET.SubElement(document, 'Placemark')
pm_name = ET.SubElement(placemark, 'name')
pm_name.text = f"{avg_value:.1f} dBZ"
pm_desc = ET.SubElement(placemark, 'description')
pm_desc.text = f"Radar reflectivity: {avg_value:.1f} dBZ"
# Assign style based on value
style_id = min(len(styles) - 1, max(0, int((avg_value - 5) / 5)))
style_url = ET.SubElement(placemark, 'styleUrl')
style_url.text = f"#radar{style_id}"
# Create polygon coordinates
polygon = ET.SubElement(placemark, 'Polygon')
outer_ring = ET.SubElement(polygon, 'outerBoundaryIs')
linear_ring = ET.SubElement(outer_ring, 'LinearRing')
coordinates = ET.SubElement(linear_ring, 'coordinates')
# Get corner coordinates for this cell
coords = []
coords.append(f"{lon2d[i, j]},{lat2d[i, j]},0")
coords.append(f"{lon2d[i, min(j+step, nx-1)]},{lat2d[i, min(j+step, nx-1)]},0")
coords.append(f"{lon2d[min(i+step, ny-1), min(j+step, nx-1)]},{lat2d[min(i+step, ny-1), min(j+step, nx-1)]},0")
coords.append(f"{lon2d[min(i+step, ny-1), j]},{lat2d[min(i+step, ny-1), j]},0")
coords.append(f"{lon2d[i, j]},{lat2d[i, j]},0") # Close polygon
coordinates.text = " ".join(coords)
# Convert to string
rough_string = ET.tostring(kml, 'unicode')
return rough_string
except Exception as e:
print(f"KML creation error: {e}")
return f"""
Error creating comparison map: {str(e)}
Real NOAA NAM/RAP data with Plotly vs Leaflet alignment comparison
Compare our radar overlay alignment with these official NOAA RAP visualizations:
💡 Tip: Use the same forecast time and look for matching radar patterns, storm positions, and geographic alignment with cities/coastlines.
New: Side-by-side Plotly vs Leaflet comparison maps to validate alignment.
Improvement: Radar data uses validated RAP coordinates with Lambert Conformal projection.
Export: KMZ format for Google Earth and professional GIS applications.