import gradio as gr import numpy as np import plotly.graph_objects as go from datetime import datetime, timedelta import warnings import gc import sys import math from typing import Optional, Dict, Any, List, Tuple import os import glob # Globals used for building overlays without refetching LAST_RADAR_GRID: Optional[Dict[str, Any]] = None LAST_ANIMATION_PATH: Optional[str] = None warnings.filterwarnings('ignore') # Import weather libraries for REAL data try: from herbie import Herbie import xarray as xr HERBIE_AVAILABLE = True print("HERBIE AVAILABLE - Will use real RAP data including radar") except ImportError as e: HERBIE_AVAILABLE = False print(f"HERBIE NOT AVAILABLE: {e}") # Try importing projection libraries for coordinate transformation try: import cartopy.crs as ccrs import pyproj PROJECTION_AVAILABLE = True except ImportError: PROJECTION_AVAILABLE = False print("Projection libraries not available - using raw coordinates") # Try importing KML/KMZ libraries try: import zipfile import xml.etree.ElementTree as ET KMZ_AVAILABLE = True except ImportError: KMZ_AVAILABLE = False print("KMZ export not available") def _try_nam_refc_data(param='REFC:entire atmosphere', fxx=6, return_src: bool = False): """Try to fetch REFC data from NAM model for North American coverage.""" try: # Try recent times for NAM model current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) for hours_back in [2, 3, 6, 12, 18]: try: target_time = current_time - timedelta(hours=hours_back) date_str = target_time.strftime('%Y-%m-%d %H:00') print(f" Trying NAM data for: {date_str}, parameter: {param}") # Try NAM with different products for broader coverage for product in ['afwaca', 'conusnest.hiresf']: # Central America/Caribbean first, then CONUS try: H = Herbie(date_str, model='nam', product=product, fxx=fxx) ds = H.xarray(param) if ds is not None: print(f" SUCCESS: Got NAM {product} data for {date_str}") if return_src: return (ds, {'date_str': date_str, 'model': 'nam', 'product': product}) return ds except Exception as e: print(f" NAM {product} failed: {e}") continue except Exception as e: print(f" NAM attempt failed for {date_str}: {e}") continue print(" All NAM attempts failed") # Try GFS as final fallback for global coverage including North America print(" Trying GFS model for global REFC coverage...") try: for hours_back in [0, 6, 12, 18]: target_time = current_time - timedelta(hours=hours_back) date_str = target_time.strftime('%Y-%m-%d %H:00') print(f" Trying GFS data for: {date_str}, parameter: {param}") # Try GFS with 0.25 degree resolution (highest available) try: H = Herbie(date_str, model='gfs', product='pgrb2.0p25', fxx=fxx) ds = H.xarray(param) if ds is not None: print(f" SUCCESS: Got GFS data for {date_str}") return (ds, {'date_str': date_str, 'model': 'gfs'}) if return_src else ds except Exception as e: print(f" GFS failed: {e}") continue except Exception as e: print(f" GFS fetch error: {e}") return None except Exception as e: print(f" NAM fetch error: {e}") return None def fetch_real_rap_data(param='TMP:2 m', fxx=6, return_src: bool = False): """Fetch actual RAP data from NOAA including forecasts. Note: RAP model may not include REFC (composite reflectivity) parameter. RAP is primarily focused on temperature, pressure, and wind fields. If return_src is True, returns a tuple (ds, info) where info contains metadata such as 'date_str' and possible 'file' path. """ if not HERBIE_AVAILABLE: return (None, None) if return_src else None try: # For REFC parameter, try NAM model for North American coverage if 'REFC' in param: print(f"INFO: Trying NAM model for North American REFC coverage...") nam_result = _try_nam_refc_data(param, fxx, return_src) if nam_result: return nam_result print(f"NAM failed, falling back to RAP model...") # Try recent times, working backwards current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) for hours_back in [2, 3, 6, 12, 18]: try: target_time = current_time - timedelta(hours=hours_back) date_str = target_time.strftime('%Y-%m-%d %H:00') print(f"Trying RAP data for: {date_str}, parameter: {param}") # Create Herbie object - RAP uses 'sfc' product like HRRR H = Herbie(date_str, model='rap', product='sfc', fxx=fxx) # Debug: Check if RAP data is available for this time try: # Test basic availability first print(f" Testing RAP availability: {H}") if hasattr(H, 'grib'): print(f" GRIB source: {H.grib}") # Download specific parameter ds = H.xarray(param) except Exception as e: print(f" RAP xarray error: {e}") continue if ds is not None: print(f"SUCCESS: Got real RAP data for {date_str}") if return_src: # Try to discover the source grib path from encodings or Herbie src_path = None try: src_path = ds.encoding.get('source', None) except Exception: pass if not src_path: try: # Try variable encodings for vname in ds.data_vars: enc = getattr(ds[vname], 'encoding', {}) src_path = enc.get('source', None) if src_path: break except Exception: pass # Fallback: ask Herbie for local file path (best effort) if not src_path: for attr in ('get_localFilePath', 'get_local_file_path', 'local_file', 'fpath', 'filepath'): if hasattr(H, attr): try: val = getattr(H, attr) src_path = val() if callable(val) else val if src_path: break except Exception: continue info = { 'date_str': date_str, 'param': param, 'fxx': fxx, 'file': src_path } return ds, info else: return ds except Exception as e: print(f"Failed for {date_str}: {e}") continue print("All RAP attempts failed") return (None, None) if return_src else None except Exception as e: print(f"RAP fetch error: {e}") return (None, None) if return_src else None def get_rap_projection(): """Get the RAP Lambert Conformal Conic projection parameters. Official NOAA RAP CONUS domain specifications from GRIB2 metadata: - Grid: 1799 x 1059 mass points, 3km resolution (DxInMetres: 3000.0, DyInMetres: 3000.0) - Lambert Conformal GRIB2 Template 30 - LaDInDegrees: 38.5, Latin1InDegrees: 38.5, Latin2InDegrees: 38.5 - LoVInDegrees: 262.5 (orientation longitude, meridian aligned with Y-axis) - Earth model: Sphere radius 6371229 meters """ return { 'proj': 'lcc', 'lat_1': 38.5, # Latin1 - first standard parallel 'lat_2': 38.5, # Latin2 - second standard parallel (tangent cone) 'lat_0': 38.5, # LaD - latitude where grid lengths are specified 'lon_0': 262.5, # LoV - orientation longitude (meridian aligned with Y-axis) 'x_0': 0, # False easting 'y_0': 0, # False northing 'a': 6371229, # Earth sphere radius in meters 'b': 6371229, # Earth sphere radius (same for sphere) 'units': 'm' } def validate_rap_coordinates(ds): """Validate and potentially correct RAP/NAM coordinate arrays.""" if ds is None: return None try: # Check if we have proper 2D coordinate arrays if 'latitude' in ds.coords and 'longitude' in ds.coords: lat2d = ds.latitude.values lon2d_raw = ds.longitude.values # Apply longitude correction for 0-360° to -180 to 180° conversion lon2d = np.where(lon2d_raw > 180, lon2d_raw - 360, lon2d_raw) # Validate that coordinates are reasonable for North American domain if lat2d.ndim == 2 and lon2d.ndim == 2: lat_min, lat_max = np.nanmin(lat2d), np.nanmax(lat2d) lon_min, lon_max = np.nanmin(lon2d), np.nanmax(lon2d) # Check for RAP CONUS domain (HRRR-like coverage) rap_lat_valid = (20.8 <= lat_min <= 21.5) and (47.5 <= lat_max <= 48.2) rap_lon_valid = (-135.0 <= lon_min <= -133.0) and (-61.5 <= lon_max <= -60.0) # Check for NAM North American domain (broader coverage including Canada/Mexico) nam_lat_valid = (20.0 <= lat_min <= 22.0) and (52.0 <= lat_max <= 54.0) nam_lon_valid = (-140.0 <= lon_min <= -130.0) and (-65.0 <= lon_max <= -55.0) if rap_lat_valid and rap_lon_valid: print(f"✓ RAP CONUS coordinates validated: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]") return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': True, 'model': 'RAP'} elif nam_lat_valid and nam_lon_valid: print(f"✓ NAM North American coordinates validated: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]") return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': True, 'model': 'NAM'} else: # Still usable coordinates, just warn about potential issues print(f"Warning: Coordinates outside expected domains: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]") print(f"RAP expected: lat [20.8-21.5, 47.5-48.2], lon [-135.0 to -133.0, -61.5 to -60.0]") print(f"NAM expected: lat [20.0-22.0, 52.0-54.0], lon [-140.0 to -130.0, -65.0 to -55.0]") print("Warning: Using potentially non-standard coordinates") return {'lat2d': lat2d, 'lon2d': lon2d, 'valid': False, 'model': 'Unknown'} return None except Exception as e: print(f"Coordinate validation error: {e}") return None def process_rap_data(ds, max_points=400, param_type='temperature'): """Process RAP xarray dataset into plot-ready data""" if ds is None: return None try: # Get the main data variable var_names = list(ds.data_vars) if not var_names: return None var_name = var_names[0] data_var = ds[var_name] # Get coordinates if 'latitude' in ds.coords and 'longitude' in ds.coords: lats = ds.latitude.values lons = ds.longitude.values values = data_var.values elif 'lat' in ds.coords and 'lon' in ds.coords: lats = ds.lat.values lons = ds.lon.values values = data_var.values else: return None # For radar, get MAXIMUM resolution - much more data points if param_type == 'radar': max_points = 10000 # Much larger for full radar coverage min_threshold = 0.1 # Even lower threshold for light precipitation else: min_threshold = None # Less aggressive subsampling for radar to keep more detail if lats.size > max_points: if param_type == 'radar': # For radar, use smaller step to keep more data step = max(1, int(np.sqrt(lats.size / max_points) * 0.7)) else: step = max(1, int(np.sqrt(lats.size / max_points))) if len(lats.shape) == 2: lats = lats[::step, ::step] lons = lons[::step, ::step] values = values[::step, ::step] else: lats = lats[::step] lons = lons[::step] values = values[::step] # Flatten arrays lats_flat = lats.flatten() lons_flat = lons.flatten() values_flat = values.flatten() # Remove invalid values valid = ~(np.isnan(values_flat) | np.isnan(lats_flat) | np.isnan(lons_flat)) # For radar, use minimal filtering to show maximum coverage if param_type == 'radar' and min_threshold is not None: radar_threshold = values_flat > min_threshold valid = valid & radar_threshold if not np.any(valid): return None return { 'lats': lats_flat[valid], 'lons': lons_flat[valid], 'values': values_flat[valid], 'units': data_var.attrs.get('units', ''), 'long_name': data_var.attrs.get('long_name', var_name), 'param_type': param_type } except Exception as e: print(f"Data processing error: {e}") return None def get_radar_colorscale(): """Get proper radar reflectivity colorscale in dBZ""" return [ [0.0, 'rgba(0,0,0,0)'], # Transparent for no echo [0.1, '#00ECEC'], # Light blue - 5-10 dBZ [0.2, '#01A0F6'], # Blue - 10-15 dBZ [0.3, '#0000F6'], # Dark blue - 15-20 dBZ [0.4, '#00FF00'], # Green - 20-25 dBZ [0.5, '#00C800'], # Dark green - 25-30 dBZ [0.6, '#FFFF00'], # Yellow - 30-35 dBZ [0.7, '#E7C000'], # Orange-yellow - 35-40 dBZ [0.8, '#FF9000'], # Orange - 40-45 dBZ [0.9, '#FF0000'], # Red - 45-50 dBZ [1.0, '#D60000'] # Dark red - 50+ dBZ ] def apply_rap_coordinate_correction(lat2d, lon2d): """Apply necessary coordinate corrections for RAP/NAM data alignment.""" try: print(f"Input coordinate shapes: lat {lat2d.shape}, lon {lon2d.shape}") # Handle different grid structures - NAM products may have different formats if lat2d.shape != lon2d.shape: print(f"Warning: Coordinate arrays have different shapes - attempting to fix") # If one is 1D and the other is 2D, or different 2D shapes, create meshgrid if lat2d.ndim == 1 and lon2d.ndim == 1: lon2d, lat2d = np.meshgrid(lon2d, lat2d) print(f"Created meshgrid: lat {lat2d.shape}, lon {lon2d.shape}") elif lat2d.ndim == 1: # Lat is 1D, lon might be 2D - make compatible lat2d = np.broadcast_to(lat2d[:, np.newaxis], lon2d.shape) print(f"Broadcasted lat to match lon: {lat2d.shape}") elif lon2d.ndim == 1: # Lon is 1D, lat might be 2D - make compatible lon2d = np.broadcast_to(lon2d[np.newaxis, :], lat2d.shape) print(f"Broadcasted lon to match lat: {lon2d.shape}") else: print(f"Cannot reconcile coordinate shapes: lat {lat2d.shape}, lon {lon2d.shape}") return lat2d, lon2d, np.ones_like(lat2d, dtype=bool) # For radar visualization, ensure longitude is in -180 to 180 range # RAP/NAM often provide longitudes in 0-360° format, convert to -180 to 180° lon2d_corrected = np.where(lon2d > 180, lon2d - 360, lon2d) # Remove any obvious outliers or invalid coordinates after correction valid_mask = (~np.isnan(lat2d) & ~np.isnan(lon2d_corrected) & (lat2d >= -90) & (lat2d <= 90) & (lon2d_corrected >= -180) & (lon2d_corrected <= 180)) # Apply RAP-specific coordinate corrections based on official specifications # RAP uses Lambert Conformal Conic projection with specific parameters # The coordinates should already be properly projected, but we can verify bounds # Validate against known RAP domain boundaries lat_min, lat_max = np.nanmin(lat2d), np.nanmax(lat2d) lon_min, lon_max = np.nanmin(lon2d_corrected), np.nanmax(lon2d_corrected) # Check if coordinates fall within known model domains # RAP CONUS: lat 21.14° to 47.84°N, lon -134.10° to -60.90°W # NAM North America: broader coverage including Canada and Mexico rap_domain = (20.8 <= lat_min <= 21.5 and 47.5 <= lat_max <= 48.2 and -135.0 <= lon_min <= -133.0 and -61.5 <= lon_max <= -60.0) nam_domain = (20.0 <= lat_min <= 22.0 and 52.0 <= lat_max <= 54.0 and -140.0 <= lon_min <= -130.0 and -65.0 <= lon_max <= -55.0) if rap_domain: print(f"✓ Coordinates match RAP CONUS domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]") elif nam_domain: print(f"✓ Coordinates match NAM North American domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]") else: print(f"Info: Using non-standard coordinate domain: lat [{lat_min:.2f}, {lat_max:.2f}], lon [{lon_min:.2f}, {lon_max:.2f}]") return lat2d, lon2d_corrected, valid_mask except Exception as e: print(f"Coordinate correction error: {e}") return lat2d, lon2d, np.ones_like(lat2d, dtype=bool) def process_rap_grid(ds, target_cells=50000, param_type='radar', min_threshold=0.1): """Return RAP data as 2D grids (lat2d, lon2d, z2d) suitable for filled contours. - target_cells: approximate max number of grid cells to draw for performance - min_threshold: values below are masked as NaN (for radar transparency) """ if ds is None: return None try: var_names = list(ds.data_vars) if not var_names: return None var_name = var_names[0] data_var = ds[var_name] # Prefer explicit 2D latitude/longitude if available if 'latitude' in ds.coords and 'longitude' in ds.coords: lat2d_raw = ds.latitude.values lon2d_raw = ds.longitude.values # Apply coordinate validation and correction lat2d, lon2d, valid_mask = apply_rap_coordinate_correction(lat2d_raw, lon2d_raw) # Store validation info for debugging coord_validation = validate_rap_coordinates(ds) if coord_validation and not coord_validation['valid']: model_type = coord_validation.get('model', 'Unknown') print(f"Warning: Using potentially non-standard {model_type} coordinates") elif 'lat' in ds.coords and 'lon' in ds.coords: # Some datasets provide 1D lat/lon; try to construct 2D mesh lat = ds.lat.values lon = ds.lon.values if lat.ndim == 1 and lon.ndim == 1: lon2d, lat2d = np.meshgrid(lon, lat) else: lat2d = lat lon2d = lon else: return None z = data_var.values # Ensure z is 2D (squeeze time/levels if any) z = np.squeeze(z) if z.ndim != 2: # Cannot contour non-2D return None # Subsample to keep performance predictable ny, nx = z.shape total = nx * ny if total > target_cells: step = int(np.ceil(np.sqrt(total / target_cells))) step = max(1, step) z = z[::step, ::step] # Only subsample coordinates if they're 2D and match z dimensions if lat2d.ndim == 2 and lat2d.shape == (ny, nx): lat2d = lat2d[::step, ::step] elif lat2d.ndim == 1 and len(lat2d) == ny: lat2d = lat2d[::step] if lon2d.ndim == 2 and lon2d.shape == (ny, nx): lon2d = lon2d[::step, ::step] elif lon2d.ndim == 1 and len(lon2d) == nx: lon2d = lon2d[::step] # Mask values below threshold for radar if param_type == 'radar' and min_threshold is not None: z = np.where(z >= min_threshold, z, np.nan) return { 'lat2d': lat2d, 'lon2d': lon2d, 'z2d': z, 'units': data_var.attrs.get('units', ''), 'long_name': data_var.attrs.get('long_name', var_name), 'param_type': param_type } except Exception as e: print(f"Grid processing error: {e}") return None def _clamp(val, vmin, vmax): return max(vmin, min(val, vmax)) def grid_to_geojson(lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray, max_polygons: Optional[int] = None, nan_as_transparent: bool = True) -> Optional[Dict[str, Any]]: """Convert a lat/lon curvilinear grid into a GeoJSON FeatureCollection of cell polygons. - Each cell is a quadrilateral around the center (i,j) using neighboring points. - Values that are NaN are skipped when nan_as_transparent is True. - max_polygons optionally caps the number of cells included (row/col stride). """ try: ny, nx = z2d.shape if ny < 2 or nx < 2: return None # Determine stride to cap polygons if needed istep = jstep = 1 total_cells = ny * nx if max_polygons and total_cells > max_polygons: factor = math.sqrt(total_cells / max_polygons) istep = max(1, int(round(factor))) jstep = istep features = [] # Helper for safe index def lat_(i, j): ii = _clamp(i, 0, ny - 1) jj = _clamp(j, 0, nx - 1) return float(lat2d[ii, jj]) def lon_(i, j): ii = _clamp(i, 0, ny - 1) jj = _clamp(j, 0, nx - 1) return float(lon2d[ii, jj]) # Build polygons for i in range(0, ny, istep): for j in range(0, nx, jstep): val = z2d[i, j] if nan_as_transparent and (val is None or np.isnan(val)): continue # Corners as average of 4 surrounding centers (clamped at edges) # Top-left around (i-0.5, j-0.5) lat_tl = (lat_(i, j) + lat_(i-1, j) + lat_(i, j-1) + lat_(i-1, j-1)) / 4.0 lon_tl = (lon_(i, j) + lon_(i-1, j) + lon_(i, j-1) + lon_(i-1, j-1)) / 4.0 # Top-right around (i-0.5, j+0.5) lat_tr = (lat_(i, j) + lat_(i-1, j) + lat_(i, j+1) + lat_(i-1, j+1)) / 4.0 lon_tr = (lon_(i, j) + lon_(i-1, j) + lon_(i, j+1) + lon_(i-1, j+1)) / 4.0 # Bottom-right around (i+0.5, j+0.5) lat_br = (lat_(i, j) + lat_(i+1, j) + lat_(i, j+1) + lat_(i+1, j+1)) / 4.0 lon_br = (lon_(i, j) + lon_(i+1, j) + lon_(i, j+1) + lon_(i+1, j+1)) / 4.0 # Bottom-left around (i+0.5, j-0.5) lat_bl = (lat_(i, j) + lat_(i+1, j) + lat_(i, j-1) + lat_(i+1, j-1)) / 4.0 lon_bl = (lon_(i, j) + lon_(i+1, j) + lon_(i, j-1) + lon_(i+1, j-1)) / 4.0 poly = [ [lon_tl, lat_tl], [lon_tr, lat_tr], [lon_br, lat_br], [lon_bl, lat_bl], [lon_tl, lat_tl] ] fid = f"{i}-{j}" feat = { "type": "Feature", "id": fid, "properties": {"id": fid, "value": None if np.isnan(val) else float(val)}, "geometry": {"type": "Polygon", "coordinates": [poly]} } features.append(feat) return {"type": "FeatureCollection", "features": features} except Exception as e: print(f"GeoJSON build error: {e}") return None def _parse_plotly_color(color_str: str) -> Tuple[float, float, float, float]: """Convert '#RRGGBB' or 'rgba(r,g,b,a)' to normalized RGBA tuple.""" color_str = color_str.strip() if color_str.startswith('#'): r = int(color_str[1:3], 16) / 255.0 g = int(color_str[3:5], 16) / 255.0 b = int(color_str[5:7], 16) / 255.0 a = 1.0 return (r, g, b, a) if color_str.startswith('rgba'): nums = color_str[color_str.find('(')+1:color_str.find(')')].split(',') r = int(nums[0]) / 255.0 g = int(nums[1]) / 255.0 b = int(nums[2]) / 255.0 a = float(nums[3]) return (r, g, b, a) raise ValueError(f"Unsupported color: {color_str}") def build_mpl_colormap(colorscale: List[List[float]], name: str = 'radar'): """Build a Matplotlib colormap from a Plotly colorscale definition.""" try: import matplotlib.colors as mcolors stops = [(float(p), _parse_plotly_color(c)) for p, c in colorscale] # mcolors.LinearSegmentedColormap.from_list accepts (x, color) pairs cmap = mcolors.LinearSegmentedColormap.from_list(name, stops) # Ensure NaNs are transparent cmap.set_bad((0, 0, 0, 0)) return cmap except Exception as e: print(f"Colormap build error: {e}") return None def add_radar_image_layer(fig: go.Figure, lat2d: np.ndarray, lon2d: np.ndarray, z2d: np.ndarray, detail_level: int, param_type: str) -> bool: """Render radar as a smooth raster image and overlay via mapbox image layer. Returns True on success. """ try: import io, base64 import matplotlib matplotlib.use('Agg', force=True) import matplotlib.pyplot as plt # Determine output image size based on detail level and grid size ny, nx = z2d.shape scale_map = {1: 1.2, 2: 1.6, 3: 2.0, 4: 3.0, 5: 4.0} scale = scale_map.get(int(detail_level) if detail_level is not None else 3, 2.0) max_pixels = 2_400_000 # cap to ~2.4 MP for performance width = int(nx * scale) height = int(ny * scale) # Fit within cap preserving aspect if width * height > max_pixels: ratio = math.sqrt(max_pixels / (width * height)) width = max(64, int(width * ratio)) height = max(64, int(height * ratio)) # Prepare data (mask NaNs for transparency) zmask = np.ma.masked_invalid(z2d) # Check if we need to flip the data to match geographic orientation ny, nx = lat2d.shape lat_top = float(lat2d[0, nx//2]) # Middle of top row lat_bottom = float(lat2d[-1, nx//2]) # Middle of bottom row # For proper geographic alignment, image top should correspond to highest latitude # With origin='upper', array[0] should have higher latitudes than array[-1] if lat_top < lat_bottom: # Data is ordered south-to-north, need to flip for north-to-south display zmask = np.flipud(zmask) print(f"⚠ Flipping radar data vertically: array has S-to-N order ({lat_top:.2f}° to {lat_bottom:.2f}°)") else: print(f"✓ Radar data orientation OK: array has N-to-S order ({lat_top:.2f}° to {lat_bottom:.2f}°)") cmap = build_mpl_colormap(get_radar_colorscale()) if cmap is None: return False dpi = 100 fig_img = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi) ax = fig_img.add_axes([0, 0, 1, 1]) # full-bleed ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear') ax.axis('off') buf = io.BytesIO() fig_img.savefig(buf, format='png', dpi=dpi, transparent=True) plt.close(fig_img) img_b64 = base64.b64encode(buf.getvalue()).decode('ascii') data_url = f"data:image/png;base64,{img_b64}" # Corner coordinates for RAP Lambert Conformal grid # Account for potential coordinate distortion in curvilinear grid ny, nx = lat2d.shape # RAP uses a curvilinear Lambert Conformal grid - corners may not be at exact array indices # Find actual geographic bounds rather than assuming corner positions # Get geographic bounds for proper alignment lat_min, lat_max = float(np.nanmin(lat2d)), float(np.nanmax(lat2d)) lon_min, lon_max = float(np.nanmin(lon2d)), float(np.nanmax(lon2d)) print(f"RAP geographic bounds: lat [{lat_min:.3f}, {lat_max:.3f}], lon [{lon_min:.3f}, {lon_max:.3f}]") # For Lambert Conformal grids, use the actual geographic bounds as corners # rather than relying on specific array indices which may not represent true corners # This approach works better with curvilinear grids # Define corners based on geographic bounds (standard GeoJSON/Mapbox order: [lon, lat]) tl = [lon_min, lat_max] # Top-left: western edge, northern edge tr = [lon_max, lat_max] # Top-right: eastern edge, northern edge br = [lon_max, lat_min] # Bottom-right: eastern edge, southern edge bl = [lon_min, lat_min] # Bottom-left: western edge, southern edge # Validate orientation is consistent with image lat_top = float(lat2d[0, nx//2]) # Middle of top row in data array lat_bottom = float(lat2d[-1, nx//2]) # Middle of bottom row in data array if lat_top < lat_bottom: print(f"⚠ Data array has inverted latitude order: array[0]={lat_top:.2f}° < array[-1]={lat_bottom:.2f}°") else: print(f"✓ Data array latitude order: array[0]={lat_top:.2f}° > array[-1]={lat_bottom:.2f}°") # Log corner coordinates for model validation print(f"Grid corners: TL({tl[1]:.3f},{tl[0]:.3f}) TR({tr[1]:.3f},{tr[0]:.3f}) BR({br[1]:.3f},{br[0]:.3f}) BL({bl[1]:.3f},{bl[0]:.3f})") # Check if coordinates match known model domains # RAP CONUS: SW(21.14°N,122.72°W), NW(47.84°N,134.10°W), NE(47.84°N,60.90°W), SE(21.14°N,72.28°W) # NAM has broader North American coverage extending into Canada and Mexico lat_range = max(tl[1], tr[1]) - min(bl[1], br[1]) lon_range = max(tr[0], br[0]) - min(tl[0], bl[0]) if lat_range < 30: # Likely RAP CONUS domain print("✓ Grid appears to be RAP CONUS domain") elif lat_range > 30: # Likely NAM North American domain print("✓ Grid appears to be NAM North American domain") else: print("? Grid domain classification unclear") layers = list(fig.layout.mapbox.layers) if fig.layout.mapbox.layers is not None else [] layers.append(dict( sourcetype='image', source=data_url, coordinates=[tl, tr, br, bl], opacity=1.0, below='traces', name='Radar Raster' )) fig.update_layout(mapbox_layers=layers) # Add invisible scatter to provide colorbar for the image try: c_lat = float(np.nanmean(lat2d)) c_lon = float(np.nanmean(lon2d)) fig.add_trace(go.Scattermapbox( lat=[c_lat, c_lat], lon=[c_lon, c_lon], mode='markers', marker=dict( size=1, color=[0, 65], colorscale=get_radar_colorscale(), showscale=True, colorbar=dict( title="Radar Reflectivity (dBZ)", x=0.02 if param_type != 'radar' else 1.02, len=0.6 ), opacity=0 # invisible points ), hoverinfo='skip', name='Radar Scale' )) except Exception as e: print(f"Colorbar marker add failed: {e}") return True except Exception as e: print(f"Image layer error: {e}") return False def _locate_or_download_grib(forecast_hour: int): """Return local GRIB2 path for RAP REFC at fxx, downloading if needed.""" if not HERBIE_AVAILABLE: return None, "Herbie is not available" try: current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) for hours_back in [0, 1, 2, 3, 6, 12, 18, 24]: try: target_time = current_time - timedelta(hours=hours_back) date_str = target_time.strftime('%Y-%m-%d %H:00') H = Herbie(date_str, model='rap', product='sfc', fxx=int(forecast_hour)) # Ensure local file local = None try: local = H.get_localFilePath() except Exception: local = None if not local: files = None try: files = H.download() except Exception: files = None if isinstance(files, (list, tuple)) and files: local = files[0] if not local and hasattr(H, 'fpath'): local = H.fpath if local and os.path.exists(str(local)): return str(local), None # As a fallback, search the expected directory for subset GRIB2 files # Herbie typically stores under ~/data/rap/YYYYMMDD try: day_dir = os.path.expanduser(os.path.join('~', 'data', 'rap', target_time.strftime('%Y%m%d'))) if os.path.isdir(day_dir): pattern1 = os.path.join(day_dir, f"*wrfsfcf{int(forecast_hour):02d}.grib2") pattern2 = os.path.join(day_dir, f"**/*f{int(forecast_hour):02d}*.grib2") candidates = sorted(glob.glob(pattern1)) + sorted(glob.glob(pattern2, recursive=True)) if candidates: return candidates[0], None except Exception as se: print(f"subset search failed: {se}") except Exception as e: print(f"locate/download attempt failed: {e}") continue # Global fallback: scan entire cache tree (could be slow but last resort) try: root = os.path.expanduser(os.path.join('~', 'data', 'rap')) if os.path.isdir(root): pat = os.path.join(root, f"**/*f{int(forecast_hour):02d}*.grib2") cand = glob.glob(pat, recursive=True) if cand: return sorted(cand)[0], None except Exception as e2: print(f"global scan failed: {e2}") return None, "Unable to locate/download GRIB file" except Exception as e: return None, f"Locate/download error: {e}" def export_radar_grib(forecast_hour: int, min_dbz: float): """Export the RAP radar (REFC) field to a GRIB2 file with values below min_dbz set to missing. Returns (path, message). If path is None, message contains error. """ try: if not HERBIE_AVAILABLE: return None, "Herbie is not available to fetch RAP data." # Fetch dataset and try to learn source path and date used ds, info = fetch_real_rap_data('REFC:entire atmosphere', int(forecast_hour), return_src=True) if ds is None: return None, "Unable to fetch RAP radar data for export." var_names = list(ds.data_vars) if not var_names: return None, "Dataset missing variables." vname = var_names[0] z = np.squeeze(ds[vname].values) if z.ndim != 2: return None, "Unexpected radar array shape." # Apply threshold thr = float(min_dbz) if min_dbz is not None else 1.0 z = np.where(z >= thr, z.astype(float), np.nan) # Determine or download source GRIB path src = None if isinstance(info, dict) and info.get('file') and os.path.exists(info['file']): src = info['file'] if not src: src, err = _locate_or_download_grib(int(forecast_hour)) if not src: return None, err or "Could not obtain source GRIB file" from eccodes import codes_grib_new_from_file, codes_get, codes_set, codes_set_values, codes_write, codes_release # Iterate file to find the composite reflectivity message handle = None with open(src, 'rb') as f: while True: try: gid = codes_grib_new_from_file(f) except Exception: gid = None if gid is None: break try: shortName = None try: shortName = codes_get(gid, 'shortName') except Exception: shortName = None name = None try: name = codes_get(gid, 'name') except Exception: name = None # Identify composite reflectivity ok = False if shortName and str(shortName).lower() in ('refc', 'refd', 'refl', 'ref'): # be lenient ok = True if (not ok) and name and 'reflect' in str(name).lower(): ok = True if ok and handle is None: handle = gid break else: codes_release(gid) except Exception: try: codes_release(gid) except Exception: pass if handle is None: return None, "Composite reflectivity message not found in GRIB file." # Ensure bitmap for missing values try: codes_set(handle, 'bitmapPresent', 1) except Exception: pass # Flatten in scan order (assuming row-major) vals = z.flatten().astype(float) codes_set_values(handle, vals) os.makedirs('exports', exist_ok=True) date_tag = info.get('date_str', 'unknown').replace(':', '').replace(' ', 'T') if isinstance(info, dict) else 'unknown' out_path = os.path.join('exports', f"rap_radar_reflectivity_{date_tag}_f{int(forecast_hour):02d}_mindbz{thr:.1f}.grib2") with open(out_path, 'wb') as fo: codes_write(handle, fo) try: codes_release(handle) except Exception: pass return out_path, None except Exception as e: return None, f"Export error: {e}" def download_raw_grib(forecast_hour: int): """Return a copy-path under ./exports for the raw RAP GRIB2 file used for REFC at the given forecast hour.""" try: if not HERBIE_AVAILABLE: return None, "Herbie is not available" # Try immediate locate/download via Herbie src_file, err = _locate_or_download_grib(int(forecast_hour)) if not src_file: return None, err try: import shutil os.makedirs('exports', exist_ok=True) base = os.path.basename(str(src_file)) dest = os.path.join('exports', f"raw_{base}") shutil.copy2(src_file, dest) return dest, None except Exception as e: return None, f"Copy error: {e}" # Fallback: attempt direct Herbie path current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) for hours_back in [2, 3, 6, 12, 18]: try: target_time = current_time - timedelta(hours=hours_back) date_str = target_time.strftime('%Y-%m-%d %H:00') H = Herbie(date_str, model='rap', product='sfc', fxx=int(forecast_hour)) # This triggers download if not present local = H.get_localFilePath() if hasattr(H, 'get_localFilePath') else None if not local and hasattr(H, 'download'): files = H.download() if isinstance(files, (list, tuple)) and files: local = files[0] if not local and hasattr(H, 'fpath'): local = H.fpath # Fallback handled above except Exception: continue return None, "Unable to locate/download raw GRIB file" except Exception as e: return None, f"Raw download error: {e}" def export_rap_to_kmz(forecast_hour: int, min_dbz: float = 0.0): """Export RAP radar data to KMZ format for use in mapping applications. Returns (path, message). If path is None, message contains error. """ try: if not KMZ_AVAILABLE: return None, "KMZ export libraries not available" # Fetch RAP radar data ds = fetch_real_rap_data('REFC:entire atmosphere', int(forecast_hour)) if ds is None: return None, "Unable to fetch RAP radar data for KMZ export" # Process the grid data radar_grid = process_rap_grid(ds, target_cells=50000, param_type='radar', min_threshold=float(min_dbz)) if radar_grid is None: return None, "Unable to process RAP radar grid for KMZ export" lat2d = radar_grid['lat2d'] lon2d = radar_grid['lon2d'] z2d = radar_grid['z2d'] # Create KML content kml_content = create_radar_kml(lat2d, lon2d, z2d, forecast_hour, min_dbz) # Create KMZ file (zipped KML) os.makedirs('exports', exist_ok=True) kmz_path = f"exports/rap_radar_f{int(forecast_hour):02d}_mindbz{min_dbz:.1f}.kmz" with zipfile.ZipFile(kmz_path, 'w', zipfile.ZIP_DEFLATED) as kmz: kmz.writestr('doc.kml', kml_content) return kmz_path, None except Exception as e: return None, f"KMZ export error: {e}" def create_radar_kml(lat2d, lon2d, z2d, forecast_hour, min_dbz): """Create KML content for RAP radar data.""" try: # Create KML structure kml = ET.Element('kml', xmlns="http://www.opengis.net/kml/2.2") document = ET.SubElement(kml, 'Document') # Add document info name = ET.SubElement(document, 'name') name.text = f"RAP Radar Forecast +{forecast_hour}h (min {min_dbz} dBZ)" description = ET.SubElement(document, 'description') description.text = f"RAP Composite Reflectivity forecast for +{forecast_hour} hours, minimum {min_dbz} dBZ threshold" # Add styles for different reflectivity ranges styles = [ (5, 10, '#00ECEC', 'Light precipitation'), (10, 15, '#01A0F6', 'Light-moderate precipitation'), (15, 20, '#0000F6', 'Moderate precipitation'), (20, 25, '#00FF00', 'Moderate-heavy precipitation'), (25, 30, '#00C800', 'Heavy precipitation'), (30, 35, '#FFFF00', 'Very heavy precipitation'), (35, 40, '#E7C000', 'Intense precipitation'), (40, 45, '#FF9000', 'Very intense precipitation'), (45, 50, '#FF0000', 'Extreme precipitation'), (50, 65, '#D60000', 'Severe precipitation') ] for i, (min_val, max_val, color, desc) in enumerate(styles): style = ET.SubElement(document, 'Style', id=f"radar{i}") poly_style = ET.SubElement(style, 'PolyStyle') color_elem = ET.SubElement(poly_style, 'color') # Convert hex to KML ABGR format (80% opacity) hex_color = color.lstrip('#') r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16) color_elem.text = f"CC{b:02X}{g:02X}{r:02X}" # ABGR format with CC for ~80% opacity # Add ground overlay for the radar image ground_overlay = ET.SubElement(document, 'GroundOverlay') overlay_name = ET.SubElement(ground_overlay, 'name') overlay_name.text = f"RAP Radar Grid" # Create boundaries lat_box = ET.SubElement(ground_overlay, 'LatLonBox') north = ET.SubElement(lat_box, 'north') south = ET.SubElement(lat_box, 'south') east = ET.SubElement(lat_box, 'east') west = ET.SubElement(lat_box, 'west') north.text = str(float(np.nanmax(lat2d))) south.text = str(float(np.nanmin(lat2d))) east.text = str(float(np.nanmax(lon2d))) west.text = str(float(np.nanmin(lon2d))) # Add sample polygons for areas with significant reflectivity ny, nx = z2d.shape step = max(1, min(ny, nx) // 50) # Sample grid for polygon creation for i in range(0, ny - step, step): for j in range(0, nx - step, step): # Get average value for this grid cell cell_values = z2d[i:i+step, j:j+step] avg_value = np.nanmean(cell_values) if np.isnan(avg_value) or avg_value < min_dbz: continue # Create polygon for this cell placemark = ET.SubElement(document, 'Placemark') pm_name = ET.SubElement(placemark, 'name') pm_name.text = f"{avg_value:.1f} dBZ" pm_desc = ET.SubElement(placemark, 'description') pm_desc.text = f"Radar reflectivity: {avg_value:.1f} dBZ" # Assign style based on value style_id = min(len(styles) - 1, max(0, int((avg_value - 5) / 5))) style_url = ET.SubElement(placemark, 'styleUrl') style_url.text = f"#radar{style_id}" # Create polygon coordinates polygon = ET.SubElement(placemark, 'Polygon') outer_ring = ET.SubElement(polygon, 'outerBoundaryIs') linear_ring = ET.SubElement(outer_ring, 'LinearRing') coordinates = ET.SubElement(linear_ring, 'coordinates') # Get corner coordinates for this cell coords = [] coords.append(f"{lon2d[i, j]},{lat2d[i, j]},0") coords.append(f"{lon2d[i, min(j+step, nx-1)]},{lat2d[i, min(j+step, nx-1)]},0") coords.append(f"{lon2d[min(i+step, ny-1), min(j+step, nx-1)]},{lat2d[min(i+step, ny-1), min(j+step, nx-1)]},0") coords.append(f"{lon2d[min(i+step, ny-1), j]},{lat2d[min(i+step, ny-1), j]},0") coords.append(f"{lon2d[i, j]},{lat2d[i, j]},0") # Close polygon coordinates.text = " ".join(coords) # Convert to string rough_string = ET.tostring(kml, 'unicode') return rough_string except Exception as e: print(f"KML creation error: {e}") return f""" RAP Radar Export Error Error creating KML: {str(e)} """ def generate_radar_animation_gif(detail_level: int = 5, min_dbz: float = 0.0): """Generate a GIF animating radar reflectivity from f00..f18 and return (path, message). The GIF is set to loop indefinitely. """ try: import os import imageio import matplotlib matplotlib.use('Agg', force=True) import matplotlib.pyplot as plt frames = [] times = [] for fxx in range(0, 19): ds = fetch_real_rap_data('REFC:entire atmosphere', fxx) if isinstance(ds, tuple): ds = ds[0] grid = process_rap_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz)) if grid is None: continue lat2d = grid['lat2d'] lon2d = grid['lon2d'] z2d = grid['z2d'] # Apply same orientation correction as other visualizations ny, nx = lat2d.shape lat_top = float(lat2d[0, nx//2]) lat_bottom = float(lat2d[-1, nx//2]) zmask = np.ma.masked_invalid(z2d) if lat_top < lat_bottom: # Data is ordered south-to-north, flip for proper display zmask = np.flipud(zmask) cmap = build_mpl_colormap(get_radar_colorscale()) if cmap is None: continue scale_map = {1: 1.0, 2: 1.2, 3: 1.6, 4: 2.0, 5: 2.5} scale = scale_map.get(int(detail_level), 2.5) width = int(nx * scale) height = int(ny * scale) dpi = 100 fig_anim = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi) ax = fig_anim.add_axes([0, 0, 1, 1]) ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear') ax.axis('off') fig_anim.canvas.draw() # Convert canvas to array img = np.frombuffer(fig_anim.canvas.tostring_argb(), dtype=np.uint8) img = img.reshape(fig_anim.canvas.get_width_height()[::-1] + (4,)) # ARGB to RGBA img = img[:, :, [1, 2, 3, 0]] frames.append(img) times.append(fxx) plt.close(fig_anim) if not frames: return None, "No frames generated" os.makedirs('exports', exist_ok=True) out_path = 'exports/rap_radar_animation_f00_f18.gif' imageio.mimsave(out_path, frames, duration=0.25, loop=0) # 4 fps, loop forever return out_path, None except Exception as e: return None, f"Animation error: {e}" def generate_radar_animation_png_frames(detail_level: int = 5, min_dbz: float = 0.0, fps: float = 4.0): """Return (frames, message) where frames is a list of data URLs (PNG with alpha) for f00..f18.""" try: import io, base64 import matplotlib matplotlib.use('Agg', force=True) import matplotlib.pyplot as plt frames = [] for fxx in range(0, 19): ds = fetch_real_rap_data('REFC:entire atmosphere', fxx) if isinstance(ds, tuple): ds = ds[0] grid = process_rap_grid(ds, target_cells={1:20000,2:40000,3:60000,4:90000,5:120000}.get(int(detail_level), 120000), param_type='radar', min_threshold=float(min_dbz)) if grid is None: continue lat2d = grid['lat2d'] lon2d = grid['lon2d'] z2d = grid['z2d'] # Apply same orientation correction as other visualizations ny, nx = lat2d.shape lat_top = float(lat2d[0, nx//2]) lat_bottom = float(lat2d[-1, nx//2]) zmask = np.ma.masked_invalid(z2d) if lat_top < lat_bottom: # Data is ordered south-to-north, flip for proper display zmask = np.flipud(zmask) cmap = build_mpl_colormap(get_radar_colorscale()) if cmap is None: continue scale_map = {1: 1.0, 2: 1.2, 3: 1.6, 4: 2.0, 5: 2.5} scale = scale_map.get(int(detail_level), 2.0) width = int(nx * scale) height = int(ny * scale) dpi = 100 fig_anim = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi) fig_anim.patch.set_alpha(0.0) ax = fig_anim.add_axes([0, 0, 1, 1]) ax.patch.set_alpha(0.0) ax.imshow(zmask, cmap=cmap, vmin=0, vmax=65, origin='upper', interpolation='bilinear') ax.axis('off') buf = io.BytesIO() fig_anim.savefig(buf, format='png', dpi=dpi, transparent=True) plt.close(fig_anim) img_b64 = base64.b64encode(buf.getvalue()).decode('ascii') frames.append(f"data:image/png;base64,{img_b64}") if not frames: return None, "No frames generated" return frames, None except Exception as e: return None, f"Animation frames error: {e}" def build_leaflet_overlay_from_frames(frame_data_urls: List[str], grid: Optional[Dict[str, Any]], fps: float = 4.0): """Return HTML with Leaflet + JS that cycles through transparent PNG frames warped by a 4-corner homography (no external plugins), aligned to the RAP grid. """ try: if not frame_data_urls: return "
No animation frames.
" if not grid or 'lat2d' not in grid or 'lon2d' not in grid: return "
No grid available for overlay bounds.
" lat2d = grid['lat2d'] lon2d = grid['lon2d'] # Bounds for initial fit min_lat = float(np.nanmin(lat2d)) max_lat = float(np.nanmax(lat2d)) min_lon = float(np.nanmin(lon2d)) max_lon = float(np.nanmax(lon2d)) c_lat = float(np.nanmean(lat2d)) c_lon = float(np.nanmean(lon2d)) # Corner control points for RAP Lambert Conformal grid transformation # Use geographic bounds approach (same as Plotly) for consistent alignment ny, nx = lat2d.shape # RAP uses curvilinear Lambert Conformal grid - use geographic bounds for corners # This matches the approach used in the Plotly visualization for consistency lat_tl, lon_tl = max_lat, min_lon # Top-left: northern edge, western edge lat_tr, lon_tr = max_lat, max_lon # Top-right: northern edge, eastern edge lat_br, lon_br = min_lat, max_lon # Bottom-right: southern edge, eastern edge lat_bl, lon_bl = min_lat, min_lon # Bottom-left: southern edge, western edge print(f"Leaflet corners using geographic bounds: TL({lat_tl:.3f},{lon_tl:.3f}) TR({lat_tr:.3f},{lon_tr:.3f}) BR({lat_br:.3f},{lon_br:.3f}) BL({lat_bl:.3f},{lon_bl:.3f})") # Validate corner coordinates are within expected CONUS bounds corners = [(lat_tl, lon_tl), (lat_tr, lon_tr), (lat_br, lon_br), (lat_bl, lon_bl)] for i, (lat, lon) in enumerate(corners): if not (20 <= lat <= 50 and -140 <= lon <= -50): print(f"Warning: Corner {i} coordinates ({lat:.3f}, {lon:.3f}) outside expected CONUS bounds") # Prepare JS array of frame URLs js_frames = "[" + ",".join([f"'{u}'" for u in frame_data_urls]) + "]" interval_ms = max(50, int(1000.0 / max(0.5, float(fps)))) doc = f"""
""" doc_escaped = doc.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """) iframe = f"" return iframe except Exception as e: return f"
Leaflet frames overlay error: {str(e)}
" def build_leaflet_overlay_html(gif_path: Optional[str], grid: Optional[Dict[str, Any]]): """Return HTML for a Leaflet map with the animated GIF overlaid as an image. If gif_path is provided, it is embedded as a base64 data URL for portability. """ try: if not gif_path or not os.path.exists(gif_path): return "
No animation generated.
" if not grid or 'lat2d' not in grid or 'lon2d' not in grid: return "
No grid available for overlay bounds.
" lat2d = grid['lat2d'] lon2d = grid['lon2d'] min_lat = float(np.nanmin(lat2d)) max_lat = float(np.nanmax(lat2d)) min_lon = float(np.nanmin(lon2d)) max_lon = float(np.nanmax(lon2d)) c_lat = float(np.nanmean(lat2d)) c_lon = float(np.nanmean(lon2d)) import base64 with open(gif_path, 'rb') as f: gif_b64 = base64.b64encode(f.read()).decode('ascii') data_url = f"data:image/gif;base64,{gif_b64}" # Prefer Folium (self-contained HTML) and fallback to raw Leaflet # Build a standalone HTML document and embed via iframe srcdoc to ensure scripts run doc = f"""
""" # Escape for srcdoc doc_escaped = doc.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """) iframe = f"" return iframe except Exception as e: return f"
Leaflet overlay error: {str(e)}
" def create_leaflet_comparison_map(param_type, forecast_hour, show_radar=False, detail_level=3, min_dbz=1.0): """Create a Leaflet map with the same data as Plotly for side-by-side comparison.""" try: # Fetch the same data as the Plotly map param_map = { 'temperature': 'TMP:2 m', 'humidity': 'RH:2 m', 'wind_speed': 'WIND:10 m', 'pressure': 'MSLMA:mean sea level', 'radar': 'REFC:entire atmosphere' } # Get center coordinates for North American view # Center on NAM domain: lat [21.14, 52.62], lon [-134.10, -60.92] center_lat, center_lon = 36.9, -97.5 # Midpoint of NAM domain zoom_level = 3 # Lower zoom to show Canada and Mexico # Start building the HTML html_content = f""" Leaflet RAP Comparison
""" return html_content except Exception as e: return f"""

