rapviz / app.py
nakas's picture
fix(coordinates): handle NAM afwaca grid structure differences
ff18fc8
import gradio as gr
import numpy as np
import plotly.graph_objects as go
from datetime import datetime, timedelta
import warnings
import gc
import sys
import math
from typing import Optional, Dict, Any, List, Tuple
import os
import glob
# Globals used for building overlays without refetching
LAST_RADAR_GRID: Optional[Dict[str, Any]] = None
LAST_ANIMATION_PATH: Optional[str] = None
warnings.filterwarnings('ignore')
# Import weather libraries for REAL data
try:
from herbie import Herbie
import xarray as xr
HERBIE_AVAILABLE = True
print("HERBIE AVAILABLE - Will use real RAP data including radar")
except ImportError as e:
HERBIE_AVAILABLE = False
print(f"HERBIE NOT AVAILABLE: {e}")
# Try importing projection libraries for coordinate transformation
try:
import cartopy.crs as ccrs
import pyproj
PROJECTION_AVAILABLE = True
except ImportError:
PROJECTION_AVAILABLE = False
print("Projection libraries not available - using raw coordinates")
# Try importing KML/KMZ libraries
try:
import zipfile
import xml.etree.ElementTree as ET
KMZ_AVAILABLE = True
except ImportError:
KMZ_AVAILABLE = False
print("KMZ export not available")
def _try_nam_refc_data(param='REFC:entire atmosphere', fxx=6, return_src: bool = False):
"""Try to fetch REFC data from NAM model for North American coverage."""
try:
# Try recent times for NAM model
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f" Trying NAM data for: {date_str}, parameter: {param}")
# Try NAM with different products for broader coverage
for product in ['afwaca', 'conusnest.hiresf']: # Central America/Caribbean first, then CONUS
try:
H = Herbie(date_str, model='nam', product=product, fxx=fxx)
ds = H.xarray(param)
if ds is not None:
print(f" SUCCESS: Got NAM {product} data for {date_str}")
if return_src:
return (ds, {'date_str': date_str, 'model': 'nam', 'product': product})
return ds
except Exception as e:
print(f" NAM {product} failed: {e}")
continue
except Exception as e:
print(f" NAM attempt failed for {date_str}: {e}")
continue
print(" All NAM attempts failed")
# Try GFS as final fallback for global coverage including North America
print(" Trying GFS model for global REFC coverage...")
try:
for hours_back in [0, 6, 12, 18]:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f" Trying GFS data for: {date_str}, parameter: {param}")
# Try GFS with 0.25 degree resolution (highest available)
try:
H = Herbie(date_str, model='gfs', product='pgrb2.0p25', fxx=fxx)
ds = H.xarray(param)
if ds is not None:
print(f" SUCCESS: Got GFS data for {date_str}")
return (ds, {'date_str': date_str, 'model': 'gfs'}) if return_src else ds
except Exception as e:
print(f" GFS failed: {e}")
continue
except Exception as e:
print(f" GFS fetch error: {e}")
return None
except Exception as e:
print(f" NAM fetch error: {e}")
return None
def fetch_real_rap_data(param='TMP:2 m', fxx=6, return_src: bool = False):
"""Fetch actual RAP data from NOAA including forecasts.
Note: RAP model may not include REFC (composite reflectivity) parameter.
RAP is primarily focused on temperature, pressure, and wind fields.
If return_src is True, returns a tuple (ds, info) where info contains
metadata such as 'date_str' and possible 'file' path.
"""
if not HERBIE_AVAILABLE:
return (None, None) if return_src else None
try:
# For REFC parameter, try NAM model for North American coverage
if 'REFC' in param:
print(f"INFO: Trying NAM model for North American REFC coverage...")
nam_result = _try_nam_refc_data(param, fxx, return_src)
if nam_result:
return nam_result
print(f"NAM failed, falling back to RAP model...")
# Try recent times, working backwards
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f"Trying RAP data for: {date_str}, parameter: {param}")
# Create Herbie object - RAP uses 'sfc' product like HRRR
H = Herbie(date_str, model='rap', product='sfc', fxx=fxx)
# Debug: Check if RAP data is available for this time
try:
# Test basic availability first
print(f" Testing RAP availability: {H}")
if hasattr(H, 'grib'):
print(f" GRIB source: {H.grib}")
# Download specific parameter
ds = H.xarray(param)
except Exception as e:
print(f" RAP xarray error: {e}")
continue
if ds is not None:
print(f"SUCCESS: Got real RAP data for {date_str}")
if return_src:
# Try to discover the source grib path from encodings or Herbie
src_path = None
try:
src_path = ds.encoding.get('source', None)
except Exception:
pass
if not src_path:
try:
# Try variable encodings
for vname in ds.data_vars:
enc = getattr(ds[vname], 'encoding', {})
src_path = enc.get('source', None)
if src_path:
break
except Exception:
pass
# Fallback: ask Herbie for local file path (best effort)
if not src_path:
for attr in ('get_localFilePath', 'get_local_file_path', 'local_file', 'fpath', 'filepath'):
if hasattr(H, attr):
try:
val = getattr(H, attr)
src_path = val() if callable(val) else val
if src_path:
break
except Exception:
continue
info = {
'date_str': date_str,
'param': param,
'fxx': fxx,
'file': src_path
}
return ds, info
else:
return ds
except Exception as e:
print(f"Failed for {date_str}: {e}")
continue
print("All RAP attempts failed")
return (None, None) if return_src else None
except Exception as e:
print(f"RAP fetch error: {e}")
return (None, None) if return_src else None
def get_rap_projection():
"""Get the RAP Lambert Conformal Conic projection parameters.
Official NOAA RAP CONUS domain specifications from GRIB2 metadata:
- Grid: 1799 x 1059 mass points, 3km resolution (DxInMetres: 3000.0, DyInMetres: 3000.0)
- Lambert Conformal GRIB2 Template 30
- LaDInDegrees: 38.5, Latin1InDegrees: 38.5, Latin2InDegrees: 38.5
- LoVInDegrees: 262.5 (orientation longitude, meridian aligned with Y-axis)
- Earth model: Sphere radius 6371229 meters
"""
return {
'proj': 'lcc',
'lat_1': 38.5, # Latin1 - first standard parallel
'lat_2': 38.5, # Latin2 - second standard parallel (tangent cone)
'lat_0': 38.5, # LaD - latitude where grid lengths are specified
'lon_0': 262.5, # LoV - orientation longitude (meridian aligned with Y-axis)
'x_0': 0, # False easting
'y_0': 0, # False northing
'a': 6371229, # Earth sphere radius in meters
'b': 6371229, # Earth sphere radius (same for sphere)
'units': 'm'
}
def validate_rap_coordinates(ds):
"""Validate and potentially correct RAP/NAM coordinate arrays."""
if ds is None:
return None
try:
# Check if we have proper 2D coordinate arrays
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lat2d = ds.latitude.values
lon2d_raw = ds.longitude.values
# Apply longitude correction for 0-360° to -180 to 180° conversion
lon2d = np.where(lon2d_raw > 180, lon2d_raw - 360, lon2d_raw)
# Validate that coordinates are reasonable for North American domain
if lat2d.ndim == 2 and lon2d.ndim == 2:
lat_min, lat_max = np.nanmin(lat2d), np.nanmax(lat2d)
lon_min, lon_max = np.nanmin(lon2d), np.nanmax(lon2d)
# Check for RAP CONUS domain (HRRR-like coverage)
rap_lat_valid = (20.8 <= lat_min <= 21.5) and (47.5 <= lat_max <= 48.2)
rap_lon_valid = (-135.0 <= lon_min <= -133.0) and (-61.5 <= lon_max <= -60.0)
# Check for NAM North American domain (broader coverage including Canada/Mexico)
nam_lat_valid = (20.0 <= lat_min <= 22.0) and (52.0 <= lat_max <= 54.0)
nam_lon_valid = (-140.0 <= lon_min <= -130.0) and (-65.0 <= lon_max <= -55.0)
if rap_lat_valid and rap_lon_valid:
print(f"✓ RAP CONUS coordinates validated: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': True, 'model': 'RAP'}
elif nam_lat_valid and nam_lon_valid:
print(f"✓ NAM North American coordinates validated: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': True, 'model': 'NAM'}
else:
# Still usable coordinates, just warn about potential issues
print(f"Warning: Coordinates outside expected domains: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
print(f"RAP expected: lat [20.8-21.5, 47.5-48.2], lon [-135.0 to -133.0, -61.5 to -60.0]")
print(f"NAM expected: lat [20.0-22.0, 52.0-54.0], lon [-140.0 to -130.0, -65.0 to -55.0]")
print("Warning: Using potentially non-standard coordinates")
return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': False, 'model': 'Unknown'}
return None
except Exception as e:
print(f"Coordinate validation error: {e}")
return None
def process_rap_data(ds, max_points=400, param_type='temperature'):
"""Process RAP xarray dataset into plot-ready data"""
if ds is None:
return None
try:
# Get the main data variable
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Get coordinates
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lats = ds.latitude.values
lons = ds.longitude.values
values = data_var.values
elif 'lat' in ds.coords and 'lon' in ds.coords:
lats = ds.lat.values
lons = ds.lon.values
values = data_var.values
else:
return None
# For radar, get MAXIMUM resolution - much more data points
if param_type == 'radar':
max_points = 10000 # Much larger for full radar coverage
min_threshold = 0.1 # Even lower threshold for light precipitation
else:
min_threshold = None
# Less aggressive subsampling for radar to keep more detail
if lats.size > max_points:
if param_type == 'radar':
# For radar, use smaller step to keep more data
step = max(1, int(np.sqrt(lats.size / max_points) * 0.7))
else:
step = max(1, int(np.sqrt(lats.size / max_points)))
if len(lats.shape) == 2:
lats = lats[::step, ::step]
lons = lons[::step, ::step]
values = values[::step, ::step]
else:
lats = lats[::step]
lons = lons[::step]
values = values[::step]
# Flatten arrays
lats_flat = lats.flatten()
lons_flat = lons.flatten()
values_flat = values.flatten()
# Remove invalid values
valid = ~(np.isnan(values_flat) | np.isnan(lats_flat) | np.isnan(lons_flat))
# For radar, use minimal filtering to show maximum coverage
if param_type == 'radar' and min_threshold is not None:
radar_threshold = values_flat > min_threshold
valid = valid & radar_threshold
if not np.any(valid):
return None
return {
'lats': lats_flat[valid],
'lons': lons_flat[valid],
'values': values_flat[valid],
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Data processing error: {e}")
return None
def get_radar_colorscale():
"""Get proper radar reflectivity colorscale in dBZ"""
return [
[0.0, 'rgba(0,0,0,0)'], # Transparent for no echo
[0.1, '#00ECEC'], # Light blue - 5-10 dBZ
[0.2, '#01A0F6'], # Blue - 10-15 dBZ
[0.3, '#0000F6'], # Dark blue - 15-20 dBZ
[0.4, '#00FF00'], # Green - 20-25 dBZ
[0.5, '#00C800'], # Dark green - 25-30 dBZ
[0.6, '#FFFF00'], # Yellow - 30-35 dBZ
[0.7, '#E7C000'], # Orange-yellow - 35-40 dBZ
[0.8, '#FF9000'], # Orange - 40-45 dBZ
[0.9, '#FF0000'], # Red - 45-50 dBZ
[1.0, '#D60000'] # Dark red - 50+ dBZ
]
def apply_rap_coordinate_correction(lat2d, lon2d):
"""Apply necessary coordinate corrections for RAP/NAM data alignment."""
try:
print(f"Input coordinate shapes: lat {lat2d.shape}, lon {lon2d.shape}")
# Handle different grid structures - NAM products may have different formats
if lat2d.shape != lon2d.shape:
print(f"Warning: Coordinate arrays have different shapes - attempting to fix")
# If one is 1D and the other is 2D, or different 2D shapes, create meshgrid
if lat2d.ndim == 1 and lon2d.ndim == 1:
lon2d, lat2d = np.meshgrid(lon2d, lat2d)
print(f"Created meshgrid: lat {lat2d.shape}, lon {lon2d.shape}")
elif lat2d.ndim == 1:
# Lat is 1D, lon might be 2D - make compatible
lat2d = np.broadcast_to(lat2d[:, np.newaxis], lon2d.shape)
print(f"Broadcasted lat to match lon: {lat2d.shape}")
elif lon2d.ndim == 1:
# Lon is 1D, lat might be 2D - make compatible
lon2d = np.broadcast_to(lon2d[np.newaxis, :], lat2d.shape)
print(f"Broadcasted lon to match lat: {lon2d.shape}")
else:
print(f"Cannot reconcile coordinate shapes: lat {lat2d.shape}, lon {lon2d.shape}")
return lat2d, lon2d, np.ones_like(lat2d, dtype=bool)
# For radar visualization, ensure longitude is in -180 to 180 range
# RAP/NAM often provide longitudes in 0-360° format, convert to -180 to 180°
lon2d_corrected = np.where(lon2d > 180, lon2d - 360, lon2d)
# Remove any obvious outliers or invalid coordinates after correction
valid_mask = (~np.isnan(lat2d) & ~np.isnan(lon2d_corrected) &
(lat2d >= -90) & (lat2d <= 90) &
(lon2d_corrected >= -180) & (lon2d_corrected <= 180))
# Apply RAP-specific coordinate corrections based on official specifications
# RAP uses Lambert Conformal Conic projection with specific parameters
# The coordinates should already be properly projected, but we can verify bounds
# Validate against known RAP domain boundaries
lat_min, lat_max = np.nanmin(lat2d), np.nanmax(lat2d)
lon_min, lon_max = np.nanmin(lon2d_corrected), np.nanmax(lon2d_corrected)
# Check if coordinates fall within known model domains
# RAP CONUS: lat 21.14° to 47.84°N, lon -134.10° to -60.90°W
# NAM North America: broader coverage including Canada and Mexico
rap_domain = (20.8 <= lat_min <= 21.5 and 47.5 <= lat_max <= 48.2 and
-135.0 <= lon_min <= -133.0 and -61.5 <= lon_max <= -60.0)
nam_domain = (20.0 <= lat_min <= 22.0 and 52.0 <= lat_max <= 54.0 and
-140.0 <= lon_min <= -130.0 and -65.0 <= lon_max <= -55.0)
if rap_domain:
print(f"✓ Coordinates match RAP CONUS domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
elif nam_domain:
print(f"✓ Coordinates match NAM North American domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
else:
print(f"Info: Using non-standard coordinate domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]")
return lat2d, lon2d_corrected, valid_mask
except Exception as e:
print(f"Coordinate correction error: {e}")
return lat2d, lon2d, np.ones_like(lat2d, dtype=bool)
def process_rap_grid(ds, target_cells=50000, param_type='radar', min_threshold=0.1):
"""Return RAP data as 2D grids (lat2d, lon2d, z2d) suitable for filled contours.
- target_cells: approximate max number of grid cells to draw for performance
- min_threshold: values below are masked as NaN (for radar transparency)
"""
if ds is None:
return None
try:
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Prefer explicit 2D latitude/longitude if available
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lat2d_raw = ds.latitude.values
lon2d_raw = ds.longitude.values
# Apply coordinate validation and correction
lat2d, lon2d, valid_mask = apply_rap_coordinate_correction(lat2d_raw, lon2d_raw)
# Store validation info for debugging
coord_validation = validate_rap_coordinates(ds)
if coord_validation and not coord_validation['valid']:
model_type = coord_validation.get('model', 'Unknown')
print(f"Warning: Using potentially non-standard {model_type} coordinates")
elif 'lat' in ds.coords and 'lon' in ds.coords:
# Some datasets provide 1D lat/lon; try to construct 2D mesh
lat = ds.lat.values
lon = ds.lon.values
if lat.ndim == 1 and lon.ndim == 1:
lon2d, lat2d = np.meshgrid(lon, lat)
else:
lat2d = lat
lon2d = lon
else:
return None
z = data_var.values
# Ensure z is 2D (squeeze time/levels if any)
z = np.squeeze(z)
if z.ndim != 2:
# Cannot contour non-2D
return None
# Subsample to keep performance predictable
ny, nx = z.shape
total = nx * ny
if total > target_cells:
step = int(np.ceil(np.sqrt(total / target_cells)))
step = max(1, step)
z = z[::step, ::step]
# Only subsample coordinates if they're 2D and match z dimensions
if lat2d.ndim == 2 and lat2d.shape == (ny, nx):
lat2d = lat2d[::step, ::step]
elif lat2d.ndim == 1 and len(lat2d) == ny:
lat2d = lat2d[::step]
if lon2d.ndim == 2 and lon2d.shape == (ny, nx):
lon2d = lon2d[::step, ::step]
elif lon2d.ndim == 1 and len(lon2d) == nx:
lon2d = lon2d[::step]
# Mask values below threshold for radar
if param_type == 'radar' and min_threshold is not None:
z = np.where(z >= min_threshold, z, np.nan)
return {
'lat2d': lat2d,
'lon2d': lon2d,
'z2d': z,
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Grid processing error: {e}")
return None
def _clamp(val, vmin, vmax):
return max(vmin, min(val, vmax))
def grid_to_geojson(lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray,
max_polygons: Optional[int] = None,
nan_as_transparent: bool = True) -> Optional[Dict[str, Any]]:
"""Convert a lat/lon curvilinear grid into a GeoJSON FeatureCollection of cell polygons.
- Each cell is a quadrilateral around the center (i,j) using neighboring points.
- Values that are NaN are skipped when nan_as_transparent is True.
- max_polygons optionally caps the number of cells included (row/col stride).
"""
try:
ny, nx = z2d.shape
if ny < 2 or nx < 2:
return None
# Determine stride to cap polygons if needed
istep = jstep = 1
total_cells = ny * nx
if max_polygons and total_cells > max_polygons:
factor = math.sqrt(total_cells / max_polygons)
istep = max(1, int(round(factor)))
jstep = istep
features = []
# Helper for safe index
def lat_(i, j):
ii = _clamp(i, 0, ny - 1)
jj = _clamp(j, 0, nx - 1)
return float(lat2d[ii, jj])
def lon_(i, j):
ii = _clamp(i, 0, ny - 1)
jj = _clamp(j, 0, nx - 1)
return float(lon2d[ii, jj])
# Build polygons
for i in range(0, ny, istep):
for j in range(0, nx, jstep):
val = z2d[i, j]
if nan_as_transparent and (val is None or np.isnan(val)):
continue
# Corners as average of 4 surrounding centers (clamped at edges)
# Top-left around (i-0.5, j-0.5)
lat_tl = (lat_(i, j) + lat_(i-1, j) + lat_(i, j-1) + lat_(i-1, j-1)) / 4.0
lon_tl = (lon_(i, j) + lon_(i-1, j) + lon_(i, j-1) + lon_(i-1, j-1)) / 4.0
# Top-right around (i-0.5, j+0.5)
lat_tr = (lat_(i, j) + lat_(i-1, j) + lat_(i, j+1) + lat_(i-1, j+1)) / 4.0
lon_tr = (lon_(i, j) + lon_(i-1, j) + lon_(i, j+1) + lon_(i-1, j+1)) / 4.0
# Bottom-right around (i+0.5, j+0.5)
lat_br = (lat_(i, j) + lat_(i+1, j) + lat_(i, j+1) + lat_(i+1, j+1)) / 4.0
lon_br = (lon_(i, j) + lon_(i+1, j) + lon_(i, j+1) + lon_(i+1, j+1)) / 4.0
# Bottom-left around (i+0.5, j-0.5)
lat_bl = (lat_(i, j) + lat_(i+1, j) + lat_(i, j-1) + lat_(i+1, j-1)) / 4.0
lon_bl = (lon_(i, j) + lon_(i+1, j) + lon_(i, j-1) + lon_(i+1, j-1)) / 4.0
poly = [
[lon_tl, lat_tl],
[lon_tr, lat_tr],
[lon_br, lat_br],
[lon_bl, lat_bl],
[lon_tl, lat_tl]
]
fid = f"{i}-{j}"
feat = {
"type": "Feature",
"id": fid,
"properties": {"id": fid, "value": None if np.isnan(val) else float(val)},
"geometry": {"type": "Polygon", "coordinates": [poly]}
}
features.append(feat)
return {"type": "FeatureCollection", "features": features}
except Exception as e:
print(f"GeoJSON build error: {e}")
return None
def _parse_plotly_color(color_str: str) -> Tuple[float, float, float, float]:
"""Convert '#RRGGBB' or 'rgba(r,g,b,a)' to normalized RGBA tuple."""
color_str = color_str.strip()
if color_str.startswith('#'):
r = int(color_str[1:3], 16) / 255.0
g = int(color_str[3:5], 16) / 255.0
b = int(color_str[5:7], 16) / 255.0
a = 1.0
return (r, g, b, a)
if color_str.startswith('rgba'):
nums = color_str[color_str.find('(')+1:color_str.find(')')].split(',')
r = int(nums[0]) / 255.0
g = int(nums[1]) / 255.0
b = int(nums[2]) / 255.0
a = float(nums[3])
return (r, g, b, a)
raise ValueError(f"Unsupported color: {color_str}")
def build_mpl_colormap(colorscale: List[List[float]], name: str = 'radar'):
"""Build a Matplotlib colormap from a Plotly colorscale definition."""
try:
import matplotlib.colors as mcolors
stops = [(float(p), _parse_plotly_color(c)) for p, c in colorscale]
# mcolors.LinearSegmentedColormap.from_list accepts (x, color) pairs
cmap = mcolors.LinearSegmentedColormap.from_list(name, stops)
# Ensure NaNs are transparent
cmap.set_bad((0, 0, 0, 0))
return cmap
except Exception as e:
print(f"Colormap build error: {e}")
return None
def add_radar_image_layer(fig: go.Figure, lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray,
detail_level: int, param_type: str) -> bool:
"""Render radar as a smooth raster image and overlay via mapbox image layer.
Returns True on success.
"""
try:
import io, base64
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
# Determine output image size based on detail level and grid size
ny, nx = z2d.shape
scale_map = {1: 1.2, 2: 1.6, 3: 2.0, 4: 3.0, 5: 4.0}
scale = scale_map.get(int(detail_level) if detail_level is not None else 3, 2.0)
max_pixels = 2_400_000 # cap to ~2.4 MP for performance
width = int(nx * scale)
height = int(ny * scale)
# Fit within cap preserving aspect
if width * height > max_pixels:
ratio = math.sqrt(max_pixels / (width * height))
width = max(64, int(width * ratio))
height = max(64, int(height * ratio))
# Prepare data (mask NaNs for transparency)
zmask = np.ma.masked_invalid(z2d)
# Check if we need to flip the data to match geographic orientation
ny, nx = lat2d.shape
lat_top = float(lat2d[0, nx//2]) # Middle of top row
lat_bottom = float(lat2d[-1, nx//2]) # Middle of bottom row
# For proper geographic alignment, image top should correspond to highest latitude
# With origin='upper', array[0] should have higher latitudes than array[-1]
if lat_top < lat_bottom:
# Data is ordered south-to-north, need to flip for north-to-south display
zmask = np.flipud(zmask)
print(f"⚠ Flipping radar data vertically: array has S-to-N order ({lat_top:.2f}° to {lat_bottom:.2f}°)")
else:
print(f"✓ Radar data orientation OK: array has N-to-S order ({lat_top:.2f}° to {lat_bottom:.2f}°)")
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
return False
dpi = 100
fig_img = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
ax = fig_img.add_axes([0, 0, 1, 1]) # full-bleed
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_img.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_img)
img_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
data_url = f"data:image/png;base64,{img_b64}"
# Corner coordinates for RAP Lambert Conformal grid
# Account for potential coordinate distortion in curvilinear grid
ny, nx = lat2d.shape
# RAP uses a curvilinear Lambert Conformal grid - corners may not be at exact array indices
# Find actual geographic bounds rather than assuming corner positions
# Get geographic bounds for proper alignment
lat_min, lat_max = float(np.nanmin(lat2d)), float(np.nanmax(lat2d))
lon_min, lon_max = float(np.nanmin(lon2d)), float(np.nanmax(lon2d))
print(f"RAP geographic bounds: lat [{lat_min:.3f}, {lat_max:.3f}], lon [{lon_min:.3f}, {lon_max:.3f}]")
# For Lambert Conformal grids, use the actual geographic bounds as corners
# rather than relying on specific array indices which may not represent true corners
# This approach works better with curvilinear grids
# Define corners based on geographic bounds (standard GeoJSON/Mapbox order: [lon, lat])
tl = [lon_min, lat_max] # Top-left: western edge, northern edge
tr = [lon_max, lat_max] # Top-right: eastern edge, northern edge
br = [lon_max, lat_min] # Bottom-right: eastern edge, southern edge
bl = [lon_min, lat_min] # Bottom-left: western edge, southern edge
# Validate orientation is consistent with image
lat_top = float(lat2d[0, nx//2]) # Middle of top row in data array
lat_bottom = float(lat2d[-1, nx//2]) # Middle of bottom row in data array
if lat_top < lat_bottom:
print(f"⚠ Data array has inverted latitude order: array[0]={lat_top:.2f}° < array[-1]={lat_bottom:.2f}°")
else:
print(f"✓ Data array latitude order: array[0]={lat_top:.2f}° > array[-1]={lat_bottom:.2f}°")
# Log corner coordinates for model validation
print(f"Grid corners: TL({tl[1]:.3f},{tl[0]:.3f}) TR({tr[1]:.3f},{tr[0]:.3f}) BR({br[1]:.3f},{br[0]:.3f}) BL({bl[1]:.3f},{bl[0]:.3f})")
# Check if coordinates match known model domains
# RAP CONUS: SW(21.14°N,122.72°W), NW(47.84°N,134.10°W), NE(47.84°N,60.90°W), SE(21.14°N,72.28°W)
# NAM has broader North American coverage extending into Canada and Mexico
lat_range = max(tl[1], tr[1]) - min(bl[1], br[1])
lon_range = max(tr[0], br[0]) - min(tl[0], bl[0])
if lat_range < 30: # Likely RAP CONUS domain
print("✓ Grid appears to be RAP CONUS domain")
elif lat_range > 30: # Likely NAM North American domain
print("✓ Grid appears to be NAM North American domain")
else:
print("? Grid domain classification unclear")
layers = list(fig.layout.mapbox.layers) if fig.layout.mapbox.layers is not None else []
layers.append(dict(
sourcetype='image',
source=data_url,
coordinates=[tl, tr, br, bl],
opacity=1.0,
below='traces',
name='Radar Raster'
))
fig.update_layout(mapbox_layers=layers)
# Add invisible scatter to provide colorbar for the image
try:
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
fig.add_trace(go.Scattermapbox(
lat=[c_lat, c_lat],
lon=[c_lon, c_lon],
mode='markers',
marker=dict(
size=1,
color=[0, 65],
colorscale=get_radar_colorscale(),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
opacity=0 # invisible points
),
hoverinfo='skip',
name='Radar Scale'
))
except Exception as e:
print(f"Colorbar marker add failed: {e}")
return True
except Exception as e:
print(f"Image layer error: {e}")
return False
def _locate_or_download_grib(forecast_hour: int):
"""Return local GRIB2 path for RAP REFC at fxx, downloading if needed."""
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
try:
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [0, 1, 2, 3, 6, 12, 18, 24]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
H = Herbie(date_str, model='rap', product='sfc', fxx=int(forecast_hour))
# Ensure local file
local = None
try:
local = H.get_localFilePath()
except Exception:
local = None
if not local:
files = None
try:
files = H.download()
except Exception:
files = None
if isinstance(files, (list, tuple)) and files:
local = files[0]
if not local and hasattr(H, 'fpath'):
local = H.fpath
if local and os.path.exists(str(local)):
return str(local), None
# As a fallback, search the expected directory for subset GRIB2 files
# Herbie typically stores under ~/data/rap/YYYYMMDD
try:
day_dir = os.path.expanduser(os.path.join('~', 'data', 'rap', target_time.strftime('%Y%m%d')))
if os.path.isdir(day_dir):
pattern1 = os.path.join(day_dir, f"*wrfsfcf{int(forecast_hour):02d}.grib2")
pattern2 = os.path.join(day_dir, f"**/*f{int(forecast_hour):02d}*.grib2")
candidates = sorted(glob.glob(pattern1)) + sorted(glob.glob(pattern2, recursive=True))
if candidates:
return candidates[0], None
except Exception as se:
print(f"subset search failed: {se}")
except Exception as e:
print(f"locate/download attempt failed: {e}")
continue
# Global fallback: scan entire cache tree (could be slow but last resort)
try:
root = os.path.expanduser(os.path.join('~', 'data', 'rap'))
if os.path.isdir(root):
pat = os.path.join(root, f"**/*f{int(forecast_hour):02d}*.grib2")
cand = glob.glob(pat, recursive=True)
if cand:
return sorted(cand)[0], None
except Exception as e2:
print(f"global scan failed: {e2}")
return None, "Unable to locate/download GRIB file"
except Exception as e:
return None, f"Locate/download error: {e}"
def export_radar_grib(forecast_hour: int, min_dbz: float):
"""Export the RAP radar (REFC) field to a GRIB2 file with values below min_dbz set to missing.
Returns (path, message). If path is None, message contains error.
"""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available to fetch RAP data."
# Fetch dataset and try to learn source path and date used
ds, info = fetch_real_rap_data('REFC:entire atmosphere', int(forecast_hour), return_src=True)
if ds is None:
return None, "Unable to fetch RAP radar data for export."
var_names = list(ds.data_vars)
if not var_names:
return None, "Dataset missing variables."
vname = var_names[0]
z = np.squeeze(ds[vname].values)
if z.ndim != 2:
return None, "Unexpected radar array shape."
# Apply threshold
thr = float(min_dbz) if min_dbz is not None else 1.0
z = np.where(z >= thr, z.astype(float), np.nan)
# Determine or download source GRIB path
src = None
if isinstance(info, dict) and info.get('file') and os.path.exists(info['file']):
src = info['file']
if not src:
src, err = _locate_or_download_grib(int(forecast_hour))
if not src:
return None, err or "Could not obtain source GRIB file"
from eccodes import codes_grib_new_from_file, codes_get, codes_set, codes_set_values, codes_write, codes_release
# Iterate file to find the composite reflectivity message
handle = None
with open(src, 'rb') as f:
while True:
try:
gid = codes_grib_new_from_file(f)
except Exception:
gid = None
if gid is None:
break
try:
shortName = None
try:
shortName = codes_get(gid, 'shortName')
except Exception:
shortName = None
name = None
try:
name = codes_get(gid, 'name')
except Exception:
name = None
# Identify composite reflectivity
ok = False
if shortName and str(shortName).lower() in ('refc', 'refd', 'refl', 'ref'): # be lenient
ok = True
if (not ok) and name and 'reflect' in str(name).lower():
ok = True
if ok and handle is None:
handle = gid
break
else:
codes_release(gid)
except Exception:
try:
codes_release(gid)
except Exception:
pass
if handle is None:
return None, "Composite reflectivity message not found in GRIB file."
# Ensure bitmap for missing values
try:
codes_set(handle, 'bitmapPresent', 1)
except Exception:
pass
# Flatten in scan order (assuming row-major)
vals = z.flatten().astype(float)
codes_set_values(handle, vals)
os.makedirs('exports', exist_ok=True)
date_tag = info.get('date_str', 'unknown').replace(':', '').replace(' ', 'T') if isinstance(info, dict) else 'unknown'
out_path = os.path.join('exports', f"rap_radar_reflectivity_{date_tag}_f{int(forecast_hour):02d}_mindbz{thr:.1f}.grib2")
with open(out_path, 'wb') as fo:
codes_write(handle, fo)
try:
codes_release(handle)
except Exception:
pass
return out_path, None
except Exception as e:
return None, f"Export error: {e}"
def download_raw_grib(forecast_hour: int):
"""Return a copy-path under ./exports for the raw RAP GRIB2 file used for REFC at the given forecast hour."""
try:
if not HERBIE_AVAILABLE:
return None, "Herbie is not available"
# Try immediate locate/download via Herbie
src_file, err = _locate_or_download_grib(int(forecast_hour))
if not src_file:
return None, err
try:
import shutil
os.makedirs('exports', exist_ok=True)
base = os.path.basename(str(src_file))
dest = os.path.join('exports', f"raw_{base}")
shutil.copy2(src_file, dest)
return dest, None
except Exception as e:
return None, f"Copy error: {e}"
# Fallback: attempt direct Herbie path
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
for hours_back in [2, 3, 6, 12, 18]:
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
H = Herbie(date_str, model='rap', product='sfc', fxx=int(forecast_hour))
# This triggers download if not present
local = H.get_localFilePath() if hasattr(H, 'get_localFilePath') else None
if not local and hasattr(H, 'download'):
files = H.download()
if isinstance(files, (list, tuple)) and files:
local = files[0]
if not local and hasattr(H, 'fpath'):
local = H.fpath
# Fallback handled above
except Exception:
continue
return None, "Unable to locate/download raw GRIB file"
except Exception as e:
return None, f"Raw download error: {e}"
def export_rap_to_kmz(forecast_hour: int, min_dbz: float = 0.0):
"""Export RAP radar data to KMZ format for use in mapping applications.
Returns (path, message). If path is None, message contains error.
"""
try:
if not KMZ_AVAILABLE:
return None, "KMZ export libraries not available"
# Fetch RAP radar data
ds = fetch_real_rap_data('REFC:entire atmosphere', int(forecast_hour))
if ds is None:
return None, "Unable to fetch RAP radar data for KMZ export"
# Process the grid data
radar_grid = process_rap_grid(ds, target_cells=50000, param_type='radar', min_threshold=float(min_dbz))
if radar_grid is None:
return None, "Unable to process RAP radar grid for KMZ export"
lat2d = radar_grid['lat2d']
lon2d = radar_grid['lon2d']
z2d = radar_grid['z2d']
# Create KML content
kml_content = create_radar_kml(lat2d, lon2d, z2d, forecast_hour, min_dbz)
# Create KMZ file (zipped KML)
os.makedirs('exports', exist_ok=True)
kmz_path = f"exports/rap_radar_f{int(forecast_hour):02d}_mindbz{min_dbz:.1f}.kmz"
with zipfile.ZipFile(kmz_path, 'w', zipfile.ZIP_DEFLATED) as kmz:
kmz.writestr('doc.kml', kml_content)
return kmz_path, None
except Exception as e:
return None, f"KMZ export error: {e}"
def create_radar_kml(lat2d, lon2d, z2d, forecast_hour, min_dbz):
"""Create KML content for RAP radar data."""
try:
# Create KML structure
kml = ET.Element('kml', xmlns="http://www.opengis.net/kml/2.2")
document = ET.SubElement(kml, 'Document')
# Add document info
name = ET.SubElement(document, 'name')
name.text = f"RAP Radar Forecast +{forecast_hour}h (min {min_dbz} dBZ)"
description = ET.SubElement(document, 'description')
description.text = f"RAP Composite Reflectivity forecast for +{forecast_hour} hours, minimum {min_dbz} dBZ threshold"
# Add styles for different reflectivity ranges
styles = [
(5, 10, '#00ECEC', 'Light precipitation'),
(10, 15, '#01A0F6', 'Light-moderate precipitation'),
(15, 20, '#0000F6', 'Moderate precipitation'),
(20, 25, '#00FF00', 'Moderate-heavy precipitation'),
(25, 30, '#00C800', 'Heavy precipitation'),
(30, 35, '#FFFF00', 'Very heavy precipitation'),
(35, 40, '#E7C000', 'Intense precipitation'),
(40, 45, '#FF9000', 'Very intense precipitation'),
(45, 50, '#FF0000', 'Extreme precipitation'),
(50, 65, '#D60000', 'Severe precipitation')
]
for i, (min_val, max_val, color, desc) in enumerate(styles):
style = ET.SubElement(document, 'Style', id=f"radar{i}")
poly_style = ET.SubElement(style, 'PolyStyle')
color_elem = ET.SubElement(poly_style, 'color')
# Convert hex to KML ABGR format (80% opacity)
hex_color = color.lstrip('#')
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
color_elem.text = f"CC{b:02X}{g:02X}{r:02X}" # ABGR format with CC for ~80% opacity
# Add ground overlay for the radar image
ground_overlay = ET.SubElement(document, 'GroundOverlay')
overlay_name = ET.SubElement(ground_overlay, 'name')
overlay_name.text = f"RAP Radar Grid"
# Create boundaries
lat_box = ET.SubElement(ground_overlay, 'LatLonBox')
north = ET.SubElement(lat_box, 'north')
south = ET.SubElement(lat_box, 'south')
east = ET.SubElement(lat_box, 'east')
west = ET.SubElement(lat_box, 'west')
north.text = str(float(np.nanmax(lat2d)))
south.text = str(float(np.nanmin(lat2d)))
east.text = str(float(np.nanmax(lon2d)))
west.text = str(float(np.nanmin(lon2d)))
# Add sample polygons for areas with significant reflectivity
ny, nx = z2d.shape
step = max(1, min(ny, nx) // 50) # Sample grid for polygon creation
for i in range(0, ny - step, step):
for j in range(0, nx - step, step):
# Get average value for this grid cell
cell_values = z2d[i:i+step, j:j+step]
avg_value = np.nanmean(cell_values)
if np.isnan(avg_value) or avg_value < min_dbz:
continue
# Create polygon for this cell
placemark = ET.SubElement(document, 'Placemark')
pm_name = ET.SubElement(placemark, 'name')
pm_name.text = f"{avg_value:.1f} dBZ"
pm_desc = ET.SubElement(placemark, 'description')
pm_desc.text = f"Radar reflectivity: {avg_value:.1f} dBZ"
# Assign style based on value
style_id = min(len(styles) - 1, max(0, int((avg_value - 5) / 5)))
style_url = ET.SubElement(placemark, 'styleUrl')
style_url.text = f"#radar{style_id}"
# Create polygon coordinates
polygon = ET.SubElement(placemark, 'Polygon')
outer_ring = ET.SubElement(polygon, 'outerBoundaryIs')
linear_ring = ET.SubElement(outer_ring, 'LinearRing')
coordinates = ET.SubElement(linear_ring, 'coordinates')
# Get corner coordinates for this cell
coords = []
coords.append(f"{lon2d[i, j]},{lat2d[i, j]},0")
coords.append(f"{lon2d[i, min(j+step, nx-1)]},{lat2d[i, min(j+step, nx-1)]},0")
coords.append(f"{lon2d[min(i+step, ny-1), min(j+step, nx-1)]},{lat2d[min(i+step, ny-1), min(j+step, nx-1)]},0")
coords.append(f"{lon2d[min(i+step, ny-1), j]},{lat2d[min(i+step, ny-1), j]},0")
coords.append(f"{lon2d[i, j]},{lat2d[i, j]},0") # Close polygon
coordinates.text = " ".join(coords)
# Convert to string
rough_string = ET.tostring(kml, 'unicode')
return rough_string
except Exception as e:
print(f"KML creation error: {e}")
return f"""<?xml version="1.0" encoding="UTF-8"?>
<kml xmlns="http://www.opengis.net/kml/2.2">
<Document>
<name>RAP Radar Export Error</name>
<description>Error creating KML: {str(e)}</description>
</Document>
</kml>"""
def generate_radar_animation_gif(detail_level: int = 5, min_dbz: float = 0.0):
"""Generate a GIF animating radar reflectivity from f00..f18 and return (path, message).
The GIF is set to loop indefinitely.
"""
try:
import os
import imageio
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
frames = []
times = []
for fxx in range(0, 19):
ds = fetch_real_rap_data('REFC:entire atmosphere', fxx)
if isinstance(ds, tuple):
ds = ds[0]
grid = process_rap_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz))
if grid is None:
continue
lat2d = grid['lat2d']
lon2d = grid['lon2d']
z2d = grid['z2d']
# Apply same orientation correction as other visualizations
ny, nx = lat2d.shape
lat_top = float(lat2d[0, nx//2])
lat_bottom = float(lat2d[-1, nx//2])
zmask = np.ma.masked_invalid(z2d)
if lat_top < lat_bottom:
# Data is ordered south-to-north, flip for proper display
zmask = np.flipud(zmask)
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
continue
scale_map = {1: 1.0, 2: 1.2, 3: 1.6, 4: 2.0, 5: 2.5}
scale = scale_map.get(int(detail_level), 2.5)
width = int(nx * scale)
height = int(ny * scale)
dpi = 100
fig_anim = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
ax = fig_anim.add_axes([0, 0, 1, 1])
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
fig_anim.canvas.draw()
# Convert canvas to array
img = np.frombuffer(fig_anim.canvas.tostring_argb(), dtype=np.uint8)
img = img.reshape(fig_anim.canvas.get_width_height()[::-1] + (4,))
# ARGB to RGBA
img = img[:, :, [1, 2, 3, 0]]
frames.append(img)
times.append(fxx)
plt.close(fig_anim)
if not frames:
return None, "No frames generated"
os.makedirs('exports', exist_ok=True)
out_path = 'exports/rap_radar_animation_f00_f18.gif'
imageio.mimsave(out_path, frames, duration=0.25, loop=0) # 4 fps, loop forever
return out_path, None
except Exception as e:
return None, f"Animation error: {e}"
def generate_radar_animation_png_frames(detail_level: int = 5, min_dbz: float = 0.0, fps: float = 4.0):
"""Return (frames, message) where frames is a list of data URLs (PNG with alpha) for f00..f18."""
try:
import io, base64
import matplotlib
matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
frames = []
for fxx in range(0, 19):
ds = fetch_real_rap_data('REFC:entire atmosphere', fxx)
if isinstance(ds, tuple):
ds = ds[0]
grid = process_rap_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz))
if grid is None:
continue
lat2d = grid['lat2d']
lon2d = grid['lon2d']
z2d = grid['z2d']
# Apply same orientation correction as other visualizations
ny, nx = lat2d.shape
lat_top = float(lat2d[0, nx//2])
lat_bottom = float(lat2d[-1, nx//2])
zmask = np.ma.masked_invalid(z2d)
if lat_top < lat_bottom:
# Data is ordered south-to-north, flip for proper display
zmask = np.flipud(zmask)
cmap = build_mpl_colormap(get_radar_colorscale())
if cmap is None:
continue
scale_map = {1: 1.0, 2: 1.2, 3: 1.6, 4: 2.0, 5: 2.5}
scale = scale_map.get(int(detail_level), 2.0)
width = int(nx * scale)
height = int(ny * scale)
dpi = 100
fig_anim = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
fig_anim.patch.set_alpha(0.0)
ax = fig_anim.add_axes([0, 0, 1, 1])
ax.patch.set_alpha(0.0)
ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear')
ax.axis('off')
buf = io.BytesIO()
fig_anim.savefig(buf, format='png', dpi=dpi, transparent=True)
plt.close(fig_anim)
img_b64 = base64.b64encode(buf.getvalue()).decode('ascii')
frames.append(f"data:image/png;base64,{img_b64}")
if not frames:
return None, "No frames generated"
return frames, None
except Exception as e:
return None, f"Animation frames error: {e}"
def build_leaflet_overlay_from_frames(frame_data_urls: List[str], grid: Optional[Dict[str, Any]], fps: float = 4.0):
"""Return HTML with Leaflet + JS that cycles through transparent PNG frames warped
by a 4-corner homography (no external plugins), aligned to the RAP grid.
"""
try:
if not frame_data_urls:
return "<div style='padding:8px;color:#666'>No animation frames.</div>"
if not grid or 'lat2d' not in grid or 'lon2d' not in grid:
return "<div style='padding:8px;color:#666'>No grid available for overlay bounds.</div>"
lat2d = grid['lat2d']
lon2d = grid['lon2d']
# Bounds for initial fit
min_lat = float(np.nanmin(lat2d))
max_lat = float(np.nanmax(lat2d))
min_lon = float(np.nanmin(lon2d))
max_lon = float(np.nanmax(lon2d))
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
# Corner control points for RAP Lambert Conformal grid transformation
# Use geographic bounds approach (same as Plotly) for consistent alignment
ny, nx = lat2d.shape
# RAP uses curvilinear Lambert Conformal grid - use geographic bounds for corners
# This matches the approach used in the Plotly visualization for consistency
lat_tl, lon_tl = max_lat, min_lon # Top-left: northern edge, western edge
lat_tr, lon_tr = max_lat, max_lon # Top-right: northern edge, eastern edge
lat_br, lon_br = min_lat, max_lon # Bottom-right: southern edge, eastern edge
lat_bl, lon_bl = min_lat, min_lon # Bottom-left: southern edge, western edge
print(f"Leaflet corners using geographic bounds: TL({lat_tl:.3f},{lon_tl:.3f}) TR({lat_tr:.3f},{lon_tr:.3f}) BR({lat_br:.3f},{lon_br:.3f}) BL({lat_bl:.3f},{lon_bl:.3f})")
# Validate corner coordinates are within expected CONUS bounds
corners = [(lat_tl, lon_tl), (lat_tr, lon_tr), (lat_br, lon_br), (lat_bl, lon_bl)]
for i, (lat, lon) in enumerate(corners):
if not (20 <= lat <= 50 and -140 <= lon <= -50):
print(f"Warning: Corner {i} coordinates ({lat:.3f}, {lon:.3f}) outside expected CONUS bounds")
# Prepare JS array of frame URLs
js_frames = "[" + ",".join([f"'{u}'" for u in frame_data_urls]) + "]"
interval_ms = max(50, int(1000.0 / max(0.5, float(fps))))
doc = f"""
<!doctype html>
<html>
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<link rel=\"stylesheet\" href=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.css\"/>
<style>
html,body,#leaflet-map{{height:100%;margin:0;padding:0}}
.proj-image{{position:absolute; left:0; top:0; transform-origin:0 0; will-change:transform; pointer-events:none;}}
</style>
</head>
<body>
<div id=\"leaflet-map\"></div>
<script src=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.js\"></script>
<script>
(function() {{
var map = L.map('leaflet-map', {{center: [{c_lat:.5f}, {c_lon:.5f}], zoom: 5, zoomControl: true}});
L.tileLayer('https://{{s}}.tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{
maxZoom: 18,
attribution: '&copy; OpenStreetMap contributors'
}}).addTo(map);
var bounds = [[{min_lat:.6f}, {min_lon:.6f}], [{max_lat:.6f}, {max_lon:.6f}]];
map.fitBounds(bounds);
var frames = {js_frames};
var idx = 0;
// Create image element in overlay pane and apply 4-corner projective transform using CSS matrix3d
var overlayPane = map.getPanes().overlayPane;
var img = new Image();
img.className = 'proj-image';
img.style.opacity = 0.95;
img.src = frames[0];
overlayPane.appendChild(img);
function computeAndApplyTransform() {{
if (!img.naturalWidth || !img.naturalHeight) return;
var w = img.naturalWidth, h = img.naturalHeight;
// Destination pixel coords
var p0 = map.latLngToLayerPoint([{lat_tl:.6f}, {lon_tl:.6f}]); // TL
var p1 = map.latLngToLayerPoint([{lat_tr:.6f}, {lon_tr:.6f}]); // TR
var p2 = map.latLngToLayerPoint([{lat_br:.6f}, {lon_br:.6f}]); // BR
var p3 = map.latLngToLayerPoint([{lat_bl:.6f}, {lon_bl:.6f}]); // BL
var x0=p0.x, y0=p0.y, x1=p1.x, y1=p1.y, x2=p2.x, y2=p2.y, x3=p3.x, y3=p3.y;
var dx1 = x1 - x2, dy1 = y1 - y2;
var dx2 = x3 - x2, dy2 = y3 - y2;
var dx3 = x0 - x1 + x2 - x3, dy3 = y0 - y1 + y2 - y3;
var a, b, c, d, e, f, g, h2;
if (dx3 === 0 && dy3 === 0) {{
g = 0; h2 = 0;
a = x1 - x0; b = x3 - x0; c = x0;
d = y1 - y0; e = y3 - y0; f = y0;
}} else {{
var denom = dx1*dy2 - dx2*dy1;
g = (dx3*dy2 - dx2*dy3)/denom;
h2 = (dx1*dy3 - dx3*dy1)/denom;
a = x1 - x0 + g*x1;
b = x3 - x0 + h2*x3;
c = x0;
d = y1 - y0 + g*y1;
e = y3 - y0 + h2*y3;
f = y0;
}}
// Normalize for source image size (map from [0..w],[0..h])
a /= w; b /= h; d /= w; e /= h; g /= w; h2 /= h;
var css = 'matrix3d('+
a + ',' + d + ',0,' + g + ','+
b + ',' + e + ',0,' + h2 + ','+
'0,0,1,0,'+
c + ',' + f + ',0,1)';
img.style.transform = css;
}}
function tick() {{
idx = (idx + 1) % frames.length;
var url = frames[idx];
var tmp = new Image();
tmp.onload = function(){{ img.src = url; computeAndApplyTransform(); }};
tmp.src = url;
}}
img.onload = computeAndApplyTransform;
setInterval(tick, {interval_ms});
map.on('zoom viewreset move', computeAndApplyTransform);
}})();
</script>
</body>
</html>
"""
doc_escaped = doc.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace("\"", "&quot;")
iframe = f"<iframe srcdoc=\"{doc_escaped}\" style=\"width:100%;height:520px;border:none;border-radius:8px\"></iframe>"
return iframe
except Exception as e:
return f"<div style='padding:8px;color:#900'>Leaflet frames overlay error: {str(e)}</div>"
def build_leaflet_overlay_html(gif_path: Optional[str], grid: Optional[Dict[str, Any]]):
"""Return HTML for a Leaflet map with the animated GIF overlaid as an image.
If gif_path is provided, it is embedded as a base64 data URL for portability.
"""
try:
if not gif_path or not os.path.exists(gif_path):
return "<div style='padding:8px;color:#666'>No animation generated.</div>"
if not grid or 'lat2d' not in grid or 'lon2d' not in grid:
return "<div style='padding:8px;color:#666'>No grid available for overlay bounds.</div>"
lat2d = grid['lat2d']
lon2d = grid['lon2d']
min_lat = float(np.nanmin(lat2d))
max_lat = float(np.nanmax(lat2d))
min_lon = float(np.nanmin(lon2d))
max_lon = float(np.nanmax(lon2d))
c_lat = float(np.nanmean(lat2d))
c_lon = float(np.nanmean(lon2d))
import base64
with open(gif_path, 'rb') as f:
gif_b64 = base64.b64encode(f.read()).decode('ascii')
data_url = f"data:image/gif;base64,{gif_b64}"
# Prefer Folium (self-contained HTML) and fallback to raw Leaflet
# Build a standalone HTML document and embed via iframe srcdoc to ensure scripts run
doc = f"""
<!doctype html>
<html>
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<link rel=\"stylesheet\" href=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.css\"/>
<style>html,body,#leaflet-map{{height:100%;margin:0;padding:0}}</style>
</head>
<body>
<div id=\"leaflet-map\"></div>
<script src=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.js\"></script>
<script>
(function() {{
var map = L.map('leaflet-map', {{center: [{c_lat:.5f}, {c_lon:.5f}], zoom: 5, zoomControl: true}});
L.tileLayer('https://{{s}}.tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{
maxZoom: 18,
attribution: '&copy; OpenStreetMap contributors'
}}).addTo(map);
var bounds = [[{min_lat:.6f}, {min_lon:.6f}], [{max_lat:.6f}, {max_lon:.6f}]];
var overlay = L.imageOverlay('{data_url}', bounds, {{opacity: 0.95, interactive: false}}).addTo(map);
map.fitBounds(bounds);
}})();
</script>
</body>
</html>
"""
# Escape for srcdoc
doc_escaped = doc.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace("\"", "&quot;")
iframe = f"<iframe srcdoc=\"{doc_escaped}\" style=\"width:100%;height:520px;border:none;border-radius:8px\"></iframe>"
return iframe
except Exception as e:
return f"<div style='padding:8px;color:#900'>Leaflet overlay error: {str(e)}</div>"
def create_leaflet_comparison_map(param_type, forecast_hour, show_radar=False, detail_level=3, min_dbz=1.0):
"""Create a Leaflet map with the same data as Plotly for side-by-side comparison."""
try:
# Fetch the same data as the Plotly map
param_map = {
'temperature': 'TMP:2 m',
'humidity': 'RH:2 m',
'wind_speed': 'WIND:10 m',
'pressure': 'MSLMA:mean sea level',
'radar': 'REFC:entire atmosphere'
}
# Get center coordinates for North American view
# Center on NAM domain: lat [21.14, 52.62], lon [-134.10, -60.92]
center_lat, center_lon = 36.9, -97.5 # Midpoint of NAM domain
zoom_level = 3 # Lower zoom to show Canada and Mexico
# Start building the HTML
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Leaflet RAP Comparison</title>
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
<style>
#leaflet-comparison {{ height: 500px; width: 100%; }}
.legend {{
background: white;
padding: 10px;
border-radius: 5px;
box-shadow: 0 0 15px rgba(0,0,0,0.2);
line-height: 18px;
color: #555;
}}
.legend i {{
width: 18px;
height: 18px;
float: left;
margin-right: 8px;
opacity: 0.7;
}}
</style>
</head>
<body>
<div id="leaflet-comparison"></div>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script>
var map = L.map('leaflet-comparison').setView([{center_lat}, {center_lon}], {zoom_level});
// Add base map
L.tileLayer('https://{{s}}.tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{
maxZoom: 18,
attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
}}).addTo(map);
// Add title
var title = L.control({{position: 'topright'}});
title.onAdd = function (map) {{
var div = L.DomUtil.create('div', 'legend');
div.innerHTML = '<h4>Leaflet Comparison</h4><p>RAP {param_type.title()} +{forecast_hour}h</p>';
return div;
}};
title.addTo(map);
"""
# Add radar data if requested
if show_radar or param_type == 'radar':
# Fetch radar data
radar_ds = fetch_real_rap_data('REFC:entire atmosphere', forecast_hour)
if radar_ds is not None:
radar_grid = process_rap_grid(radar_ds, target_cells=20000, param_type='radar', min_threshold=float(min_dbz))
if radar_grid is not None:
lat2d = radar_grid['lat2d']
lon2d = radar_grid['lon2d']
z2d = radar_grid['z2d']
# Create markers for radar data (simplified visualization)
html_content += """
// Add radar data points
var radarData = [
"""
# Sample radar data points for visualization
ny, nx = z2d.shape
step = max(1, min(ny, nx) // 100) # Sample every nth point
radar_points = []
for i in range(0, ny, step):
for j in range(0, nx, step):
value = z2d[i, j]
if not np.isnan(value) and value >= min_dbz:
lat = float(lat2d[i, j])
lon = float(lon2d[i, j])
if -90 <= lat <= 90 and -180 <= lon <= 180:
# Color based on reflectivity value
if value < 10:
color = '#00ECEC'
elif value < 20:
color = '#0000F6'
elif value < 30:
color = '#00FF00'
elif value < 40:
color = '#FFFF00'
elif value < 50:
color = '#FF9000'
else:
color = '#FF0000'
radar_points.append(f"[{lat}, {lon}, {value:.1f}, '{color}']")
html_content += ",\n".join(radar_points[:1000]) # Limit to 1000 points
html_content += """
];
// Add radar points to map
radarData.forEach(function(point) {
var lat = point[0];
var lon = point[1];
var value = point[2];
var color = point[3];
L.circleMarker([lat, lon], {
radius: 3,
fillColor: color,
color: color,
weight: 0,
opacity: 0.8,
fillOpacity: 0.8
}).addTo(map).bindPopup('Radar: ' + value + ' dBZ');
});
// Add radar legend
var radarLegend = L.control({position: 'bottomright'});
radarLegend.onAdd = function (map) {
var div = L.DomUtil.create('div', 'legend');
div.innerHTML = '<h4>Radar (dBZ)</h4>' +
'<i style="background:#00ECEC"></i> 5-10<br>' +
'<i style="background:#0000F6"></i> 10-20<br>' +
'<i style="background:#00FF00"></i> 20-30<br>' +
'<i style="background:#FFFF00"></i> 30-40<br>' +
'<i style="background:#FF9000"></i> 40-50<br>' +
'<i style="background:#FF0000"></i> 50+<br>';
return div;
};
radarLegend.addTo(map);
"""
# Add weather data if not radar-only
if param_type != 'radar':
grib_param = param_map.get(param_type, 'TMP:2 m')
weather_ds = fetch_real_rap_data(grib_param, forecast_hour)
weather_data = process_rap_data(weather_ds, max_points=200, param_type=param_type)
if weather_data is not None:
html_content += """
// Add weather data points
var weatherData = [
"""
weather_points = []
for i in range(min(len(weather_data['lats']), 200)): # Limit to 200 points
lat = float(weather_data['lats'][i])
lon = float(weather_data['lons'][i])
value = float(weather_data['values'][i])
if -90 <= lat <= 90 and -180 <= lon <= 180:
weather_points.append(f"[{lat}, {lon}, {value:.2f}]")
html_content += ",\n".join(weather_points)
# Choose color scheme based on parameter
if param_type == 'temperature':
color_scheme = """
var color;
if (value < 0) color = '#0000ff';
else if (value < 10) color = '#0080ff';
else if (value < 20) color = '#00ffff';
else if (value < 30) color = '#80ff00';
else if (value < 40) color = '#ffff00';
else color = '#ff0000';
"""
legend_html = "'<h4>Temperature (°C)</h4>' + '<i style=\"background:#0000ff\"></i> < 0<br>' + '<i style=\"background:#0080ff\"></i> 0-10<br>' + '<i style=\"background:#00ffff\"></i> 10-20<br>' + '<i style=\"background:#80ff00\"></i> 20-30<br>' + '<i style=\"background:#ffff00\"></i> 30-40<br>' + '<i style=\"background:#ff0000\"></i> 40+<br>'"
else:
color_scheme = "var color = '#2E86AB';"
legend_html = f"'<h4>{param_type.title()}</h4>'"
html_content += f"""
];
// Add weather points to map
weatherData.forEach(function(point) {{
var lat = point[0];
var lon = point[1];
var value = point[2];
{color_scheme}
L.circleMarker([lat, lon], {{
radius: 4,
fillColor: color,
color: color,
weight: 1,
opacity: 0.8,
fillOpacity: 0.7
}}).addTo(map).bindPopup('{param_type.title()}: ' + value + ' {weather_data.get("units", "")}');
}});
// Add weather legend
var weatherLegend = L.control({{position: 'bottomleft'}});
weatherLegend.onAdd = function (map) {{
var div = L.DomUtil.create('div', 'legend');
div.innerHTML = {legend_html};
return div;
}};
weatherLegend.addTo(map);
"""
html_content += """
</script>
</body>
</html>
"""
return html_content
except Exception as e:
return f"""
<!DOCTYPE html>
<html>
<body>
<div style="padding: 20px; color: red;">
<h3>Leaflet Map Error</h3>
<p>Error creating comparison map: {str(e)}</p>
</div>
</body>
</html>
"""
def validate_radar_alignment(lat2d, lon2d, z2d):
"""Validate radar alignment by checking against known geographic features."""
try:
# Known reference points for validation (major cities in RAP domain)
reference_points = {
'Kansas_City': (39.0997, -94.5786),
'Denver': (39.7392, -104.9903),
'Atlanta': (33.7490, -84.3880),
'Chicago': (41.8781, -87.6298),
'Phoenix': (33.4484, -112.0740),
'Dallas': (32.7767, -96.7970),
'New_York': (40.7128, -74.0060),
'Los_Angeles': (34.0522, -118.2437),
'Seattle': (47.6062, -122.3321)
}
validation_results = {}
total_offset_sum = 0
valid_points = 0
for city, (ref_lat, ref_lon) in reference_points.items():
# Find closest grid point to reference location
lat_diff = np.abs(lat2d - ref_lat)
lon_diff = np.abs(lon2d - ref_lon)
distance = np.sqrt(lat_diff**2 + lon_diff**2)
min_idx = np.unravel_index(np.argmin(distance), distance.shape)
closest_lat = lat2d[min_idx]
closest_lon = lon2d[min_idx]
# Calculate offset in km using more accurate Haversine-based calculation
lat_offset_km = (closest_lat - ref_lat) * 111.32 # More accurate km per degree lat
lon_offset_km = (closest_lon - ref_lon) * 111.32 * np.cos(np.radians(ref_lat))
total_offset_km = np.sqrt(lat_offset_km**2 + lon_offset_km**2)
# Determine grid resolution and tolerance based on coordinates
# RAP: ~13km grid, NAM: ~12km grid, broader tolerance for larger grids
grid_size = lat2d.shape[0] * lat2d.shape[1]
if grid_size > 100000: # Large grid like NAM (265x450 = 119,250)
tolerance_km = 8.0 # More lenient for NAM North American grid
else: # Smaller grid like RAP
tolerance_km = 4.0 # Stricter for RAP CONUS grid
acceptable = total_offset_km < tolerance_km
validation_results[city] = {
'reference': (ref_lat, ref_lon),
'closest_grid': (closest_lat, closest_lon),
'offset_km': total_offset_km,
'acceptable': acceptable
}
if acceptable:
total_offset_sum += total_offset_km
valid_points += 1
# Calculate overall alignment quality based on grid type
avg_offset = total_offset_sum / valid_points if valid_points > 0 else float('inf')
grid_size = lat2d.shape[0] * lat2d.shape[1]
if grid_size > 100000: # Large NAM grid
alignment_quality = 'Excellent' if avg_offset < 3.0 else 'Good' if avg_offset < 6.0 else 'Acceptable'
else: # Smaller RAP grid
alignment_quality = 'Excellent' if avg_offset < 1.0 else 'Good' if avg_offset < 2.0 else 'Poor'
validation_results['_summary'] = {
'average_offset_km': avg_offset,
'valid_points': valid_points,
'total_points': len(reference_points),
'alignment_quality': alignment_quality
}
print(f"Radar alignment validation: {alignment_quality} (avg offset: {avg_offset:.2f}km)")
return validation_results
except Exception as e:
print(f"Alignment validation error: {e}")
return {}
def create_weather_map_with_radar(param_type, forecast_hour, show_radar=False, detail_level=3, min_dbz=1.0):
"""Create weather map with optional radar forecast overlay"""
try:
# Map parameter names to GRIB codes
param_map = {
'temperature': 'TMP:2 m',
'humidity': 'RH:2 m',
'wind_speed': 'WIND:10 m',
'pressure': 'MSLMA:mean sea level',
'radar': 'REFC:entire atmosphere' # Composite reflectivity
}
fig = go.Figure()
# Always try to get main weather parameter (unless it's radar-only)
if param_type != 'radar':
grib_param = param_map.get(param_type, 'TMP:2 m')
print(f"Fetching {param_type} ({grib_param}) for +{forecast_hour}h")
# Fetch real weather data
ds = fetch_real_rap_data(grib_param, forecast_hour)
processed = process_rap_data(ds, max_points=400, param_type=param_type)
if processed is not None:
# Real RAP weather data
print(f"Plotting {len(processed['values'])} weather data points")
# Choose colorscale based on parameter
if param_type == 'temperature':
colorscale = 'RdYlBu_r'
elif param_type == 'humidity':
colorscale = 'Blues'
elif param_type == 'pressure':
colorscale = 'RdBu_r'
else:
colorscale = 'Viridis'
fig.add_trace(go.Scattermapbox(
lat=processed['lats'],
lon=processed['lons'],
mode='markers',
marker=dict(
size=5,
color=processed['values'],
colorscale=colorscale,
showscale=True,
colorbar=dict(
title=f"{processed.get('long_name', param_type)} ({processed.get('units', '')})",
x=1.02,
len=0.8
),
opacity=0.7
),
text=[f"{v:.1f} {processed.get('units', '')}" for v in processed['values']],
hovertemplate='<b>%{text}</b><extra></extra>',
name="Weather Data"
))
# Add radar data if requested OR if radar is the main parameter
if show_radar or param_type == 'radar':
print(f"Fetching radar data (REFC) for +{forecast_hour}h")
# Fetch radar reflectivity forecast
radar_ds = fetch_real_rap_data('REFC:entire atmosphere', forecast_hour)
# Map detail_level (1-5) to target cell counts for performance/detail tradeoff
detail_to_cells = {1: 20000, 2: 40000, 3: 60000, 4: 90000, 5: 120000}
target_cells = detail_to_cells.get(int(detail_level) if detail_level is not None else 3, 60000)
# Use user-selected threshold to control speckle
radar_grid = process_rap_grid(radar_ds, target_cells=target_cells, param_type='radar', min_threshold=float(min_dbz) if min_dbz is not None else 0.5)
# Store latest grid globally for other components (animation overlays)
global LAST_RADAR_GRID
LAST_RADAR_GRID = radar_grid
if radar_grid is not None:
lat2d = radar_grid['lat2d']
lon2d = radar_grid['lon2d']
z2d = radar_grid['z2d']
print(f"Plotting radar grid: {z2d.shape[0]}x{z2d.shape[1]} cells")
# Validate radar alignment
validation = validate_radar_alignment(lat2d, lon2d, z2d)
if validation:
print("=== RADAR ALIGNMENT VALIDATION ===")
for city, result in validation.items():
if city != '_summary': # Skip summary entry
status = "✓ GOOD" if result['acceptable'] else "✗ POOR"
print(f"{city}: {result['offset_km']:.1f}km offset {status}")
print("=== END VALIDATION ===")
radar_layer_added = False
# First choice: Contourmapbox if available in this Plotly version
if hasattr(go, 'Contourmapbox') and not radar_layer_added:
try:
fig.add_trace(go.Contourmapbox(
lat=lat2d,
lon=lon2d,
z=z2d,
colorscale=get_radar_colorscale(),
contours=dict(coloring='heatmap', showlines=False),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
zmin=0,
zmax=65,
hovertemplate='Radar: %{z:.1f} dBZ<extra></extra>',
name="Radar Reflectivity"
))
radar_layer_added = True
except Exception as e:
print(f"Contourmapbox failed, trying raster fallback: {e}")
elif not hasattr(go, 'Contourmapbox'):
print("Contourmapbox not available in this Plotly version; trying raster fallback")
# Second choice: smooth raster image layer for fused appearance
if not radar_layer_added:
try:
image_added = add_radar_image_layer(fig, lat2d, lon2d, z2d, detail_level, param_type)
radar_layer_added = radar_layer_added or bool(image_added)
except Exception as e:
print(f"Image layer attempt failed: {e}")
# Third choice: Choroplethmapbox (solid polygons per cell)
if not radar_layer_added:
try:
# Limit polygon count based on detail level
detail_to_polys = {1: 4000, 2: 8000, 3: 12000, 4: 20000, 5: 30000}
max_polys = detail_to_polys.get(int(detail_level) if detail_level is not None else 3, 12000)
geojson = grid_to_geojson(lat2d, lon2d, z2d, max_polygons=max_polys)
if geojson and geojson.get('features'):
ids = [f["properties"]["id"] for f in geojson["features"]]
vals = [f["properties"]["value"] for f in geojson["features"]]
fig.add_trace(go.Choroplethmapbox(
geojson=geojson,
locations=ids,
z=vals,
featureidkey="properties.id",
colorscale=get_radar_colorscale(),
zmin=0,
zmax=65,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
marker_opacity=0.85,
marker_line_width=0,
hovertemplate='Radar: %{z:.1f} dBZ<extra></extra>',
name="Radar Reflectivity"
))
radar_layer_added = True
else:
print("Choropleth fallback failed: empty geojson or no features")
except Exception as e:
print(f"Choropleth fallback failed: {e}")
# Final fallback: density layer (zoom-dependent appearance)
if not radar_layer_added:
radar_processed = process_rap_data(radar_ds, max_points=10000, param_type='radar')
if radar_processed is not None:
detail_to_radius = {1: 18, 2: 14, 3: 12, 4: 10, 5: 8}
radius = detail_to_radius.get(int(detail_level) if detail_level is not None else 3, 12)
fig.add_trace(go.Densitymapbox(
lat=radar_processed['lats'],
lon=radar_processed['lons'],
z=radar_processed['values'],
radius=radius,
colorscale=get_radar_colorscale(),
showscale=True,
colorbar=dict(
title="Radar Reflectivity (dBZ)",
x=0.02 if param_type != 'radar' else 1.02,
len=0.6
),
opacity=0.85,
zmin=0.1,
zmax=65,
hovertemplate='Radar: %{z:.1f} dBZ<extra></extra>',
name="Radar Forecast"
))
else:
print("No radar data available for this time")
# Add note about radar unavailability
if param_type == 'radar':
fig.add_annotation(
text="RAP radar forecast temporarily unavailable<br>Try a different forecast hour",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False,
font=dict(size=14)
)
# Set title based on what's displayed
if param_type == 'radar':
title = f"RAP Radar Reflectivity Forecast (+{forecast_hour}h)"
elif show_radar:
title = f"RAP {param_type.title()} + Radar Forecast (+{forecast_hour}h)"
else:
title = f"RAP {param_type.title()} Forecast (+{forecast_hour}h)"
# If no data at all
if len(fig.data) == 0:
fig.add_annotation(
text="RAP data temporarily unavailable<br>Try different parameters or forecast hours",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False,
font=dict(size=16)
)
title = "RAP Data - Loading"
# Center map on North America to show full NAM domain coverage
# NAM domain: lat [21.14, 52.62], lon [-134.10, -60.92]
center_lat = (21.14 + 52.62) / 2 # ~36.9°N (mid-point of NAM domain)
center_lon = (-134.10 + -60.92) / 2 # ~-97.5°W (mid-point of NAM domain)
fig.update_layout(
mapbox=dict(
style="open-street-map",
zoom=3, # Lower zoom to show more area including Canada and Mexico
center=dict(lat=center_lat, lon=center_lon)
),
height=500,
title=title,
margin=dict(l=0, r=80, t=50, b=0)
)
return fig
except Exception as e:
print(f"Map creation error: {e}")
# Force cleanup
gc.collect()
# Return error figure
fig = go.Figure()
fig.add_annotation(
text=f"Error: {str(e)[:100]}",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False
)
fig.update_layout(height=400, title="Error Loading Data")
return fig
def update_display(location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast):
"""Simple stable update function - single map only"""
try:
# Force garbage collection
gc.collect()
print(f"\n=== UPDATE: {location}, +{forecast_hour}h, {parameter}, radar:{show_radar_overlay} ===")
# Create single weather map (no second map to avoid crashes)
weather_map = create_weather_map_with_radar(parameter, forecast_hour, show_radar_overlay, detail_level, min_dbz)
# Simple status
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
# Get alignment status if available
alignment_status = ""
if LAST_RADAR_GRID and show_radar_overlay:
try:
validation = validate_radar_alignment(
LAST_RADAR_GRID['lat2d'],
LAST_RADAR_GRID['lon2d'],
LAST_RADAR_GRID['z2d']
)
if validation:
good_count = sum(1 for v in validation.values() if v['acceptable'])
total_count = len(validation)
avg_offset = np.mean([v['offset_km'] for v in validation.values()])
alignment_status = f"\n**Alignment:** {good_count}/{total_count} reference points within 10km (avg: {avg_offset:.1f}km)"
except Exception:
alignment_status = "\n**Alignment:** Validation unavailable"
status = f"""
## North American Weather + Radar Forecasts
**Location:** {location}
**Current:** {current_time.strftime('%H:%M UTC')}
**Forecast:** {forecast_time.strftime('%H:%M UTC')} (+{forecast_hour}h)
**Parameter:** {parameter.title()}
**Radar Overlay:** {"Enabled" if show_radar_overlay else "Disabled"}
**Radar Detail:** {detail_level} (1=Fast, 5=Max)
**Min dBZ:** {min_dbz:.1f}
**Data Source:** {"REAL NOAA RAP with Enhanced Alignment" if HERBIE_AVAILABLE else "RAP Unavailable"}{alignment_status}
**Radar Info:** 🌎 For North American coverage, app tries NAM model first for REFC (composite reflectivity), then falls back to RAP for other weather data.
**Note:** Radar forecasts now use validated RAP Lambert Conformal projection coordinates for proper geographic alignment.
"""
# Optional animation and Leaflet overlay
gif_path = None
leaflet_html = ""
if animate_forecast:
try:
gif_path, _ = generate_radar_animation_gif(detail_level=int(detail_level), min_dbz=float(min_dbz))
global LAST_ANIMATION_PATH
LAST_ANIMATION_PATH = gif_path
except Exception as e:
print(f"Animation generation error (gif): {e}")
gif_path = None
# Build Leaflet overlay from transparent PNG frames for correct alpha blending
try:
frames, msg = generate_radar_animation_png_frames(detail_level=int(detail_level), min_dbz=float(min_dbz), fps=4.0)
if frames:
leaflet_html = build_leaflet_overlay_from_frames(frames, LAST_RADAR_GRID, fps=4.0)
else:
leaflet_html = f"<div style='padding:8px;color:#900'>Animation frames error: {msg}</div>"
except Exception as e:
leaflet_html = f"<div style='padding:8px;color:#900'>Leaflet overlay build failed: {str(e)}</div>"
# Create comparison Leaflet map
leaflet_comparison = ""
try:
leaflet_comparison_html = create_leaflet_comparison_map(parameter, forecast_hour, show_radar_overlay, detail_level, min_dbz)
# Escape HTML for iframe display
escaped_html = leaflet_comparison_html.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace("\"", "&quot;")
leaflet_comparison = f"<iframe srcdoc=\"{escaped_html}\" style=\"width:100%;height:520px;border:none;border-radius:8px\"></iframe>"
except Exception as e:
leaflet_comparison = f"<div style='padding:8px;color:#900'>Leaflet comparison error: {str(e)}</div>"
return status, weather_map, leaflet_comparison, gif_path, leaflet_html
except Exception as e:
print(f"Update error: {e}")
# Aggressive cleanup on error
gc.collect()
error_fig = go.Figure()
error_fig.add_annotation(text=f"Update failed: {str(e)}", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False)
error_fig.update_layout(height=300)
return f"## Error\n{str(e)}", error_fig, "<div>Error loading comparison</div>", None, ""
# Stable interface - single map only
with gr.Blocks(title="North American Weather + Radar") as app:
gr.HTML("""
<div style="text-align: center; background: linear-gradient(45deg, #3498db, #9b59b6);
color: white; padding: 1.5rem; border-radius: 10px; margin-bottom: 1rem;">
<h1>🌎 North American Weather + Radar Forecasts</h1>
<p>Real NOAA NAM/RAP data with Plotly vs Leaflet alignment comparison</p>
</div>
""")
gr.HTML("""
<div style="background: #f8f9fa; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;">
<h3>📍 Validate Radar Alignment Against Official NOAA Sources</h3>
<p>Compare our radar overlay alignment with these official NOAA RAP visualizations:</p>
<ul style="margin: 0.5rem 0;">
<li><strong>RAP Model Browser:</strong> <a href="https://rapidrefresh.noaa.gov/" target="_blank">NOAA RAP Graphics</a></li>
<li><strong>RAP CONUS Hourly:</strong> <a href="https://rapidrefresh.noaa.gov/RAP/Welcome.cgi" target="_blank">NOAA RAP Graphics</a></li>
<li><strong>National Radar:</strong> <a href="https://radar.weather.gov/" target="_blank">NWS Radar (NEXRAD)</a></li>
<li><strong>Graphical Forecast:</strong> <a href="https://graphical.weather.gov/sectors/conus.php" target="_blank">NOAA CONUS Graphics</a></li>
</ul>
<p><em>💡 Tip: Use the same forecast time and look for matching radar patterns, storm positions, and geographic alignment with cities/coastlines.</em></p>
</div>
""")
with gr.Row():
with gr.Column():
location = gr.Textbox(value="Kansas City, MO", label="Location")
with gr.Row():
forecast_hour = gr.Slider(
minimum=0, maximum=18, value=6, step=1,
label="Forecast Hours"
)
detail_level = gr.Slider(
minimum=1, maximum=5, value=5, step=1,
label="Radar Detail",
info="Higher = more detail (slower)"
)
min_dbz = gr.Slider(
minimum=0.0, maximum=20.0, value=0.0, step=0.5,
label="Min dBZ",
info="Hide speckle below this reflectivity"
)
animate_forecast = gr.Checkbox(value=False, label="Animate 0–18h Forecast")
parameter = gr.Dropdown(
choices=[
("Temperature", "temperature"),
("Humidity", "humidity"),
("Wind Speed", "wind_speed"),
("Pressure", "pressure"),
("Radar Only", "radar")
],
value="temperature",
label="Weather Parameter"
)
show_radar_overlay = gr.Checkbox(
value=False,
label="Add Radar Overlay",
info="Show RAP radar forecast on top of weather data"
)
update_btn = gr.Button("Get North American Data + Radar", variant="primary")
with gr.Row():
export_btn = gr.Button("Export Radar as GRIB2")
download_raw_btn = gr.Button("Download Raw GRIB2 (RAP)")
export_kmz_btn = gr.Button("Export Radar as KMZ")
gr.HTML("""
<div style="background: #e8f5e8; padding: 1rem; border-radius: 8px; margin-top: 1rem;">
<h4>🎯 Enhanced Radar Features:</h4>
<ul style="font-size: 0.9em; margin: 0.5rem 0;">
<li><strong>REFC:</strong> Composite reflectivity forecast</li>
<li><strong>dBZ Scale:</strong> 0.5-65+ precipitation intensity</li>
<li><strong>Projection:</strong> Validated Lambert Conformal Conic</li>
<li><strong>Alignment:</strong> Coordinates validated against reference points</li>
<li><strong>Geographic accuracy:</strong> Proper RAP grid transformation</li>
<li><strong>Real forecasts:</strong> RAP model predictions</li>
</ul>
<p style="font-size: 0.8em; margin-top: 0.5rem;">
<strong>New:</strong> Side-by-side Plotly vs Leaflet comparison maps to validate alignment.<br>
<strong>Improvement:</strong> Radar data uses validated RAP coordinates with Lambert Conformal projection.<br>
<strong>Export:</strong> KMZ format for Google Earth and professional GIS applications.
</p>
</div>
""")
with gr.Column():
status_text = gr.Markdown("Click button to fetch RAP weather + radar data")
# Side-by-side map comparison
with gr.Row():
with gr.Column():
gr.Markdown("### Plotly Map")
weather_map = gr.Plot()
with gr.Column():
gr.Markdown("### Leaflet Comparison")
leaflet_comparison = gr.HTML(label="Leaflet Map Comparison")
# Additional outputs below
with gr.Row():
with gr.Column():
animation_view = gr.Image(label="Radar Animation (0–18h)")
with gr.Column():
leaflet_overlay = gr.HTML(label="Leaflet Animation Overlay")
# Export files
with gr.Row():
export_file = gr.File(label="GRIB2 Export", visible=True)
raw_grib_file = gr.File(label="Raw RAP GRIB2", visible=True)
kmz_export_file = gr.File(label="KMZ Export", visible=True)
# Event handlers
update_btn.click(
fn=update_display,
inputs=[location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast],
outputs=[status_text, weather_map, leaflet_comparison, animation_view, leaflet_overlay]
)
# Auto-update when toggling radar
show_radar_overlay.change(
fn=update_display,
inputs=[location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast],
outputs=[status_text, weather_map, leaflet_comparison, animation_view, leaflet_overlay]
)
# Export GRIB button
def _export_handler(forecast_hour, min_dbz):
path, msg = export_radar_grib(forecast_hour, min_dbz)
if path:
return path
else:
# Create a tiny text file with error to make it downloadable
import os
os.makedirs('exports', exist_ok=True)
err_path = f"exports/export_error.txt"
with open(err_path, 'w') as f:
f.write(msg or 'Export failed')
return err_path
export_btn.click(
fn=_export_handler,
inputs=[forecast_hour, min_dbz],
outputs=[export_file]
)
def _download_raw_handler(forecast_hour):
path, msg = download_raw_grib(int(forecast_hour))
if path:
return path
else:
import os
os.makedirs('exports', exist_ok=True)
err_path = f"exports/raw_grib_error.txt"
with open(err_path, 'w') as f:
f.write(msg or 'Download failed')
return err_path
download_raw_btn.click(
fn=_download_raw_handler,
inputs=[forecast_hour],
outputs=[raw_grib_file]
)
# Export KMZ button
def _export_kmz_handler(forecast_hour, min_dbz):
path, msg = export_rap_to_kmz(forecast_hour, min_dbz)
if path:
return path
else:
# Create a tiny text file with error to make it downloadable
import os
os.makedirs('exports', exist_ok=True)
err_path = f"exports/kmz_export_error.txt"
with open(err_path, 'w') as f:
f.write(msg or 'KMZ export failed')
return err_path
export_kmz_btn.click(
fn=_export_kmz_handler,
inputs=[forecast_hour, min_dbz],
outputs=[kmz_export_file]
)
if __name__ == "__main__":
# Disable SSR to allow custom JS (Leaflet/Folium) to run in gr.HTML blocks
app.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)