hrrr_smoke / app.py
nakas's picture
Fix smoke plume artifacts: realistic HMS thresholds, light smoothing/morphology for polygons, proper contour grid axes, and legend sync
b1d5591
#!/usr/bin/env python3
"""
HRRR Smoke Forecast Application
Real-time smoke plume forecasting using NOAA HRRR-Smoke model data
Based on successful patterns from ECMWF and HrrrVizCodex applications
"""
import gradio as gr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import xarray as xr
import warnings
import gc
import sys
import math
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple
import folium
import requests
import tempfile
import os
import time
import zipfile
import xml.etree.ElementTree as ET
from xml.dom import minidom
warnings.filterwarnings('ignore')
# Import weather libraries for REAL HRRR-Smoke data
try:
from herbie import Herbie
import cfgrib
HERBIE_AVAILABLE = True
print("HERBIE AVAILABLE - Will use real HRRR-Smoke data")
except ImportError as e:
HERBIE_AVAILABLE = False
print(f"HERBIE NOT AVAILABLE: {e}")
# Import optional libraries for HMS polygon generation
try:
from shapely.geometry import Polygon, MultiPolygon
from shapely.ops import unary_union
import geopandas as gpd
from scipy.ndimage import label, binary_dilation
from skimage.measure import find_contours
HMS_LIBS_AVAILABLE = True
print("HMS POLYGON LIBRARIES AVAILABLE")
except ImportError as e:
HMS_LIBS_AVAILABLE = False
print(f"HMS POLYGON LIBRARIES NOT AVAILABLE: {e}")
# Import folium for leaflet maps
try:
import folium
from folium.plugins import HeatMap
FOLIUM_AVAILABLE = True
print("FOLIUM AVAILABLE")
except ImportError as e:
FOLIUM_AVAILABLE = False
print(f"FOLIUM NOT AVAILABLE: {e}")
class FoliumSmokeRenderer:
"""Render smoke plumes on folium/leaflet maps with grayscale styling"""
def __init__(self):
# Grayscale smoke styling based on density
self.grayscale_styles = {
'light': {
'fillColor': '#E8E8E8', # Light gray
'color': '#CCCCCC', # Border
'weight': 1,
'fillOpacity': 0.4,
'opacity': 0.6
},
'medium': {
'fillColor': '#AAAAAA', # Medium gray
'color': '#888888', # Border
'weight': 2,
'fillOpacity': 0.6,
'opacity': 0.8
},
'heavy': {
'fillColor': '#666666', # Dark gray
'color': '#444444', # Border
'weight': 2,
'fillOpacity': 0.8,
'opacity': 1.0
}
}
def create_folium_map(self, polygons, center_lat=39.5, center_lon=-98.5, zoom_start=5):
"""Create folium map with grayscale smoke polygons"""
if not FOLIUM_AVAILABLE:
print("Folium not available")
return None
try:
# Create base map with satellite imagery
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=zoom_start,
tiles='OpenStreetMap' # Start with OSM, add satellite option
)
# Add satellite imagery option
folium.TileLayer(
tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr='Esri',
name='Satellite',
overlay=False,
control=True
).add_to(m)
# Add CartoDB Positron for clean background
folium.TileLayer(
tiles='CartoDB positron',
name='Clean',
overlay=False,
control=True
).add_to(m)
# Add smoke polygons with grayscale styling
if polygons:
for i, poly_data in enumerate(polygons):
try:
# Get polygon coordinates
coords = list(poly_data['geometry'].exterior.coords)
# Convert to lat/lon pairs for folium (note: folium expects [lat, lon])
folium_coords = [[lat, lon] for lon, lat in coords]
# Get styling for density category
style = self.grayscale_styles.get(
poly_data['density_category'],
self.grayscale_styles['medium']
)
# Create popup with smoke information
popup_html = f"""
<div style="font-family: Arial, sans-serif; width: 200px;">
<h4 style="margin: 0; color: #333;">🌬️ Smoke Plume</h4>
<hr style="margin: 5px 0;">
<p style="margin: 3px 0;"><b>Density:</b> {poly_data['description']}</p>
<p style="margin: 3px 0;"><b>Concentration:</b> {poly_data['density_value']:.1f} µg/m³</p>
<p style="margin: 3px 0;"><b>Area:</b> {poly_data['area_deg2']:.4f} deg²</p>
</div>
"""
# Add polygon to map
folium.Polygon(
locations=folium_coords,
popup=folium.Popup(popup_html, max_width=300),
tooltip=f"{poly_data['description']}: {poly_data['density_value']:.1f} µg/m³",
**style
).add_to(m)
except Exception as e:
print(f"Error adding polygon {i} to folium map: {e}")
continue
# Add layer control
folium.LayerControl().add_to(m)
# Add legend
self._add_smoke_legend(m)
return m
except Exception as e:
print(f"Folium map creation error: {e}")
return None
def _add_smoke_legend(self, folium_map):
"""Add smoke density legend to the map"""
legend_html = '''
<div style="position: fixed;
top: 10px; right: 10px; width: 150px; height: 120px;
background-color: white; border:2px solid grey; z-index:9999;
font-size:14px; padding: 10px; box-shadow: 2px 2px 6px rgba(0,0,0,0.3);
">
<h4 style="margin: 0 0 10px 0; color: #333;">🌬️ Smoke Density</h4>
<div style="margin: 5px 0;">
<span style="display: inline-block; width: 20px; height: 15px;
background-color: #E8E8E8; border: 1px solid #CCC; margin-right: 5px;"></span>
<span style="font-size: 12px;">Light (3–15 µg/m³)</span>
</div>
<div style="margin: 5px 0;">
<span style="display: inline-block; width: 20px; height: 15px;
background-color: #AAAAAA; border: 1px solid #888; margin-right: 5px;"></span>
<span style="font-size: 12px;">Medium (15-35 µg/m³)</span>
</div>
<div style="margin: 5px 0;">
<span style="display: inline-block; width: 20px; height: 15px;
background-color: #666666; border: 1px solid #444; margin-right: 5px;"></span>
<span style="font-size: 12px;">Heavy (35+ µg/m³)</span>
</div>
</div>
'''
folium_map.get_root().html.add_child(folium.Element(legend_html))
def create_gradient_smoke_map(self, lat2d, lon2d, smoke_values, center_lat=39.5, center_lon=-98.5, zoom_start=5):
"""Create folium map with gradient-based smoke visualization"""
if not FOLIUM_AVAILABLE:
print("Folium not available")
return None
try:
# Create base map
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=zoom_start,
tiles='CartoDB positron'
)
# Add satellite imagery option
folium.TileLayer(
tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr='Esri',
name='Satellite',
overlay=False,
control=True
).add_to(m)
# Create heat map data from smoke values
heat_data = []
# Sample the grid to create heat points
ny, nx = smoke_values.shape
step = max(1, min(ny, nx) // 50) # Limit to ~50x50 points for performance
for i in range(0, ny, step):
for j in range(0, nx, step):
if not np.isnan(smoke_values[i, j]) and smoke_values[i, j] > 0.5:
lat = lat2d[i, j] if lat2d.ndim == 2 else lat2d[i]
lon = lon2d[i, j] if lon2d.ndim == 2 else lon2d[j]
if not (np.isnan(lat) or np.isnan(lon)):
# Normalize smoke value for heat intensity
intensity = min(1.0, smoke_values[i, j] / 100.0)
heat_data.append([lat, lon, intensity])
# Add heat map if we have data
if heat_data and FOLIUM_AVAILABLE:
HeatMap(
heat_data,
min_opacity=0.2,
max_zoom=18,
radius=15,
blur=10,
gradient={
0.0: 'rgba(0,0,0,0)', # Transparent
0.2: 'rgba(128,128,128,0.3)', # Light gray
0.5: 'rgba(96,96,96,0.6)', # Medium gray
1.0: 'rgba(64,64,64,0.9)' # Dark gray
}
).add_to(m)
# Add layer control
folium.LayerControl().add_to(m)
return m
except Exception as e:
print(f"Gradient smoke map creation error: {e}")
return None
class HMSSmokePolygonGenerator:
"""Generate HMS-style smoke plume polygons from HRRR-Smoke data"""
def __init__(self):
# HMS-style category thresholds expressed in µg/m³ (approximate, for visualization)
# These align with the legend and produce HMS-like plumes
self.density_categories = {
'light': {'min': 3, 'max': 15, 'color': '#FFFF88', 'description': 'Light smoke (3–15 µg/m³)'},
'medium': {'min': 15, 'max': 35, 'color': '#FFB366', 'description': 'Medium smoke (15–35 µg/m³)'},
'heavy': {'min': 35, 'max': 9999, 'color': '#FF6666', 'description': 'Heavy smoke (35+ µg/m³)'}
}
# HMS visibility-based smoke detection thresholds (primary detection method)
# Based on operational HMS procedures using GOES visible imagery
self.visibility_thresholds = {
'light': {'max_vis': 15000, 'min_vis': 8000}, # 8-15km visibility = light smoke
'medium': {'max_vis': 8000, 'min_vis': 4000}, # 4-8km visibility = medium smoke
'heavy': {'max_vis': 4000, 'min_vis': 0} # <4km visibility = heavy smoke
}
def combine_smoke_parameters(self, datasets):
"""
Combine multiple HRRR smoke parameters using NOAA best practices
Based on HRRR-Smoke operational methodology and HMS standards
"""
if not datasets:
return None
combined_smoke = None
param_weights = {}
# NOAA-based parameter priority and weighting - using realistic scales
if 'MASSDEN' in datasets:
# Near-surface smoke mass density (primary parameter) - scale down if too high
massden_data = np.array(datasets['MASSDEN'])
# If MASSDEN values are very high, they might be in different units - cap at reasonable values
combined_smoke = np.minimum(massden_data, 200.0) # Cap at 200 µg/m³ (very heavy smoke)
param_weights['MASSDEN'] = 1.0
print(f"Using MASSDEN as primary smoke parameter (capped at 200 µg/m³)")
if 'PM25' in datasets:
# PM2.5 concentration - highly correlated with smoke health impacts
pm25_data = np.array(datasets['PM25'])
pm25_data = np.minimum(pm25_data, 100.0) # Cap PM2.5 at 100 µg/m³
if combined_smoke is None:
combined_smoke = pm25_data
param_weights['PM25'] = 1.0
print(f"Using PM25 as primary smoke parameter (capped at 100 µg/m³)")
else:
# Take the maximum of the two primary smoke measurements (more conservative)
combined_smoke = np.maximum(combined_smoke, pm25_data)
param_weights['PM25'] = 0.5
print(f"Taking maximum of MASSDEN and PM25 for conservative estimate")
if 'COLMD' in datasets:
# Column mass density - useful for transport patterns but don't over-weight
colmd_data = np.array(datasets['COLMD'])
if combined_smoke is None:
# Convert column density to surface estimate (conservative conversion)
combined_smoke = np.minimum(colmd_data * 0.05, 50.0) # 5% of column, max 50 µg/m³
param_weights['COLMD'] = 1.0
print(f"Using COLMD as primary with conservative surface conversion")
else:
# Add small enhancement where column density is high (additive, not multiplicative)
transport_enhancement = np.minimum(10.0, colmd_data / 100.0) # Max 10 µg/m³ enhancement
combined_smoke = combined_smoke + transport_enhancement
param_weights['COLMD'] = 0.1
print(f"Adding small transport enhancement from COLMD")
if 'VIS' in datasets:
# Visibility - convert to smoke estimate using NOAA methodology
visibility_data = np.array(datasets['VIS'])
smoke_from_vis = self.visibility_to_smoke_concentration(visibility_data)
if combined_smoke is None:
combined_smoke = smoke_from_vis
param_weights['VIS'] = 1.0
print(f"Using visibility-derived smoke as primary parameter")
else:
# Use visibility as cross-validation - take average where both exist
vis_weight = 0.3
combined_smoke = (1 - vis_weight) * combined_smoke + vis_weight * smoke_from_vis
param_weights['VIS'] = vis_weight
print(f"Blending with visibility-derived smoke (weight: {vis_weight})")
if 'FRPAVG' in datasets:
# Fire Radiative Power - add localized fire emissions (additive enhancement)
frp_data = np.array(datasets['FRPAVG'])
# Convert FRP to additional smoke emissions (realistic fire-to-smoke conversion)
fire_smoke_addition = np.minimum(frp_data * 2.0, 30.0) # Max 30 µg/m³ from fires
if combined_smoke is not None:
combined_smoke = combined_smoke + fire_smoke_addition
param_weights['FRPAVG'] = 0.2
print(f"Adding fire emission enhancement (max 30 µg/m³ per fire)")
if combined_smoke is not None:
# Final realistic caps to ensure values stay within operational ranges
combined_smoke = np.maximum(0, combined_smoke) # No negative values
combined_smoke = np.minimum(combined_smoke, 250.0) # Cap at extreme heavy smoke
print(f"Combined smoke parameters: {list(param_weights.keys())}")
print(f"Combined smoke range: {combined_smoke.min():.1f} - {combined_smoke.max():.1f}")
return combined_smoke
else:
print("No parameters available for combination, returning empty array")
return np.zeros((50, 60))
def hms_style_smoke_detection(self, datasets):
"""
HMS-style smoke detection based on visibility and satellite-observable smoke
Mimics how HMS analysts detect smoke from GOES visible imagery
"""
if not datasets:
return None
print("HMS-style smoke detection: Prioritizing visibility-based analysis")
# HMS analysts primarily use visibility for smoke detection
if 'VIS' in datasets:
visibility_data = np.array(datasets['VIS'])
print(f"Using visibility data: range {visibility_data.min():.0f} - {visibility_data.max():.0f} meters")
# Convert visibility to HMS-style smoke categories
smoke_categories = np.zeros_like(visibility_data)
# HMS visibility-based categorization (like satellite analysts would see)
heavy_mask = visibility_data < 4000 # <4km = heavy smoke
medium_mask = (visibility_data >= 4000) & (visibility_data < 8000) # 4-8km = medium
light_mask = (visibility_data >= 8000) & (visibility_data < 15000) # 8-15km = light
smoke_categories[heavy_mask] = 3 # Heavy smoke
smoke_categories[medium_mask] = 2 # Medium smoke
smoke_categories[light_mask] = 1 # Light smoke
# Apply smoothing to create realistic plume shapes (like HMS polygons)
try:
from scipy import ndimage
smoke_categories = ndimage.gaussian_filter(smoke_categories, sigma=1.0)
print("Applied HMS-style Gaussian smoothing")
except ImportError:
print("Scipy not available - using unsmoothed HMS detection")
print(f"HMS-style detection: {np.sum(smoke_categories > 0)} smoke grid points detected")
return smoke_categories
# Fallback: Use MASSDEN/PM25 but apply HMS-style thresholds
elif 'MASSDEN' in datasets or 'PM25' in datasets:
if 'MASSDEN' in datasets:
smoke_data = np.array(datasets['MASSDEN'])
print("Using MASSDEN for HMS-style detection")
else:
smoke_data = np.array(datasets['PM25'])
print("Using PM25 for HMS-style detection")
# Apply HMS-style smoothing and realistic thresholds
try:
from scipy import ndimage
smoothed_smoke = ndimage.gaussian_filter(smoke_data, sigma=2.0)
print("Applied HMS-style Gaussian smoothing to concentration data")
except ImportError:
print("Scipy not available - using unsmoothed concentration data")
smoothed_smoke = smoke_data
# Use lower, more realistic thresholds for visible smoke plumes
smoke_categories = np.zeros_like(smoothed_smoke)
smoke_categories[smoothed_smoke > 50] = 3 # Heavy (very visible smoke)
smoke_categories[(smoothed_smoke > 15) & (smoothed_smoke <= 50)] = 2 # Medium
smoke_categories[(smoothed_smoke > 3) & (smoothed_smoke <= 15)] = 1 # Light
print(f"HMS-style fallback: {np.sum(smoke_categories > 0)} smoke grid points detected")
return smoke_categories
print("HMS-style detection: No suitable parameters found")
return None
def visibility_to_smoke_concentration(self, visibility_data):
"""
Convert visibility data to smoke concentration estimates
Based on NOAA operational relationships between visibility and PM2.5
"""
# Initialize smoke concentration array
smoke_concentration = np.zeros_like(visibility_data)
# NOAA visibility-smoke relationships (empirical)
# These are based on operational experience and research
# Heavy smoke: visibility < 3 km → PM2.5 > 27 µg/m³
heavy_mask = visibility_data < 3000
smoke_concentration[heavy_mask] = 35 + (3000 - visibility_data[heavy_mask]) / 100
# Medium smoke: 3-6 km visibility → PM2.5 16-27 µg/m³
medium_mask = (visibility_data >= 3000) & (visibility_data < 6000)
smoke_concentration[medium_mask] = 16 + (6000 - visibility_data[medium_mask]) / 273
# Light smoke: 6-10 km visibility → PM2.5 5-16 µg/m³
light_mask = (visibility_data >= 6000) & (visibility_data < 10000)
smoke_concentration[light_mask] = 5 + (10000 - visibility_data[light_mask]) / 364
# Clear conditions: visibility > 10 km → minimal smoke
clear_mask = visibility_data >= 10000
smoke_concentration[clear_mask] = np.maximum(0, 5 - (visibility_data[clear_mask] - 10000) / 2000)
return np.maximum(0, smoke_concentration) # Ensure non-negative
def classify_smoke_density(self, values):
"""Classify smoke values into HMS density categories"""
classifications = np.zeros_like(values, dtype=int)
# HMS-style categories (post-2022, no direct PM2.5 equivalents)
# Light smoke (category 1)
light_mask = (values >= self.density_categories['light']['min']) & \
(values < self.density_categories['light']['max'])
classifications[light_mask] = 1
# Medium smoke (category 2)
medium_mask = (values >= self.density_categories['medium']['min']) & \
(values < self.density_categories['medium']['max'])
classifications[medium_mask] = 2
# Heavy smoke (category 3)
heavy_mask = values >= self.density_categories['heavy']['min']
classifications[heavy_mask] = 3
return classifications
def extract_smoke_polygons(self, lat2d, lon2d, smoke_values, min_area=0.01, is_hms_categorical=False):
"""Extract smoke plume polygons from gridded data with light smoothing for HMS-like shapes"""
polygons = []
if not HMS_LIBS_AVAILABLE:
print("HMS polygon libraries not available")
return []
try:
# Handle HMS categorical data vs concentration data
if is_hms_categorical:
# HMS data is already classified (1=light, 2=medium, 3=heavy)
classifications = smoke_values.astype(int)
print(f"Using HMS categorical data directly: {np.unique(classifications)}")
else:
# Lightly smooth the continuous field before categorization to avoid speckle
try:
from scipy import ndimage
smoothed = ndimage.gaussian_filter(smoke_values.astype(float), sigma=1.0)
except Exception:
smoothed = smoke_values
# Classify smoke densities from concentration values (µg/m³)
classifications = self.classify_smoke_density(smoothed)
# Process each density category
for category_name, category_info in self.density_categories.items():
if category_name == 'light':
category_id = 1
elif category_name == 'medium':
category_id = 2
else: # heavy
category_id = 3
# Create binary mask for this category
category_mask = (classifications == category_id)
if not np.any(category_mask):
continue
# Apply minimal morphology to reduce tiny blocky artifacts
try:
from scipy import ndimage
# Remove isolated noise, close small gaps
smoothed_mask = ndimage.binary_opening(category_mask, iterations=1)
smoothed_mask = ndimage.binary_closing(smoothed_mask, iterations=1)
# Light dilation to smooth edges
smoothed_mask = ndimage.binary_dilation(smoothed_mask, iterations=1)
except Exception:
smoothed_mask = category_mask
# Remove very small components (size threshold by category)
try:
from scipy import ndimage
labeled, num = ndimage.label(smoothed_mask)
sizes = np.bincount(labeled.ravel())
# Heavier categories can retain smaller features; light should be larger
min_pixels = {1: 80, 2: 50, 3: 25}.get(category_id, 40)
remove = sizes < min_pixels
remove_idx = np.nonzero(remove)[0]
if len(remove_idx) > 0:
smoothed_mask[np.isin(labeled, remove_idx)] = False
except Exception:
pass
# Find contours with simpler algorithm
try:
contours = find_contours(smoothed_mask.astype(float), 0.5)
# Limit number of contours for speed
max_contours = 20 if category_name == 'light' else 50
contours = contours[:max_contours]
for contour in contours:
if len(contour) < 4: # Need at least 4 points for polygon
continue
# Simplify contour for speed while retaining shape
step = 2 if category_name == 'heavy' else 3
simplified_contour = contour[::step]
# Convert pixel coordinates to lat/lon
contour_coords = []
for point in simplified_contour:
try:
row, col = int(point[0]), int(point[1])
if 0 <= row < lat2d.shape[0] and 0 <= col < lat2d.shape[1]:
lat = lat2d[row, col]
lon = lon2d[row, col]
if not (np.isnan(lat) or np.isnan(lon)):
contour_coords.append((lon, lat))
except (IndexError, ValueError):
continue
# Ensure polygon is closed
if len(contour_coords) >= 4:
if contour_coords[0] != contour_coords[-1]:
contour_coords.append(contour_coords[0])
try:
# Create Shapely polygon
poly = Polygon(contour_coords)
# Check minimum area threshold
if poly.is_valid and poly.area >= min_area:
# Get average smoke value in polygon (simplified calculation)
avg_smoke = np.mean(smoke_values[category_mask])
polygons.append({
'geometry': poly,
'density_category': category_name,
'density_value': avg_smoke,
'color': category_info['color'],
'description': category_info['description'],
'area_deg2': poly.area
})
except Exception as e:
print(f"Polygon creation error: {e}")
continue
except Exception as e:
print(f"Contour finding error for {category_name}: {e}")
continue
return polygons
except Exception as e:
print(f"Polygon extraction error: {e}")
return []
def create_kml_string(self, polygons, forecast_time, model_run_time):
"""Create KML string for smoke polygons"""
try:
# Create KML root
kml = ET.Element('kml', xmlns='http://www.opengis.net/kml/2.2')
document = ET.SubElement(kml, 'Document')
# Add document info
ET.SubElement(document, 'name').text = f'HRRR-Smoke Forecast {forecast_time}'
ET.SubElement(document, 'description').text = f'Smoke plume polygons from HRRR-Smoke model run {model_run_time}'
# Add styles for each density category
for category_name, category_info in self.density_categories.items():
style = ET.SubElement(document, 'Style', id=f'smoke_{category_name}')
poly_style = ET.SubElement(style, 'PolyStyle')
ET.SubElement(poly_style, 'color').text = self._hex_to_kml_color(category_info['color'])
ET.SubElement(poly_style, 'fill').text = '1'
ET.SubElement(poly_style, 'outline').text = '1'
line_style = ET.SubElement(style, 'LineStyle')
ET.SubElement(line_style, 'color').text = self._hex_to_kml_color(category_info['color'])
ET.SubElement(line_style, 'width').text = '2'
# Add polygons
for i, polygon_data in enumerate(polygons):
placemark = ET.SubElement(document, 'Placemark')
ET.SubElement(placemark, 'name').text = f"Smoke Plume {i+1}"
description = f"""
Density: {polygon_data['description']}
Average Value: {polygon_data['density_value']:.1f} µg/m³
Area: {polygon_data['area_deg2']:.4f} deg²
Forecast Time: {forecast_time}
"""
ET.SubElement(placemark, 'description').text = description.strip()
# Add style reference
ET.SubElement(placemark, 'styleUrl').text = f"#smoke_{polygon_data['density_category']}"
# Add polygon geometry
polygon_elem = ET.SubElement(placemark, 'Polygon')
ET.SubElement(polygon_elem, 'extrude').text = '0'
ET.SubElement(polygon_elem, 'altitudeMode').text = 'clampToGround'
outer_boundary = ET.SubElement(polygon_elem, 'outerBoundaryIs')
linear_ring = ET.SubElement(outer_boundary, 'LinearRing')
# Add coordinates
coords = []
for lon, lat in polygon_data['geometry'].exterior.coords:
coords.append(f"{lon},{lat},0")
ET.SubElement(linear_ring, 'coordinates').text = ' '.join(coords)
# Convert to string with proper formatting
rough_string = ET.tostring(kml, encoding='unicode')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent=" ")
except Exception as e:
print(f"KML creation error: {e}")
return None
def _hex_to_kml_color(self, hex_color):
"""Convert hex color to KML ABGR format"""
# Remove # if present
hex_color = hex_color.lstrip('#')
# Convert RGB to BGR with alpha
r = hex_color[0:2]
g = hex_color[2:4]
b = hex_color[4:6]
# KML uses AABBGGRR format
return f"80{b}{g}{r}" # 80 = 50% opacity
def create_kmz_file(self, polygons, forecast_time, model_run_time, output_path):
"""Create KMZ file with smoke polygons"""
try:
kml_content = self.create_kml_string(polygons, forecast_time, model_run_time)
if kml_content is None:
return None
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as kmz:
kmz.writestr('doc.kml', kml_content)
return output_path
except Exception as e:
print(f"KMZ creation error: {e}")
return None
class HRRRSmokeDataManager:
def __init__(self):
self.temp_dir = tempfile.mkdtemp()
self.forecast_cache = {}
# HRRR-Smoke specific parameters
self.smoke_parameters = {
# Primary smoke parameters
"MASSDEN": {"name": "Smoke Mass Density", "units": "µg/m³", "description": "Total smoke concentration", "colorscale": "OrRd"},
"COLMD": {"name": "Column Smoke Density", "units": "µg/m²", "description": "Integrated column smoke", "colorscale": "Reds"},
# Air Quality Index related
"PMTF": {"name": "PM2.5 Total", "units": "µg/m³", "description": "Fine particulate matter from smoke", "colorscale": "Plasma"},
"PM25": {"name": "PM2.5", "units": "µg/m³", "description": "PM2.5 concentration", "colorscale": "Plasma"},
# Visibility and optical parameters
"VIS": {"name": "Visibility", "units": "km", "description": "Visibility through smoke", "colorscale": "Blues_r"},
"EXTCOF55": {"name": "Extinction Coefficient", "units": "1/km", "description": "Light extinction by smoke", "colorscale": "Greys"},
# Fire and emissions
"FRPAVG": {"name": "Fire Radiative Power", "units": "MW", "description": "Fire intensity", "colorscale": "Hot"},
"EMSABV": {"name": "Emissions Above Surface", "units": "µg/m²/s", "description": "Smoke emissions rate", "colorscale": "YlOrRd"},
# Meteorological support
"WIND": {"name": "Wind Speed", "units": "m/s", "description": "Wind speed affecting smoke transport", "colorscale": "Viridis"},
"WDIR": {"name": "Wind Direction", "units": "degrees", "description": "Wind direction", "colorscale": "hsv"},
"TMP": {"name": "Temperature", "units": "°C", "description": "Air temperature", "colorscale": "RdYlBu_r"},
"RH": {"name": "Relative Humidity", "units": "%", "description": "Humidity affecting smoke dispersion", "colorscale": "Blues"},
}
def fetch_multiple_smoke_parameters(self, params=['MASSDEN', 'VIS', 'COLMD'], level='surface', fxx=6, fast_mode=False):
"""
Fetch multiple HRRR smoke parameters for comprehensive analysis
Based on NOAA best practices for smoke forecasting
"""
datasets = {}
successful_params = []
# Priority order based on NOAA operational use
param_priority = ['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG', 'PMTF']
# Sort requested parameters by priority
sorted_params = [p for p in param_priority if p in params] + [p for p in params if p not in param_priority]
print(f"Fetching multiple smoke parameters: {sorted_params}")
for param in sorted_params:
try:
ds = self.fetch_hrrr_smoke_data(param, level, fxx, return_info=False, fast_mode=fast_mode)
if ds is not None:
# Extract the data values
var_names = list(ds.data_vars)
if var_names:
var_name = var_names[0]
data_values = ds[var_name].values
# Handle different data shapes
if len(data_values.shape) == 3: # Time, lat, lon
data_values = data_values[0] # Take first time step
elif len(data_values.shape) == 4: # Time, level, lat, lon
data_values = data_values[0, 0] # Take first time and level
datasets[param] = data_values
successful_params.append(param)
print(f"Successfully fetched {param}: shape {data_values.shape}")
else:
print(f"Failed to fetch {param}")
except Exception as e:
print(f"Error fetching {param}: {e}")
continue
print(f"Successfully fetched parameters: {successful_params}")
return datasets if datasets else None
def fetch_hrrr_smoke_data(self, param='MASSDEN', level='surface', fxx=6, return_info=False, fast_mode=False):
"""Fetch HRRR-Smoke data using Herbie with optimized fetching strategy"""
if not HERBIE_AVAILABLE:
return (None, None) if return_info else None
# Fast mode - try only most likely parameters and times
if fast_mode:
return self._fast_fetch_hrrr_data(param, level, fxx, return_info)
try:
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
# Optimized: Try fewer time periods first (most recent)
for hours_back in [2, 3, 6]: # Reduced from [2, 3, 6, 12, 18]
try:
target_time = current_time - timedelta(hours=hours_back)
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f"Trying HRRR data for: {date_str}, parameter: {param}")
# Optimized parameter mapping - focus on working parameters
param_mapping = {
'VIS': 'VIS:surface', # Visibility - WORKS
'COLMD': 'COLMD:entire atmosphere', # Column smoke - WORKS
'TMP': 'TMP:2 m', # Temperature
'WIND': 'WIND:10 m', # Wind speed
'RH': 'RH:2 m', # Relative humidity
'WDIR': 'WDIR:10 m', # Wind direction
# Skip problematic parameters for speed
'MASSDEN': 'VIS:surface', # Fallback to visibility
'PM25': 'VIS:surface', # Fallback to visibility
}
# Optimized: Try only surface products first
products_to_try = ['sfc', 'nat'] # Reduced from ['sfc', 'nat', 'subh']
for product in products_to_try:
try:
# Create Herbie object
H = Herbie(date_str, model='hrrr', product=product, fxx=fxx)
# Use mapped parameter or original
search_param = param_mapping.get(param, f"{param}:surface")
print(f" Trying product '{product}' with parameter '{search_param}'")
# Download specific parameter
ds = H.xarray(search_param)
if ds is not None:
print(f"SUCCESS: Got HRRR data for {date_str} using product '{product}'")
if return_info:
info = {
'date_str': date_str,
'param': param,
'search_param': search_param,
'product': product,
'level': level,
'fxx': fxx,
'model_run': H.date.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': (H.date + timedelta(hours=fxx)).strftime('%Y-%m-%d %H:00 UTC')
}
return ds, info
else:
return ds
except Exception as e:
print(f" Failed with product '{product}': {e}")
continue
# If we get here, all products failed for this time
print(f" All products failed for {date_str}")
except Exception as e:
print(f"Failed for {date_str}: {e}")
continue
print("Optimized HRRR attempts failed, trying fallback...")
return self._fallback_fetch_hrrr_data(param, level, fxx, return_info)
except Exception as e:
print(f"HRRR fetch error: {e}")
return (None, None) if return_info else None
def _fast_fetch_hrrr_data(self, param='VIS', level='surface', fxx=6, return_info=False):
"""Ultra-fast HRRR data fetching - try only most reliable parameters"""
try:
current_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
target_time = current_time - timedelta(hours=2) # Only try most recent
date_str = target_time.strftime('%Y-%m-%d %H:00')
print(f"FAST MODE: Trying HRRR data for: {date_str}, parameter: VIS")
# Only try visibility (most reliable) in fast mode
H = Herbie(date_str, model='hrrr', product='sfc', fxx=fxx)
ds = H.xarray('VIS:surface')
if ds is not None:
print(f"FAST MODE SUCCESS: Got HRRR data for {date_str}")
if return_info:
info = {
'date_str': date_str,
'param': 'VIS',
'search_param': 'VIS:surface',
'product': 'sfc',
'level': level,
'fxx': fxx,
'model_run': H.date.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': (H.date + timedelta(hours=fxx)).strftime('%Y-%m-%d %H:00 UTC')
}
return ds, info
else:
return ds
print("FAST MODE: No data available")
return (None, None) if return_info else None
except Exception as e:
print(f"FAST MODE error: {e}")
return (None, None) if return_info else None
def _fallback_fetch_hrrr_data(self, param, level, fxx, return_info):
"""Fallback to synthetic data when HRRR fails"""
print("Generating synthetic smoke data for demo...")
# Create synthetic visibility/smoke data
lats = np.linspace(25, 50, 100)
lons = np.linspace(-130, -65, 150)
lon2d, lat2d = np.meshgrid(lons, lats)
# Generate synthetic smoke pattern based on known fire regions
smoke_data = np.ones_like(lat2d) * 16000 # Base visibility 16km
# Add synthetic smoke plumes (reduced visibility)
fire_regions = [
(39.0, -120.0, 8000), # California
(47.0, -114.0, 12000), # Montana
(44.0, -103.0, 10000), # North Dakota
(33.0, -117.0, 6000), # Southern California
]
for fire_lat, fire_lon, visibility in fire_regions:
dist = np.sqrt((lat2d - fire_lat)**2 * 1.5 + (lon2d - fire_lon)**2)
intensity = np.exp(-dist**2 / 8.0)
smoke_data = np.minimum(smoke_data, visibility + intensity * 8000)
# Create xarray dataset
import xarray as xr
current_time = datetime.utcnow()
ds = xr.Dataset({
'vis': (['y', 'x'], smoke_data)
}, coords={
'latitude': (['y', 'x'], lat2d),
'longitude': (['y', 'x'], lon2d),
'time': current_time
})
if return_info:
info = {
'date_str': current_time.strftime('%Y-%m-%d %H:00'),
'param': param,
'search_param': 'SYNTHETIC',
'product': 'synthetic',
'level': level,
'fxx': fxx,
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': current_time.strftime('%Y-%m-%d %H:00 UTC')
}
return ds, info
else:
return ds
return (None, None) if return_info else None
def generate_demo_smoke_data(self, param='MASSDEN', fxx=6):
"""Generate synthetic smoke data for demonstration when real data unavailable"""
print(f"Generating demo smoke data for {param}")
# Create synthetic grid over CONUS
lats = np.linspace(25, 50, 50)
lons = np.linspace(-125, -70, 60)
lon_grid, lat_grid = np.meshgrid(lons, lats)
# Generate realistic smoke patterns
np.random.seed(42 + fxx) # Consistent but time-varying patterns
# Base pattern with fire hotspots
values = np.zeros_like(lat_grid)
# Add some fire/smoke hotspots (western US pattern)
hotspots = [
(40.0, -120.0, 50.0), # Northern California
(44.0, -115.0, 30.0), # Idaho
(46.0, -110.0, 25.0), # Montana
(37.0, -105.0, 20.0), # Colorado
]
for hot_lat, hot_lon, intensity in hotspots:
# Distance-based falloff
dist = np.sqrt((lat_grid - hot_lat)**2 + (lon_grid - hot_lon)**2)
hotspot_values = intensity * np.exp(-dist**2 / 8.0)
# Add wind dispersion (eastward bias)
wind_shift = np.exp(-(lat_grid - hot_lat)**2 / 4.0) * np.exp(-np.maximum(0, hot_lon - lon_grid)**2 / 15.0)
hotspot_values += intensity * 0.3 * wind_shift
values += hotspot_values
# Add time evolution (smoke transport and decay)
time_factor = max(0.1, 1.0 - fxx * 0.05) # Decay over time
values *= time_factor
# Add some random variability
values += np.random.normal(0, 2, values.shape)
values = np.maximum(0, values) # No negative smoke
# Parameter-specific scaling
if param == 'MASSDEN':
values *= 1.0 # µg/m³
elif param in ['PM25', 'PMTF']:
values *= 0.7 # PM2.5 typically lower
elif param == 'VIS':
values = 20 - values * 0.3 # Visibility decreases with smoke
values = np.maximum(1, values) # Minimum 1 km visibility
elif param == 'COLMD':
values *= 5.0 # Column density higher
# Create xarray dataset
ds = xr.Dataset({
param: (['lat', 'lon'], values)
}, coords={
'lat': lats,
'lon': lons
})
ds[param].attrs['units'] = self.smoke_parameters.get(param, {}).get('units', 'µg/m³')
ds[param].attrs['long_name'] = self.smoke_parameters.get(param, {}).get('name', param)
return ds
def generate_multiple_demo_parameters(self, params=['MASSDEN', 'VIS', 'COLMD'], fxx=6):
"""Generate multiple synthetic smoke parameters for comprehensive demo"""
print(f"Generating demo data for multiple parameters: {params}")
datasets = {}
# Create base synthetic grid
lats = np.linspace(25, 50, 50)
lons = np.linspace(-125, -70, 60)
lon_grid, lat_grid = np.meshgrid(lons, lats)
# Generate base smoke pattern
np.random.seed(42 + fxx)
base_smoke = np.zeros_like(lat_grid)
# NOAA-style fire hotspots with realistic intensities
hotspots = [
(40.0, -120.0, 45.0), # Northern California - heavy smoke
(44.0, -115.0, 28.0), # Idaho - medium smoke
(46.0, -110.0, 18.0), # Montana - light smoke
(37.0, -105.0, 32.0), # Colorado - medium-heavy smoke
(42.0, -118.0, 12.0), # Nevada - light smoke
]
for hot_lat, hot_lon, intensity in hotspots:
dist = np.sqrt((lat_grid - hot_lat)**2 + (lon_grid - hot_lon)**2)
hotspot_values = intensity * np.exp(-dist**2 / 8.0)
# Add wind dispersion (eastward)
wind_shift = np.exp(-(lat_grid - hot_lat)**2 / 4.0) * np.exp(-np.maximum(0, hot_lon - lon_grid)**2 / 15.0)
hotspot_values += intensity * 0.3 * wind_shift
base_smoke += hotspot_values
# Add time evolution and variability
time_factor = max(0.1, 1.0 - fxx * 0.05)
base_smoke *= time_factor
base_smoke += np.random.normal(0, 1, base_smoke.shape)
base_smoke = np.maximum(0, base_smoke)
# Generate parameter-specific data
for param in params:
if param == 'MASSDEN':
# Near-surface smoke mass density (µg/m³)
datasets[param] = base_smoke * 1.0
elif param == 'PM25':
# PM2.5 concentration (typically 70% of MASSDEN)
datasets[param] = base_smoke * 0.7
elif param == 'COLMD':
# Column mass density (higher than surface)
datasets[param] = base_smoke * 5.0
elif param == 'VIS':
# Visibility (decreases with smoke) - NOAA operational relationships
visibility = 20000 - base_smoke * 300 # Start at 20km, reduce with smoke
datasets[param] = np.maximum(500, visibility) # Minimum 500m visibility
elif param == 'FRPAVG':
# Fire Radiative Power (MW) - only at hotspot locations
frp = np.zeros_like(base_smoke)
for hot_lat, hot_lon, intensity in hotspots:
dist = np.sqrt((lat_grid - hot_lat)**2 + (lon_grid - hot_lon)**2)
fire_power = (intensity / 10.0) * np.exp(-dist**2 / 2.0) # More localized
frp += fire_power
datasets[param] = frp
elif param == 'PMTF':
# PM2.5 Total (similar to PM25 but slightly higher)
datasets[param] = base_smoke * 0.8
else:
# Default: use base smoke pattern
datasets[param] = base_smoke
print(f"Generated {param}: range {datasets[param].min():.1f} - {datasets[param].max():.1f}")
return datasets
def process_smoke_data(self, ds, max_points=1000, param_type='smoke'):
"""Process HRRR-Smoke xarray dataset into visualization-ready data"""
if ds is None:
return None
try:
# Get the main data variable
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Get coordinates
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lats = ds.latitude.values
lons = ds.longitude.values
values = data_var.values
elif 'lat' in ds.coords and 'lon' in ds.coords:
lats = ds.lat.values
lons = ds.lon.values
values = data_var.values
else:
return None
# For smoke data, we want to show more detail
if param_type == 'smoke':
max_points = 2000 # More points for smoke detail
min_threshold = 0.1 # Filter very low concentrations
else:
min_threshold = None
# Subsample if needed
if lats.size > max_points:
step = max(1, int(np.sqrt(lats.size / max_points)))
if len(lats.shape) == 2:
lats = lats[::step, ::step]
lons = lons[::step, ::step]
values = values[::step, ::step]
else:
lats = lats[::step]
lons = lons[::step]
values = values[::step]
# Flatten arrays
lats_flat = lats.flatten()
lons_flat = lons.flatten()
values_flat = values.flatten()
# Remove invalid values
valid = ~(np.isnan(values_flat) | np.isnan(lats_flat) | np.isnan(lons_flat))
# For smoke, filter low concentrations
if param_type == 'smoke' and min_threshold is not None:
smoke_threshold = values_flat > min_threshold
valid = valid & smoke_threshold
if not np.any(valid):
return None
return {
'lats': lats_flat[valid],
'lons': lons_flat[valid],
'values': values_flat[valid],
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Smoke data processing error: {e}")
return None
def process_smoke_grid(self, ds, target_cells=25000, param_type='smoke', min_threshold=0.1):
"""Process HRRR-Smoke data as 2D grid for contour plotting"""
if ds is None:
return None
try:
var_names = list(ds.data_vars)
if not var_names:
return None
var_name = var_names[0]
data_var = ds[var_name]
# Get coordinate grids
if 'latitude' in ds.coords and 'longitude' in ds.coords:
lat2d = ds.latitude.values
lon2d = ds.longitude.values
elif 'lat' in ds.coords and 'lon' in ds.coords:
lat = ds.lat.values
lon = ds.lon.values
if lat.ndim == 1 and lon.ndim == 1:
lon2d, lat2d = np.meshgrid(lon, lat)
else:
lat2d = lat
lon2d = lon
else:
return None
z = data_var.values
z = np.squeeze(z)
if z.ndim != 2:
return None
# Subsample for performance
ny, nx = z.shape
total = nx * ny
if total > target_cells:
step = int(np.ceil(np.sqrt(total / target_cells)))
step = max(1, step)
z = z[::step, ::step]
lat2d = lat2d[::step, ::step]
lon2d = lon2d[::step, ::step]
# Mask low values for smoke
if param_type == 'smoke' and min_threshold is not None:
z = np.where(z >= min_threshold, z, np.nan)
return {
'lat2d': lat2d,
'lon2d': lon2d,
'z2d': z,
'units': data_var.attrs.get('units', ''),
'long_name': data_var.attrs.get('long_name', var_name),
'param_type': param_type
}
except Exception as e:
print(f"Smoke grid processing error: {e}")
return None
class SmokeNarrativeGenerator:
"""Generate narrative smoke forecasts"""
def __init__(self):
pass
def get_aqi_category(self, pm25_value):
"""Convert PM2.5 to AQI category"""
if pm25_value <= 12:
return "Good", "green", "Air quality is satisfactory"
elif pm25_value <= 35.4:
return "Moderate", "yellow", "Acceptable for most people"
elif pm25_value <= 55.4:
return "Unhealthy for Sensitive Groups", "orange", "Sensitive people may experience symptoms"
elif pm25_value <= 150.4:
return "Unhealthy", "red", "Everyone may experience health effects"
elif pm25_value <= 250.4:
return "Very Unhealthy", "purple", "Health warnings for everyone"
else:
return "Hazardous", "maroon", "Emergency conditions - avoid all outdoor activity"
def get_visibility_impact(self, visibility_km):
"""Describe visibility impacts"""
if visibility_km >= 15:
return "Excellent visibility"
elif visibility_km >= 10:
return "Good visibility with minor haze"
elif visibility_km >= 5:
return "Moderate visibility - noticeable smoke haze"
elif visibility_km >= 2:
return "Poor visibility - heavy smoke conditions"
else:
return "Very poor visibility - dense smoke"
def get_smoke_density_description(self, density):
"""Describe smoke density levels"""
if density < 5:
return "Light smoke"
elif density < 25:
return "Moderate smoke"
elif density < 75:
return "Heavy smoke"
else:
return "Very heavy smoke"
def assess_health_risk(self, pm25_max, visibility_min, duration_hours):
"""Assess health risks from smoke exposure"""
risks = []
risk_level = "low"
# PM2.5 health assessment
if pm25_max > 250:
risks.append("HAZARDOUS air quality - avoid all outdoor activities")
risk_level = "extreme"
elif pm25_max > 150:
risks.append("Very unhealthy air - limit outdoor exposure for everyone")
risk_level = "high"
elif pm25_max > 55:
risks.append("Unhealthy air quality - sensitive individuals should stay indoors")
risk_level = "moderate"
elif pm25_max > 35:
risks.append("Moderate air quality - sensitive groups should limit prolonged outdoor exertion")
risk_level = "low"
# Visibility safety
if visibility_min < 2:
risks.append("Dangerous driving conditions due to poor visibility")
if risk_level == "low":
risk_level = "moderate"
elif visibility_min < 5:
risks.append("Reduced visibility may affect driving safety")
# Duration consideration
if duration_hours > 12 and pm25_max > 35:
risks.append("Extended exposure period increases health risks")
return risks, risk_level
def generate_smoke_forecast(self, forecast_data, location="Selected Location"):
"""Generate comprehensive smoke forecast narrative"""
if not forecast_data:
return "Smoke forecast data is not available at this time."
# Extract key metrics
pm25_values = []
visibility_values = []
smoke_density_values = []
for param_info in forecast_data:
param_name = param_info['parameter']
for data_point in param_info['data']:
hour = data_point['step']
if hour <= 48: # Focus on 48-hour forecast
if param_name in ['PM25', 'PMTF']:
pm25_values.append(data_point['value'])
elif param_name == 'VIS':
visibility_values.append(data_point['value'])
elif param_name == 'MASSDEN':
smoke_density_values.append(data_point['value'])
if not any([pm25_values, visibility_values, smoke_density_values]):
return "Insufficient smoke data to generate forecast."
# Calculate key statistics
pm25_max = max(pm25_values) if pm25_values else 0
pm25_avg = sum(pm25_values) / len(pm25_values) if pm25_values else 0
visibility_min = min(visibility_values) if visibility_values else 20
smoke_max = max(smoke_density_values) if smoke_density_values else 0
# Generate narrative
forecast_text = f"# 🌬️ **48-Hour Smoke Forecast for {location}**\n\n"
# Overall air quality assessment
aqi_category, aqi_color, aqi_desc = self.get_aqi_category(pm25_max)
forecast_text += f"## 🚨 **Air Quality Alert: {aqi_category}**\n\n"
forecast_text += f"**Peak PM2.5:** {pm25_max:.1f} µg/m³ - {aqi_desc}\n\n"
# Health risk assessment
health_risks, risk_level = self.assess_health_risk(pm25_max, visibility_min, 48)
if health_risks:
risk_emoji = {"extreme": "🚨", "high": "⚠️", "moderate": "⚡", "low": "ℹ️"}.get(risk_level, "ℹ️")
forecast_text += f"{risk_emoji} **Health Recommendations:**\n\n"
for risk in health_risks:
forecast_text += f"• {risk}\n"
forecast_text += "\n"
# Smoke conditions overview
forecast_text += "## 🌫️ **Smoke Conditions Summary**\n\n"
visibility_desc = self.get_visibility_impact(visibility_min)
smoke_desc = self.get_smoke_density_description(smoke_max)
forecast_text += f"**Smoke Intensity:** {smoke_desc} (peak: {smoke_max:.1f} µg/m³)\n"
forecast_text += f"**Visibility Impact:** {visibility_desc} (minimum: {visibility_min:.1f} km)\n"
forecast_text += f"**Average PM2.5:** {pm25_avg:.1f} µg/m³ over forecast period\n\n"
# Detailed timeline (simplified for key periods)
forecast_text += "## ⏰ **48-Hour Timeline**\n\n"
# Group data by time periods
periods = [
(0, 6, "Next 6 hours"),
(6, 12, "6-12 hours"),
(12, 24, "12-24 hours"),
(24, 48, "24-48 hours")
]
for start_hour, end_hour, period_name in periods:
period_pm25 = []
period_visibility = []
for param_info in forecast_data:
param_name = param_info['parameter']
for data_point in param_info['data']:
hour = data_point['step']
if start_hour <= hour < end_hour:
if param_name in ['PM25', 'PMTF']:
period_pm25.append(data_point['value'])
elif param_name == 'VIS':
period_visibility.append(data_point['value'])
if period_pm25 or period_visibility:
avg_pm25 = sum(period_pm25) / len(period_pm25) if period_pm25 else 0
avg_vis = sum(period_visibility) / len(period_visibility) if period_visibility else 20
period_aqi, _, period_desc = self.get_aqi_category(avg_pm25)
forecast_text += f"**{period_name}:** {period_aqi} air quality "
forecast_text += f"(PM2.5: {avg_pm25:.1f} µg/m³, Visibility: {avg_vis:.1f} km)\n"
forecast_text += "\n"
# Protective actions
forecast_text += "## 🛡️ **Protective Actions**\n\n"
if pm25_max > 150:
forecast_text += "• **Stay indoors** with windows and doors closed\n"
forecast_text += "• **Use air purifiers** or create a clean air room\n"
forecast_text += "• **Avoid all outdoor activities** including exercise\n"
forecast_text += "• **Wear N95 masks** if you must go outside\n"
elif pm25_max > 55:
forecast_text += "• **Limit outdoor activities**, especially strenuous exercise\n"
forecast_text += "• **Sensitive individuals** should stay indoors\n"
forecast_text += "• **Consider wearing masks** when outdoors\n"
forecast_text += "• **Keep windows closed** and use air conditioning on recirculate\n"
elif pm25_max > 35:
forecast_text += "• **Sensitive groups** should limit prolonged outdoor exertion\n"
forecast_text += "• **Monitor air quality** throughout the day\n"
forecast_text += "• **Consider indoor activities** during peak smoke hours\n"
else:
forecast_text += "• **Normal outdoor activities** are generally acceptable\n"
forecast_text += "• **Monitor conditions** as smoke can change rapidly\n"
if visibility_min < 5:
forecast_text += "• **Drive with caution** - use headlights and reduce speed\n"
forecast_text += "• **Avoid unnecessary travel** during poorest visibility\n"
forecast_text += "\n"
# Data source and disclaimer
forecast_text += "---\n\n"
forecast_text += "*This forecast is based on NOAA HRRR-Smoke model data. "
forecast_text += "Smoke conditions can change rapidly due to fire behavior and weather. "
forecast_text += "Check for updates frequently and follow local emergency guidance.*\n\n"
forecast_text += f"**Generated:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
return forecast_text
class HRRRSmokeApp:
def __init__(self):
self.smoke_manager = HRRRSmokeDataManager()
self.narrative_generator = SmokeNarrativeGenerator()
self.polygon_generator = HMSSmokePolygonGenerator()
self.folium_renderer = FoliumSmokeRenderer() # Initialize folium renderer
def create_smoke_map(self, param_type, forecast_hour, detail_level=3, min_threshold=0.1, show_polygons=False, demo_mode=False):
"""Create smoke forecast map"""
try:
# Get parameter info
param_info = self.smoke_manager.smoke_parameters.get(param_type, {})
param_name = param_info.get('name', param_type)
colorscale = param_info.get('colorscale', 'OrRd')
print(f"Fetching HRRR-Smoke {param_type} for +{forecast_hour}h")
# Handle Combined Smoke mode
if param_type == 'COMBINED':
print("Using NOAA Combined Smoke methodology")
if demo_mode:
print("Demo mode: generating multiple synthetic parameters")
# Generate multiple synthetic parameters
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour
)
combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets)
# Create synthetic xarray dataset
lats = np.linspace(25, 50, 50)
lons = np.linspace(-125, -70, 60)
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
# Fetch multiple real parameters
datasets = self.smoke_manager.fetch_multiple_smoke_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], 'surface', forecast_hour, fast_mode=True
)
if datasets:
combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets)
# Create combined xarray dataset with proper HRRR grid coordinates
print(f"Main forecast: Creating coordinates for HRRR grid: {combined_smoke.shape}")
lats = np.linspace(20.192, 52.863, combined_smoke.shape[0]) # HRRR CONUS lat range
lons = np.linspace(-134.096, -60.917, combined_smoke.shape[1]) # HRRR CONUS lon range
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
print("Failed to fetch multiple parameters, falling back to demo")
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour
)
combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets)
lats = np.linspace(25, 50, 50)
lons = np.linspace(-125, -70, 60)
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
info = {
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'),
'product': 'combined'
}
else:
# Single parameter mode (existing logic)
if demo_mode:
print("Demo mode enabled - using synthetic data")
ds = None # Force synthetic data
info = None
else:
ds, info = self.smoke_manager.fetch_hrrr_smoke_data(param_type, 'surface', forecast_hour, return_info=True, fast_mode=True)
# If real data failed, use demo data
if ds is None:
print("Real HRRR data unavailable, using demo smoke data")
ds = self.smoke_manager.generate_demo_smoke_data(param_type, forecast_hour)
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
info = {
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'),
'product': 'demo'
}
fig = go.Figure()
if ds is not None:
# Process as grid for contour plotting
detail_to_cells = {1: 15000, 2: 25000, 3: 35000, 4: 50000, 5: 75000}
target_cells = detail_to_cells.get(int(detail_level), 35000)
grid_data = self.smoke_manager.process_smoke_grid(
ds, target_cells=target_cells, param_type='smoke', min_threshold=min_threshold
)
if grid_data is not None:
lat2d = grid_data['lat2d']
lon2d = grid_data['lon2d']
z2d = grid_data['z2d']
units = grid_data['units']
print(f"Plotting smoke grid: {z2d.shape[0]}x{z2d.shape[1]} cells")
# Generate HMS-style polygons if requested (optimized for speed)
polygons = []
if show_polygons and param_type in ['MASSDEN', 'PM25', 'PMTF', 'HMS'] and HMS_LIBS_AVAILABLE:
try:
# Check if this is HMS categorical data
is_hms = (param_type == 'HMS')
polygons = self.polygon_generator.extract_smoke_polygons(
lat2d, lon2d, z2d,
min_area=0.02, # Larger min area for speed
is_hms_categorical=is_hms
)
print(f"Generated {len(polygons)} HMS-style smoke polygons")
except Exception as e:
print(f"Polygon generation error: {e}")
polygons = []
# Add HMS-style polygon overlays if requested
if polygons:
for i, poly_data in enumerate(polygons):
try:
# Get polygon coordinates
coords = list(poly_data['geometry'].exterior.coords)
poly_lons = [c[0] for c in coords]
poly_lats = [c[1] for c in coords]
# Add polygon as scatter trace
fig.add_trace(go.Scatter(
x=poly_lons,
y=poly_lats,
mode='lines',
line=dict(
color=poly_data['color'],
width=3
),
fill='toself',
fillcolor=f"rgba({int(poly_data['color'][1:3], 16)}, {int(poly_data['color'][3:5], 16)}, {int(poly_data['color'][5:7], 16)}, 0.3)",
name=f"{poly_data['description']} ({poly_data['density_value']:.1f} µg/m³)",
hovertemplate=f"<b>{poly_data['description']}</b><br>" +
f"Density: {poly_data['density_value']:.1f} µg/m³<br>" +
f"Area: {poly_data['area_deg2']:.4f} deg²<extra></extra>"
))
except Exception as e:
print(f"Error adding polygon {i}: {e}")
continue
# Try contour plotting
try:
opacity = 0.6 if polygons else 1.0 # Reduce opacity if polygons are shown
# Use 2D z with 1D axes for rectilinear grids (avoids jagged artifacts)
if lat2d.ndim == 2 and lon2d.ndim == 2:
x_axis = lon2d[0, :]
y_axis = lat2d[:, 0]
else:
x_axis = lon2d
y_axis = lat2d
fig.add_trace(go.Contour(
x=x_axis,
y=y_axis,
z=z2d,
colorscale=colorscale,
contours=dict(coloring='heatmap'),
showscale=True,
opacity=opacity,
colorbar=dict(
title=f"{param_name} ({units})",
x=1.02,
len=0.8
),
hovertemplate=f'{param_name}: %{{z:.1f}} {units}<extra></extra>',
name=f"HRRR-Smoke {param_name}"
))
# Update layout for geographic projection
data_source = "Demo Data" if info.get('product') == 'demo' else "HRRR Model"
fig.update_layout(
xaxis_title="Longitude",
yaxis_title="Latitude",
title=f"{data_source}: {param_name} Forecast (+{forecast_hour}h)<br>{info['model_run']}{info['valid_time']}",
height=600
)
# Return appropriate map type
if map_type == 'folium' and polygons:
# Create folium map with grayscale smoke polygons
folium_map = self.folium_renderer.create_folium_map(polygons)
if folium_map:
return folium_map
else:
# Fallback to plotly if folium fails
return fig
else:
return fig
except Exception as e:
print(f"Contour plot failed, trying scatter: {e}")
# Fallback to scatter plot
processed = self.smoke_manager.process_smoke_data(ds, max_points=2000, param_type='smoke')
if processed is not None:
fig.add_trace(go.Scattermapbox(
lat=processed['lats'],
lon=processed['lons'],
mode='markers',
marker=dict(
size=6,
color=processed['values'],
colorscale=colorscale,
showscale=True,
colorbar=dict(
title=f"{param_name} ({units})",
x=1.02,
len=0.8
),
opacity=0.8
),
text=[f"{v:.1f} {units}" for v in processed['values']],
hovertemplate='<b>%{text}</b><extra></extra>',
name=f"HRRR-Smoke {param_name}"
))
data_source = "Demo Data" if info.get('product') == 'demo' else "HRRR Model"
fig.update_layout(
mapbox=dict(
style="open-street-map",
zoom=5,
center=dict(lat=39.5, lon=-98.5)
),
height=600,
title=f"{data_source}: {param_name} Forecast (+{forecast_hour}h)<br>{info['model_run']}{info['valid_time']}",
margin=dict(l=0, r=100, t=80, b=0)
)
else:
fig.add_annotation(
text=f"No {param_name} data above threshold ({min_threshold})",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False,
font=dict(size=14)
)
else:
fig.add_annotation(
text="HRRR-Smoke data temporarily unavailable<br>Try different parameters or forecast hours",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False,
font=dict(size=16)
)
# Return appropriate map type
if map_type == 'folium':
# Return empty folium map for errors in folium mode
return folium.Map(location=[39.5, -98.5], zoom_start=5)
else:
return fig
except Exception as e:
print(f"Smoke map creation error: {e}")
# Return error figure
fig = go.Figure()
fig.add_annotation(
text=f"Error: {str(e)[:100]}",
x=0.5, y=0.5,
xref="paper", yref="paper",
showarrow=False
)
fig.update_layout(height=400, title="Error Loading Smoke Data")
return fig
def get_smoke_forecast(self, latitude, longitude):
"""Get comprehensive smoke forecast for a location"""
try:
forecast_data = []
forecast_steps = [0, 3, 6, 9, 12, 18, 24, 30, 36, 42, 48]
# Key smoke parameters for forecast
key_params = ['MASSDEN', 'PM25', 'VIS', 'COLMD']
for param in key_params:
if param not in self.smoke_manager.smoke_parameters:
continue
param_data = []
for step in forecast_steps:
try:
ds = self.smoke_manager.fetch_hrrr_smoke_data(param, 'surface', step)
if ds is not None:
# Extract point data (simplified - would need proper interpolation)
var_names = list(ds.data_vars)
if var_names:
data_var = ds[var_names[0]]
# Use mean as approximation for point value
value = float(np.nanmean(data_var.values))
if not np.isnan(value):
param_data.append({
'step': step,
'value': value,
'datetime': datetime.utcnow() + timedelta(hours=step)
})
except Exception as e:
print(f"Error getting {param} at step {step}: {e}")
continue
if param_data:
forecast_data.append({
'parameter': param,
'name': self.smoke_manager.smoke_parameters[param]['name'],
'units': self.smoke_manager.smoke_parameters[param]['units'],
'data': param_data
})
return forecast_data
except Exception as e:
print(f"Forecast generation error: {e}")
return []
def generate_kmz_download(self, param_type, forecast_hour):
"""Generate KMZ file for download - optimized for speed"""
try:
# Get smoke data
ds, info = self.smoke_manager.fetch_hrrr_smoke_data(param_type, 'surface', forecast_hour, return_info=True)
if ds is None:
ds = self.smoke_manager.generate_demo_smoke_data(param_type, forecast_hour)
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
info = {
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'),
'product': 'demo'
}
if ds is not None and param_type in ['MASSDEN', 'PM25', 'PMTF', 'COMBINED', 'HMS'] and HMS_LIBS_AVAILABLE:
# Process as grid with reduced resolution for speed
grid_data = self.smoke_manager.process_smoke_grid(
ds, target_cells=8000, param_type='smoke', min_threshold=1.0 # Higher threshold, fewer cells
)
if grid_data is not None:
# Generate polygons with larger minimum area for speed
# Check if this is HMS categorical data
is_hms = (param_type == 'HMS')
polygons = self.polygon_generator.extract_smoke_polygons(
grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'],
min_area=0.02, # 4x larger min area
is_hms_categorical=is_hms
)
if polygons:
# Create KMZ file
output_path = os.path.join(self.smoke_manager.temp_dir, f'hrrr_smoke_{param_type}_f{forecast_hour:02d}.kmz')
kmz_path = self.polygon_generator.create_kmz_file(
polygons, info['valid_time'], info['model_run'], output_path
)
if kmz_path and os.path.exists(kmz_path):
return kmz_path
return None
except Exception as e:
print(f"KMZ generation error: {e}")
return None
def quick_kmz_generation(self, forecast_hour, parameter="MASSDEN", demo_mode=False):
"""Ultra-fast KMZ generation for ALL AMERICA smoke plumes"""
try:
print(f"Quick KMZ generation for {parameter} at +{forecast_hour}h - FULL AMERICA DOMAIN")
# Get smoke data with fast mode - handle COMBINED and HMS parameters
if parameter == 'HMS':
print("Quick KMZ: Using HMS-style smoke detection methodology")
if demo_mode:
print("Quick KMZ Demo mode: generating HMS-style synthetic data")
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['VIS', 'MASSDEN', 'PM25'], forecast_hour
)
hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets)
# Create synthetic xarray dataset covering ALL AMERICA
lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain
lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
# Fetch HMS-relevant parameters for real HMS-style detection
print("Quick KMZ: Fetching HRRR parameters for HMS-style smoke detection")
datasets = self.smoke_manager.fetch_multiple_smoke_parameters(
['VIS', 'MASSDEN', 'PM25'], 'surface', forecast_hour, fast_mode=True
)
if datasets:
print(f"Quick KMZ: Successfully fetched {len(datasets)} parameters: {list(datasets.keys())}")
hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets)
if hms_smoke is not None:
# Create coordinates for FULL AMERICA HRRR CONUS domain
print(f"Creating coordinates for HMS-style HRRR grid: {hms_smoke.shape}")
lats = np.linspace(20.192, 52.863, hms_smoke.shape[0]) # From Mexico border to Canada
lons = np.linspace(-134.096, -60.917, hms_smoke.shape[1]) # From Pacific to Atlantic
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
print(f"Quick KMZ: Created HMS-style dataset with shape {hms_smoke.shape}")
else:
print("Quick KMZ: HMS-style detection failed, using demo data")
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['VIS', 'MASSDEN', 'PM25'], forecast_hour
)
hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets)
lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain
lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
print("Quick KMZ: Using demo data for HMS mode")
else:
print("Quick KMZ: No real parameters fetched, using demo data")
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['VIS', 'MASSDEN', 'PM25'], forecast_hour
)
hms_smoke = self.polygon_generator.hms_style_smoke_detection(datasets)
lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain
lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
print("Quick KMZ: Using demo data for HMS mode")
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
info = {
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'),
'product': 'hms'
}
elif parameter == 'COMBINED':
print("Quick KMZ: Using NOAA Combined Smoke methodology")
if demo_mode:
print("Quick KMZ Demo mode: generating multiple synthetic parameters")
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour
)
combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets)
# Create synthetic xarray dataset covering ALL AMERICA
lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain
lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
# Fetch multiple real parameters for Quick KMZ
print("Quick KMZ: Fetching multiple HRRR parameters for COMBINED mode")
datasets = self.smoke_manager.fetch_multiple_smoke_parameters(
['VIS', 'COLMD', 'MASSDEN', 'PM25', 'FRPAVG'], 'surface', forecast_hour, fast_mode=True
)
if datasets:
print(f"Quick KMZ: Successfully fetched {len(datasets)} parameters: {list(datasets.keys())}")
combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets)
# Create coordinates for FULL AMERICA HRRR CONUS domain
print(f"Creating coordinates for FULL AMERICA HRRR grid: {combined_smoke.shape}")
# HRRR CONUS covers all of continental United States + southern Canada + northern Mexico
lats = np.linspace(20.192, 52.863, combined_smoke.shape[0]) # From Mexico border to Canada
lons = np.linspace(-134.096, -60.917, combined_smoke.shape[1]) # From Pacific to Atlantic
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
print(f"Quick KMZ: Created combined dataset with shape {combined_smoke.shape}")
else:
print("Quick KMZ: No real parameters fetched, using demo data")
datasets = self.smoke_manager.generate_multiple_demo_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour
)
combined_smoke = self.polygon_generator.combine_smoke_parameters(datasets)
lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain
lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
print("Quick KMZ: Using demo data for COMBINED mode")
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
info = {
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'),
'product': 'combined'
}
else:
# Single parameter mode (existing logic) - Skip HMS since it's already handled above
if demo_mode:
print("Quick KMZ Demo mode enabled - using synthetic data")
ds = None
info = None
else:
ds, info = self.smoke_manager.fetch_hrrr_smoke_data(parameter, 'surface', forecast_hour, return_info=True, fast_mode=True)
if ds is None:
ds = self.smoke_manager.generate_demo_smoke_data(parameter, forecast_hour)
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
info = {
'model_run': current_time.strftime('%Y-%m-%d %H:00 UTC'),
'valid_time': forecast_time.strftime('%Y-%m-%d %H:00 UTC'),
'product': 'demo'
}
if ds is not None and parameter in ['MASSDEN', 'PM25', 'PMTF', 'COMBINED', 'HMS'] and HMS_LIBS_AVAILABLE:
# Ultra-fast processing: optimized for ALL AMERICA coverage
grid_data = self.smoke_manager.process_smoke_grid(
ds, target_cells=6000, param_type='smoke', min_threshold=0.0 # Zero threshold + more cells for full coverage
)
if grid_data is not None:
# Quick polygon generation optimized for continental-scale smoke detection
# Check if this is HMS categorical data
is_hms = (parameter == 'HMS')
polygons = self.polygon_generator.extract_smoke_polygons(
grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'],
min_area=0.005, # Even smaller area for continental coverage
is_hms_categorical=is_hms
)
if polygons:
# Create KMZ file
output_path = os.path.join(self.smoke_manager.temp_dir, f'quick_smoke_{parameter}_f{forecast_hour:02d}.kmz')
kmz_path = self.polygon_generator.create_kmz_file(
polygons, info['valid_time'], info['model_run'], output_path
)
if kmz_path and os.path.exists(kmz_path):
if parameter == 'HMS':
status = f"🛰️ **HMS-Style Smoke Forecast Generated!**\n\n**Polygons:** {len(polygons)}\n**Method:** Visibility-based detection\n**Forecast:** +{forecast_hour}h\n**Coverage:** Full CONUS (20°-53°N, 134°-61°W)\n\nMimics NOAA HMS analyst procedures using HRRR forecast data."
else:
status = f"⚡ **ALL AMERICA Smoke KMZ Generated!**\n\n**Polygons:** {len(polygons)}\n**Parameter:** {parameter}\n**Forecast:** +{forecast_hour}h\n**Coverage:** Full CONUS (20°-53°N, 134°-61°W)\n\nOptimized for continental-scale smoke detection."
return status, kmz_path
return "KMZ generation failed - no suitable data found", None
except Exception as e:
print(f"Quick KMZ generation error: {e}")
return f"Quick KMZ generation error: {e}", None
def update_smoke_display(self, location, forecast_hour, parameter, detail_level, min_threshold, show_polygons=False, map_type='plotly', demo_mode=False):
"""Update smoke forecast display"""
try:
gc.collect()
print(f"\n=== SMOKE UPDATE: {location}, +{forecast_hour}h, {parameter} ===")
# Create smoke map
smoke_map = self.create_smoke_map(parameter, forecast_hour, detail_level, min_threshold, show_polygons, demo_mode)
# Generate status
current_time = datetime.utcnow()
forecast_time = current_time + timedelta(hours=forecast_hour)
param_info = self.smoke_manager.smoke_parameters.get(parameter, {})
param_name = param_info.get('name', parameter)
status = f"""
## 🌬️ HRRR-Smoke Forecast
**Location:** {location}
**Current:** {current_time.strftime('%H:%M UTC')}
**Forecast:** {forecast_time.strftime('%H:%M UTC')} (+{forecast_hour}h)
**Parameter:** {param_name}
**Detail Level:** {detail_level} (1=Fast, 5=Max Detail)
**Min Threshold:** {min_threshold}
**Data Source:** {"NOAA HRRR-Smoke Model" if HERBIE_AVAILABLE else "HRRR-Smoke Unavailable"}
**About HRRR-Smoke:** Real-time smoke forecasting using the NOAA High-Resolution Rapid Refresh
model with smoke tracking. Provides detailed predictions of smoke transport, dispersion, and
air quality impacts from wildfires and prescribed burns.
"""
# Try to generate narrative forecast for the location
try:
# Parse coordinates if provided
if ',' in location:
parts = location.split(',')
if len(parts) >= 2:
try:
lat = float(parts[0].strip())
lon = float(parts[1].strip())
forecast_data = self.get_smoke_forecast(lat, lon)
narrative = self.narrative_generator.generate_smoke_forecast(forecast_data, location)
except:
narrative = "Enter coordinates as 'latitude, longitude' for detailed forecast"
else:
narrative = "Enter coordinates as 'latitude, longitude' for detailed forecast"
else:
narrative = "Enter coordinates as 'latitude, longitude' for detailed forecast"
except Exception as e:
narrative = f"Narrative forecast error: {e}"
# Generate KMZ download and Folium map if polygons are requested
kmz_path = None
folium_html = "<p>No map data available</p>"
if show_polygons and parameter in ['MASSDEN', 'PM25', 'PMTF', 'HMS'] and HMS_LIBS_AVAILABLE:
kmz_path = self.generate_kmz_download(parameter, forecast_hour)
# Generate Folium map with polygons
try:
# Get the same data used for KMZ
ds, _ = self.smoke_manager.fetch_hrrr_smoke_data(parameter, 'surface', forecast_hour, return_info=True, fast_mode=demo_mode)
if ds is None:
ds = self.smoke_manager.generate_demo_smoke_data(parameter, forecast_hour)
if ds is not None:
# Process data to get polygons
grid_data = self.smoke_manager.process_smoke_grid(
ds, target_cells=8000, param_type='smoke', min_threshold=min_threshold
)
if grid_data is not None:
# Check if this is HMS categorical data
is_hms = (parameter == 'HMS')
polygons = self.polygon_generator.extract_smoke_polygons(
grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'],
min_area=0.02,
is_hms_categorical=is_hms
)
if polygons:
# Create Folium map
center_lat = np.mean(grid_data['lat2d'])
center_lon = np.mean(grid_data['lon2d'])
folium_map = self.folium_renderer.create_folium_map(
polygons, center_lat, center_lon, zoom_start=6
)
if folium_map:
folium_html = folium_map._repr_html_()
else:
folium_html = "<p>Folium map generation failed</p>"
else:
folium_html = "<p>No smoke polygons detected for current threshold</p>"
else:
folium_html = "<p>Unable to process smoke data</p>"
else:
folium_html = "<p>No smoke data available</p>"
except Exception as e:
print(f"Folium map generation error: {e}")
folium_html = f"<p>Folium map error: {e}</p>"
return status, smoke_map, narrative, kmz_path, folium_html
except Exception as e:
print(f"Smoke update error: {e}")
gc.collect()
error_fig = go.Figure()
error_fig.add_annotation(text=f"Update failed: {str(e)}", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False)
error_fig.update_layout(height=300)
return f"## Error\n{str(e)}", error_fig, "Narrative forecast unavailable due to error", None, "<p>Error loading map</p>"
# Initialize the application
smoke_app = HRRRSmokeApp()
# Gradio interface
with gr.Blocks(title="HRRR Smoke Forecast", theme=gr.themes.Soft()) as app:
gr.HTML("""
<div style="text-align: center; background: linear-gradient(135deg, #ff7e5f, #feb47b);
color: white; padding: 2rem; border-radius: 15px; margin-bottom: 1rem; box-shadow: 0 4px 6px rgba(0,0,0,0.1);">
<h1>🌬️ HRRR-Smoke Forecast System</h1>
<p style="font-size: 1.1em; margin-top: 0.5rem;">Real-time wildfire smoke forecasting using NOAA HRRR-Smoke model data</p>
<p style="font-size: 0.9em; opacity: 0.9;">Professional-grade smoke transport and air quality predictions</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📍 Forecast Configuration")
location = gr.Textbox(
value="39.7392, -104.9903", # Denver coordinates
label="Location (lat, lon or city)",
placeholder="Enter coordinates as '39.7392, -104.9903' or city name"
)
with gr.Row():
forecast_hour = gr.Slider(
minimum=0, maximum=48, value=12, step=3,
label="Forecast Hours",
info="Hours ahead to forecast"
)
detail_level = gr.Slider(
minimum=1, maximum=5, value=3, step=1,
label="Detail Level",
info="Higher = more detail (slower loading)"
)
parameter = gr.Dropdown(
choices=[
("🛰️ HMS-Style Smoke Detection", "HMS"),
("🔥 Combined Smoke (NOAA Best Practices)", "COMBINED"),
("Smoke Mass Density", "MASSDEN"),
("PM2.5 Concentration", "PM25"),
("Column Smoke Density", "COLMD"),
("Visibility", "VIS"),
("Fire Radiative Power", "FRPAVG"),
("Temperature", "TMP"),
("Wind Speed", "WIND"),
("Relative Humidity", "RH")
],
value="HMS",
label="Smoke Parameter",
info="Combined mode uses multiple HRRR parameters with NOAA methodology"
)
min_threshold = gr.Slider(
minimum=0.0, maximum=10.0, value=0.5, step=0.1,
label="Minimum Threshold",
info="Filter values below this threshold"
)
show_polygons = gr.Checkbox(
value=True, # Default to True for faster KMZ generation
label="Show HMS-style Polygons + KMZ Download",
info="Generate NOAA HMS-style smoke plume polygons and enable KMZ download"
)
demo_mode = gr.Checkbox(
value=False,
label="🚀 Demo Mode (Ultra-Fast)",
info="Use synthetic data for instant results when HRRR data is slow/unavailable"
)
map_type = gr.Radio(
choices=[
("Interactive (Plotly)", "plotly"),
("Leaflet/Grayscale (Folium)", "folium")
],
value="plotly",
label="Map Type",
info="Choose visualization style"
)
with gr.Row():
update_btn = gr.Button("🔄 Get Smoke Forecast", variant="primary", size="lg")
quick_kmz_btn = gr.Button("⚡ Quick KMZ", variant="secondary", size="lg")
gr.HTML("""
<div style="background: linear-gradient(135deg, #667eea, #764ba2); color: white; padding: 1.5rem; border-radius: 10px; margin-top: 1rem;">
<h4>🎯 HRRR-Smoke Features:</h4>
<ul style="font-size: 0.9em; margin: 0.5rem 0; list-style-type: none; padding-left: 0;">
<li>🔥 <strong>Real wildfire tracking:</strong> Live fire detection and emissions</li>
<li>🌪️ <strong>Smoke transport:</strong> 3D atmospheric dispersion modeling</li>
<li>🏥 <strong>Air quality impacts:</strong> PM2.5 and health assessments</li>
<li>👁️ <strong>Visibility forecasts:</strong> Aviation and travel safety</li>
<li>⏰ <strong>48-hour range:</strong> Extended smoke forecasts</li>
<li>🗺️ <strong>High resolution:</strong> 3km grid spacing</li>
<li>📐 <strong>HMS-style polygons:</strong> NOAA-compatible plume boundaries</li>
<li>📦 <strong>KMZ export:</strong> Google Earth compatible files</li>
<li>🌍 <strong>Leaflet maps:</strong> Grayscale smoke visualization</li>
<li>🎨 <strong>Multiple views:</strong> Interactive and satellite-ready styles</li>
</ul>
<p style="font-size: 0.8em; margin-top: 1rem; opacity: 0.9;">
<strong>Model:</strong> NOAA HRRR-Smoke provides operational smoke forecasts
used by air quality agencies and emergency management.
</p>
</div>
""")
with gr.Column(scale=2):
status_text = gr.Markdown("## Click 'Get Smoke Forecast' to load HRRR-Smoke data")
with gr.Tab("Plotly Map"):
smoke_map = gr.Plot(label="Interactive Smoke Forecast Map")
with gr.Tab("Leaflet Map (KMZ)"):
folium_map = gr.HTML(label="Leaflet Map with KMZ Display", value="<p>Click 'Get Smoke Forecast' to load map</p>")
with gr.Row():
kmz_download = gr.File(
label="Download KMZ File (Google Earth Compatible)",
visible=True, # Make visible by default to show quick generation works
interactive=False
)
with gr.Row():
with gr.Column():
narrative_forecast = gr.Markdown(
label="Detailed Smoke Forecast",
value="## Detailed smoke forecast will appear here after selecting a location and updating forecast."
)
def update_display_wrapper(*args):
"""Wrapper to handle KMZ file display and Folium map"""
result = smoke_app.update_smoke_display(*args)
if len(result) == 5:
# New format: status, plot, narrative, kmz_path, folium_html
status, plot, narrative, kmz_path, folium_html = result
if kmz_path is not None:
return status, plot, narrative, gr.update(value=kmz_path, visible=True), folium_html
else:
return status, plot, narrative, gr.update(visible=False), folium_html
elif len(result) == 4:
# Old format: status, plot, narrative, kmz_path
status, plot, narrative, kmz_path = result
if kmz_path is not None:
return status, plot, narrative, gr.update(value=kmz_path, visible=True), "<p>Folium map not available</p>"
else:
return status, plot, narrative, gr.update(visible=False), "<p>Folium map not available</p>"
else:
# Fallback
return result[0], result[1], result[2], gr.update(visible=False), "<p>Error loading map</p>"
def quick_kmz_wrapper(forecast_hour, parameter, demo_mode):
"""Wrapper for quick KMZ generation with Folium map display"""
status, kmz_path = smoke_app.quick_kmz_generation(forecast_hour, parameter, demo_mode)
# Generate Folium map for the same data
folium_html = "<p>No map data available</p>"
try:
# Handle HMS and COMBINED parameters in Folium wrapper
if parameter == 'HMS':
if demo_mode:
datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters(
['VIS', 'MASSDEN', 'PM25'], forecast_hour
)
hms_smoke = smoke_app.polygon_generator.hms_style_smoke_detection(datasets)
lats = np.linspace(20.192, 52.863, 50) # Full HRRR CONUS domain
lons = np.linspace(-134.096, -60.917, 60) # Pacific to Atlantic
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
datasets = smoke_app.smoke_manager.fetch_multiple_smoke_parameters(
['VIS', 'MASSDEN', 'PM25'], 'surface', forecast_hour, fast_mode=True
)
if datasets:
hms_smoke = smoke_app.polygon_generator.hms_style_smoke_detection(datasets)
if hms_smoke is not None:
# Use HRRR grid coordinates for HMS data
lats = np.linspace(20.192, 52.863, hms_smoke.shape[0])
lons = np.linspace(-134.096, -60.917, hms_smoke.shape[1])
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
ds = None
else:
datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters(
['VIS', 'MASSDEN', 'PM25'], forecast_hour
)
hms_smoke = smoke_app.polygon_generator.hms_style_smoke_detection(datasets)
lats = np.linspace(20.192, 52.863, 50)
lons = np.linspace(-134.096, -60.917, 60)
ds = xr.Dataset({
'hms_smoke': (['lat', 'lon'], hms_smoke)
}, coords={'lat': lats, 'lon': lons})
elif parameter == 'COMBINED':
if demo_mode:
datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour
)
combined_smoke = smoke_app.polygon_generator.combine_smoke_parameters(datasets)
lats = np.linspace(25, 50, 50)
lons = np.linspace(-125, -70, 60)
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
datasets = smoke_app.smoke_manager.fetch_multiple_smoke_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], 'surface', forecast_hour, fast_mode=True
)
if datasets:
combined_smoke = smoke_app.polygon_generator.combine_smoke_parameters(datasets)
# Use HRRR grid coordinates for Folium wrapper too
lats = np.linspace(20.192, 52.863, combined_smoke.shape[0])
lons = np.linspace(-134.096, -60.917, combined_smoke.shape[1])
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
datasets = smoke_app.smoke_manager.generate_multiple_demo_parameters(
['MASSDEN', 'PM25', 'VIS', 'COLMD', 'FRPAVG'], forecast_hour
)
combined_smoke = smoke_app.polygon_generator.combine_smoke_parameters(datasets)
# Use demo coordinates for synthetic data (50x60 grid)
lats = np.linspace(25, 50, combined_smoke.shape[0])
lons = np.linspace(-125, -70, combined_smoke.shape[1])
ds = xr.Dataset({
'combined_smoke': (['lat', 'lon'], combined_smoke)
}, coords={'lat': lats, 'lon': lons})
else:
# Single parameter mode
if demo_mode:
ds = None
else:
ds, _ = smoke_app.smoke_manager.fetch_hrrr_smoke_data(parameter, 'surface', forecast_hour, return_info=True, fast_mode=True)
if ds is None:
ds = smoke_app.smoke_manager.generate_demo_smoke_data(parameter, forecast_hour)
if ds is not None and parameter in ['MASSDEN', 'PM25', 'PMTF', 'COMBINED', 'HMS'] and HMS_LIBS_AVAILABLE:
# Ultra-fast processing for Quick KMZ with zero threshold to show all smoke
grid_data = smoke_app.smoke_manager.process_smoke_grid(
ds, target_cells=4000, param_type='smoke', min_threshold=0.0
)
if grid_data is not None:
# Check if this is HMS categorical data
is_hms = (parameter == 'HMS')
polygons = smoke_app.polygon_generator.extract_smoke_polygons(
grid_data['lat2d'], grid_data['lon2d'], grid_data['z2d'],
min_area=0.01,
is_hms_categorical=is_hms
)
if polygons:
# Create Folium map
center_lat = np.mean(grid_data['lat2d'])
center_lon = np.mean(grid_data['lon2d'])
folium_map = smoke_app.folium_renderer.create_folium_map(
polygons, center_lat, center_lon, zoom_start=6
)
if folium_map:
folium_html = folium_map._repr_html_()
else:
folium_html = "<p>Quick KMZ: Folium map generation failed</p>"
else:
folium_html = "<p>Quick KMZ: No smoke polygons detected</p>"
else:
folium_html = "<p>Quick KMZ: Unable to process smoke data</p>"
else:
folium_html = "<p>Quick KMZ: Parameter not supported for polygon generation</p>"
except Exception as e:
print(f"Quick KMZ Folium map generation error: {e}")
folium_html = f"<p>Quick KMZ map error: {e}</p>"
if kmz_path is not None:
return status, gr.update(value=kmz_path, visible=True), folium_html
else:
return status, gr.update(visible=False), folium_html
# Event handlers
update_btn.click(
fn=update_display_wrapper,
inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type, demo_mode],
outputs=[status_text, smoke_map, narrative_forecast, kmz_download, folium_map]
)
# Quick KMZ generation
quick_kmz_btn.click(
fn=quick_kmz_wrapper,
inputs=[forecast_hour, parameter, demo_mode],
outputs=[status_text, kmz_download, folium_map]
)
# Auto-update when parameter changes
parameter.change(
fn=update_display_wrapper,
inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type],
outputs=[status_text, smoke_map, narrative_forecast, kmz_download]
)
# Update when polygon option changes
show_polygons.change(
fn=update_display_wrapper,
inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type],
outputs=[status_text, smoke_map, narrative_forecast, kmz_download]
)
# Update when map type changes
map_type.change(
fn=update_display_wrapper,
inputs=[location, forecast_hour, parameter, detail_level, min_threshold, show_polygons, map_type],
outputs=[status_text, smoke_map, narrative_forecast, kmz_download]
)
gr.HTML("""
<div style="text-align: center; padding: 1rem; margin-top: 2rem; border-top: 1px solid #eee; color: #666;">
<p><strong>Data Source:</strong> NOAA HRRR-Smoke Model | <strong>Update Frequency:</strong> Hourly | <strong>Resolution:</strong> 3km</p>
<p style="font-size: 0.8em;">
HRRR-Smoke forecasts are experimental and should be used in conjunction with official air quality and emergency guidance.
</p>
</div>
""")
if __name__ == "__main__":
app.launch(server_name="0.0.0.0", server_port=7860)