Leaflet Map Error

Error creating comparison map: {str(e)}

""" def validate_radar_alignment(lat2d, lon2d, z2d): """Validate radar alignment by checking against known geographic features.""" try: # Known reference points for validation (major cities in RAP domain) reference_points = { 'Kansas_City': (39.0997, -94.5786), 'Denver': (39.7392, -104.9903), 'Atlanta': (33.7490, -84.3880), 'Chicago': (41.8781, -87.6298), 'Phoenix': (33.4484, -112.0740), 'Dallas': (32.7767, -96.7970), 'New_York': (40.7128, -74.0060), 'Los_Angeles': (34.0522, -118.2437), 'Seattle': (47.6062, -122.3321) } validation_results = {} total_offset_sum = 0 valid_points = 0 for city, (ref_lat, ref_lon) in reference_points.items(): # Find closest grid point to reference location lat_diff = np.abs(lat2d - ref_lat) lon_diff = np.abs(lon2d - ref_lon) distance = np.sqrt(lat_diff**2 + lon_diff**2) min_idx = np.unravel_index(np.argmin(distance), distance.shape) closest_lat = lat2d[min_idx] closest_lon = lon2d[min_idx] # Calculate offset in km using more accurate Haversine-based calculation lat_offset_km = (closest_lat - ref_lat) * 111.32 # More accurate km per degree lat lon_offset_km = (closest_lon - ref_lon) * 111.32 * np.cos(np.radians(ref_lat)) total_offset_km = np.sqrt(lat_offset_km**2 + lon_offset_km**2) # Determine grid resolution and tolerance based on coordinates # RAP: ~13km grid, NAM: ~12km grid, broader tolerance for larger grids grid_size = lat2d.shape[0] * lat2d.shape[1] if grid_size > 100000: # Large grid like NAM (265x450 = 119,250) tolerance_km = 8.0 # More lenient for NAM North American grid else: # Smaller grid like RAP tolerance_km = 4.0 # Stricter for RAP CONUS grid acceptable = total_offset_km < tolerance_km validation_results[city] = { 'reference': (ref_lat, ref_lon), 'closest_grid': (closest_lat, closest_lon), 'offset_km': total_offset_km, 'acceptable': acceptable } if acceptable: total_offset_sum += total_offset_km valid_points += 1 # Calculate overall alignment quality based on grid type avg_offset = total_offset_sum / valid_points if valid_points > 0 else float('inf') grid_size = lat2d.shape[0] * lat2d.shape[1] if grid_size > 100000: # Large NAM grid alignment_quality = 'Excellent' if avg_offset < 3.0 else 'Good' if avg_offset < 6.0 else 'Acceptable' else: # Smaller RAP grid alignment_quality = 'Excellent' if avg_offset < 1.0 else 'Good' if avg_offset < 2.0 else 'Poor' validation_results['_summary'] = { 'average_offset_km': avg_offset, 'valid_points': valid_points, 'total_points': len(reference_points), 'alignment_quality': alignment_quality } print(f"Radar alignment validation: {alignment_quality} (avg offset: {avg_offset:.2f}km)") return validation_results except Exception as e: print(f"Alignment validation error: {e}") return {} def create_weather_map_with_radar(param_type, forecast_hour, show_radar=False, detail_level=3, min_dbz=1.0): """Create weather map with optional radar forecast overlay""" try: # Map parameter names to GRIB codes param_map = { 'temperature': 'TMP:2 m', 'humidity': 'RH:2 m', 'wind_speed': 'WIND:10 m', 'pressure': 'MSLMA:mean sea level', 'radar': 'REFC:entire atmosphere' # Composite reflectivity } fig = go.Figure() # Always try to get main weather parameter (unless it's radar-only) if param_type != 'radar': grib_param = param_map.get(param_type, 'TMP:2 m') print(f"Fetching {param_type} ({grib_param}) for +{forecast_hour}h") # Fetch real weather data ds = fetch_real_rap_data(grib_param, forecast_hour) processed = process_rap_data(ds, max_points=400, param_type=param_type) if processed is not None: # Real RAP weather data print(f"Plotting {len(processed['values'])} weather data points") # Choose colorscale based on parameter if param_type == 'temperature': colorscale = 'RdYlBu_r' elif param_type == 'humidity': colorscale = 'Blues' elif param_type == 'pressure': colorscale = 'RdBu_r' else: colorscale = 'Viridis' fig.add_trace(go.Scattermapbox( lat=processed['lats'], lon=processed['lons'], mode='markers', marker=dict( size=5, color=processed['values'], colorscale=colorscale, showscale=True, colorbar=dict( title=f"{processed.get('long_name', param_type)} ({processed.get('units', '')})", x=1.02, len=0.8 ), opacity=0.7 ), text=[f"{v:.1f} {processed.get('units', '')}" for v in processed['values']], hovertemplate='%{text}', name="Weather Data" )) # Add radar data if requested OR if radar is the main parameter if show_radar or param_type == 'radar': print(f"Fetching radar data (REFC) for +{forecast_hour}h") # Fetch radar reflectivity forecast radar_ds = fetch_real_rap_data('REFC:entire atmosphere', forecast_hour) # Map detail_level (1-5) to target cell counts for performance/detail tradeoff detail_to_cells = {1: 20000, 2: 40000, 3: 60000, 4: 90000, 5: 120000} target_cells = detail_to_cells.get(int(detail_level) if detail_level is not None else 3, 60000) # Use user-selected threshold to control speckle radar_grid = process_rap_grid(radar_ds, target_cells=target_cells, param_type='radar', min_threshold=float(min_dbz) if min_dbz is not None else 0.5) # Store latest grid globally for other components (animation overlays) global LAST_RADAR_GRID LAST_RADAR_GRID = radar_grid if radar_grid is not None: lat2d = radar_grid['lat2d'] lon2d = radar_grid['lon2d'] z2d = radar_grid['z2d'] print(f"Plotting radar grid: {z2d.shape[0]}x{z2d.shape[1]} cells") # Validate radar alignment validation = validate_radar_alignment(lat2d, lon2d, z2d) if validation: print("=== RADAR ALIGNMENT VALIDATION ===") for city, result in validation.items(): if city != '_summary': # Skip summary entry status = "✓ GOOD" if result['acceptable'] else "✗ POOR" print(f"{city}: {result['offset_km']:.1f}km offset {status}") print("=== END VALIDATION ===") radar_layer_added = False # First choice: Contourmapbox if available in this Plotly version if hasattr(go, 'Contourmapbox') and not radar_layer_added: try: fig.add_trace(go.Contourmapbox( lat=lat2d, lon=lon2d, z=z2d, colorscale=get_radar_colorscale(), contours=dict(coloring='heatmap', showlines=False), showscale=True, colorbar=dict( title="Radar Reflectivity (dBZ)", x=0.02 if param_type != 'radar' else 1.02, len=0.6 ), zmin=0, zmax=65, hovertemplate='Radar: %{z:.1f} dBZ', name="Radar Reflectivity" )) radar_layer_added = True except Exception as e: print(f"Contourmapbox failed, trying raster fallback: {e}") elif not hasattr(go, 'Contourmapbox'): print("Contourmapbox not available in this Plotly version; trying raster fallback") # Second choice: smooth raster image layer for fused appearance if not radar_layer_added: try: image_added = add_radar_image_layer(fig, lat2d, lon2d, z2d, detail_level, param_type) radar_layer_added = radar_layer_added or bool(image_added) except Exception as e: print(f"Image layer attempt failed: {e}") # Third choice: Choroplethmapbox (solid polygons per cell) if not radar_layer_added: try: # Limit polygon count based on detail level detail_to_polys = {1: 4000, 2: 8000, 3: 12000, 4: 20000, 5: 30000} max_polys = detail_to_polys.get(int(detail_level) if detail_level is not None else 3, 12000) geojson = grid_to_geojson(lat2d, lon2d, z2d, max_polygons=max_polys) if geojson and geojson.get('features'): ids = [f["properties"]["id"] for f in geojson["features"]] vals = [f["properties"]["value"] for f in geojson["features"]] fig.add_trace(go.Choroplethmapbox( geojson=geojson, locations=ids, z=vals, featureidkey="properties.id", colorscale=get_radar_colorscale(), zmin=0, zmax=65, colorbar=dict( title="Radar Reflectivity (dBZ)", x=0.02 if param_type != 'radar' else 1.02, len=0.6 ), marker_opacity=0.85, marker_line_width=0, hovertemplate='Radar: %{z:.1f} dBZ', name="Radar Reflectivity" )) radar_layer_added = True else: print("Choropleth fallback failed: empty geojson or no features") except Exception as e: print(f"Choropleth fallback failed: {e}") # Final fallback: density layer (zoom-dependent appearance) if not radar_layer_added: radar_processed = process_rap_data(radar_ds, max_points=10000, param_type='radar') if radar_processed is not None: detail_to_radius = {1: 18, 2: 14, 3: 12, 4: 10, 5: 8} radius = detail_to_radius.get(int(detail_level) if detail_level is not None else 3, 12) fig.add_trace(go.Densitymapbox( lat=radar_processed['lats'], lon=radar_processed['lons'], z=radar_processed['values'], radius=radius, colorscale=get_radar_colorscale(), showscale=True, colorbar=dict( title="Radar Reflectivity (dBZ)", x=0.02 if param_type != 'radar' else 1.02, len=0.6 ), opacity=0.85, zmin=0.1, zmax=65, hovertemplate='Radar: %{z:.1f} dBZ', name="Radar Forecast" )) else: print("No radar data available for this time") # Add note about radar unavailability if param_type == 'radar': fig.add_annotation( text="RAP radar forecast temporarily unavailable
Try a different forecast hour", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False, font=dict(size=14) ) # Set title based on what's displayed if param_type == 'radar': title = f"RAP Radar Reflectivity Forecast (+{forecast_hour}h)" elif show_radar: title = f"RAP {param_type.title()} + Radar Forecast (+{forecast_hour}h)" else: title = f"RAP {param_type.title()} Forecast (+{forecast_hour}h)" # If no data at all if len(fig.data) == 0: fig.add_annotation( text="RAP data temporarily unavailable
Try different parameters or forecast hours", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False, font=dict(size=16) ) title = "RAP Data - Loading" # Center map on North America to show full NAM domain coverage # NAM domain: lat [21.14, 52.62], lon [-134.10, -60.92] center_lat = (21.14 + 52.62) / 2 # ~36.9°N (mid-point of NAM domain) center_lon = (-134.10 + -60.92) / 2 # ~-97.5°W (mid-point of NAM domain) fig.update_layout( mapbox=dict( style="open-street-map", zoom=3, # Lower zoom to show more area including Canada and Mexico center=dict(lat=center_lat, lon=center_lon) ), height=500, title=title, margin=dict(l=0, r=80, t=50, b=0) ) return fig except Exception as e: print(f"Map creation error: {e}") # Force cleanup gc.collect() # Return error figure fig = go.Figure() fig.add_annotation( text=f"Error: {str(e)[:100]}", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False ) fig.update_layout(height=400, title="Error Loading Data") return fig def update_display(location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast): """Simple stable update function - single map only""" try: # Force garbage collection gc.collect() print(f"\n=== UPDATE: {location}, +{forecast_hour}h, {parameter}, radar:{show_radar_overlay} ===") # Create single weather map (no second map to avoid crashes) weather_map = create_weather_map_with_radar(parameter, forecast_hour, show_radar_overlay, detail_level, min_dbz) # Simple status current_time = datetime.utcnow() forecast_time = current_time + timedelta(hours=forecast_hour) # Get alignment status if available alignment_status = "" if LAST_RADAR_GRID and show_radar_overlay: try: validation = validate_radar_alignment( LAST_RADAR_GRID['lat2d'], LAST_RADAR_GRID['lon2d'], LAST_RADAR_GRID['z2d'] ) if validation: good_count = sum(1 for v in validation.values() if v['acceptable']) total_count = len(validation) avg_offset = np.mean([v['offset_km'] for v in validation.values()]) alignment_status = f"\n**Alignment:** {good_count}/{total_count} reference points within 10km (avg: {avg_offset:.1f}km)" except Exception: alignment_status = "\n**Alignment:** Validation unavailable" status = f""" ## North American Weather + Radar Forecasts **Location:** {location} **Current:** {current_time.strftime('%H:%M UTC')} **Forecast:** {forecast_time.strftime('%H:%M UTC')} (+{forecast_hour}h) **Parameter:** {parameter.title()} **Radar Overlay:** {"Enabled" if show_radar_overlay else "Disabled"} **Radar Detail:** {detail_level} (1=Fast, 5=Max) **Min dBZ:** {min_dbz:.1f} **Data Source:** {"REAL NOAA RAP with Enhanced Alignment" if HERBIE_AVAILABLE else "RAP Unavailable"}{alignment_status} **Radar Info:** 🌎 For North American coverage, app tries NAM model first for REFC (composite reflectivity), then falls back to RAP for other weather data. **Note:** Radar forecasts now use validated RAP Lambert Conformal projection coordinates for proper geographic alignment. """ # Optional animation and Leaflet overlay gif_path = None leaflet_html = "" if animate_forecast: try: gif_path, _ = generate_radar_animation_gif(detail_level=int(detail_level), min_dbz=float(min_dbz)) global LAST_ANIMATION_PATH LAST_ANIMATION_PATH = gif_path except Exception as e: print(f"Animation generation error (gif): {e}") gif_path = None # Build Leaflet overlay from transparent PNG frames for correct alpha blending try: frames, msg = generate_radar_animation_png_frames(detail_level=int(detail_level), min_dbz=float(min_dbz), fps=4.0) if frames: leaflet_html = build_leaflet_overlay_from_frames(frames, LAST_RADAR_GRID, fps=4.0) else: leaflet_html = f"
Animation frames error: {msg}
" except Exception as e: leaflet_html = f"
Leaflet overlay build failed: {str(e)}
" # Create comparison Leaflet map leaflet_comparison = "" try: leaflet_comparison_html = create_leaflet_comparison_map(parameter, forecast_hour, show_radar_overlay, detail_level, min_dbz) # Escape HTML for iframe display escaped_html = leaflet_comparison_html.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """) leaflet_comparison = f"" except Exception as e: leaflet_comparison = f"
Leaflet comparison error: {str(e)}
" return status, weather_map, leaflet_comparison, gif_path, leaflet_html except Exception as e: print(f"Update error: {e}") # Aggressive cleanup on error gc.collect() error_fig = go.Figure() error_fig.add_annotation(text=f"Update failed: {str(e)}", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False) error_fig.update_layout(height=300) return f"## Error\n{str(e)}", error_fig, "
Error loading comparison
", None, "" # Stable interface - single map only with gr.Blocks(title="North American Weather + Radar") as app: gr.HTML("""

