Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| HRRR Smoke Forecast Application | |
| Real-time smoke plume forecasting using NOAA HRRR-Smoke model data | |
| Based on successful patterns from ECMWF and HrrrVizCodex applications | |
| """ | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from plotly.subplots import make_subplots | |
| import xarray as xr | |
| import warnings | |
| import gc | |
| import sys | |
| import math | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any, List, Tuple | |
| import folium | |
| import requests | |
| import tempfile | |
| import os | |
| import time | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| from xml.dom import minidom | |
| warnings.filterwarnings('ignore') | |
| # Import weather libraries for REAL HRRR-Smoke data | |
| try: | |
| from herbie import Herbie | |
| import cfgrib | |
| HERBIE_AVAILABLE = True | |
| print("HERBIE AVAILABLE - Will use real HRRR-Smoke data") | |
| except ImportError as e: | |
| HERBIE_AVAILABLE = False | |
| print(f"HERBIE NOT AVAILABLE: {e}") | |
| # Import optional libraries for HMS polygon generation | |
| try: | |
| from shapely.geometry import Polygon, MultiPolygon | |
| from shapely.ops import unary_union | |
| import geopandas as gpd | |
| from scipy.ndimage import label, binary_dilation | |
| from skimage.measure import find_contours | |
| HMS_LIBS_AVAILABLE = True | |
| print("HMS POLYGON LIBRARIES AVAILABLE") | |
| except ImportError as e: | |
| HMS_LIBS_AVAILABLE = False | |
| print(f"HMS POLYGON LIBRARIES NOT AVAILABLE: {e}") | |
| # Import folium for leaflet maps | |
| try: | |
| import folium | |
| from folium.plugins import HeatMap | |
| FOLIUM_AVAILABLE = True | |
| print("FOLIUM AVAILABLE") | |
| except ImportError as e: | |
| FOLIUM_AVAILABLE = False | |
| print(f"FOLIUM NOT AVAILABLE: {e}") | |
| class FoliumSmokeRenderer: | |
| """Render smoke plumes on folium/leaflet maps with grayscale styling""" | |
| def __init__(self): | |
| # Grayscale smoke styling based on density | |
| self.grayscale_styles = { | |
| 'light': { | |
| 'fillColor': '#E8E8E8', # Light gray | |
| 'color': '#CCCCCC', # Border | |
| 'weight': 1, | |
| 'fillOpacity': 0.4, | |
| 'opacity': 0.6 | |
| }, | |
| 'medium': { | |
| 'fillColor': '#AAAAAA', # Medium gray | |
| 'color': '#888888', # Border | |
| 'weight': 2, | |
| 'fillOpacity': 0.6, | |
| 'opacity': 0.8 | |
| }, | |
| 'heavy': { | |
| 'fillColor': '#666666', # Dark gray | |
| 'color': '#444444', # Border | |
| 'weight': 2, | |
| 'fillOpacity': 0.8, | |
| 'opacity': 1.0 | |
| } | |
| } | |
| def create_folium_map(self, polygons, center_lat=39.5, center_lon=-98.5, zoom_start=5): | |
| """Create folium map with grayscale smoke polygons""" | |
| if not FOLIUM_AVAILABLE: | |
| print("Folium not available") | |
| return None | |
| try: | |
| # Create base map with satellite imagery | |
| m = folium.Map( | |
| location=[center_lat, center_lon], | |
| zoom_start=zoom_start, | |
| tiles='OpenStreetMap' # Start with OSM, add satellite option | |
| ) | |
| # Add satellite imagery option | |
| folium.TileLayer( | |
| tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', | |
| attr='Esri', | |
| name='Satellite', | |
| overlay=False, | |
| control=True | |
| ).add_to(m) | |
| # Add CartoDB Positron for clean background | |
| folium.TileLayer( | |
| tiles='CartoDB positron', | |
| name='Clean', | |
| overlay=False, | |
| control=True | |
| ).add_to(m) | |
| # Add smoke polygons with grayscale styling | |
| if polygons: | |
| for i, poly_data in enumerate(polygons): | |
| try: | |
| # Get polygon coordinates | |
| coords = list(poly_data['geometry'].exterior.coords) | |
| # Convert to lat/lon pairs for folium (note: folium expects [lat, lon]) | |
| folium_coords = [[lat, lon] for lon, lat in coords] | |
| # Get styling for density category | |
| style = self.grayscale_styles.get( | |
| poly_data['density_category'], | |
| self.grayscale_styles['medium'] | |
| ) | |
| # Create popup with smoke information | |
| popup_html = f""" | |
| <div style="font-family: Arial, sans-serif; width: 200px;"> | |
| <h4 style="margin: 0; color: #333;">🌬️ Smoke Plume</h4> | |
| <hr style="margin: 5px 0;"> | |
| <p style="margin: 3px 0;"><b>Density:</b> {poly_data['description']}</p> | |
| <p style="margin: 3px 0;"><b>Concentration:</b> {poly_data['density_value']:.1f} µg/m³</p> | |
| <p style="margin: 3px 0;"><b>Area:</b> {poly_data['area_deg2']:.4f} deg²</p> | |
| </div> | |
| """ | |
| # Add polygon to map | |
| folium.Polygon( | |
| locations=folium_coords, | |
| popup=folium.Popup(popup_html, max_width=300), | |
| tooltip=f"{poly_data['description']}: {poly_data['density_value']:.1f} µg/m³", | |
| **style | |
| ).add_to(m) | |
| except Exception as e: | |
| print(f"Error adding polygon {i} to folium map: {e}") | |
| continue | |
| # Add layer control | |
| folium.LayerControl().add_to(m) | |
| # Add legend | |
| self._add_smoke_legend(m) | |
| return m | |
| except Exception as e: | |
| print(f"Folium map creation error: {e}") | |
| return None | |
| def _add_smoke_legend(self, folium_map): | |
| """Add smoke density legend to the map""" | |
| legend_html = ''' | |
| <div style="position: fixed; | |
| top: 10px; right: 10px; width: 150px; height: 120px; | |
| background-color: white; border:2px solid grey; z-index:9999; | |
| font-size:14px; padding: 10px; box-shadow: 2px 2px 6px rgba(0,0,0,0.3); | |
| "> | |
| <h4 style="margin: 0 0 10px 0; color: #333;">🌬️ Smoke Density</h4> | |
| <div style="margin: 5px 0;"> | |
| <span style="display: inline-block; width: 20px; height: 15px; | |
| background-color: #E8E8E8; border: 1px solid #CCC; margin-right: 5px;"></span> | |
| <span style="font-size: 12px;">Light (3–15 µg/m³)</span> | |
| </div> | |
| <div style="margin: 5px 0;"> | |
| <span style="display: inline-block; width: 20px; height: 15px; | |
| background-color: #AAAAAA; border: 1px solid #888; margin-right: 5px;"></span> | |
| <span style="font-size: 12px;">Medium (15-35 µg/m³)</span> | |
| </div> | |
| <div style="margin: 5px 0;"> | |
| <span style="display: inline-block; width: 20px; height: 15px; | |
| background-color: #666666; border: 1px solid #444; margin-right: 5px;"></span> | |
| <span style="font-size: 12px;">Heavy (35+ µg/m³)</span> | |
| </div> | |
| </div> | |
| ''' | |
| folium_map.get_root().html.add_child(folium.Element(legend_html)) | |
| def create_gradient_smoke_map(self, lat2d, lon2d, smoke_values, center_lat=39.5, center_lon=-98.5, zoom_start=5): | |
| """Create folium map with gradient-based smoke visualization""" | |
| if not FOLIUM_AVAILABLE: | |
| print("Folium not available") | |
| return None | |
| try: | |
| # Create base map | |
| m = folium.Map( | |
| location=[center_lat, center_lon], | |
| zoom_start=zoom_start, | |
| tiles='CartoDB positron' | |
| ) | |
| # Add satellite imagery option | |
| folium.TileLayer( | |
| tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', | |
| attr='Esri', | |
| name='Satellite', | |
| overlay=False, | |
| control=True | |
| ).add_to(m) | |
| # Create heat map data from smoke values | |
| heat_data = [] | |
| # Sample the grid to create heat points | |
| ny, nx = smoke_values.shape | |
| step = max(1, min(ny, nx) // 50) # Limit to ~50x50 points for performance | |
| for i in range(0, ny, step): | |
| for j in range(0, nx, step): | |
| if not np.isnan(smoke_values[i, j]) and smoke_values[i, j] > 0.5: | |
| lat = lat2d[i, j] if lat2d.ndim == 2 else lat2d[i] | |
| lon = lon2d[i, j] if lon2d.ndim == 2 else lon2d[j] | |
| if not (np.isnan(lat) or np.isnan(lon)): | |
| # Normalize smoke value for heat intensity | |
| intensity = min(1.0, smoke_values[i, j] / 100.0) | |
| heat_data.append([lat, lon, intensity]) | |
| # Add heat map if we have data | |
| if heat_data and FOLIUM_AVAILABLE: | |
| HeatMap( | |
| heat_data, | |
| min_opacity=0.2, | |
| max_zoom=18, | |
| radius=15, | |
| blur=10, | |
| gradient={ | |
| 0.0: 'rgba(0,0,0,0)', # Transparent | |
| 0.2: 'rgba(128,128,128,0.3)', # Light gray | |
| 0.5: 'rgba(96,96,96,0.6)', # Medium gray | |
| 1.0: 'rgba(64,64,64,0.9)' # Dark gray | |
| } | |
| ).add_to(m) | |
| # Add layer control | |
| folium.LayerControl().add_to(m) | |
| return m | |
| except Exception as e: | |
| print(f"Gradient smoke map creation error: {e}") | |
| return None | |
| class HMSSmokePolygonGenerator: | |
| """Generate HMS-style smoke plume polygons from HRRR-Smoke data""" | |
| def __init__(self): | |
| # HMS-style category thresholds expressed in µg/m³ (approximate, for visualization) | |
| # These align with the legend and produce HMS-like plumes | |
| self.density_categories = { | |
| 'light': {'min': 3, 'max': 15, 'color': '#FFFF88', 'description': 'Light smoke (3–15 µg/m³)'}, | |
| 'medium': {'min': 15, 'max': 35, 'color': '#FFB366', 'description': 'Medium smoke (15–35 µg/m³)'}, | |
| 'heavy': {'min': 35, 'max': 9999, 'color': '#FF6666', 'description': 'Heavy smoke (35+ µg/m³)'} | |
| } | |
| # HMS visibility-based smoke detection thresholds (primary detection method) | |
| # Based on operational HMS procedures using GOES visible imagery | |
| self.visibility_thresholds = { | |
| 'light': {'max_vis': 15000, 'min_vis': 8000}, # 8-15km visibility = light smoke | |
| 'medium': {'max_vis': 8000, 'min_vis': 4000}, # 4-8km visibility = medium smoke | |
| 'heavy': {'max_vis': 4000, 'min_vis': 0} # <4km visibility = heavy smoke | |
| } | |
| def combine_smoke_parameters(self, datasets): | |
| """ | |
| Combine multiple HRRR smoke parameters using NOAA best practices | |
| Based on HRRR-Smoke operational methodology and HMS standards | |
| """ | |
| if not datasets: | |
| return None | |
| combined_smoke = None | |
| param_weights = {} | |
| # NOAA-based parameter priority and weighting - using realistic scales | |
| if 'MASSDEN' in datasets: | |
| # Near-surface smoke mass density (primary parameter) - scale down if too high | |
| massden_data = np.array(datasets['MASSDEN']) | |
| # If MASSDEN values are very high, they might be in different units - cap at reasonable values | |
| combined_smoke = np.minimum(massden_data, 200.0) # Cap at 200 µg/m³ (very heavy smoke) | |
| param_weights['MASSDEN'] = 1.0 | |
| print(f"Using MASSDEN as primary smoke parameter (capped at 200 µg/m³)") | |
| if 'PM25' in datasets: | |
| # PM2.5 concentration - highly correlated with smoke health impacts | |
| pm25_data = np.array(datasets['PM25']) | |
| pm25_data = np.minimum(pm25_data, 100.0) # Cap PM2.5 at 100 µg/m³ | |
| if combined_smoke is None: | |
| combined_smoke = pm25_data | |
| param_weights['PM25'] = 1.0 | |
| print(f"Using PM25 as primary smoke parameter (capped at 100 µg/m³)") | |
| else: | |
| # Take the maximum of the two primary smoke measurements (more conservative) | |
| combined_smoke = np.maximum(combined_smoke, pm25_data) | |
| param_weights['PM25'] = 0.5 | |
| print(f"Taking maximum of MASSDEN and PM25 for conservative estimate") | |
| if 'COLMD' in datasets: | |
| # Column mass density - useful for transport patterns but don't over-weight | |
| colmd_data = np.array(datasets['COLMD']) | |
| if combined_smoke is None: | |
| # Convert column density to surface estimate (conservative conversion) | |
| combined_smoke = np.minimum(colmd_data * 0.05, 50.0) # 5% of column, max 50 µg/m³ | |
| param_weights['COLMD'] = 1.0 | |
| print(f"Using COLMD as primary with conservative surface conversion") | |
| else: | |
| # Add small enhancement where column density is high (additive, not multiplicative) | |
| transport_enhancement = np.minimum(10.0, colmd_data / 100.0) # Max 10 µg/m³ enhancement | |
| combined_smoke = combined_smoke + transport_enhancement | |
| param_weights['COLMD'] = 0.1 | |
| print(f"Adding small transport enhancement from COLMD") | |
| if 'VIS' in datasets: | |
| # Visibility - convert to smoke estimate using NOAA methodology | |
| visibility_data = np.array(datasets['VIS']) | |
| smoke_from_vis = self.visibility_to_smoke_concentration(visibility_data) | |
| if combined_smoke is None: | |
| combined_smoke = smoke_from_vis | |
| param_weights['VIS'] = 1.0 | |
| print(f"Using visibility-derived smoke as primary parameter") | |
| else: | |
| # Use visibility as cross-validation - take average where both exist | |
| vis_weight = 0.3 | |
| combined_smoke = (1 - vis_weight) * combined_smoke + vis_weight * smoke_from_vis | |
| param_weights['VIS'] = vis_weight | |
| print(f"Blending with visibility-derived smoke (weight: {vis_weight})") | |
| if 'FRPAVG' in datasets: | |
| # Fire Radiative Power - add localized fire emissions (additive enhancement) | |
| frp_data = np.array(datasets['FRPAVG']) | |
| # Convert FRP to additional smoke emissions (realistic fire-to-smoke conversion) | |
| fire_smoke_addition = np.minimum(frp_data * 2.0, 30.0) # Max 30 µg/m³ from fires | |
| if combined_smoke is not None: | |
| combined_smoke = combined_smoke + fire_smoke_addition | |
| param_weights['FRPAVG'] = 0.2 | |
| print(f"Adding fire emission enhancement (max 30 µg/m³ per fire)") | |
| if combined_smoke is not None: | |
| # Final realistic caps to ensure values stay within operational ranges | |
| combined_smoke = np.maximum(0, combined_smoke) # No negative values | |
| combined_smoke = np.minimum(combined_smoke, 250.0) # Cap at extreme heavy smoke | |
| print(f"Combined smoke parameters: {list(param_weights.keys())}") | |
| print(f"Combined smoke range: {combined_smoke.min():.1f} - {combined_smoke.max():.1f}") | |
| return combined_smoke | |
| else: | |
| print("No parameters available for combination, returning empty array") | |
| return np.zeros((50, 60)) | |
| def hms_style_smoke_detection(self, datasets): | |
| """ | |
| HMS-style smoke detection based on visibility and satellite-observable smoke | |
| Mimics how HMS analysts detect smoke from GOES visible imagery | |
| """ | |
| if not datasets: | |
| return None | |
| print("HMS-style smoke detection: Prioritizing visibility-based analysis") | |
| # HMS analysts primarily use visibility for smoke detection | |
| if 'VIS' in datasets: | |
| visibility_data = np.array(datasets['VIS']) | |
| print(f"Using visibility data: range {visibility_data.min():.0f} - {visibility_data.max():.0f} meters") | |
| # Convert visibility to HMS-style smoke categories | |
| smoke_categories = np.zeros_like(visibility_data) | |
| # HMS visibility-based categorization (like satellite analysts would see) | |
| heavy_mask = visibility_data < 4000 # <4km = heavy smoke | |
| medium_mask = (visibility_data >= 4000) & (visibility_data < 8000) # 4-8km = medium | |
| light_mask = (visibility_data >= 8000) & (visibility_data < 15000) # 8-15km = light | |
| smoke_categories[heavy_mask] = 3 # Heavy smoke | |
| smoke_categories[medium_mask] = 2 # Medium smoke | |
| smoke_categories[light_mask] = 1 # Light smoke | |
| # Apply smoothing to create realistic plume shapes (like HMS polygons) | |
| try: | |
| from scipy import ndimage | |
| smoke_categories = ndimage.gaussian_filter(smoke_categories, sigma=1.0) | |
| print("Applied HMS-style Gaussian smoothing") | |
| except ImportError: | |
| print("Scipy not available - using unsmoothed HMS detection") | |
| print(f"HMS-style detection: {np.sum(smoke_categories > 0)} smoke grid points detected") | |
| return smoke_categories | |
| # Fallback: Use MASSDEN/PM25 but apply HMS-style thresholds | |
| elif 'MASSDEN' in datasets or 'PM25' in datasets: | |
| if 'MASSDEN' in datasets: | |
| smoke_data = np.array(datasets['MASSDEN']) | |
| print("Using MASSDEN for HMS-style detection") | |
| else: | |
| smoke_data = np.array(datasets['PM25']) | |
| print("Using PM25 for HMS-style detection") | |
| # Apply HMS-style smoothing and realistic thresholds | |
| try: | |
| from scipy import ndimage | |
| smoothed_smoke = ndimage.gaussian_filter(smoke_data, sigma=2.0) | |
| print("Applied HMS-style Gaussian smoothing to concentration data") | |
| except ImportError: | |
| print("Scipy not available - using unsmoothed concentration data") | |
| smoothed_smoke = smoke_data | |
| # Use lower, more realistic thresholds for visible smoke plumes | |
| smoke_categories = np.zeros_like(smoothed_smoke) | |
| smoke_categories[smoothed_smoke > 50] = 3 # Heavy (very visible smoke) | |
| smoke_categories[(smoothed_smoke > 15) & (smoothed_smoke <= 50)] = 2 # Medium | |
| smoke_categories[(smoothed_smoke > 3) & (smoothed_smoke <= 15)] = 1 # Light | |
| print(f"HMS-style fallback: {np.sum(smoke_categories > 0)} smoke grid points detected") | |
| return smoke_categories | |
| print("HMS-style detection: No suitable parameters found") | |
| return None | |
| def visibility_to_smoke_concentration(self, visibility_data): | |
| """ | |
| Convert visibility data to smoke concentration estimates | |
| Based on NOAA operational relationships between visibility and PM2.5 | |
| """ | |
| # Initialize smoke concentration array | |
| smoke_concentration = np.zeros_like(visibility_data) | |
| # NOAA visibility-smoke relationships (empirical) | |
| # These are based on operational experience and research | |
| # Heavy smoke: visibility < 3 km → PM2.5 > 27 µg/m³ | |
| heavy_mask = visibility_data < 3000 | |
| smoke_concentration[heavy_mask] = 35 + (3000 - visibility_data[heavy_mask]) / 100 | |
| # Medium smoke: 3-6 km visibility → PM2.5 16-27 µg/m³ | |
| medium_mask = (visibility_data >= 3000) & (visibility_data < 6000) | |
| smoke_concentration[medium_mask] = 16 + (6000 - visibility_data[medium_mask]) / 273 | |
| # Light smoke: 6-10 km visibility → PM2.5 5-16 µg/m³ | |
| light_mask = (visibility_data >= 6000) & (visibility_data < 10000) | |
| smoke_concentration[light_mask] = 5 + (10000 - visibility_data[light_mask]) / 364 | |
| # Clear conditions: visibility > 10 km → minimal smoke | |
| clear_mask = visibility_data >= 10000 | |
| smoke_concentration[clear_mask] = np.maximum(0, 5 - (visibility_data[clear_mask] - 10000) / 2000) | |
| return np.maximum(0, smoke_concentration) # Ensure non-negative | |
| def classify_smoke_density(self, values): | |
| """Classify smoke values into HMS density categories""" | |
| classifications = np.zeros_like(values, dtype=int) | |
| # HMS-style categories (post-2022, no direct PM2.5 equivalents) | |
| # Light smoke (category 1) | |
| light_mask = (values >= self.density_categories['light']['min']) & \ | |
| (values < self.density_categories['light']['max']) | |
| classifications[light_mask] = 1 | |
| # Medium smoke (category 2) | |
| medium_mask = (values >= self.density_categories['medium']['min']) & \ | |
| (values < self.density_categories['medium']['max']) | |
| classifications[medium_mask] = 2 | |
| # Heavy smoke (category 3) | |
| heavy_mask = values >= self.density_categories['heavy']['min'] | |
| classifications[heavy_mask] = 3 | |
| return classifications | |
| def extract_smoke_polygons(self, lat2d, lon2d, smoke_values, min_area=0.01, is_hms_categorical=False): | |
| """Extract smoke plume polygons from gridded data with light smoothing for HMS-like shapes""" | |
| polygons = [] | |
| if not HMS_LIBS_AVAILABLE: | |
| print("HMS polygon libraries not available") | |
| return [] | |
| try: | |
| # Handle HMS categorical data vs concentration data | |
| if is_hms_categorical: | |
| # HMS data is already classified (1=light, 2=medium, 3=heavy) | |
| classifications = smoke_values.astype(int) | |
| print(f"Using HMS categorical data directly: {np.unique(classifications)}") | |
| else: | |
| # Lightly smooth the continuous field before categorization to avoid speckle | |
| try: | |
| from scipy import ndimage | |
| smoothed = ndimage.gaussian_filter(smoke_values.astype(float), sigma=1.0) | |
| except Exception: | |
| smoothed = smoke_values | |
| # Classify smoke densities from concentration values (µg/m³) | |
| classifications = self.classify_smoke_density(smoothed) | |
| # Process each density category | |
| for category_name, category_info in self.density_categories.items(): | |
| if category_name == 'light': | |
| category_id = 1 | |
| elif category_name == 'medium': | |
| category_id = 2 | |
| else: # heavy | |
| category_id = 3 | |
| # Create binary mask for this category | |
| category_mask = (classifications == category_id) | |
| if not np.any(category_mask): | |
| continue | |
| # Apply minimal morphology to reduce tiny blocky artifacts | |
| try: | |
| from scipy import ndimage | |
| # Remove isolated noise, close small gaps | |
| smoothed_mask = ndimage.binary_opening(category_mask, iterations=1) | |
| smoothed_mask = ndimage.binary_closing(smoothed_mask, iterations=1) | |
| # Light dilation to smooth edges | |
| smoothed_mask = ndimage.binary_dilation(smoothed_mask, iterations=1) | |
| except Exception: | |
| smoothed_mask = category_mask | |
| # Remove very small components (size threshold by category) | |
| try: | |
| from scipy import ndimage | |
| labeled, num = ndimage.label(smoothed_mask) | |
| sizes = np.bincount(labeled.ravel()) | |
| # Heavier categories can retain smaller features; light should be larger | |
| min_pixels = {1: 80, 2: 50, 3: 25}.get(category_id, 40) | |
| remove = sizes < min_pixels | |
| remove_idx = np.nonzero(remove)[0] | |
| if len(remove_idx) > 0: | |
| smoothed_mask[np.isin(labeled, remove_idx)] = False | |
| except Exception: | |
| pass | |
| # Find contours with simpler algorithm | |
| try: | |
| contours = find_contours(smoothed_mask.astype(float), 0.5) | |
| # Limit number of contours for speed | |
| max_contours = 20 if category_name == 'light' else 50 | |
| contours = contours[:max_contours] | |
| for contour in contours: | |
| if len(contour) < 4: # Need at least 4 points for polygon | |
| continue | |
| # Simplify contour for speed while retaining shape | |
| step = 2 if category_name == 'heavy' else 3 | |
| simplified_contour = contour[::step] | |
| # Convert pixel coordinates to lat/lon | |
| contour_coords = [] | |
| for point in simplified_contour: | |
| try: | |
| row, col = int(point[0]), int(point[1]) | |
| if 0 <= row < lat2d.shape[0] and 0 <= col < lat2d.shape[1]: | |
| lat = lat2d[row, col] | |
| lon = lon2d[row, col] | |
| if not (np.isnan(lat) or np.isnan(lon)): | |
| contour_coords.append((lon, lat)) | |
| except (IndexError, ValueError): | |
| continue | |
| # Ensure polygon is closed | |
| if len(contour_coords) >= 4: | |
| if contour_coords[0] != contour_coords[-1]: | |
| contour_coords.append(contour_coords[0]) | |
| try: | |
| # Create Shapely polygon | |
| poly = Polygon(contour_coords) | |
| # Check minimum area threshold | |
| if poly.is_valid and poly.area >= min_area: | |
| # Get average smoke value in polygon (simplified calculation) | |
| avg_smoke = np.mean(smoke_values[category_mask]) | |
| polygons.append({ | |
| 'geometry': poly, | |
| 'density_category': category_name, | |
| 'density_value': avg_smoke, | |
| 'color': category_info['color'], | |
| 'description': category_info['description'], | |
| 'area_deg2': poly.area | |
| }) | |
| except Exception as e: | |
| print(f"Polygon creation error: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"Contour finding error for {category_name}: {e}") | |
| continue | |
| return polygons | |
| except Exception as e: | |
| print(f"Polygon extraction error: {e}") | |
| return [] | |
| def create_kml_string(self, polygons, forecast_time, model_run_time): | |
| """Create KML string for smoke polygons""" | |
| try: | |
| # Create KML root | |
| kml = ET.Element('kml', xmlns='http://www.opengis.net/kml/2.2') | |
| document = ET.SubElement(kml, 'Document') | |
| # Add document info | |
| ET.SubElement(document, 'name').text = f'HRRR-Smoke Forecast {forecast_time}' | |
| ET.SubElement(document, 'description').text = f'Smoke plume polygons from HRRR-Smoke model run {model_run_time}' | |
| # Add styles for each density category | |
| for category_name, category_info in self.density_categories.items(): | |
| style = ET.SubElement(document, 'Style', id=f'smoke_{category_name}') | |
| poly_style = ET.SubElement(style, 'PolyStyle') | |
| ET.SubElement(poly_style, 'color').text = self._hex_to_kml_color(category_info['color']) | |
| ET.SubElement(poly_style, 'fill').text = '1' | |
| ET.SubElement(poly_style, 'outline').text = '1' | |
| line_style = ET.SubElement(style, 'LineStyle') | |
| ET.SubElement(line_style, 'color').text = self._hex_to_kml_color(category_info['color']) | |
| ET.SubElement(line_style, 'width').text = '2' | |
| # Add polygons | |
| for i, polygon_data in enumerate(polygons): | |
| placemark = ET.SubElement(document, 'Placemark') | |
| ET.SubElement(placemark, 'name').text = f"Smoke Plume {i+1}" | |
| description = f""" | |
| Density: {polygon_data['description']} | |
| Average Value: {polygon_data['density_value']:.1f} µg/m³ | |
| Area: {polygon_data['area_deg2']:.4f} deg² | |
| Forecast Time: {forecast_time} | |
| """ | |
| ET.SubElement(placemark, 'description').text = description.strip() | |
| # Add style reference | |
| ET.SubElement(placemark, 'styleUrl').text = f"#smoke_{polygon_data['density_category']}" | |
| # Add polygon geometry | |
| polygon_elem = ET.SubElement(placemark, 'Polygon') | |
| ET.SubElement(polygon_elem, 'extrude').text = '0' | |
| ET.SubElement(polygon_elem, 'altitudeMode').text = 'clampToGround' | |
| outer_boundary = ET.SubElement(polygon_elem, 'outerBoundaryIs') | |
| linear_ring = ET.SubElement(outer_boundary, 'LinearRing') | |
| # Add coordinates | |
| coords = [] | |
| for lon, lat in polygon_data['geometry'].exterior.coords: | |
| coords.append(f"{lon},{lat},0") | |
| ET.SubElement(linear_ring, 'coordinates').text = ' '.join(coords) | |
| # Convert to string with proper formatting | |
| rough_string = ET.tostring(kml, encoding='unicode') | |
| reparsed = minidom.parseString(rough_string) | |
| return reparsed.toprettyxml(indent=" ") | |
| except Exception as e: | |
| print(f"KML creation error: {e}") | |
| return None | |
| def _hex_to_kml_color(self, hex_color): | |
| """Convert hex color to KML ABGR format""" | |
| # Remove # if present | |
| hex_color = hex_color.lstrip('#') | |
| # Convert RGB to BGR with alpha | |
| r = hex_color[0:2] | |
| g = hex_color[2:4] | |
| b = hex_color[4:6] | |
| # KML uses AABBGGRR format | |
| return f"80{b}{g}{r}" # 80 = 50% opacity | |
| def create_kmz_file(self, polygons, forecast_time, model_run_time, output_path): | |
| """Create KMZ file with smoke polygons""" | |
| try: | |
| kml_content = self.create_kml_string(polygons, forecast_time, model_run_time) | |
| if kml_content is None: | |
| return None | |
| with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as kmz: | |
| kmz.writestr('doc.kml', kml_content) | |
| return output_path | |
| except Exception as e: | |
| print(f"KMZ creation error: {e}") | |
| return None | |
| class HRRRSmokeDataManager: | |
| def __init__(self): | |
| self.temp_dir = tempfile.mkdtemp() | |
| self.forecast_cache = {} | |
| # HRRR-Smoke specific parameters | |
| self.smoke_parameters = { | |
| # Primary smoke parameters | |
| "MASSDEN": {"name": "Smoke Mass Density", "units": "µg/m³", "description": "Total smoke concentration", "colorscale": "OrRd"}, | |
| "COLMD": {"name": "Column Smoke Density", "units": "µg/m²", "description": "Integrated column smoke", "colorscale": "Reds"}, | |
| # Air Quality Index related | |
| "PMTF": {"name": "PM2.5 Total", "units": "µg/m³", "description": "Fine particulate matter from smoke", "colorscale": "Plasma"}, | |
| "PM25": {"name": "PM2.5", "units": "µg/m³", "description": "PM2.5 concentration", "colorscale": "Plasma"}, | |
| # Visibility and optical parameters | |
| "VIS": {"name": "Visibility", "units": "km", "description": "Visibility through smoke", "colorscale": "Blues_r"}, | |
| "EXTCOF55": {"name": "Extinction Coefficient", "units": "1/km", "description": "Light extinction by smoke", "colorscale": "Greys"}, | |
| # Fire and emissions | |
| "FRPAVG": {"name": "Fire Radiative Power", "units": "MW", "description": "Fire intensity", "colorscale": "Hot"}, | |
| "EMSABV": {"name": "Emissions Above Surface", "units": "µg/m²/s", "description": "Smoke emissions rate", "colorscale": "YlOrRd"}, | |
| # Meteorological support | |
| "WIND": {"name": "Wind Speed", "units": "m/s", "description": "Wind speed affecting smoke transport", "colorscale": "Viridis"}, | |
| "WDIR": {"name": "Wind Direction", "units": "degrees", "description": "Wind direction", "colorscale": "hsv"}, | |
| "TMP": {"name": "Temperature", "units": "°C", "description": "Air temperature", "colorscale": "RdYlBu_r"}, | |
| "RH": {"name": "Relative Humidity", "units": "%", "description": "Humidity affecting smoke dispersion", "colorscale": "Blues"}, | |
| } | |
| def fetch_multiple_smoke_parameters(self, params=['MASSDEN', 'VIS', 'COLMD'], level='surface', fxx=6, fast_mode=False): | |
| """ | |
| Fetch multiple HRRR smoke parameters for comprehensive analysis | |
| Based on NOAA best practices for smoke forecasting | |
| """ | |
| datasets = {} | |
| successful_params = [] | |
| # Priority order based on NOAA operational use | |
| param_priority = ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG', 'PMTF'] | |
| # Sort requested parameters by priority | |
| sorted_params = [p for p in param_priority if p in params] + [p for p in params if p not in param_priority] | |
| print(f"Fetching multiple smoke parameters: {sorted_params}") | |
| for param in sorted_params: | |
| try: | |
| ds = self.fetch_hrrr_smoke_data(param, level, fxx, return_info=False, fast_mode=fast_mode) | |
| if ds is not None: | |
| # Extract the data values | |
| var_names = list(ds.data_vars) | |
| if var_names: | |
| var_name = var_names[0] | |
| data_values = ds[var_name].values | |
| # Handle different data shapes | |
| if len(data_values.shape) == 3: # Time, lat, lon | |
| data_values = data_values[0] # Take first time step | |
| elif len(data_values.shape) == 4: # Time, level, lat, lon | |
| data_values = data_values[0, 0] # Take first time and level | |
| datasets[param] = data_values | |
| successful_params.append(param) | |
| print(f"Successfully fetched {param}: shape {data_values.shape}") | |
| else: | |
| print(f"Failed to fetch {param}") | |
| except Exception as e: | |
| print(f"Error fetching {param}: {e}") | |
| continue | |
| print(f"Successfully fetched parameters: {successful_params}") | |
| return datasets if datasets else None | |
| def fetch_hrrr_smoke_data(self, param='MASSDEN', level='surface', fxx=6, return_info=False, fast_mode=False): | |
| """Fetch HRRR-Smoke data using Herbie with optimized fetching strategy""" | |
| if not HERBIE_AVAILABLE: | |
| return (None, None) if return_info else None | |
| # Fast mode - try only most likely parameters and times | |
| if fast_mode: | |
| return self._fast_fetch_hrrr_data(param, level, fxx, return_info) | |
| try: | |
| current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) | |
| # Optimized: Try fewer time periods first (most recent) | |
| for hours_back in [2, 3, 6]: # Reduced from [2, 3, 6, 12, 18] | |
| try: | |
| target_time = current_time - timedelta(hours=hours_back) | |
| date_str = target_time.strftime('%Y-%m-%d %H:00') | |
| print(f"Trying HRRR data for: {date_str}, parameter: {param}") | |
| # Optimized parameter mapping - focus on working parameters | |
| param_mapping = { | |
| 'VIS': 'VIS:surface', # Visibility - WORKS | |
| 'COLMD': 'COLMD:entire atmosphere', # Column smoke - WORKS | |
| 'TMP': 'TMP:2 m', # Temperature | |
| 'WIND': 'WIND:10 m', # Wind speed | |
| 'RH': 'RH:2 m', # Relative humidity | |
| 'WDIR': 'WDIR:10 m', # Wind direction | |
| # Skip problematic parameters for speed | |
| 'MASSDEN': 'VIS:surface', # Fallback to visibility | |
| 'PM25': 'VIS:surface', # Fallback to visibility | |
| } | |
| # Optimized: Try only surface products first | |
| products_to_try = ['sfc', 'nat'] # Reduced from ['sfc', 'nat', 'subh'] | |
| for product in products_to_try: | |
| try: | |
| # Create Herbie object | |
| H = Herbie(date_str, model='hrrr', product=product, fxx=fxx) | |
| # Use mapped parameter or original | |
| search_param = param_mapping.get(param, f"{param}:surface") | |
| print(f" Trying product '{product}' with parameter '{search_param}'") | |
| # Download specific parameter | |
| ds = H.xarray(search_param) | |
| if ds is not None: | |
| print(f"SUCCESS: Got HRRR data for {date_str} using product '{product}'") | |
| if return_info: | |
| info = { | |
| 'date_str': date_str, | |
| 'param': param, | |
| 'search_param': search_param, | |
| 'product': product, | |
| 'level': level, | |
| 'fxx': fxx, | |
| 'model_run': H.date.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': (H.date + timedelta(hours=fxx)).strftime('%Y-%m-%d %H:00 UTC') | |
| } | |
| return ds, info | |
| else: | |
| return ds | |
| except Exception as e: | |
| print(f" Failed with product '{product}': {e}") | |
| continue | |
| # If we get here, all products failed for this time | |
| print(f" All products failed for {date_str}") | |
| except Exception as e: | |
| print(f"Failed for {date_str}: {e}") | |
| continue | |
| print("Optimized HRRR attempts failed, trying fallback...") | |
| return self._fallback_fetch_hrrr_data(param, level, fxx, return_info) | |
| except Exception as e: | |
| print(f"HRRR fetch error: {e}") | |
| return (None, None) if return_info else None | |
| def _fast_fetch_hrrr_data(self, param='VIS', level='surface', fxx=6, return_info=False): | |
| """Ultra-fast HRRR data fetching - try only most reliable parameters""" | |
| try: | |
| current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0) | |
| target_time = current_time - timedelta(hours=2) # Only try most recent | |
| date_str = target_time.strftime('%Y-%m-%d %H:00') | |
| print(f"FAST MODE: Trying HRRR data for: {date_str}, parameter: VIS") | |
| # Only try visibility (most reliable) in fast mode | |
| H = Herbie(date_str, model='hrrr', product='sfc', fxx=fxx) | |
| ds = H.xarray('VIS:surface') | |
| if ds is not None: | |
| print(f"FAST MODE SUCCESS: Got HRRR data for {date_str}") | |
| if return_info: | |
| info = { | |
| 'date_str': date_str, | |
| 'param': 'VIS', | |
| 'search_param': 'VIS:surface', | |
| 'product': 'sfc', | |
| 'level': level, | |
| 'fxx': fxx, | |
| 'model_run': H.date.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': (H.date + timedelta(hours=fxx)).strftime('%Y-%m-%d %H:00 UTC') | |
| } | |
| return ds, info | |
| else: | |
| return ds | |
| print("FAST MODE: No data available") | |
| return (None, None) if return_info else None | |
| except Exception as e: | |
| print(f"FAST MODE error: {e}") | |
| return (None, None) if return_info else None | |
| def _fallback_fetch_hrrr_data(self, param, level, fxx, return_info): | |
| """Fallback to synthetic data when HRRR fails""" | |
| print("Generating synthetic smoke data for demo...") | |
| # Create synthetic visibility/smoke data | |
| lats = np.linspace(25, 50, 100) | |
| lons = np.linspace(-130, -65, 150) | |
| lon2d, lat2d = np.meshgrid(lons, lats) | |
| # Generate synthetic smoke pattern based on known fire regions | |
| smoke_data = np.ones_like(lat2d) * 16000 # Base visibility 16km | |
| # Add synthetic smoke plumes (reduced visibility) | |
| fire_regions = [ | |
| (39.0, -120.0, 8000), # California | |
| (47.0, -114.0, 12000), # Montana | |
| (44.0, -103.0, 10000), # North Dakota | |
| (33.0, -117.0, 6000), # Southern California | |
| ] | |
| for fire_lat, fire_lon, visibility in fire_regions: | |
| dist = np.sqrt((lat2d - fire_lat)**2 * 1.5 + (lon2d - fire_lon)**2) | |
| intensity = np.exp(-dist**2 / 8.0) | |
| smoke_data = np.minimum(smoke_data, visibility + intensity * 8000) | |
| # Create xarray dataset | |
| import xarray as xr | |
| current_time = datetime.utcnow() | |
| ds = xr.Dataset({ | |
| 'vis': (['y', 'x'], smoke_data) | |
| }, coords={ | |
| 'latitude': (['y', 'x'], lat2d), | |
| 'longitude': (['y', 'x'], lon2d), | |
| 'time': current_time | |
| }) | |
| if return_info: | |
| info = { | |
| 'date_str': current_time.strftime('%Y-%m-%d %H:00'), | |
| 'param': param, | |
| 'search_param': 'SYNTHETIC', | |
| 'product': 'synthetic', | |
| 'level': level, | |
| 'fxx': fxx, | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': current_time.strftime('%Y-%m-%d %H:00 UTC') | |
| } | |
| return ds, info | |
| else: | |
| return ds | |
| return (None, None) if return_info else None | |
| def generate_demo_smoke_data(self, param='MASSDEN', fxx=6): | |
| """Generate synthetic smoke data for demonstration when real data unavailable""" | |
| print(f"Generating demo smoke data for {param}") | |
| # Create synthetic grid over CONUS | |
| lats = np.linspace(25, 50, 50) | |
| lons = np.linspace(-125, -70, 60) | |
| lon_grid, lat_grid = np.meshgrid(lons, lats) | |
| # Generate realistic smoke patterns | |
| np.random.seed(42 + fxx) # Consistent but time-varying patterns | |
| # Base pattern with fire hotspots | |
| values = np.zeros_like(lat_grid) | |
| # Add some fire/smoke hotspots (western US pattern) | |
| hotspots = [ | |
| (40.0, -120.0, 50.0), # Northern California | |
| (44.0, -115.0, 30.0), # Idaho | |
| (46.0, -110.0, 25.0), # Montana | |
| (37.0, -105.0, 20.0), # Colorado | |
| ] | |
| for hot_lat, hot_lon, intensity in hotspots: | |
| # Distance-based falloff | |
| dist = np.sqrt((lat_grid - hot_lat)**2 + (lon_grid - hot_lon)**2) | |
| hotspot_values = intensity * np.exp(-dist**2 / 8.0) | |
| # Add wind dispersion (eastward bias) | |
| wind_shift = np.exp(-(lat_grid - hot_lat)**2 / 4.0) * np.exp(-np.maximum(0, hot_lon - lon_grid)**2 / 15.0) | |
| hotspot_values += intensity * 0.3 * wind_shift | |
| values += hotspot_values | |
| # Add time evolution (smoke transport and decay) | |
| time_factor = max(0.1, 1.0 - fxx * 0.05) # Decay over time | |
| values *= time_factor | |
| # Add some random variability | |
| values += np.random.normal(0, 2, values.shape) | |
| values = np.maximum(0, values) # No negative smoke | |
| # Parameter-specific scaling | |
| if param == 'MASSDEN': | |
| values *= 1.0 # µg/m³ | |
| elif param in ['PM25', 'PMTF']: | |
| values *= 0.7 # PM2.5 typically lower | |
| elif param == 'VIS': | |
| values = 20 - values * 0.3 # Visibility decreases with smoke | |
| values = np.maximum(1, values) # Minimum 1 km visibility | |
| elif param == 'COLMD': | |
| values *= 5.0 # Column density higher | |
| # Create xarray dataset | |
| ds = xr.Dataset({ | |
| param: (['lat', 'lon'], values) | |
| }, coords={ | |
| 'lat': lats, | |
| 'lon': lons | |
| }) | |
| ds[param].attrs['units'] = self.smoke_parameters.get(param, {}).get('units', 'µg/m³') | |
| ds[param].attrs['long_name'] = self.smoke_parameters.get(param, {}).get('name', param) | |
| return ds | |
| def generate_multiple_demo_parameters(self, params=['MASSDEN', 'VIS', 'COLMD'], fxx=6): | |
| """Generate multiple synthetic smoke parameters for comprehensive demo""" | |
| print(f"Generating demo data for multiple parameters: {params}") | |
| datasets = {} | |
| # Create base synthetic grid | |
| lats = np.linspace(25, 50, 50) | |
| lons = np.linspace(-125, -70, 60) | |
| lon_grid, lat_grid = np.meshgrid(lons, lats) | |
| # Generate base smoke pattern | |
| np.random.seed(42 + fxx) | |
| base_smoke = np.zeros_like(lat_grid) | |
| # NOAA-style fire hotspots with realistic intensities | |
| hotspots = [ | |
| (40.0, -120.0, 45.0), # Northern California - heavy smoke | |
| (44.0, -115.0, 28.0), # Idaho - medium smoke | |
| (46.0, -110.0, 18.0), # Montana - light smoke | |
| (37.0, -105.0, 32.0), # Colorado - medium-heavy smoke | |
| (42.0, -118.0, 12.0), # Nevada - light smoke | |
| ] | |
| for hot_lat, hot_lon, intensity in hotspots: | |
| dist = np.sqrt((lat_grid - hot_lat)**2 + (lon_grid - hot_lon)**2) | |
| hotspot_values = intensity * np.exp(-dist**2 / 8.0) | |
| # Add wind dispersion (eastward) | |
| wind_shift = np.exp(-(lat_grid - hot_lat)**2 / 4.0) * np.exp(-np.maximum(0, hot_lon - lon_grid)**2 / 15.0) | |
| hotspot_values += intensity * 0.3 * wind_shift | |
| base_smoke += hotspot_values | |
| # Add time evolution and variability | |
| time_factor = max(0.1, 1.0 - fxx * 0.05) | |
| base_smoke *= time_factor | |
| base_smoke += np.random.normal(0, 1, base_smoke.shape) | |
| base_smoke = np.maximum(0, base_smoke) | |
| # Generate parameter-specific data | |
| for param in params: | |
| if param == 'MASSDEN': | |
| # Near-surface smoke mass density (µg/m³) | |
| datasets[param] = base_smoke * 1.0 | |
| elif param == 'PM25': | |
| # PM2.5 concentration (typically 70% of MASSDEN) | |
| datasets[param] = base_smoke * 0.7 | |
| elif param == 'COLMD': | |
| # Column mass density (higher than surface) | |
| datasets[param] = base_smoke * 5.0 | |
| elif param == 'VIS': | |
| # Visibility (decreases with smoke) - NOAA operational relationships | |
| visibility = 20000 - base_smoke * 300 # Start at 20km, reduce with smoke | |
| datasets[param] = np.maximum(500, visibility) # Minimum 500m visibility | |
| elif param == 'FRPAVG': | |
| # Fire Radiative Power (MW) - only at hotspot locations | |
| frp = np.zeros_like(base_smoke) | |
| for hot_lat, hot_lon, intensity in hotspots: | |
| dist = np.sqrt((lat_grid - hot_lat)**2 + (lon_grid - hot_lon)**2) | |
| fire_power = (intensity / 10.0) * np.exp(-dist**2 / 2.0) # More localized | |
| frp += fire_power | |
| datasets[param] = frp | |
| elif param == 'PMTF': | |
| # PM2.5 Total (similar to PM25 but slightly higher) | |
| datasets[param] = base_smoke * 0.8 | |
| else: | |
| # Default: use base smoke pattern | |
| datasets[param] = base_smoke | |
| print(f"Generated {param}: range {datasets[param].min():.1f} - {datasets[param].max():.1f}") | |
| return datasets | |
| def process_smoke_data(self, ds, max_points=1000, param_type='smoke'): | |
| """Process HRRR-Smoke xarray dataset into visualization-ready data""" | |
| if ds is None: | |
| return None | |
| try: | |
| # Get the main data variable | |
| var_names = list(ds.data_vars) | |
| if not var_names: | |
| return None | |
| var_name = var_names[0] | |
| data_var = ds[var_name] | |
| # Get coordinates | |
| if 'latitude' in ds.coords and 'longitude' in ds.coords: | |
| lats = ds.latitude.values | |
| lons = ds.longitude.values | |
| values = data_var.values | |
| elif 'lat' in ds.coords and 'lon' in ds.coords: | |
| lats = ds.lat.values | |
| lons = ds.lon.values | |
| values = data_var.values | |
| else: | |
| return None | |
| # For smoke data, we want to show more detail | |
| if param_type == 'smoke': | |
| max_points = 2000 # More points for smoke detail | |
| min_threshold = 0.1 # Filter very low concentrations | |
| else: | |
| min_threshold = None | |
| # Subsample if needed | |
| if lats.size > max_points: | |
| step = max(1, int(np.sqrt(lats.size / max_points))) | |
| if len(lats.shape) == 2: | |
| lats = lats[::step, ::step] | |
| lons = lons[::step, ::step] | |
| values = values[::step, ::step] | |
| else: | |
| lats = lats[::step] | |
| lons = lons[::step] | |
| values = values[::step] | |
| # Flatten arrays | |
| lats_flat = lats.flatten() | |
| lons_flat = lons.flatten() | |
| values_flat = values.flatten() | |
| # Remove invalid values | |
| valid = ~(np.isnan(values_flat) | np.isnan(lats_flat) | np.isnan(lons_flat)) | |
| # For smoke, filter low concentrations | |
| if param_type == 'smoke' and min_threshold is not None: | |
| smoke_threshold = values_flat > min_threshold | |
| valid = valid & smoke_threshold | |
| if not np.any(valid): | |
| return None | |
| return { | |
| 'lats': lats_flat[valid], | |
| 'lons': lons_flat[valid], | |
| 'values': values_flat[valid], | |
| 'units': data_var.attrs.get('units', ''), | |
| 'long_name': data_var.attrs.get('long_name', var_name), | |
| 'param_type': param_type | |
| } | |
| except Exception as e: | |
| print(f"Smoke data processing error: {e}") | |
| return None | |
| def process_smoke_grid(self, ds, target_cells=25000, param_type='smoke', min_threshold=0.1): | |
| """Process HRRR-Smoke data as 2D grid for contour plotting""" | |
| if ds is None: | |
| return None | |
| try: | |
| var_names = list(ds.data_vars) | |
| if not var_names: | |
| return None | |
| var_name = var_names[0] | |
| data_var = ds[var_name] | |
| # Get coordinate grids | |
| if 'latitude' in ds.coords and 'longitude' in ds.coords: | |
| lat2d = ds.latitude.values | |
| lon2d = ds.longitude.values | |
| elif 'lat' in ds.coords and 'lon' in ds.coords: | |
| lat = ds.lat.values | |
| lon = ds.lon.values | |
| if lat.ndim == 1 and lon.ndim == 1: | |
| lon2d, lat2d = np.meshgrid(lon, lat) | |
| else: | |
| lat2d = lat | |
| lon2d = lon | |
| else: | |
| return None | |
| z = data_var.values | |
| z = np.squeeze(z) | |
| if z.ndim != 2: | |
| return None | |
| # Subsample for performance | |
| ny, nx = z.shape | |
| total = nx * ny | |
| if total > target_cells: | |
| step = int(np.ceil(np.sqrt(total / target_cells))) | |
| step = max(1, step) | |
| z = z[::step, ::step] | |
| lat2d = lat2d[::step, ::step] | |
| lon2d = lon2d[::step, ::step] | |
| # Mask low values for smoke | |
| if param_type == 'smoke' and min_threshold is not None: | |
| z = np.where(z >= min_threshold, z, np.nan) | |
| return { | |
| 'lat2d': lat2d, | |
| 'lon2d': lon2d, | |
| 'z2d': z, | |
| 'units': data_var.attrs.get('units', ''), | |
| 'long_name': data_var.attrs.get('long_name', var_name), | |
| 'param_type': param_type | |
| } | |
| except Exception as e: | |
| print(f"Smoke grid processing error: {e}") | |
| return None | |
| class SmokeNarrativeGenerator: | |
| """Generate narrative smoke forecasts""" | |
| def __init__(self): | |
| pass | |
| def get_aqi_category(self, pm25_value): | |
| """Convert PM2.5 to AQI category""" | |
| if pm25_value <= 12: | |
| return "Good", "green", "Air quality is satisfactory" | |
| elif pm25_value <= 35.4: | |
| return "Moderate", "yellow", "Acceptable for most people" | |
| elif pm25_value <= 55.4: | |
| return "Unhealthy for Sensitive Groups", "orange", "Sensitive people may experience symptoms" | |
| elif pm25_value <= 150.4: | |
| return "Unhealthy", "red", "Everyone may experience health effects" | |
| elif pm25_value <= 250.4: | |
| return "Very Unhealthy", "purple", "Health warnings for everyone" | |
| else: | |
| return "Hazardous", "maroon", "Emergency conditions - avoid all outdoor activity" | |
| def get_visibility_impact(self, visibility_km): | |
| """Describe visibility impacts""" | |
| if visibility_km >= 15: | |
| return "Excellent visibility" | |
| elif visibility_km >= 10: | |
| return "Good visibility with minor haze" | |
| elif visibility_km >= 5: | |
| return "Moderate visibility - noticeable smoke haze" | |
| elif visibility_km >= 2: | |
| return "Poor visibility - heavy smoke conditions" | |
| else: | |
| return "Very poor visibility - dense smoke" | |
| def get_smoke_density_description(self, density): | |
| """Describe smoke density levels""" | |
| if density < 5: | |
| return "Light smoke" | |
| elif density < 25: | |
| return "Moderate smoke" | |
| elif density < 75: | |
| return "Heavy smoke" | |
| else: | |
| return "Very heavy smoke" | |
| def assess_health_risk(self, pm25_max, visibility_min, duration_hours): | |
| """Assess health risks from smoke exposure""" | |
| risks = [] | |
| risk_level = "low" | |
| # PM2.5 health assessment | |
| if pm25_max > 250: | |
| risks.append("HAZARDOUS air quality - avoid all outdoor activities") | |
| risk_level = "extreme" | |
| elif pm25_max > 150: | |
| risks.append("Very unhealthy air - limit outdoor exposure for everyone") | |
| risk_level = "high" | |
| elif pm25_max > 55: | |
| risks.append("Unhealthy air quality - sensitive individuals should stay indoors") | |
| risk_level = "moderate" | |
| elif pm25_max > 35: | |
| risks.append("Moderate air quality - sensitive groups should limit prolonged outdoor exertion") | |
| risk_level = "low" | |
| # Visibility safety | |
| if visibility_min < 2: | |
| risks.append("Dangerous driving conditions due to poor visibility") | |
| if risk_level == "low": | |
| risk_level = "moderate" | |
| elif visibility_min < 5: | |
| risks.append("Reduced visibility may affect driving safety") | |
| # Duration consideration | |
| if duration_hours > 12 and pm25_max > 35: | |
| risks.append("Extended exposure period increases health risks") | |
| return risks, risk_level | |
| def generate_smoke_forecast(self, forecast_data, location="Selected Location"): | |
| """Generate comprehensive smoke forecast narrative""" | |
| if not forecast_data: | |
| return "Smoke forecast data is not available at this time." | |
| # Extract key metrics | |
| pm25_values = [] | |
| visibility_values = [] | |
| smoke_density_values = [] | |
| for param_info in forecast_data: | |
| param_name = param_info['parameter'] | |
| for data_point in param_info['data']: | |
| hour = data_point['step'] | |
| if hour <= 48: # Focus on 48-hour forecast | |
| if param_name in ['PM25', 'PMTF']: | |
| pm25_values.append(data_point['value']) | |
| elif param_name == 'VIS': | |
| visibility_values.append(data_point['value']) | |
| elif param_name == 'MASSDEN': | |
| smoke_density_values.append(data_point['value']) | |
| if not any([pm25_values, visibility_values, smoke_density_values]): | |
| return "Insufficient smoke data to generate forecast." | |
| # Calculate key statistics | |
| pm25_max = max(pm25_values) if pm25_values else 0 | |
| pm25_avg = sum(pm25_values) / len(pm25_values) if pm25_values else 0 | |
| visibility_min = min(visibility_values) if visibility_values else 20 | |
| smoke_max = max(smoke_density_values) if smoke_density_values else 0 | |
| # Generate narrative | |
| forecast_text = f"# 🌬️ **48-Hour Smoke Forecast for {location}**\n\n" | |
| # Overall air quality assessment | |
| aqi_category, aqi_color, aqi_desc = self.get_aqi_category(pm25_max) | |
| forecast_text += f"## 🚨 **Air Quality Alert: {aqi_category}**\n\n" | |
| forecast_text += f"**Peak PM2.5:** {pm25_max:.1f} µg/m³ - {aqi_desc}\n\n" | |
| # Health risk assessment | |
| health_risks, risk_level = self.assess_health_risk(pm25_max, visibility_min, 48) | |
| if health_risks: | |
| risk_emoji = {"extreme": "🚨", "high": "⚠️", "moderate": "⚡", "low": "ℹ️"}.get(risk_level, "ℹ️") | |
| forecast_text += f"{risk_emoji} **Health Recommendations:**\n\n" | |
| for risk in health_risks: | |
| forecast_text += f"• {risk}\n" | |
| forecast_text += "\n" | |
| # Smoke conditions overview | |
| forecast_text += "## 🌫️ **Smoke Conditions Summary**\n\n" | |
| visibility_desc = self.get_visibility_impact(visibility_min) | |
| smoke_desc = self.get_smoke_density_description(smoke_max) | |
| forecast_text += f"**Smoke Intensity:** {smoke_desc} (peak: {smoke_max:.1f} µg/m³)\n" | |
| forecast_text += f"**Visibility Impact:** {visibility_desc} (minimum: {visibility_min:.1f} km)\n" | |
| forecast_text += f"**Average PM2.5:** {pm25_avg:.1f} µg/m³ over forecast period\n\n" | |
| # Detailed timeline (simplified for key periods) | |
| forecast_text += "## ⏰ **48-Hour Timeline**\n\n" | |
| # Group data by time periods | |
| periods = [ | |
| (0, 6, "Next 6 hours"), | |
| (6, 12, "6-12 hours"), | |
| (12, 24, "12-24 hours"), | |
| (24, 48, "24-48 hours") | |
| ] | |
| for start_hour, end_hour, period_name in periods: | |
| period_pm25 = [] | |
| period_visibility = [] | |
| for param_info in forecast_data: | |
| param_name = param_info['parameter'] | |
| for data_point in param_info['data']: | |
| hour = data_point['step'] | |
| if start_hour <= hour < end_hour: | |
| if param_name in ['PM25', 'PMTF']: | |
| period_pm25.append(data_point['value']) | |
| elif param_name == 'VIS': | |
| period_visibility.append(data_point['value']) | |
| if period_pm25 or period_visibility: | |
| avg_pm25 = sum(period_pm25) / len(period_pm25) if period_pm25 else 0 | |
| avg_vis = sum(period_visibility) / len(period_visibility) if period_visibility else 20 | |
| period_aqi, _, period_desc = self.get_aqi_category(avg_pm25) | |
| forecast_text += f"**{period_name}:** {period_aqi} air quality " | |
| forecast_text += f"(PM2.5: {avg_pm25:.1f} µg/m³, Visibility: {avg_vis:.1f} km)\n" | |
| forecast_text += "\n" | |
| # Protective actions | |
| forecast_text += "## 🛡️ **Protective Actions**\n\n" | |
| if pm25_max > 150: | |
| forecast_text += "• **Stay indoors** with windows and doors closed\n" | |
| forecast_text += "• **Use air purifiers** or create a clean air room\n" | |
| forecast_text += "• **Avoid all outdoor activities** including exercise\n" | |
| forecast_text += "• **Wear N95 masks** if you must go outside\n" | |
| elif pm25_max > 55: | |
| forecast_text += "• **Limit outdoor activities**, especially strenuous exercise\n" | |
| forecast_text += "• **Sensitive individuals** should stay indoors\n" | |
| forecast_text += "• **Consider wearing masks** when outdoors\n" | |
| forecast_text += "• **Keep windows closed** and use air conditioning on recirculate\n" | |
| elif pm25_max > 35: | |
| forecast_text += "• **Sensitive groups** should limit prolonged outdoor exertion\n" | |
| forecast_text += "• **Monitor air quality** throughout the day\n" | |
| forecast_text += "• **Consider indoor activities** during peak smoke hours\n" | |
| else: | |
| forecast_text += "• **Normal outdoor activities** are generally acceptable\n" | |
| forecast_text += "• **Monitor conditions** as smoke can change rapidly\n" | |
| if visibility_min < 5: | |
| forecast_text += "• **Drive with caution** - use headlights and reduce speed\n" | |
| forecast_text += "• **Avoid unnecessary travel** during poorest visibility\n" | |
| forecast_text += "\n" | |
| # Data source and disclaimer | |
| forecast_text += "---\n\n" | |
| forecast_text += "*This forecast is based on NOAA HRRR-Smoke model data. " | |
| forecast_text += "Smoke conditions can change rapidly due to fire behavior and weather. " | |
| forecast_text += "Check for updates frequently and follow local emergency guidance.*\n\n" | |
| forecast_text += f"**Generated:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}" | |
| return forecast_text | |
| class HRRRSmokeApp: | |
| def __init__(self): | |
| self.smoke_manager = HRRRSmokeDataManager() | |
| self.narrative_generator = SmokeNarrativeGenerator() | |
| self.polygon_generator = HMSSmokePolygonGenerator() | |
| self.folium_renderer = FoliumSmokeRenderer() # Initialize folium renderer | |
| def create_smoke_map(self, param_type, forecast_hour, detail_level=3, min_threshold=0.1, show_polygons=False, demo_mode=False): | |
| """Create smoke forecast map""" | |
| try: | |
| # Get parameter info | |
| param_info = self.smoke_manager.smoke_parameters.get(param_type, {}) | |
| param_name = param_info.get('name', param_type) | |
| colorscale = param_info.get('colorscale', 'OrRd') | |
| print(f"Fetching HRRR-Smoke {param_type} for +{forecast_hour}h") | |
| # Handle Combined Smoke mode | |
| if param_type == 'COMBINED': | |
| print("Using NOAA Combined Smoke methodology") | |
| if demo_mode: | |
| print("Demo mode: generating multiple synthetic parameters") | |
| # Generate multiple synthetic parameters | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour | |
| ) | |
| combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets) | |
| # Create synthetic xarray dataset | |
| lats = np.linspace(25, 50, 50) | |
| lons = np.linspace(-125, -70, 60) | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| # Fetch multiple real parameters | |
| datasets = self.smoke_manager.fetch_multiple_smoke_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], 'surface', forecast_hour, fast_mode=True | |
| ) | |
| if datasets: | |
| combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets) | |
| # Create combined xarray dataset with proper HRRR grid coordinates | |
| print(f"Main forecast: Creating coordinates for HRRR grid: {combined_smoke.shape}") | |
| lats = np.linspace(20.192, 52.863, combined_smoke.shape[0]) # HRRR CONUS lat range | |
| lons = np.linspace(-134.096, -60.917, combined_smoke.shape[1]) # HRRR CONUS lon range | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| print("Failed to fetch multiple parameters, falling back to demo") | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour | |
| ) | |
| combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets) | |
| lats = np.linspace(25, 50, 50) | |
| lons = np.linspace(-125, -70, 60) | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| info = { | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'product': 'combined' | |
| } | |
| else: | |
| # Single parameter mode (existing logic) | |
| if demo_mode: | |
| print("Demo mode enabled - using synthetic data") | |
| ds = None # Force synthetic data | |
| info = None | |
| else: | |
| ds, info = self.smoke_manager.fetch_hrrr_smoke_data(param_type, 'surface', forecast_hour, return_info=True, fast_mode=True) | |
| # If real data failed, use demo data | |
| if ds is None: | |
| print("Real HRRR data unavailable, using demo smoke data") | |
| ds = self.smoke_manager.generate_demo_smoke_data(param_type, forecast_hour) | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| info = { | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'product': 'demo' | |
| } | |
| fig = go.Figure() | |
| if ds is not None: | |
| # Process as grid for contour plotting | |
| detail_to_cells = {1: 15000, 2: 25000, 3: 35000, 4: 50000, 5: 75000} | |
| target_cells = detail_to_cells.get(int(detail_level), 35000) | |
| grid_data = self.smoke_manager.process_smoke_grid( | |
| ds, target_cells=target_cells, param_type='smoke', min_threshold=min_threshold | |
| ) | |
| if grid_data is not None: | |
| lat2d = grid_data['lat2d'] | |
| lon2d = grid_data['lon2d'] | |
| z2d = grid_data['z2d'] | |
| units = grid_data['units'] | |
| print(f"Plotting smoke grid: {z2d.shape[0]}x{z2d.shape[1]} cells") | |
| # Generate HMS-style polygons if requested (optimized for speed) | |
| polygons = [] | |
| if show_polygons and param_type in ['MASSDEN', 'PM25', 'PMTF', 'HMS'] and HMS_LIBS_AVAILABLE: | |
| try: | |
| # Check if this is HMS categorical data | |
| is_hms = (param_type == 'HMS') | |
| polygons = self.polygon_generator.extract_smoke_polygons( | |
| lat2d, lon2d, z2d, | |
| min_area=0.02, # Larger min area for speed | |
| is_hms_categorical=is_hms | |
| ) | |
| print(f"Generated {len(polygons)} HMS-style smoke polygons") | |
| except Exception as e: | |
| print(f"Polygon generation error: {e}") | |
| polygons = [] | |
| # Add HMS-style polygon overlays if requested | |
| if polygons: | |
| for i, poly_data in enumerate(polygons): | |
| try: | |
| # Get polygon coordinates | |
| coords = list(poly_data['geometry'].exterior.coords) | |
| poly_lons = [c[0] for c in coords] | |
| poly_lats = [c[1] for c in coords] | |
| # Add polygon as scatter trace | |
| fig.add_trace(go.Scatter( | |
| x=poly_lons, | |
| y=poly_lats, | |
| mode='lines', | |
| line=dict( | |
| color=poly_data['color'], | |
| width=3 | |
| ), | |
| fill='toself', | |
| fillcolor=f"rgba({int(poly_data['color'][1:3], 16)}, {int(poly_data['color'][3:5], 16)}, {int(poly_data['color'][5:7], 16)}, 0.3)", | |
| name=f"{poly_data['description']} ({poly_data['density_value']:.1f} µg/m³)", | |
| hovertemplate=f"<b>{poly_data['description']}</b><br>" + | |
| f"Density: {poly_data['density_value']:.1f} µg/m³<br>" + | |
| f"Area: {poly_data['area_deg2']:.4f} deg²<extra></extra>" | |
| )) | |
| except Exception as e: | |
| print(f"Error adding polygon {i}: {e}") | |
| continue | |
| # Try contour plotting | |
| try: | |
| opacity = 0.6 if polygons else 1.0 # Reduce opacity if polygons are shown | |
| # Use 2D z with 1D axes for rectilinear grids (avoids jagged artifacts) | |
| if lat2d.ndim == 2 and lon2d.ndim == 2: | |
| x_axis = lon2d[0, :] | |
| y_axis = lat2d[:, 0] | |
| else: | |
| x_axis = lon2d | |
| y_axis = lat2d | |
| fig.add_trace(go.Contour( | |
| x=x_axis, | |
| y=y_axis, | |
| z=z2d, | |
| colorscale=colorscale, | |
| contours=dict(coloring='heatmap'), | |
| showscale=True, | |
| opacity=opacity, | |
| colorbar=dict( | |
| title=f"{param_name} ({units})", | |
| x=1.02, | |
| len=0.8 | |
| ), | |
| hovertemplate=f'{param_name}: %{{z:.1f}} {units}<extra></extra>', | |
| name=f"HRRR-Smoke {param_name}" | |
| )) | |
| # Update layout for geographic projection | |
| data_source = "Demo Data" if info.get('product') == 'demo' else "HRRR Model" | |
| fig.update_layout( | |
| xaxis_title="Longitude", | |
| yaxis_title="Latitude", | |
| title=f"{data_source}: {param_name} Forecast (+{forecast_hour}h)<br>{info['model_run']} → {info['valid_time']}", | |
| height=600 | |
| ) | |
| # Return appropriate map type | |
| if map_type == 'folium' and polygons: | |
| # Create folium map with grayscale smoke polygons | |
| folium_map = self.folium_renderer.create_folium_map(polygons) | |
| if folium_map: | |
| return folium_map | |
| else: | |
| # Fallback to plotly if folium fails | |
| return fig | |
| else: | |
| return fig | |
| except Exception as e: | |
| print(f"Contour plot failed, trying scatter: {e}") | |
| # Fallback to scatter plot | |
| processed = self.smoke_manager.process_smoke_data(ds, max_points=2000, param_type='smoke') | |
| if processed is not None: | |
| fig.add_trace(go.Scattermapbox( | |
| lat=processed['lats'], | |
| lon=processed['lons'], | |
| mode='markers', | |
| marker=dict( | |
| size=6, | |
| color=processed['values'], | |
| colorscale=colorscale, | |
| showscale=True, | |
| colorbar=dict( | |
| title=f"{param_name} ({units})", | |
| x=1.02, | |
| len=0.8 | |
| ), | |
| opacity=0.8 | |
| ), | |
| text=[f"{v:.1f} {units}" for v in processed['values']], | |
| hovertemplate='<b>%{text}</b><extra></extra>', | |
| name=f"HRRR-Smoke {param_name}" | |
| )) | |
| data_source = "Demo Data" if info.get('product') == 'demo' else "HRRR Model" | |
| fig.update_layout( | |
| mapbox=dict( | |
| style="open-street-map", | |
| zoom=5, | |
| center=dict(lat=39.5, lon=-98.5) | |
| ), | |
| height=600, | |
| title=f"{data_source}: {param_name} Forecast (+{forecast_hour}h)<br>{info['model_run']} → {info['valid_time']}", | |
| margin=dict(l=0, r=100, t=80, b=0) | |
| ) | |
| else: | |
| fig.add_annotation( | |
| text=f"No {param_name} data above threshold ({min_threshold})", | |
| x=0.5, y=0.5, | |
| xref="paper", yref="paper", | |
| showarrow=False, | |
| font=dict(size=14) | |
| ) | |
| else: | |
| fig.add_annotation( | |
| text="HRRR-Smoke data temporarily unavailable<br>Try different parameters or forecast hours", | |
| x=0.5, y=0.5, | |
| xref="paper", yref="paper", | |
| showarrow=False, | |
| font=dict(size=16) | |
| ) | |
| # Return appropriate map type | |
| if map_type == 'folium': | |
| # Return empty folium map for errors in folium mode | |
| return folium.Map(location=[39.5, -98.5], zoom_start=5) | |
| else: | |
| return fig | |
| except Exception as e: | |
| print(f"Smoke map creation error: {e}") | |
| # Return error figure | |
| fig = go.Figure() | |
| fig.add_annotation( | |
| text=f"Error: {str(e)[:100]}", | |
| x=0.5, y=0.5, | |
| xref="paper", yref="paper", | |
| showarrow=False | |
| ) | |
| fig.update_layout(height=400, title="Error Loading Smoke Data") | |
| return fig | |
| def get_smoke_forecast(self, latitude, longitude): | |
| """Get comprehensive smoke forecast for a location""" | |
| try: | |
| forecast_data = [] | |
| forecast_steps = [0, 3, 6, 9, 12, 18, 24, 30, 36, 42, 48] | |
| # Key smoke parameters for forecast | |
| key_params = ['MASSDEN', 'PM25', 'VIS', 'COLMD'] | |
| for param in key_params: | |
| if param not in self.smoke_manager.smoke_parameters: | |
| continue | |
| param_data = [] | |
| for step in forecast_steps: | |
| try: | |
| ds = self.smoke_manager.fetch_hrrr_smoke_data(param, 'surface', step) | |
| if ds is not None: | |
| # Extract point data (simplified - would need proper interpolation) | |
| var_names = list(ds.data_vars) | |
| if var_names: | |
| data_var = ds[var_names[0]] | |
| # Use mean as approximation for point value | |
| value = float(np.nanmean(data_var.values)) | |
| if not np.isnan(value): | |
| param_data.append({ | |
| 'step': step, | |
| 'value': value, | |
| 'datetime': datetime.utcnow() + timedelta(hours=step) | |
| }) | |
| except Exception as e: | |
| print(f"Error getting {param} at step {step}: {e}") | |
| continue | |
| if param_data: | |
| forecast_data.append({ | |
| 'parameter': param, | |
| 'name': self.smoke_manager.smoke_parameters[param]['name'], | |
| 'units': self.smoke_manager.smoke_parameters[param]['units'], | |
| 'data': param_data | |
| }) | |
| return forecast_data | |
| except Exception as e: | |
| print(f"Forecast generation error: {e}") | |
| return [] | |
| def generate_kmz_download(self, param_type, forecast_hour): | |
| """Generate KMZ file for download - optimized for speed""" | |
| try: | |
| # Get smoke data | |
| ds, info = self.smoke_manager.fetch_hrrr_smoke_data(param_type, 'surface', forecast_hour, return_info=True) | |
| if ds is None: | |
| ds = self.smoke_manager.generate_demo_smoke_data(param_type, forecast_hour) | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| info = { | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'product': 'demo' | |
| } | |
| if ds is not None and param_type in ['MASSDEN', 'PM25', 'PMTF', 'COMBINED', 'HMS'] and HMS_LIBS_AVAILABLE: | |
| # Process as grid with reduced resolution for speed | |
| grid_data = self.smoke_manager.process_smoke_grid( | |
| ds, target_cells=8000, param_type='smoke', min_threshold=1.0 # Higher threshold, fewer cells | |
| ) | |
| if grid_data is not None: | |
| # Generate polygons with larger minimum area for speed | |
| # Check if this is HMS categorical data | |
| is_hms = (param_type == 'HMS') | |
| polygons = self.polygon_generator.extract_smoke_polygons( | |
| grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'], | |
| min_area=0.02, # 4x larger min area | |
| is_hms_categorical=is_hms | |
| ) | |
| if polygons: | |
| # Create KMZ file | |
| output_path = os.path.join(self.smoke_manager.temp_dir, f'hrrr_smoke_{param_type}_f{forecast_hour:02d}.kmz') | |
| kmz_path = self.polygon_generator.create_kmz_file( | |
| polygons, info['valid_time'], info['model_run'], output_path | |
| ) | |
| if kmz_path and os.path.exists(kmz_path): | |
| return kmz_path | |
| return None | |
| except Exception as e: | |
| print(f"KMZ generation error: {e}") | |
| return None | |
| def quick_kmz_generation(self, forecast_hour, parameter="MASSDEN", demo_mode=False): | |
| """Ultra-fast KMZ generation for ALL AMERICA smoke plumes""" | |
| try: | |
| print(f"Quick KMZ generation for {parameter} at +{forecast_hour}h - FULL AMERICA DOMAIN") | |
| # Get smoke data with fast mode - handle COMBINED and HMS parameters | |
| if parameter == 'HMS': | |
| print("Quick KMZ: Using HMS-style smoke detection methodology") | |
| if demo_mode: | |
| print("Quick KMZ Demo mode: generating HMS-style synthetic data") | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], forecast_hour | |
| ) | |
| hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets) | |
| # Create synthetic xarray dataset covering ALL AMERICA | |
| lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain | |
| lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| # Fetch HMS-relevant parameters for real HMS-style detection | |
| print("Quick KMZ: Fetching HRRR parameters for HMS-style smoke detection") | |
| datasets = self.smoke_manager.fetch_multiple_smoke_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], 'surface', forecast_hour, fast_mode=True | |
| ) | |
| if datasets: | |
| print(f"Quick KMZ: Successfully fetched {len(datasets)} parameters: {list(datasets.keys())}") | |
| hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets) | |
| if hms_smoke is not None: | |
| # Create coordinates for FULL AMERICA HRRR CONUS domain | |
| print(f"Creating coordinates for HMS-style HRRR grid: {hms_smoke.shape}") | |
| lats = np.linspace(20.192, 52.863, hms_smoke.shape[0]) # From Mexico border to Canada | |
| lons = np.linspace(-134.096, -60.917, hms_smoke.shape[1]) # From Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| print(f"Quick KMZ: Created HMS-style dataset with shape {hms_smoke.shape}") | |
| else: | |
| print("Quick KMZ: HMS-style detection failed, using demo data") | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], forecast_hour | |
| ) | |
| hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets) | |
| lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain | |
| lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| print("Quick KMZ: Using demo data for HMS mode") | |
| else: | |
| print("Quick KMZ: No real parameters fetched, using demo data") | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], forecast_hour | |
| ) | |
| hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets) | |
| lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain | |
| lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| print("Quick KMZ: Using demo data for HMS mode") | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| info = { | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'product': 'hms' | |
| } | |
| elif parameter == 'COMBINED': | |
| print("Quick KMZ: Using NOAA Combined Smoke methodology") | |
| if demo_mode: | |
| print("Quick KMZ Demo mode: generating multiple synthetic parameters") | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour | |
| ) | |
| combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets) | |
| # Create synthetic xarray dataset covering ALL AMERICA | |
| lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain | |
| lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| # Fetch multiple real parameters for Quick KMZ | |
| print("Quick KMZ: Fetching multiple HRRR parameters for COMBINED mode") | |
| datasets = self.smoke_manager.fetch_multiple_smoke_parameters( | |
| ['VIS', 'COLMD', 'MASSDEN', 'PM25', 'FRPAVG'], 'surface', forecast_hour, fast_mode=True | |
| ) | |
| if datasets: | |
| print(f"Quick KMZ: Successfully fetched {len(datasets)} parameters: {list(datasets.keys())}") | |
| combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets) | |
| # Create coordinates for FULL AMERICA HRRR CONUS domain | |
| print(f"Creating coordinates for FULL AMERICA HRRR grid: {combined_smoke.shape}") | |
| # HRRR CONUS covers all of continental United States + southern Canada + northern Mexico | |
| lats = np.linspace(20.192, 52.863, combined_smoke.shape[0]) # From Mexico border to Canada | |
| lons = np.linspace(-134.096, -60.917, combined_smoke.shape[1]) # From Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| print(f"Quick KMZ: Created combined dataset with shape {combined_smoke.shape}") | |
| else: | |
| print("Quick KMZ: No real parameters fetched, using demo data") | |
| datasets = self.smoke_manager.generate_multiple_demo_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour | |
| ) | |
| combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets) | |
| lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain | |
| lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| print("Quick KMZ: Using demo data for COMBINED mode") | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| info = { | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'product': 'combined' | |
| } | |
| else: | |
| # Single parameter mode (existing logic) - Skip HMS since it's already handled above | |
| if demo_mode: | |
| print("Quick KMZ Demo mode enabled - using synthetic data") | |
| ds = None | |
| info = None | |
| else: | |
| ds, info = self.smoke_manager.fetch_hrrr_smoke_data(parameter, 'surface', forecast_hour, return_info=True, fast_mode=True) | |
| if ds is None: | |
| ds = self.smoke_manager.generate_demo_smoke_data(parameter, forecast_hour) | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| info = { | |
| 'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'), | |
| 'product': 'demo' | |
| } | |
| if ds is not None and parameter in ['MASSDEN', 'PM25', 'PMTF', 'COMBINED', 'HMS'] and HMS_LIBS_AVAILABLE: | |
| # Ultra-fast processing: optimized for ALL AMERICA coverage | |
| grid_data = self.smoke_manager.process_smoke_grid( | |
| ds, target_cells=6000, param_type='smoke', min_threshold=0.0 # Zero threshold + more cells for full coverage | |
| ) | |
| if grid_data is not None: | |
| # Quick polygon generation optimized for continental-scale smoke detection | |
| # Check if this is HMS categorical data | |
| is_hms = (parameter == 'HMS') | |
| polygons = self.polygon_generator.extract_smoke_polygons( | |
| grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'], | |
| min_area=0.005, # Even smaller area for continental coverage | |
| is_hms_categorical=is_hms | |
| ) | |
| if polygons: | |
| # Create KMZ file | |
| output_path = os.path.join(self.smoke_manager.temp_dir, f'quick_smoke_{parameter}_f{forecast_hour:02d}.kmz') | |
| kmz_path = self.polygon_generator.create_kmz_file( | |
| polygons, info['valid_time'], info['model_run'], output_path | |
| ) | |
| if kmz_path and os.path.exists(kmz_path): | |
| if parameter == 'HMS': | |
| status = f"🛰️ **HMS-Style Smoke Forecast Generated!**\n\n**Polygons:** {len(polygons)}\n**Method:** Visibility-based detection\n**Forecast:** +{forecast_hour}h\n**Coverage:** Full CONUS (20°-53°N, 134°-61°W)\n\nMimics NOAA HMS analyst procedures using HRRR forecast data." | |
| else: | |
| status = f"⚡ **ALL AMERICA Smoke KMZ Generated!**\n\n**Polygons:** {len(polygons)}\n**Parameter:** {parameter}\n**Forecast:** +{forecast_hour}h\n**Coverage:** Full CONUS (20°-53°N, 134°-61°W)\n\nOptimized for continental-scale smoke detection." | |
| return status, kmz_path | |
| return "KMZ generation failed - no suitable data found", None | |
| except Exception as e: | |
| print(f"Quick KMZ generation error: {e}") | |
| return f"Quick KMZ generation error: {e}", None | |
| def update_smoke_display(self, location, forecast_hour, parameter, detail_level, min_threshold, show_polygons=False, map_type='plotly', demo_mode=False): | |
| """Update smoke forecast display""" | |
| try: | |
| gc.collect() | |
| print(f"\n=== SMOKE UPDATE: {location}, +{forecast_hour}h, {parameter} ===") | |
| # Create smoke map | |
| smoke_map = self.create_smoke_map(parameter, forecast_hour, detail_level, min_threshold, show_polygons, demo_mode) | |
| # Generate status | |
| current_time = datetime.utcnow() | |
| forecast_time = current_time + timedelta(hours=forecast_hour) | |
| param_info = self.smoke_manager.smoke_parameters.get(parameter, {}) | |
| param_name = param_info.get('name', parameter) | |
| status = f""" | |
| ## 🌬️ HRRR-Smoke Forecast | |
| **Location:** {location} | |
| **Current:** {current_time.strftime('%H:%M UTC')} | |
| **Forecast:** {forecast_time.strftime('%H:%M UTC')} (+{forecast_hour}h) | |
| **Parameter:** {param_name} | |
| **Detail Level:** {detail_level} (1=Fast, 5=Max Detail) | |
| **Min Threshold:** {min_threshold} | |
| **Data Source:** {"NOAA HRRR-Smoke Model" if HERBIE_AVAILABLE else "HRRR-Smoke Unavailable"} | |
| **About HRRR-Smoke:** Real-time smoke forecasting using the NOAA High-Resolution Rapid Refresh | |
| model with smoke tracking. Provides detailed predictions of smoke transport, dispersion, and | |
| air quality impacts from wildfires and prescribed burns. | |
| """ | |
| # Try to generate narrative forecast for the location | |
| try: | |
| # Parse coordinates if provided | |
| if ',' in location: | |
| parts = location.split(',') | |
| if len(parts) >= 2: | |
| try: | |
| lat = float(parts[0].strip()) | |
| lon = float(parts[1].strip()) | |
| forecast_data = self.get_smoke_forecast(lat, lon) | |
| narrative = self.narrative_generator.generate_smoke_forecast(forecast_data, location) | |
| except: | |
| narrative = "Enter coordinates as 'latitude, longitude' for detailed forecast" | |
| else: | |
| narrative = "Enter coordinates as 'latitude, longitude' for detailed forecast" | |
| else: | |
| narrative = "Enter coordinates as 'latitude, longitude' for detailed forecast" | |
| except Exception as e: | |
| narrative = f"Narrative forecast error: {e}" | |
| # Generate KMZ download and Folium map if polygons are requested | |
| kmz_path = None | |
| folium_html = "<p>No map data available</p>" | |
| if show_polygons and parameter in ['MASSDEN', 'PM25', 'PMTF', 'HMS'] and HMS_LIBS_AVAILABLE: | |
| kmz_path = self.generate_kmz_download(parameter, forecast_hour) | |
| # Generate Folium map with polygons | |
| try: | |
| # Get the same data used for KMZ | |
| ds, _ = self.smoke_manager.fetch_hrrr_smoke_data(parameter, 'surface', forecast_hour, return_info=True, fast_mode=demo_mode) | |
| if ds is None: | |
| ds = self.smoke_manager.generate_demo_smoke_data(parameter, forecast_hour) | |
| if ds is not None: | |
| # Process data to get polygons | |
| grid_data = self.smoke_manager.process_smoke_grid( | |
| ds, target_cells=8000, param_type='smoke', min_threshold=min_threshold | |
| ) | |
| if grid_data is not None: | |
| # Check if this is HMS categorical data | |
| is_hms = (parameter == 'HMS') | |
| polygons = self.polygon_generator.extract_smoke_polygons( | |
| grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'], | |
| min_area=0.02, | |
| is_hms_categorical=is_hms | |
| ) | |
| if polygons: | |
| # Create Folium map | |
| center_lat = np.mean(grid_data['lat2d']) | |
| center_lon = np.mean(grid_data['lon2d']) | |
| folium_map = self.folium_renderer.create_folium_map( | |
| polygons, center_lat, center_lon, zoom_start=6 | |
| ) | |
| if folium_map: | |
| folium_html = folium_map._repr_html_() | |
| else: | |
| folium_html = "<p>Folium map generation failed</p>" | |
| else: | |
| folium_html = "<p>No smoke polygons detected for current threshold</p>" | |
| else: | |
| folium_html = "<p>Unable to process smoke data</p>" | |
| else: | |
| folium_html = "<p>No smoke data available</p>" | |
| except Exception as e: | |
| print(f"Folium map generation error: {e}") | |
| folium_html = f"<p>Folium map error: {e}</p>" | |
| return status, smoke_map, narrative, kmz_path, folium_html | |
| except Exception as e: | |
| print(f"Smoke update error: {e}") | |
| gc.collect() | |
| error_fig = go.Figure() | |
| error_fig.add_annotation(text=f"Update failed: {str(e)}", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False) | |
| error_fig.update_layout(height=300) | |
| return f"## Error\n{str(e)}", error_fig, "Narrative forecast unavailable due to error", None, "<p>Error loading map</p>" | |
| # Initialize the application | |
| smoke_app = HRRRSmokeApp() | |
| # Gradio interface | |
| with gr.Blocks(title="HRRR Smoke Forecast", theme=gr.themes.Soft()) as app: | |
| gr.HTML(""" | |
| <div style="text-align: center; background: linear-gradient(135deg, #ff7e5f, #feb47b); | |
| color: white; padding: 2rem; border-radius: 15px; margin-bottom: 1rem; box-shadow: 0 4px 6px rgba(0,0,0,0.1);"> | |
| <h1>🌬️ HRRR-Smoke Forecast System</h1> | |
| <p style="font-size: 1.1em; margin-top: 0.5rem;">Real-time wildfire smoke forecasting using NOAA HRRR-Smoke model data</p> | |
| <p style="font-size: 0.9em; opacity: 0.9;">Professional-grade smoke transport and air quality predictions</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📍 Forecast Configuration") | |
| location = gr.Textbox( | |
| value="39.7392, -104.9903", # Denver coordinates | |
| label="Location (lat, lon or city)", | |
| placeholder="Enter coordinates as '39.7392, -104.9903' or city name" | |
| ) | |
| with gr.Row(): | |
| forecast_hour = gr.Slider( | |
| minimum=0, maximum=48, value=12, step=3, | |
| label="Forecast Hours", | |
| info="Hours ahead to forecast" | |
| ) | |
| detail_level = gr.Slider( | |
| minimum=1, maximum=5, value=3, step=1, | |
| label="Detail Level", | |
| info="Higher = more detail (slower loading)" | |
| ) | |
| parameter = gr.Dropdown( | |
| choices=[ | |
| ("🛰️ HMS-Style Smoke Detection", "HMS"), | |
| ("🔥 Combined Smoke (NOAA Best Practices)", "COMBINED"), | |
| ("Smoke Mass Density", "MASSDEN"), | |
| ("PM2.5 Concentration", "PM25"), | |
| ("Column Smoke Density", "COLMD"), | |
| ("Visibility", "VIS"), | |
| ("Fire Radiative Power", "FRPAVG"), | |
| ("Temperature", "TMP"), | |
| ("Wind Speed", "WIND"), | |
| ("Relative Humidity", "RH") | |
| ], | |
| value="HMS", | |
| label="Smoke Parameter", | |
| info="Combined mode uses multiple HRRR parameters with NOAA methodology" | |
| ) | |
| min_threshold = gr.Slider( | |
| minimum=0.0, maximum=10.0, value=0.5, step=0.1, | |
| label="Minimum Threshold", | |
| info="Filter values below this threshold" | |
| ) | |
| show_polygons = gr.Checkbox( | |
| value=True, # Default to True for faster KMZ generation | |
| label="Show HMS-style Polygons + KMZ Download", | |
| info="Generate NOAA HMS-style smoke plume polygons and enable KMZ download" | |
| ) | |
| demo_mode = gr.Checkbox( | |
| value=False, | |
| label="🚀 Demo Mode (Ultra-Fast)", | |
| info="Use synthetic data for instant results when HRRR data is slow/unavailable" | |
| ) | |
| map_type = gr.Radio( | |
| choices=[ | |
| ("Interactive (Plotly)", "plotly"), | |
| ("Leaflet/Grayscale (Folium)", "folium") | |
| ], | |
| value="plotly", | |
| label="Map Type", | |
| info="Choose visualization style" | |
| ) | |
| with gr.Row(): | |
| update_btn = gr.Button("🔄 Get Smoke Forecast", variant="primary", size="lg") | |
| quick_kmz_btn = gr.Button("⚡ Quick KMZ", variant="secondary", size="lg") | |
| gr.HTML(""" | |
| <div style="background: linear-gradient(135deg, #667eea, #764ba2); color: white; padding: 1.5rem; border-radius: 10px; margin-top: 1rem;"> | |
| <h4>🎯 HRRR-Smoke Features:</h4> | |
| <ul style="font-size: 0.9em; margin: 0.5rem 0; list-style-type: none; padding-left: 0;"> | |
| <li>🔥 <strong>Real wildfire tracking:</strong> Live fire detection and emissions</li> | |
| <li>🌪️ <strong>Smoke transport:</strong> 3D atmospheric dispersion modeling</li> | |
| <li>🏥 <strong>Air quality impacts:</strong> PM2.5 and health assessments</li> | |
| <li>👁️ <strong>Visibility forecasts:</strong> Aviation and travel safety</li> | |
| <li>⏰ <strong>48-hour range:</strong> Extended smoke forecasts</li> | |
| <li>🗺️ <strong>High resolution:</strong> 3km grid spacing</li> | |
| <li>📐 <strong>HMS-style polygons:</strong> NOAA-compatible plume boundaries</li> | |
| <li>📦 <strong>KMZ export:</strong> Google Earth compatible files</li> | |
| <li>🌍 <strong>Leaflet maps:</strong> Grayscale smoke visualization</li> | |
| <li>🎨 <strong>Multiple views:</strong> Interactive and satellite-ready styles</li> | |
| </ul> | |
| <p style="font-size: 0.8em; margin-top: 1rem; opacity: 0.9;"> | |
| <strong>Model:</strong> NOAA HRRR-Smoke provides operational smoke forecasts | |
| used by air quality agencies and emergency management. | |
| </p> | |
| </div> | |
| """) | |
| with gr.Column(scale=2): | |
| status_text = gr.Markdown("## Click 'Get Smoke Forecast' to load HRRR-Smoke data") | |
| with gr.Tab("Plotly Map"): | |
| smoke_map = gr.Plot(label="Interactive Smoke Forecast Map") | |
| with gr.Tab("Leaflet Map (KMZ)"): | |
| folium_map = gr.HTML(label="Leaflet Map with KMZ Display", value="<p>Click 'Get Smoke Forecast' to load map</p>") | |
| with gr.Row(): | |
| kmz_download = gr.File( | |
| label="Download KMZ File (Google Earth Compatible)", | |
| visible=True, # Make visible by default to show quick generation works | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| narrative_forecast = gr.Markdown( | |
| label="Detailed Smoke Forecast", | |
| value="## Detailed smoke forecast will appear here after selecting a location and updating forecast." | |
| ) | |
| def update_display_wrapper(*args): | |
| """Wrapper to handle KMZ file display and Folium map""" | |
| result = smoke_app.update_smoke_display(*args) | |
| if len(result) == 5: | |
| # New format: status, plot, narrative, kmz_path, folium_html | |
| status, plot, narrative, kmz_path, folium_html = result | |
| if kmz_path is not None: | |
| return status, plot, narrative, gr.update(value=kmz_path, visible=True), folium_html | |
| else: | |
| return status, plot, narrative, gr.update(visible=False), folium_html | |
| elif len(result) == 4: | |
| # Old format: status, plot, narrative, kmz_path | |
| status, plot, narrative, kmz_path = result | |
| if kmz_path is not None: | |
| return status, plot, narrative, gr.update(value=kmz_path, visible=True), "<p>Folium map not available</p>" | |
| else: | |
| return status, plot, narrative, gr.update(visible=False), "<p>Folium map not available</p>" | |
| else: | |
| # Fallback | |
| return result[0], result[1], result[2], gr.update(visible=False), "<p>Error loading map</p>" | |
| def quick_kmz_wrapper(forecast_hour, parameter, demo_mode): | |
| """Wrapper for quick KMZ generation with Folium map display""" | |
| status, kmz_path = smoke_app.quick_kmz_generation(forecast_hour, parameter, demo_mode) | |
| # Generate Folium map for the same data | |
| folium_html = "<p>No map data available</p>" | |
| try: | |
| # Handle HMS and COMBINED parameters in Folium wrapper | |
| if parameter == 'HMS': | |
| if demo_mode: | |
| datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], forecast_hour | |
| ) | |
| hms_smoke = smoke_app.polygon_generator.hms_style_smoke_detection(datasets) | |
| lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain | |
| lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| datasets = smoke_app.smoke_manager.fetch_multiple_smoke_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], 'surface', forecast_hour, fast_mode=True | |
| ) | |
| if datasets: | |
| hms_smoke = smoke_app.polygon_generator.hms_style_smoke_detection(datasets) | |
| if hms_smoke is not None: | |
| # Use HRRR grid coordinates for HMS data | |
| lats = np.linspace(20.192, 52.863, hms_smoke.shape[0]) | |
| lons = np.linspace(-134.096, -60.917, hms_smoke.shape[1]) | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| ds = None | |
| else: | |
| datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters( | |
| ['VIS', 'MASSDEN', 'PM25'], forecast_hour | |
| ) | |
| hms_smoke = smoke_app.polygon_generator.hms_style_smoke_detection(datasets) | |
| lats = np.linspace(20.192, 52.863, 50) | |
| lons = np.linspace(-134.096, -60.917, 60) | |
| ds = xr.Dataset({ | |
| 'hms_smoke': (['lat', 'lon'], hms_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| elif parameter == 'COMBINED': | |
| if demo_mode: | |
| datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour | |
| ) | |
| combined_smoke = smoke_app.polygon_generator.combine_smoke_parameters(datasets) | |
| lats = np.linspace(25, 50, 50) | |
| lons = np.linspace(-125, -70, 60) | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| datasets = smoke_app.smoke_manager.fetch_multiple_smoke_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], 'surface', forecast_hour, fast_mode=True | |
| ) | |
| if datasets: | |
| combined_smoke = smoke_app.polygon_generator.combine_smoke_parameters(datasets) | |
| # Use HRRR grid coordinates for Folium wrapper too | |
| lats = np.linspace(20.192, 52.863, combined_smoke.shape[0]) | |
| lons = np.linspace(-134.096, -60.917, combined_smoke.shape[1]) | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters( | |
| ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour | |
| ) | |
| combined_smoke = smoke_app.polygon_generator.combine_smoke_parameters(datasets) | |
| # Use demo coordinates for synthetic data (50x60 grid) | |
| lats = np.linspace(25, 50, combined_smoke.shape[0]) | |
| lons = np.linspace(-125, -70, combined_smoke.shape[1]) | |
| ds = xr.Dataset({ | |
| 'combined_smoke': (['lat', 'lon'], combined_smoke) | |
| }, coords={'lat': lats, 'lon': lons}) | |
| else: | |
| # Single parameter mode | |
| if demo_mode: | |
| ds = None | |
| else: | |
| ds, _ = smoke_app.smoke_manager.fetch_hrrr_smoke_data(parameter, 'surface', forecast_hour, return_info=True, fast_mode=True) | |
| if ds is None: | |
| ds = smoke_app.smoke_manager.generate_demo_smoke_data(parameter, forecast_hour) | |
| if ds is not None and parameter in ['MASSDEN', 'PM25', 'PMTF', 'COMBINED', 'HMS'] and HMS_LIBS_AVAILABLE: | |
| # Ultra-fast processing for Quick KMZ with zero threshold to show all smoke | |
| grid_data = smoke_app.smoke_manager.process_smoke_grid( | |
| ds, target_cells=4000, param_type='smoke', min_threshold=0.0 | |
| ) | |
| if grid_data is not None: | |
| # Check if this is HMS categorical data | |
| is_hms = (parameter == 'HMS') | |
| polygons = smoke_app.polygon_generator.extract_smoke_polygons( | |
| grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'], | |
| min_area=0.01, | |
| is_hms_categorical=is_hms | |
| ) | |
| if polygons: | |
| # Create Folium map | |
| center_lat = np.mean(grid_data['lat2d']) | |
| center_lon = np.mean(grid_data['lon2d']) | |
| folium_map = smoke_app.folium_renderer.create_folium_map( | |
| polygons, center_lat, center_lon, zoom_start=6 | |
| ) | |
| if folium_map: | |
| folium_html = folium_map._repr_html_() | |
| else: | |
| folium_html = "<p>Quick KMZ: Folium map generation failed</p>" | |
| else: | |
| folium_html = "<p>Quick KMZ: No smoke polygons detected</p>" | |
| else: | |
| folium_html = "<p>Quick KMZ: Unable to process smoke data</p>" | |
| else: | |
| folium_html = "<p>Quick KMZ: Parameter not supported for polygon generation</p>" | |
| except Exception as e: | |
| print(f"Quick KMZ Folium map generation error: {e}") | |
| folium_html = f"<p>Quick KMZ map error: {e}</p>" | |
| if kmz_path is not None: | |
| return status, gr.update(value=kmz_path, visible=True), folium_html | |
| else: | |
| return status, gr.update(visible=False), folium_html | |
| # Event handlers | |
| update_btn.click( | |
| fn=update_display_wrapper, | |
| inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type, demo_mode], | |
| outputs=[status_text, smoke_map, narrative_forecast, kmz_download, folium_map] | |
| ) | |
| # Quick KMZ generation | |
| quick_kmz_btn.click( | |
| fn=quick_kmz_wrapper, | |
| inputs=[forecast_hour, parameter, demo_mode], | |
| outputs=[status_text, kmz_download, folium_map] | |
| ) | |
| # Auto-update when parameter changes | |
| parameter.change( | |
| fn=update_display_wrapper, | |
| inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type], | |
| outputs=[status_text, smoke_map, narrative_forecast, kmz_download] | |
| ) | |
| # Update when polygon option changes | |
| show_polygons.change( | |
| fn=update_display_wrapper, | |
| inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type], | |
| outputs=[status_text, smoke_map, narrative_forecast, kmz_download] | |
| ) | |
| # Update when map type changes | |
| map_type.change( | |
| fn=update_display_wrapper, | |
| inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type], | |
| outputs=[status_text, smoke_map, narrative_forecast, kmz_download] | |
| ) | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 1rem; margin-top: 2rem; border-top: 1px solid #eee; color: #666;"> | |
| <p><strong>Data Source:</strong> NOAA HRRR-Smoke Model | <strong>Update Frequency:</strong> Hourly | <strong>Resolution:</strong> 3km</p> | |
| <p style="font-size: 0.8em;"> | |
| HRRR-Smoke forecasts are experimental and should be used in conjunction with official air quality and emergency guidance. | |
| </p> | |
| </div> | |
| """) | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0", server_port=7860) | |