🌎 North American Weather + Radar Forecasts

Real NOAA NAM/RAP data with Plotly vs Leaflet alignment comparison

""") gr.HTML("""

📍 Validate Radar Alignment Against Official NOAA Sources

Compare our radar overlay alignment with these official NOAA RAP visualizations:

💡 Tip: Use the same forecast time and look for matching radar patterns, storm positions, and geographic alignment with cities/coastlines.

""") with gr.Row(): with gr.Column(): location = gr.Textbox(value="Kansas City, MO", label="Location") with gr.Row(): forecast_hour = gr.Slider( minimum=0, maximum=18, value=6, step=1, label="Forecast Hours" ) detail_level = gr.Slider( minimum=1, maximum=5, value=5, step=1, label="Radar Detail", info="Higher = more detail (slower)" ) min_dbz = gr.Slider( minimum=0.0, maximum=20.0, value=0.0, step=0.5, label="Min dBZ", info="Hide speckle below this reflectivity" ) animate_forecast = gr.Checkbox(value=False, label="Animate 0–18h Forecast") parameter = gr.Dropdown( choices=[ ("Temperature", "temperature"), ("Humidity", "humidity"), ("Wind Speed", "wind_speed"), ("Pressure", "pressure"), ("Radar Only", "radar") ], value="temperature", label="Weather Parameter" ) show_radar_overlay = gr.Checkbox( value=False, label="Add Radar Overlay", info="Show RAP radar forecast on top of weather data" ) update_btn = gr.Button("Get North American Data + Radar", variant="primary") with gr.Row(): export_btn = gr.Button("Export Radar as GRIB2") download_raw_btn = gr.Button("Download Raw GRIB2 (RAP)") export_kmz_btn = gr.Button("Export Radar as KMZ") gr.HTML("""

🎯 Enhanced Radar Features:

New: Side-by-side Plotly vs Leaflet comparison maps to validate alignment.
Improvement: Radar data uses validated RAP coordinates with Lambert Conformal projection.
Export: KMZ format for Google Earth and professional GIS applications.

""") with gr.Column(): status_text = gr.Markdown("Click button to fetch RAP weather + radar data") # Side-by-side map comparison with gr.Row(): with gr.Column(): gr.Markdown("### Plotly Map") weather_map = gr.Plot() with gr.Column(): gr.Markdown("### Leaflet Comparison") leaflet_comparison = gr.HTML(label="Leaflet Map Comparison") # Additional outputs below with gr.Row(): with gr.Column(): animation_view = gr.Image(label="Radar Animation (0–18h)") with gr.Column(): leaflet_overlay = gr.HTML(label="Leaflet Animation Overlay") # Export files with gr.Row(): export_file = gr.File(label="GRIB2 Export", visible=True) raw_grib_file = gr.File(label="Raw RAP GRIB2", visible=True) kmz_export_file = gr.File(label="KMZ Export", visible=True) # Event handlers update_btn.click( fn=update_display, inputs=[location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast], outputs=[status_text, weather_map, leaflet_comparison, animation_view, leaflet_overlay] ) # Auto-update when toggling radar show_radar_overlay.change( fn=update_display, inputs=[location, forecast_hour, parameter, show_radar_overlay, detail_level, min_dbz, animate_forecast], outputs=[status_text, weather_map, leaflet_comparison, animation_view, leaflet_overlay] ) # Export GRIB button def _export_handler(forecast_hour, min_dbz): path, msg = export_radar_grib(forecast_hour, min_dbz) if path: return path else: # Create a tiny text file with error to make it downloadable import os os.makedirs('exports', exist_ok=True) err_path = f"exports/export_error.txt" with open(err_path, 'w') as f: f.write(msg or 'Export failed') return err_path export_btn.click( fn=_export_handler, inputs=[forecast_hour, min_dbz], outputs=[export_file] ) def _download_raw_handler(forecast_hour): path, msg = download_raw_grib(int(forecast_hour)) if path: return path else: import os os.makedirs('exports', exist_ok=True) err_path = f"exports/raw_grib_error.txt" with open(err_path, 'w') as f: f.write(msg or 'Download failed') return err_path download_raw_btn.click( fn=_download_raw_handler, inputs=[forecast_hour], outputs=[raw_grib_file] ) # Export KMZ button def _export_kmz_handler(forecast_hour, min_dbz): path, msg = export_rap_to_kmz(forecast_hour, min_dbz) if path: return path else: # Create a tiny text file with error to make it downloadable import os os.makedirs('exports', exist_ok=True) err_path = f"exports/kmz_export_error.txt" with open(err_path, 'w') as f: f.write(msg or 'KMZ export failed') return err_path export_kmz_btn.click( fn=_export_kmz_handler, inputs=[forecast_hour, min_dbz], outputs=[kmz_export_file] ) if __name__ == "__main__": # Disable SSR to allow custom JS (Leaflet/Folium) to run in gr.HTML blocks app.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)