Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import folium | |
| from folium import plugins | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| import xarray as xr | |
| from datetime import datetime, timedelta, timezone | |
| import matplotlib.pyplot as plt | |
| import io | |
| import base64 | |
| import tempfile | |
| import os | |
| from scipy.spatial import cKDTree | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # GRIB2 parsing imports | |
| try: | |
| import cfgrib | |
| import pygrib | |
| GRIB_AVAILABLE = True | |
| except ImportError: | |
| GRIB_AVAILABLE = False | |
| print("GRIB2 libraries not available. Install cfgrib and pygrib for production use.") | |
| def create_map(): | |
| """Create an interactive map centered on Europe""" | |
| m = folium.Map( | |
| location=[50.0, 10.0], # Center on Europe | |
| zoom_start=4, | |
| tiles='OpenStreetMap' | |
| ) | |
| # Add click functionality | |
| m.add_child(folium.ClickForMarker(popup="Click to select location")) | |
| return m | |
| def find_nearest_grid_point(target_lat, target_lon, grid_lats, grid_lons): | |
| """ | |
| Find the nearest grid point to the target coordinates using KDTree | |
| """ | |
| try: | |
| # Convert to radians for proper distance calculation | |
| target_coords = np.radians([target_lat, target_lon]) | |
| grid_coords = np.column_stack([grid_lats.ravel(), grid_lons.ravel()]) | |
| grid_coords_rad = np.radians(grid_coords) | |
| # Build KDTree and find nearest point | |
| tree = cKDTree(grid_coords_rad) | |
| distance, index = tree.query(target_coords) | |
| # Convert back to unraveled indices | |
| grid_shape = grid_lats.shape | |
| unravel_idx = np.unravel_index(index, grid_shape) | |
| return unravel_idx | |
| except Exception as e: | |
| # Fallback to simple method | |
| lat_diff = np.abs(grid_lats - target_lat) | |
| lon_diff = np.abs(grid_lons - target_lon) | |
| distance = lat_diff + lon_diff | |
| return np.unravel_index(np.argmin(distance), grid_lats.shape) | |
| def get_latest_dwd_run(): | |
| """ | |
| Get the latest available DWD ICON model run | |
| DWD runs ICON at 00, 06, 12, 18 UTC | |
| """ | |
| now = datetime.now(timezone.utc) | |
| # DWD typically has a 3-4 hour delay before data is available | |
| available_time = now - timedelta(hours=4) | |
| # Find the most recent run time | |
| run_hours = [0, 6, 12, 18] | |
| current_hour = available_time.hour | |
| # Find the most recent run | |
| latest_run = max([h for h in run_hours if h <= current_hour], default=18) | |
| if latest_run > current_hour: | |
| # Go to previous day | |
| available_time = available_time - timedelta(days=1) | |
| latest_run = 18 | |
| run_date = available_time.replace(hour=latest_run, minute=0, second=0, microsecond=0) | |
| return run_date | |
| def download_dwd_coordinate_files(run_date): | |
| """ | |
| Download coordinate files (CLAT and CLON) from DWD Open Data Server | |
| Coordinate files use a different naming convention than weather parameters | |
| Note: Coordinate files are only available for the 00Z run | |
| """ | |
| try: | |
| base_url = "https://opendata.dwd.de/weather/nwp/icon/grib" | |
| date_str = run_date.strftime("%Y%m%d") | |
| # Coordinate files are time-invariant and only available for 00Z run | |
| clat_filename = f"icon_global_icosahedral_time-invariant_{date_str}00_CLAT.grib2.bz2" | |
| clon_filename = f"icon_global_icosahedral_time-invariant_{date_str}00_CLON.grib2.bz2" | |
| clat_url = f"{base_url}/00/clat/{clat_filename}" | |
| clon_url = f"{base_url}/00/clon/{clon_filename}" | |
| print(f"Downloading: {clat_url}") | |
| clat_response = requests.get(clat_url, timeout=60) | |
| clat_response.raise_for_status() | |
| print(f"Downloading: {clon_url}") | |
| clon_response = requests.get(clon_url, timeout=60) | |
| clon_response.raise_for_status() | |
| # Save coordinate files | |
| clat_file = tempfile.NamedTemporaryFile(suffix='_CLAT.grib2.bz2', delete=False) | |
| clat_file.write(clat_response.content) | |
| clat_file.close() | |
| clon_file = tempfile.NamedTemporaryFile(suffix='_CLON.grib2.bz2', delete=False) | |
| clon_file.write(clon_response.content) | |
| clon_file.close() | |
| return clat_file.name, clon_file.name | |
| except Exception as e: | |
| print(f"Error downloading coordinate files: {e}") | |
| return None, None | |
| def download_dwd_grib_file(run_date, parameter, level=None, forecast_hour=0): | |
| """ | |
| Download GRIB2 file from DWD Open Data Server | |
| Args: | |
| run_date: datetime of model run | |
| parameter: weather parameter (e.g., 't_2m', 'u_10m', 'pmsl') | |
| level: pressure level if applicable | |
| forecast_hour: forecast hour (0-180) | |
| """ | |
| try: | |
| # DWD ICON GRIB file URL structure | |
| base_url = "https://opendata.dwd.de/weather/nwp/icon/grib" | |
| run_hour = f"{run_date.hour:02d}" | |
| date_str = run_date.strftime("%Y%m%d") | |
| # Map parameter names to DWD names and construct URLs | |
| parameter_mapping = { | |
| 't_2m': 'T_2M', | |
| 'relhum_2m': 'RELHUM_2M', | |
| 'u_10m': 'U_10M', | |
| 'v_10m': 'V_10M', | |
| 'pmsl': 'PMSL', | |
| 'tot_prec': 'TOT_PREC', | |
| 'rain_con': 'RAIN_CON', | |
| 'rain_gsp': 'RAIN_GSP', | |
| 'snow_con': 'SNOW_CON', | |
| 'snow_gsp': 'SNOW_GSP', | |
| 'cape_con': 'CAPE_CON', | |
| 'clct': 'CLCT', | |
| 'asob_s': 'ASOB_S', | |
| 'vmax_10m': 'VMAX_10M', | |
| 'lpi_con': 'LPI_CON' | |
| } | |
| dwd_param = parameter_mapping.get(parameter, parameter.upper()) | |
| if level: | |
| # Pressure level data | |
| filename = f"icon_global_icosahedral_{level}_{date_str}{run_hour}_{forecast_hour:03d}_{dwd_param}.grib2.bz2" | |
| url = f"{base_url}/{run_hour}/{parameter}/{filename}" | |
| else: | |
| # Surface data - correct filename format | |
| filename = f"icon_global_icosahedral_single-level_{date_str}{run_hour}_{forecast_hour:03d}_{dwd_param}.grib2.bz2" | |
| url = f"{base_url}/{run_hour}/{parameter}/{filename}" | |
| print(f"Downloading: {url}") | |
| response = requests.get(url, timeout=60) | |
| response.raise_for_status() | |
| # Save to temporary file | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.grib2.bz2', delete=False) | |
| temp_file.write(response.content) | |
| temp_file.close() | |
| return temp_file.name | |
| except Exception as e: | |
| print(f"Error downloading {parameter} for hour {forecast_hour}: {e}") | |
| return None | |
| def parse_grib_file(grib_file_path): | |
| """ | |
| Parse GRIB2 file using cfgrib/xarray | |
| """ | |
| try: | |
| if not GRIB_AVAILABLE: | |
| raise Exception("GRIB2 libraries not available") | |
| # Decompress if needed | |
| if grib_file_path.endswith('.bz2'): | |
| import bz2 | |
| with bz2.open(grib_file_path, 'rb') as f: | |
| decompressed_content = f.read() | |
| decompressed_file = tempfile.NamedTemporaryFile(suffix='.grib2', delete=False) | |
| decompressed_file.write(decompressed_content) | |
| decompressed_file.close() | |
| grib_file_path = decompressed_file.name | |
| # Open with cfgrib/xarray | |
| ds = xr.open_dataset(grib_file_path, engine='cfgrib') | |
| return ds | |
| except Exception as e: | |
| print(f"Error parsing GRIB file: {e}") | |
| return None | |
| def fetch_dwd_icon_data(lat, lon): | |
| """ | |
| Fetch real weather forecast data directly from DWD Open Data Server | |
| This downloads and parses actual GRIB2 files from DWD ICON model | |
| """ | |
| try: | |
| print(f"Fetching real DWD ICON data for {lat:.3f}Β°N, {lon:.3f}Β°E") | |
| if not GRIB_AVAILABLE: | |
| raise Exception("GRIB2 libraries not available. Install cfgrib and pygrib for DWD ICON data access.") | |
| # Get latest model run | |
| run_date = get_latest_dwd_run() | |
| print(f"Using DWD ICON run: {run_date.strftime('%Y-%m-%d %H:%M UTC')}") | |
| # Define ESSENTIAL parameters only for faster downloads | |
| parameters = { | |
| 't_2m': 'Temperature at 2m', | |
| 'u_10m': 'U-component of wind at 10m', | |
| 'v_10m': 'V-component of wind at 10m', | |
| 'tot_prec': 'Total precipitation', | |
| 'snow_gsp': 'Grid-scale snow', | |
| 'clct': 'Total cloud cover', | |
| 'cape_con': 'Convective Available Potential Energy', | |
| 'vmax_10m': 'Wind gusts at 10m' | |
| } | |
| # Download coordinate files first | |
| print("Downloading coordinate information...") | |
| clat_file, clon_file = download_dwd_coordinate_files(run_date) | |
| if not clat_file or not clon_file: | |
| raise Exception("Failed to download coordinate files from DWD ICON server") | |
| # Parse coordinate files | |
| clat_ds = parse_grib_file(clat_file) | |
| clon_ds = parse_grib_file(clon_file) | |
| if clat_ds is None or clon_ds is None: | |
| raise Exception("Failed to parse coordinate files from DWD ICON server") | |
| # Get coordinate arrays | |
| # DWD coordinate files may use different variable names | |
| try: | |
| if 'clat' in clat_ds: | |
| grid_lats = clat_ds.clat.values | |
| elif 'CLAT' in clat_ds: | |
| grid_lats = clat_ds.CLAT.values | |
| else: | |
| # Try the first available variable | |
| var_names = list(clat_ds.data_vars.keys()) | |
| print(f"Available CLAT variables: {var_names}") | |
| grid_lats = clat_ds[var_names[0]].values | |
| if 'clon' in clon_ds: | |
| grid_lons = clon_ds.clon.values | |
| elif 'CLON' in clon_ds: | |
| grid_lons = clon_ds.CLON.values | |
| else: | |
| # Try the first available variable | |
| var_names = list(clon_ds.data_vars.keys()) | |
| print(f"Available CLON variables: {var_names}") | |
| grid_lons = clon_ds[var_names[0]].values | |
| except Exception as e: | |
| print(f"Error extracting coordinate arrays: {e}") | |
| print(f"CLAT dataset: {clat_ds}") | |
| print(f"CLON dataset: {clon_ds}") | |
| raise Exception(f"Failed to extract coordinate arrays: {e}") | |
| # Find nearest grid point | |
| nearest_idx = find_nearest_grid_point(lat, lon, grid_lats, grid_lons) | |
| print(f"Nearest grid point: {grid_lats[nearest_idx]:.3f}Β°N, {grid_lons[nearest_idx]:.3f}Β°E") | |
| # Download and process forecast data - high frequency for first 2 days, then longer intervals | |
| forecast_hours = [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 72, 96] # Every 3hrs for 48hrs, then 24hr intervals | |
| weather_data = {'times': [], 'data': {param: [] for param in parameters.keys()}} | |
| for fh in forecast_hours: | |
| print(f"Processing forecast hour +{fh}...") | |
| hour_data = {} | |
| for param in parameters.keys(): | |
| grib_file = download_dwd_grib_file(run_date, param, forecast_hour=fh) | |
| if grib_file: | |
| ds = parse_grib_file(grib_file) | |
| if ds is not None: | |
| # Try different variable name variations | |
| value = None | |
| possible_names = [param, param.upper(), param.lower()] | |
| for name in possible_names: | |
| if name in ds: | |
| value = ds[name].values[nearest_idx] | |
| break | |
| if value is None: | |
| # Try the first available variable if exact match not found | |
| var_names = list(ds.data_vars.keys()) | |
| if var_names: | |
| print(f"Available variables for {param}: {var_names}") | |
| value = ds[var_names[0]].values[nearest_idx] | |
| hour_data[param] = value | |
| # Clean up temporary file | |
| os.unlink(grib_file) | |
| else: | |
| hour_data[param] = None | |
| else: | |
| hour_data[param] = None | |
| # Store the data | |
| forecast_time = run_date + timedelta(hours=fh) | |
| weather_data['times'].append(forecast_time) | |
| for param in parameters.keys(): | |
| weather_data['data'][param].append(hour_data[param]) | |
| # Clean up coordinate files | |
| os.unlink(clat_file) | |
| os.unlink(clon_file) | |
| print(f"Successfully processed {len(forecast_hours)} forecast hours") | |
| return { | |
| 'location': {'lat': lat, 'lon': lon, 'name': f'DWD ICON {lat:.2f}Β°N, {lon:.2f}Β°E'}, | |
| 'run_date': run_date, | |
| 'weather_data': weather_data, | |
| 'nearest_grid': {'lat': float(grid_lats[nearest_idx]), 'lon': float(grid_lons[nearest_idx])} | |
| } | |
| except Exception as e: | |
| print(f"Error fetching real DWD ICON data: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise Exception(f"Failed to fetch DWD ICON data: {e}") | |
| def get_forecast_data(lat, lon, forecast_hour="00"): | |
| """ | |
| Fetch real forecast data for given coordinates using DWD ICON model data | |
| """ | |
| try: | |
| print(f"Starting forecast data retrieval for {lat:.3f}Β°N, {lon:.3f}Β°E") | |
| # Fetch data from DWD ICON model | |
| weather_data = fetch_dwd_icon_data(lat, lon) | |
| # Process real DWD GRIB2 data only | |
| if 'weather_data' in weather_data: | |
| return process_real_dwd_data(weather_data, lat, lon) | |
| else: | |
| raise Exception("Invalid weather data format received from DWD ICON server") | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"Error fetching DWD ICON forecast data: {str(e)}" | |
| print(error_msg) | |
| print("Full error traceback:") | |
| print(traceback.format_exc()) | |
| # No fallback - raise the error | |
| raise Exception(error_msg) | |
| def process_real_dwd_data(dwd_data, lat, lon): | |
| """ | |
| Process real DWD GRIB2 data into forecast format | |
| """ | |
| try: | |
| weather_data = dwd_data['weather_data'] | |
| run_date = dwd_data['run_date'] | |
| nearest_grid = dwd_data['nearest_grid'] | |
| timestamps = weather_data['times'] | |
| data = weather_data['data'] | |
| # Extract and convert data | |
| temperature = [] | |
| humidity = [] | |
| wind_speed = [] | |
| wind_direction = [] | |
| wind_gust = [] | |
| pressure = [] | |
| precipitation = [] | |
| rain_convective = [] | |
| rain_gridscale = [] | |
| snow_convective = [] | |
| snow_gridscale = [] | |
| cape = [] | |
| lightning_potential = [] | |
| cloud_cover = [] | |
| solar_radiation = [] | |
| for i in range(len(timestamps)): | |
| # Temperature (convert from Kelvin to Celsius) | |
| t_2m = data['t_2m'][i] | |
| if t_2m is not None and t_2m > 200: # Kelvin | |
| temperature.append(t_2m - 273.15) | |
| else: | |
| temperature.append(15.0) # Default | |
| # Humidity - use default since we don't download it for speed | |
| humidity.append(60.0) # Default humidity for faster processing | |
| # Wind components | |
| u_10m = data['u_10m'][i] if data['u_10m'][i] is not None else 0.0 | |
| v_10m = data['v_10m'][i] if data['v_10m'][i] is not None else 0.0 | |
| # Calculate wind speed and direction | |
| wind_speed_val = np.sqrt(u_10m**2 + v_10m**2) | |
| wind_dir_val = (270 - np.degrees(np.arctan2(v_10m, u_10m))) % 360 | |
| wind_speed.append(wind_speed_val) | |
| wind_direction.append(wind_dir_val) | |
| # Wind gusts | |
| vmax = data['vmax_10m'][i] | |
| wind_gust.append(vmax if vmax is not None else wind_speed_val * 1.5) | |
| # Pressure - use default since we don't download it for speed | |
| pressure.append(1013.25) # Default pressure for faster processing | |
| # Precipitation (convert from kg/mΒ²/s to mm/h if needed) | |
| tot_prec = data['tot_prec'][i] | |
| if tot_prec is not None: | |
| if tot_prec < 1: # kg/mΒ²/s | |
| precipitation.append(tot_prec * 3600) # Convert to mm/h | |
| else: | |
| precipitation.append(tot_prec) | |
| else: | |
| precipitation.append(0.0) | |
| # Rain data - use defaults for faster processing | |
| rain_convective.append(0.0) # Default - not downloaded | |
| rain_gridscale.append(0.0) # Default - not downloaded | |
| snow_convective.append(0.0) # Default - not downloaded | |
| # Grid-scale snow | |
| snow_gsp = data.get('snow_gsp', [None])[i] if 'snow_gsp' in data else None | |
| if snow_gsp is not None: | |
| if snow_gsp < 1: # kg/mΒ²/s | |
| snow_gridscale.append(snow_gsp * 3600) # Convert to mm/h | |
| else: | |
| snow_gridscale.append(snow_gsp) | |
| else: | |
| snow_gridscale.append(0.0) | |
| # CAPE (Convective Available Potential Energy) | |
| cape_con = data.get('cape_con', [None])[i] if 'cape_con' in data else None | |
| if cape_con is not None: | |
| cape.append(cape_con) | |
| else: | |
| cape.append(0.0) | |
| # Lightning Potential - use default for faster processing | |
| lightning_potential.append(0.0) # Default - not downloaded | |
| # Cloud cover (convert from fraction to percentage if needed) | |
| clct = data.get('clct', [None])[i] if 'clct' in data else None | |
| if clct is not None: | |
| if clct <= 1.0: # Fraction | |
| cloud_cover.append(clct * 100) | |
| else: # Already percentage | |
| cloud_cover.append(clct) | |
| else: | |
| cloud_cover.append(50.0) # Default | |
| # Solar radiation - use default for faster processing | |
| solar_radiation.append(0.0) # Default - not downloaded | |
| result = { | |
| 'timestamps': timestamps, | |
| 'temperature': temperature, | |
| 'humidity': humidity, | |
| 'wind_speed': wind_speed, | |
| 'wind_direction': wind_direction, | |
| 'wind_gust': wind_gust, | |
| 'pressure': pressure, | |
| 'precipitation': precipitation, | |
| 'rain_convective': rain_convective, | |
| 'rain_gridscale': rain_gridscale, | |
| 'snow_convective': snow_convective, | |
| 'snow_gridscale': snow_gridscale, | |
| 'snow': [sc + sg for sc, sg in zip(snow_convective, snow_gridscale)], # Total snow | |
| 'cape': cape, | |
| 'lightning_potential': lightning_potential, | |
| 'cloud_cover': cloud_cover, | |
| 'solar_radiation': solar_radiation, | |
| 'lat': lat, | |
| 'lon': lon, | |
| 'forecast_date': run_date.strftime('%Y-%m-%d %H:%M UTC'), | |
| 'data_source': 'Real DWD ICON GRIB2', | |
| 'location_name': f"DWD ICON {lat:.2f}Β°N, {lon:.2f}Β°E", | |
| 'nearest_grid_lat': nearest_grid['lat'], | |
| 'nearest_grid_lon': nearest_grid['lon'] | |
| } | |
| print(f"Successfully processed {len(timestamps)} hours of real DWD data") | |
| return result | |
| except Exception as e: | |
| print(f"Error processing real DWD data: {e}") | |
| raise e | |
| def analyze_weather_events(forecast_data): | |
| """ | |
| Analyze forecast data to identify significant weather events and timing | |
| Enhanced with snow and thunderstorm detection using DWD ICON data | |
| """ | |
| timestamps = forecast_data['timestamps'] | |
| temperature = forecast_data['temperature'] | |
| precipitation = forecast_data.get('precipitation', [0] * len(timestamps)) | |
| wind_speed = forecast_data['wind_speed'] | |
| wind_gust = forecast_data.get('wind_gust', wind_speed) | |
| humidity = forecast_data['humidity'] | |
| cloud_cover = forecast_data.get('cloud_cover', [50] * len(timestamps)) | |
| pressure = forecast_data.get('pressure', [1013] * len(timestamps)) | |
| # New snow and thunderstorm variables | |
| snow = forecast_data.get('snow', [0] * len(timestamps)) | |
| rain_convective = forecast_data.get('rain_convective', [0] * len(timestamps)) | |
| cape = forecast_data.get('cape', [0] * len(timestamps)) | |
| lightning_potential = forecast_data.get('lightning_potential', [0] * len(timestamps)) | |
| events = [] | |
| # Analyze each forecast period | |
| for i, timestamp in enumerate(timestamps): | |
| temp = temperature[i] | |
| precip = precipitation[i] if i < len(precipitation) else 0 | |
| wind = wind_speed[i] if i < len(wind_speed) else 0 | |
| gust = wind_gust[i] if i < len(wind_gust) else wind | |
| rh = humidity[i] if i < len(humidity) else 50 | |
| clouds = cloud_cover[i] if i < len(cloud_cover) else 50 | |
| press = pressure[i] if i < len(pressure) else 1013 | |
| # Weather event detection | |
| event = { | |
| 'time': timestamp, | |
| 'conditions': [], | |
| 'severity': 'normal', | |
| 'primary_weather': None, | |
| 'wind_descriptor': None, | |
| 'precipitation_type': None, | |
| 'temperature_descriptor': None | |
| } | |
| # Enhanced precipitation analysis with snow and thunderstorm detection | |
| snow_rate = snow[i] if i < len(snow) else 0 | |
| convective_rain = rain_convective[i] if i < len(rain_convective) else 0 | |
| cape_val = cape[i] if i < len(cape) else 0 | |
| lightning_idx = lightning_potential[i] if i < len(lightning_potential) else 0 | |
| # Snow detection | |
| if snow_rate > 0.1: # mm/h snow | |
| if snow_rate < 1.0: | |
| event['precipitation_type'] = 'light_snow' | |
| event['conditions'].append('light snow') | |
| elif snow_rate < 3.0: | |
| event['precipitation_type'] = 'snow' | |
| event['conditions'].append('snow') | |
| event['severity'] = 'moderate' | |
| else: | |
| event['precipitation_type'] = 'heavy_snow' | |
| event['conditions'].append('heavy snow') | |
| event['severity'] = 'significant' | |
| # Thunderstorm detection using CAPE and convective precipitation | |
| elif convective_rain > 0.5 or (cape_val > 1000 and precip > 1.0): | |
| if cape_val > 2500 or lightning_idx > 0.5: | |
| event['precipitation_type'] = 'severe_thunderstorms' | |
| event['conditions'].append('severe thunderstorms') | |
| event['severity'] = 'significant' | |
| else: | |
| event['precipitation_type'] = 'thunderstorms' | |
| event['conditions'].append('thunderstorms') | |
| event['severity'] = 'moderate' | |
| # Regular rain analysis | |
| elif precip > 0.1: # mm/h | |
| if precip < 1.0: | |
| event['precipitation_type'] = 'light_rain' | |
| event['conditions'].append('light rain') | |
| elif precip < 5.0: | |
| event['precipitation_type'] = 'rain' | |
| event['conditions'].append('rain') | |
| event['severity'] = 'moderate' | |
| else: | |
| event['precipitation_type'] = 'heavy_rain' | |
| event['conditions'].append('heavy rain') | |
| event['severity'] = 'significant' | |
| # Temperature analysis | |
| if temp < 0: | |
| event['temperature_descriptor'] = 'freezing' | |
| event['conditions'].append('freezing temperatures') | |
| if precip > 0.1: | |
| event['precipitation_type'] = 'snow' | |
| event['conditions'] = ['snow' if 'rain' in str(event['conditions']) else event['conditions'][0]] | |
| elif temp < 5: | |
| event['temperature_descriptor'] = 'cold' | |
| elif temp > 30: | |
| event['temperature_descriptor'] = 'hot' | |
| event['conditions'].append('hot temperatures') | |
| elif temp > 25: | |
| event['temperature_descriptor'] = 'warm' | |
| # Wind analysis | |
| if gust > 25: # m/s (about 55 mph) | |
| event['wind_descriptor'] = 'very_windy' | |
| event['conditions'].append('very windy') | |
| event['severity'] = 'significant' | |
| elif gust > 15: # m/s (about 35 mph) | |
| event['wind_descriptor'] = 'windy' | |
| event['conditions'].append('windy') | |
| event['severity'] = 'moderate' | |
| elif wind > 10: # m/s (about 22 mph) | |
| event['wind_descriptor'] = 'breezy' | |
| event['conditions'].append('breezy') | |
| # Thunderstorm potential analysis | |
| if temp > 20 and rh > 70 and precip > 2.0 and clouds > 80: | |
| if gust > 20: # Strong winds + heavy precip + high humidity | |
| event['conditions'].append('thunderstorms possible') | |
| event['precipitation_type'] = 'thunderstorms' | |
| event['severity'] = 'significant' | |
| # Fog analysis | |
| if rh > 95 and wind < 3: | |
| event['conditions'].append('fog possible') | |
| # Determine primary weather with enhanced snow and thunderstorm detection | |
| if event['conditions']: | |
| primary_condition = event['conditions'][0] | |
| if 'severe thunderstorms' in primary_condition: | |
| event['primary_weather'] = 'severe_thunderstorms' | |
| elif 'thunderstorms' in primary_condition: | |
| event['primary_weather'] = 'thunderstorms' | |
| elif 'heavy_snow' in primary_condition: | |
| event['primary_weather'] = 'heavy_snow' | |
| elif 'snow' in primary_condition: | |
| event['primary_weather'] = 'snow' | |
| elif 'heavy_rain' in primary_condition: | |
| event['primary_weather'] = 'heavy_rain' | |
| elif 'rain' in primary_condition: | |
| event['primary_weather'] = 'rain' | |
| elif 'windy' in primary_condition or 'breezy' in primary_condition: | |
| event['primary_weather'] = 'wind' | |
| else: | |
| event['primary_weather'] = primary_condition | |
| events.append(event) | |
| return events | |
| def generate_forecast_text(forecast_data, location_name="Selected Location"): | |
| """ | |
| Generate NOAA-style forecast text from gridded data | |
| Natural flowing language similar to NWS Zone Forecast Products | |
| """ | |
| events = analyze_weather_events(forecast_data) | |
| current_time = datetime.now() | |
| # Analyze overall conditions and trends | |
| temperatures = forecast_data['temperature'] | |
| precipitation = forecast_data.get('precipitation', [0] * len(temperatures)) | |
| wind_speeds = forecast_data['wind_speed'] | |
| humidity = forecast_data['humidity'] | |
| # Calculate key statistics | |
| max_temp = max(temperatures) | |
| min_temp = min(temperatures) | |
| avg_precip = sum(precipitation) / len(precipitation) if precipitation else 0 | |
| max_wind = max(wind_speeds) | |
| # Determine dominant weather pattern | |
| rain_hours = sum(1 for p in precipitation if p > 0.1) | |
| heavy_rain_hours = sum(1 for p in precipitation if p > 2.0) | |
| windy_hours = sum(1 for w in wind_speeds if w > 10) | |
| forecast_text = f"**{location_name} Extended Forecast**\n\n" | |
| forecast_text += f"Issued {current_time.strftime('%A %B %d, %Y at %I:%M %p')}\n\n" | |
| # Generate overview paragraph | |
| overview = generate_overview_paragraph(rain_hours, heavy_rain_hours, windy_hours, max_temp, min_temp, max_wind, len(temperatures)) | |
| forecast_text += overview + "\n\n" | |
| # Generate detailed daily forecasts with natural language | |
| daily_forecasts = generate_daily_detailed_forecasts(forecast_data, events) | |
| forecast_text += daily_forecasts | |
| # Add specific timing information in narrative form | |
| timing_narrative = generate_timing_narrative(events, precipitation, forecast_data['timestamps']) | |
| if timing_narrative: | |
| forecast_text += "\n" + timing_narrative + "\n" | |
| # Add any significant weather advisories | |
| advisories = generate_advisories(events) | |
| if advisories: | |
| forecast_text += "\n**Weather Advisories:**\n" + advisories | |
| return forecast_text | |
| def generate_overview_paragraph(rain_hours, heavy_rain_hours, windy_hours, max_temp, min_temp, max_wind, total_hours): | |
| """Generate a natural overview paragraph like NOAA""" | |
| overview_parts = [] | |
| # Temperature narrative | |
| if max_temp > 25: | |
| temp_desc = f"warm with highs reaching {max_temp:.0f}Β°C" | |
| elif max_temp < 10: | |
| temp_desc = f"cool with highs only reaching {max_temp:.0f}Β°C" | |
| else: | |
| temp_desc = f"mild with highs near {max_temp:.0f}Β°C" | |
| if min_temp < 0: | |
| temp_desc += f" and overnight lows dropping to {min_temp:.0f}Β°C" | |
| elif abs(max_temp - min_temp) > 15: | |
| temp_desc += f" with significant cooling overnight to {min_temp:.0f}Β°C" | |
| # Weather pattern narrative | |
| if rain_hours > total_hours * 0.6: | |
| if heavy_rain_hours > 3: | |
| weather_pattern = "A persistent weather system will bring frequent periods of rain, with some heavy downpours possible" | |
| else: | |
| weather_pattern = "Unsettled weather with rain likely through much of the forecast period" | |
| elif rain_hours > total_hours * 0.3: | |
| weather_pattern = "Scattered showers and periods of rain expected" | |
| elif rain_hours > 0: | |
| weather_pattern = "A few light showers possible" | |
| else: | |
| weather_pattern = "Generally dry conditions expected" | |
| # Wind narrative | |
| if max_wind > 15: | |
| weather_pattern += f", accompanied by gusty winds up to {max_wind:.0f} m/s" | |
| elif windy_hours > total_hours * 0.4: | |
| weather_pattern += " with breezy conditions at times" | |
| return f"{weather_pattern}. Temperatures will be {temp_desc}." | |
| def generate_daily_detailed_forecasts(forecast_data, events): | |
| """Generate detailed daily forecasts with precipitation chances, timing, and cloud cover""" | |
| current_time = datetime.now() | |
| forecasts = [] | |
| # Group data by days | |
| timestamps = forecast_data['timestamps'] | |
| temperatures = forecast_data['temperature'] | |
| precipitation = forecast_data.get('precipitation', [0] * len(temperatures)) | |
| wind_speeds = forecast_data['wind_speed'] | |
| humidity = forecast_data.get('humidity', [60] * len(temperatures)) | |
| cloud_cover = forecast_data.get('cloud_cover', [50] * len(temperatures)) | |
| # Process 4 days of forecasts - fix timezone comparison | |
| for day_offset in range(4): | |
| # Use timezone-naive datetime for comparison | |
| base_time = current_time.replace(tzinfo=None) if current_time.tzinfo else current_time | |
| target_date = base_time + timedelta(days=day_offset) | |
| # Get day and night periods | |
| day_start = target_date.replace(hour=6, minute=0, second=0, microsecond=0) | |
| day_end = target_date.replace(hour=18, minute=0, second=0, microsecond=0) | |
| night_end = (target_date + timedelta(days=1)).replace(hour=6, minute=0, second=0, microsecond=0) | |
| # Find data for this day - handle timezone-aware timestamps | |
| day_indices = [i for i, ts in enumerate(timestamps) | |
| if day_start <= (ts.replace(tzinfo=None) if ts.tzinfo else ts) < day_end] | |
| night_indices = [i for i, ts in enumerate(timestamps) | |
| if day_end <= (ts.replace(tzinfo=None) if ts.tzinfo else ts) < night_end] | |
| if not day_indices and not night_indices: | |
| continue | |
| # Day period analysis | |
| if day_indices: | |
| day_data = analyze_period_conditions( | |
| day_indices, timestamps, temperatures, precipitation, | |
| wind_speeds, cloud_cover, day_start, day_end | |
| ) | |
| # Generate day forecast | |
| if day_offset == 0: | |
| period_name = "Today" | |
| elif day_offset == 1: | |
| period_name = "Tomorrow" | |
| else: | |
| period_name = target_date.strftime("%A") | |
| day_forecast = generate_enhanced_period_narrative(period_name, day_data, True) | |
| forecasts.append(day_forecast) | |
| # Night period analysis | |
| if night_indices: | |
| night_data = analyze_period_conditions( | |
| night_indices, timestamps, temperatures, precipitation, | |
| wind_speeds, cloud_cover, day_end, night_end | |
| ) | |
| # Generate night forecast | |
| if day_offset == 0: | |
| night_name = "Tonight" | |
| elif day_offset == 1: | |
| night_name = "Tomorrow Night" | |
| else: | |
| night_name = f"{target_date.strftime('%A')} Night" | |
| night_forecast = generate_enhanced_period_narrative(night_name, night_data, False) | |
| forecasts.append(night_forecast) | |
| return '\n\n'.join(forecasts) | |
| def analyze_period_conditions(indices, timestamps, temperatures, precipitation, wind_speeds, cloud_cover, period_start, period_end): | |
| """Analyze weather conditions for a specific time period with 6-hour sub-periods""" | |
| data = { | |
| 'temps': [temperatures[i] for i in indices], | |
| 'precip': [precipitation[i] for i in indices if i < len(precipitation)], | |
| 'winds': [wind_speeds[i] for i in indices], | |
| 'clouds': [cloud_cover[i] for i in indices if i < len(cloud_cover)], | |
| 'timestamps': [timestamps[i] for i in indices] | |
| } | |
| # Calculate statistics | |
| data['high_temp'] = max(data['temps']) if data['temps'] else 20 | |
| data['low_temp'] = min(data['temps']) if data['temps'] else 10 | |
| data['avg_clouds'] = sum(data['clouds']) / len(data['clouds']) if data['clouds'] else 50 | |
| data['max_wind'] = max(data['winds']) if data['winds'] else 5 | |
| # Precipitation analysis with type detection | |
| data['precip_chance'] = calculate_precipitation_chance(data['precip']) | |
| data['rain_timing'] = analyze_6hour_precipitation_timing(data['timestamps'], data['precip'], period_start, period_end) | |
| data['precip_type'] = determine_precipitation_type(data, indices) | |
| # Cloud cover description | |
| data['sky_condition'] = get_sky_condition(data['avg_clouds']) | |
| # Wind conditions | |
| data['wind_desc'] = get_wind_description(data['max_wind']) | |
| return data | |
| def determine_precipitation_type(period_data, indices): | |
| """Determine the dominant precipitation type for a period""" | |
| # This is a simplified version - in practice, we'd need access to | |
| # snow and convective precipitation data for the specific indices | |
| # For now, we'll use temperature as a proxy | |
| temps = period_data['temps'] | |
| avg_temp = sum(temps) / len(temps) if temps else 15 | |
| # Simple temperature-based logic (would be enhanced with real snow/thunderstorm data) | |
| if avg_temp < 2: # Near freezing | |
| return 'snow' | |
| elif avg_temp > 20: # Warm enough for thunderstorms | |
| return 'thunderstorms' | |
| else: | |
| return 'rain' | |
| def calculate_precipitation_chance(precip_data): | |
| """Calculate precipitation chance percentage""" | |
| if not precip_data: | |
| return 0 | |
| rainy_hours = sum(1 for p in precip_data if p > 0.1) | |
| total_hours = len(precip_data) | |
| if rainy_hours == 0: | |
| return 0 | |
| elif rainy_hours >= total_hours * 0.8: | |
| return 90 | |
| elif rainy_hours >= total_hours * 0.6: | |
| return 70 | |
| elif rainy_hours >= total_hours * 0.4: | |
| return 50 | |
| elif rainy_hours >= total_hours * 0.2: | |
| return 30 | |
| else: | |
| return 20 | |
| def analyze_6hour_precipitation_timing(timestamps, precip_data, period_start, period_end): | |
| """Analyze when rain is most likely within the period using 6-hour blocks""" | |
| if not timestamps or not precip_data: | |
| return None | |
| # Define 6-hour periods | |
| period_duration = (period_end - period_start).total_seconds() / 3600 # hours | |
| if period_duration <= 6: | |
| # Single period | |
| avg_precip = sum(precip_data) / len(precip_data) if precip_data else 0 | |
| if avg_precip > 0.1: | |
| start_hour = period_start.hour | |
| if 6 <= start_hour < 12: | |
| return "morning" | |
| elif 12 <= start_hour < 18: | |
| return "afternoon" | |
| elif 18 <= start_hour < 24: | |
| return "evening" | |
| else: | |
| return "overnight" | |
| return None | |
| # Analyze multiple 6-hour blocks | |
| blocks = [] | |
| block_size = max(1, len(timestamps) // int(period_duration / 6)) | |
| for i in range(0, len(timestamps), block_size): | |
| block_precip = precip_data[i:i+block_size] if i+block_size <= len(precip_data) else precip_data[i:] | |
| block_avg = sum(block_precip) / len(block_precip) if block_precip else 0 | |
| block_time = timestamps[i] | |
| blocks.append((block_time, block_avg)) | |
| # Find the block with highest precipitation | |
| if blocks: | |
| max_precip_block = max(blocks, key=lambda x: x[1]) | |
| if max_precip_block[1] > 0.1: | |
| hour = max_precip_block[0].hour | |
| if 6 <= hour < 12: | |
| return "morning" | |
| elif 12 <= hour < 18: | |
| return "afternoon" | |
| elif 18 <= hour < 24: | |
| return "evening" | |
| else: | |
| return "overnight" | |
| return None | |
| def get_sky_condition(cloud_percentage): | |
| """Convert cloud percentage to descriptive terms""" | |
| if cloud_percentage < 10: | |
| return "sunny" | |
| elif cloud_percentage < 25: | |
| return "mostly sunny" | |
| elif cloud_percentage < 50: | |
| return "partly cloudy" | |
| elif cloud_percentage < 75: | |
| return "mostly cloudy" | |
| elif cloud_percentage < 90: | |
| return "cloudy" | |
| else: | |
| return "overcast" | |
| def get_wind_description(wind_speed): | |
| """Convert wind speed to descriptive terms""" | |
| if wind_speed < 3: | |
| return "light winds" | |
| elif wind_speed < 8: | |
| return "light winds" | |
| elif wind_speed < 12: | |
| return "breezy" | |
| elif wind_speed < 18: | |
| return "windy" | |
| else: | |
| return "very windy" | |
| def generate_enhanced_period_narrative(period_name, data, is_day): | |
| """Generate enhanced narrative with precipitation chances, timing, sky conditions, snow, and thunderstorms""" | |
| conditions = [] | |
| # Determine precipitation type from the period data | |
| precip_type = data.get('precip_type', 'rain') # Default to rain | |
| # Sky condition and precipitation with enhanced types | |
| if data['precip_chance'] > 60: | |
| if precip_type == 'snow': | |
| if data['rain_timing']: | |
| conditions.append(f"snow likely, mainly {data['rain_timing']}") | |
| else: | |
| conditions.append("snow likely") | |
| elif precip_type == 'thunderstorms': | |
| if data['rain_timing']: | |
| conditions.append(f"thunderstorms likely, mainly {data['rain_timing']}") | |
| else: | |
| conditions.append("thunderstorms likely") | |
| else: | |
| if data['rain_timing']: | |
| conditions.append(f"rain likely, mainly {data['rain_timing']}") | |
| else: | |
| conditions.append("rain likely") | |
| conditions.append(f"Chance of precipitation {data['precip_chance']}%") | |
| elif data['precip_chance'] > 30: | |
| if precip_type == 'snow': | |
| if data['rain_timing']: | |
| conditions.append(f"chance of snow, mainly {data['rain_timing']}") | |
| else: | |
| conditions.append("chance of snow") | |
| elif precip_type == 'thunderstorms': | |
| if data['rain_timing']: | |
| conditions.append(f"chance of thunderstorms, mainly {data['rain_timing']}") | |
| else: | |
| conditions.append("chance of thunderstorms") | |
| else: | |
| if data['rain_timing']: | |
| conditions.append(f"chance of rain, mainly {data['rain_timing']}") | |
| else: | |
| conditions.append("chance of rain") | |
| conditions.append(f"Chance of precipitation {data['precip_chance']}%") | |
| elif data['precip_chance'] > 0: | |
| if precip_type == 'snow': | |
| conditions.append(f"slight chance of snow. Chance of precipitation {data['precip_chance']}%") | |
| elif precip_type == 'thunderstorms': | |
| conditions.append(f"slight chance of thunderstorms. Chance of precipitation {data['precip_chance']}%") | |
| else: | |
| conditions.append(f"slight chance of rain. Chance of precipitation {data['precip_chance']}%") | |
| # If no significant precipitation, describe sky condition | |
| if data['precip_chance'] <= 30: | |
| if not conditions: # No rain mentioned yet | |
| conditions.insert(0, data['sky_condition']) | |
| else: | |
| conditions.insert(0, f"{data['sky_condition']}, then") | |
| # Wind conditions | |
| if data['max_wind'] > 12: | |
| conditions.append(data['wind_desc']) | |
| # Build the narrative | |
| if is_day: | |
| if conditions: | |
| weather_text = f"**{period_name}:** {' '.join(conditions).capitalize()}" | |
| else: | |
| weather_text = f"**{period_name}:** {data['sky_condition'].capitalize()}" | |
| weather_text += f". High {data['high_temp']:.0f}Β°C" | |
| else: | |
| if conditions: | |
| weather_text = f"**{period_name}:** {' '.join(conditions).capitalize()}" | |
| else: | |
| weather_text = f"**{period_name}:** {data['sky_condition'].capitalize()}" | |
| weather_text += f". Low {data['low_temp']:.0f}Β°C" | |
| weather_text += "." | |
| return weather_text | |
| def generate_period_narrative(period_name, high_temp, low_temp, has_rain, heavy_rain, windy, is_day): | |
| """Generate natural narrative for a specific period""" | |
| conditions = [] | |
| # Weather conditions | |
| if heavy_rain: | |
| conditions.append("heavy rain at times") | |
| elif has_rain: | |
| conditions.append("periods of rain") | |
| if windy: | |
| if conditions: | |
| conditions.append("gusty winds") | |
| else: | |
| conditions.append("breezy conditions") | |
| # Build the narrative | |
| if is_day: | |
| if conditions: | |
| weather_text = f"**{period_name}:** {', '.join(conditions).capitalize()}" | |
| else: | |
| weather_text = f"**{period_name}:** Partly cloudy" | |
| if high_temp: | |
| weather_text += f". High {high_temp:.0f}Β°C" | |
| else: | |
| if conditions: | |
| weather_text = f"**{period_name}:** {', '.join(conditions).capitalize()}" | |
| else: | |
| weather_text = f"**{period_name}:** Mostly clear" | |
| if low_temp: | |
| weather_text += f". Low {low_temp:.0f}Β°C" | |
| weather_text += "." | |
| return weather_text | |
| def generate_timing_narrative(events, precipitation, timestamps): | |
| """Generate narrative timing information rather than bullet points""" | |
| if not events or not any(p > 0.1 for p in precipitation): | |
| return "" | |
| # Find rain periods | |
| rain_periods = [] | |
| in_rain = False | |
| rain_start = None | |
| for i, (ts, precip) in enumerate(zip(timestamps, precipitation)): | |
| if precip > 0.1 and not in_rain: | |
| rain_start = ts | |
| in_rain = True | |
| elif precip <= 0.1 and in_rain: | |
| rain_periods.append((rain_start, timestamps[i-1])) | |
| in_rain = False | |
| if in_rain and rain_start: | |
| rain_periods.append((rain_start, timestamps[-1])) | |
| if not rain_periods: | |
| return "" | |
| # Create narrative | |
| if len(rain_periods) == 1: | |
| start, end = rain_periods[0] | |
| return f"Rain expected from approximately {start.strftime('%I %p')} through {end.strftime('%I %p')}." | |
| elif len(rain_periods) <= 3: | |
| timing_text = "Periods of rain expected " | |
| times = [] | |
| for start, end in rain_periods: | |
| times.append(f"{start.strftime('%I %p')}-{end.strftime('%I %p')}") | |
| timing_text += " and ".join(times[:-1]) + f" and {times[-1]}." if len(times) > 1 else times[0] + "." | |
| return timing_text | |
| else: | |
| return "Multiple rounds of rain expected throughout the forecast period with the heaviest amounts during afternoon and evening hours." | |
| def generate_period_text(period_name, events, min_temp, max_temp): | |
| """ | |
| Generate text for a specific forecast period using NOAA phraseology | |
| """ | |
| if not events: | |
| return f"**{period_name}:** No significant weather expected." | |
| # Determine dominant conditions | |
| weather_conditions = [] | |
| severities = [e['severity'] for e in events] | |
| primary_weathers = [e['primary_weather'] for e in events if e['primary_weather']] | |
| # Temperature phrase | |
| if 'Night' in period_name: | |
| temp_phrase = f"Low around {min_temp:.0f}Β°C" | |
| else: | |
| temp_phrase = f"High near {max_temp:.0f}Β°C" | |
| # Weather phrase generation (NOAA-style logic) | |
| weather_phrase = "" | |
| if primary_weathers: | |
| # Count occurrences of each weather type | |
| weather_counts = {} | |
| for weather in primary_weathers: | |
| weather_counts[weather] = weather_counts.get(weather, 0) + 1 | |
| # Determine dominant weather | |
| dominant_weather = max(weather_counts.items(), key=lambda x: x[1])[0] | |
| # Check for combinations | |
| has_rain = any('rain' in w for w in primary_weathers) | |
| has_wind = any('wind' in w for w in primary_weathers) | |
| has_thunderstorms = any('thunderstorms' in w for w in primary_weathers) | |
| if has_thunderstorms: | |
| weather_phrase = "Thunderstorms possible" | |
| if has_wind: | |
| weather_phrase += " with gusty winds" | |
| elif has_rain and has_wind: | |
| # NOAA logic: combine rain and wind | |
| rain_intensity = "heavy" if any('heavy' in str(e['conditions']) for e in events) else "" | |
| weather_phrase = f"{rain_intensity} rain and windy".strip() | |
| elif has_rain: | |
| # Determine rain intensity | |
| if any('heavy' in str(e['conditions']) for e in events): | |
| weather_phrase = "Heavy rain" | |
| elif any('light' in str(e['conditions']) for e in events): | |
| weather_phrase = "Light rain" | |
| else: | |
| weather_phrase = "Rain" | |
| elif has_wind: | |
| if any('very_windy' in e['wind_descriptor'] for e in events if e['wind_descriptor']): | |
| weather_phrase = "Very windy" | |
| elif any('windy' in e['wind_descriptor'] for e in events if e['wind_descriptor']): | |
| weather_phrase = "Windy" | |
| else: | |
| weather_phrase = "Breezy" | |
| else: | |
| weather_phrase = dominant_weather.replace('_', ' ').title() | |
| # Combine phrases | |
| if weather_phrase: | |
| period_text = f"**{period_name}:** {weather_phrase}. {temp_phrase}." | |
| else: | |
| # Fair weather | |
| if max_temp > min_temp + 5: # Significant temperature change | |
| period_text = f"**{period_name}:** Partly cloudy. {temp_phrase}." | |
| else: | |
| period_text = f"**{period_name}:** Fair. {temp_phrase}." | |
| return period_text | |
| def generate_timing_alerts(events): | |
| """ | |
| Generate specific timing alerts (e.g., "Rain beginning around 4 PM") | |
| """ | |
| alerts = [] | |
| # Track weather transitions | |
| prev_weather = None | |
| for i, event in enumerate(events): | |
| current_weather = event['primary_weather'] | |
| time_str = event['time'].strftime('%I %p').lstrip('0') | |
| # Detect weather onset | |
| if current_weather and current_weather != prev_weather: | |
| if current_weather == 'rain': | |
| alerts.append(f"β’ Rain beginning around {time_str}") | |
| elif current_weather == 'thunderstorms': | |
| alerts.append(f"β’ Thunderstorms possible after {time_str}") | |
| elif current_weather == 'snow': | |
| alerts.append(f"β’ Snow beginning around {time_str}") | |
| elif current_weather == 'wind' and prev_weather != 'wind': | |
| alerts.append(f"β’ Winds increasing around {time_str}") | |
| # Detect weather ending | |
| if prev_weather and current_weather != prev_weather: | |
| if prev_weather == 'rain': | |
| alerts.append(f"β’ Rain ending around {time_str}") | |
| elif prev_weather == 'thunderstorms': | |
| alerts.append(f"β’ Thunderstorms ending around {time_str}") | |
| prev_weather = current_weather | |
| return '\n'.join(alerts) | |
| def generate_advisories(events): | |
| """ | |
| Generate weather advisories based on conditions | |
| """ | |
| advisories = [] | |
| # Check for significant events based on wind descriptors and conditions | |
| if any(e.get('wind_descriptor') == 'very_windy' for e in events): | |
| advisories.append("β’ **Wind Advisory:** Sustained winds 15-25 m/s with gusts up to 30 m/s possible") | |
| if any(e.get('precipitation_type') == 'heavy_rain' for e in events): | |
| advisories.append("β’ **Heavy Rain:** Rainfall rates may exceed 5mm/h, leading to localized flooding") | |
| if any(e.get('precipitation_type') == 'severe_thunderstorms' for e in events): | |
| advisories.append("β’ **Severe Thunderstorm Warning:** Dangerous thunderstorms with potential for damaging winds, large hail, and heavy rain") | |
| elif any(e.get('precipitation_type') == 'thunderstorms' for e in events): | |
| advisories.append("β’ **Thunderstorm Watch:** Conditions favorable for thunderstorm development") | |
| if any(e.get('precipitation_type') == 'heavy_snow' for e in events): | |
| advisories.append("β’ **Heavy Snow Warning:** Significant snow accumulation expected, travel may become hazardous") | |
| elif any(e.get('precipitation_type') in ['snow', 'light_snow'] for e in events): | |
| advisories.append("β’ **Winter Weather:** Snow accumulation possible, use caution when traveling") | |
| if any(e.get('temperature_descriptor') == 'freezing' for e in events): | |
| advisories.append("β’ **Freeze Warning:** Temperatures at or below freezing expected") | |
| if any(e.get('temperature_descriptor') == 'hot' for e in events): | |
| advisories.append("β’ **Heat Advisory:** High temperatures may cause heat stress") | |
| # Check for significant severity events | |
| severe_events = [e for e in events if e.get('severity') == 'significant'] | |
| if severe_events and not advisories: | |
| advisories.append("β’ **Weather Advisory:** Significant weather conditions expected") | |
| return '\n'.join(advisories) | |
| def create_forecast_plot(forecast_data): | |
| """Create comprehensive forecast visualization plots""" | |
| if isinstance(forecast_data, str): | |
| return forecast_data | |
| # Create a larger figure with more subplots for all variables | |
| fig = plt.figure(figsize=(16, 12)) | |
| timestamps = forecast_data['timestamps'] | |
| # Create a 3x3 grid of subplots | |
| gs = fig.add_gridspec(3, 3, hspace=0.4, wspace=0.3) | |
| # Temperature plot with min/max if available | |
| ax1 = fig.add_subplot(gs[0, 0]) | |
| ax1.plot(timestamps, forecast_data['temperature'], 'r-', linewidth=2, label='Temperature') | |
| if 'temp_max' in forecast_data: | |
| ax1.plot(timestamps, forecast_data['temp_max'], 'r--', linewidth=1, alpha=0.7, label='Max') | |
| if 'temp_min' in forecast_data: | |
| ax1.plot(timestamps, forecast_data['temp_min'], 'b--', linewidth=1, alpha=0.7, label='Min') | |
| if 'dewpoint' in forecast_data: | |
| ax1.plot(timestamps, forecast_data['dewpoint'], 'c-', linewidth=1, alpha=0.8, label='Dewpoint') | |
| ax1.set_title('Temperature (Β°C)') | |
| ax1.set_ylabel('Β°C') | |
| ax1.grid(True, alpha=0.3) | |
| ax1.legend(fontsize=8) | |
| ax1.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Humidity and moisture | |
| ax2 = fig.add_subplot(gs[0, 1]) | |
| ax2.plot(timestamps, forecast_data['humidity'], 'b-', linewidth=2, label='Rel. Humidity') | |
| if 'specific_humidity' in forecast_data: | |
| ax2_twin = ax2.twinx() | |
| ax2_twin.plot(timestamps, forecast_data['specific_humidity'], 'g-', linewidth=1, alpha=0.7, label='Spec. Humidity') | |
| ax2_twin.set_ylabel('g/kg', color='g') | |
| ax2_twin.tick_params(axis='y', labelcolor='g') | |
| ax2.set_title('Humidity (%)') | |
| ax2.set_ylabel('%') | |
| ax2.grid(True, alpha=0.3) | |
| ax2.legend(fontsize=8) | |
| ax2.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Wind speed, direction, and gusts | |
| ax3 = fig.add_subplot(gs[0, 2]) | |
| ax3.plot(timestamps, forecast_data['wind_speed'], 'g-', linewidth=2, label='Wind Speed') | |
| if 'wind_gust' in forecast_data: | |
| ax3.plot(timestamps, forecast_data['wind_gust'], 'orange', linewidth=1, alpha=0.7, label='Gusts') | |
| if 'wind_direction' in forecast_data: | |
| ax3_twin = ax3.twinx() | |
| ax3_twin.scatter(timestamps, forecast_data['wind_direction'], c='purple', s=10, alpha=0.6, label='Direction') | |
| ax3_twin.set_ylabel('Direction (Β°)', color='purple') | |
| ax3_twin.set_ylim(0, 360) | |
| ax3_twin.tick_params(axis='y', labelcolor='purple') | |
| ax3.set_title('Wind (m/s)') | |
| ax3.set_ylabel('m/s') | |
| ax3.grid(True, alpha=0.3) | |
| ax3.legend(fontsize=8) | |
| ax3.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Pressure | |
| ax4 = fig.add_subplot(gs[1, 0]) | |
| if 'pressure' in forecast_data: | |
| ax4.plot(timestamps, forecast_data['pressure'], 'purple', linewidth=2, label='Sea Level') | |
| if 'surface_pressure' in forecast_data: | |
| ax4.plot(timestamps, forecast_data['surface_pressure'], 'indigo', linewidth=1, alpha=0.7, label='Surface') | |
| ax4.set_title('Pressure (hPa)') | |
| ax4.set_ylabel('hPa') | |
| ax4.grid(True, alpha=0.3) | |
| ax4.legend(fontsize=8) | |
| ax4.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Precipitation | |
| ax5 = fig.add_subplot(gs[1, 1]) | |
| if 'precipitation' in forecast_data: | |
| ax5.bar(timestamps, forecast_data['precipitation'], alpha=0.7, color='blue', label='Total', width=0.1) | |
| if 'rain' in forecast_data: | |
| ax5.bar(timestamps, forecast_data['rain'], alpha=0.5, color='lightblue', label='Rain', width=0.08) | |
| if 'snow' in forecast_data: | |
| ax5.bar(timestamps, forecast_data['snow'], alpha=0.5, color='white', edgecolor='gray', label='Snow', width=0.06) | |
| ax5.set_title('Precipitation (mm/h)') | |
| ax5.set_ylabel('mm/h') | |
| ax5.grid(True, alpha=0.3) | |
| ax5.legend(fontsize=8) | |
| ax5.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Cloud cover | |
| ax6 = fig.add_subplot(gs[1, 2]) | |
| if 'cloud_cover' in forecast_data: | |
| ax6.fill_between(timestamps, forecast_data['cloud_cover'], alpha=0.3, color='gray', label='Total') | |
| if 'low_cloud' in forecast_data: | |
| ax6.plot(timestamps, forecast_data['low_cloud'], 'brown', linewidth=1, label='Low') | |
| if 'mid_cloud' in forecast_data: | |
| ax6.plot(timestamps, forecast_data['mid_cloud'], 'orange', linewidth=1, label='Mid') | |
| if 'high_cloud' in forecast_data: | |
| ax6.plot(timestamps, forecast_data['high_cloud'], 'lightblue', linewidth=1, label='High') | |
| ax6.set_title('Cloud Cover (%)') | |
| ax6.set_ylabel('%') | |
| ax6.set_ylim(0, 100) | |
| ax6.grid(True, alpha=0.3) | |
| ax6.legend(fontsize=8) | |
| ax6.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Solar radiation | |
| ax7 = fig.add_subplot(gs[2, 0]) | |
| if 'solar_radiation' in forecast_data: | |
| ax7.fill_between(timestamps, forecast_data['solar_radiation'], alpha=0.3, color='yellow', label='Solar') | |
| if 'direct_radiation' in forecast_data: | |
| ax7.plot(timestamps, forecast_data['direct_radiation'], 'orange', linewidth=1, label='Direct') | |
| if 'diffuse_radiation' in forecast_data: | |
| ax7.plot(timestamps, forecast_data['diffuse_radiation'], 'gold', linewidth=1, label='Diffuse') | |
| ax7.set_title('Solar Radiation (W/mΒ²)') | |
| ax7.set_ylabel('W/mΒ²') | |
| ax7.grid(True, alpha=0.3) | |
| ax7.legend(fontsize=8) | |
| ax7.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Additional atmospheric parameters | |
| ax8 = fig.add_subplot(gs[2, 1]) | |
| if 'visibility' in forecast_data: | |
| ax8.plot(timestamps, forecast_data['visibility'], 'teal', linewidth=2, label='Visibility (km)') | |
| if 'boundary_layer_height' in forecast_data: | |
| ax8_twin = ax8.twinx() | |
| ax8_twin.plot(timestamps, forecast_data['boundary_layer_height'], 'brown', linewidth=1, alpha=0.7, label='BL Height (m)') | |
| ax8_twin.set_ylabel('BL Height (m)', color='brown') | |
| ax8_twin.tick_params(axis='y', labelcolor='brown') | |
| ax8.set_title('Atmospheric Conditions') | |
| ax8.set_ylabel('Visibility (km)') | |
| ax8.grid(True, alpha=0.3) | |
| ax8.legend(fontsize=8) | |
| ax8.tick_params(axis='x', rotation=45, labelsize=8) | |
| # Summary info panel | |
| ax9 = fig.add_subplot(gs[2, 2]) | |
| ax9.axis('off') | |
| # Only real DWD ICON data now | |
| data_source = "Real DWD ICON Data" | |
| forecast_info = forecast_data.get('forecast_date', 'Unknown') | |
| # Grid point info | |
| grid_info = "" | |
| if 'nearest_grid_lat' in forecast_data and 'nearest_grid_lon' in forecast_data: | |
| grid_info = f"Grid: {forecast_data['nearest_grid_lat']:.2f}Β°N, {forecast_data['nearest_grid_lon']:.2f}Β°E\n" | |
| # Count available variables | |
| available_vars = [] | |
| var_categories = { | |
| 'Temperature': ['temperature', 'temp_min', 'temp_max', 'dewpoint'], | |
| 'Wind': ['wind_speed', 'wind_direction', 'wind_gust'], | |
| 'Pressure': ['pressure', 'surface_pressure'], | |
| 'Precipitation': ['precipitation', 'rain', 'snow'], | |
| 'Clouds': ['cloud_cover', 'low_cloud', 'mid_cloud', 'high_cloud'], | |
| 'Radiation': ['solar_radiation', 'direct_radiation', 'diffuse_radiation'], | |
| 'Atmosphere': ['visibility', 'boundary_layer_height', 'cape', 'humidity'] | |
| } | |
| for category, vars_list in var_categories.items(): | |
| count = sum(1 for var in vars_list if var in forecast_data) | |
| if count > 0: | |
| available_vars.append(f"{category}: {count}") | |
| summary_text = f""" | |
| Location: {forecast_data['lat']:.2f}Β°N, {forecast_data['lon']:.2f}Β°E | |
| {grid_info} | |
| Data: {data_source} | |
| Forecast: {forecast_info} | |
| Available Variables: | |
| {chr(10).join(available_vars)} | |
| Current Conditions: | |
| Temp: {forecast_data['temperature'][0]:.1f}Β°C | |
| Humidity: {forecast_data['humidity'][0]:.1f}% | |
| Wind: {forecast_data['wind_speed'][0]:.1f} m/s | |
| """ | |
| # Add pressure if available | |
| if 'pressure' in forecast_data: | |
| summary_text += f"Pressure: {forecast_data['pressure'][0]:.1f} hPa\n" | |
| color = 'lightgreen' | |
| ax9.text(0.05, 0.95, summary_text, transform=ax9.transAxes, fontsize=8, | |
| verticalalignment='top', bbox=dict(boxstyle='round', facecolor=color, alpha=0.7)) | |
| plt.tight_layout() | |
| return fig | |
| def process_map_click(lat, lon): | |
| """Process map click and return forecast with NOAA-style text""" | |
| if lat is None or lon is None: | |
| return "Please click on the map to select a location", None, "" | |
| try: | |
| # Get forecast data - will raise exception if it fails | |
| forecast_data = get_forecast_data(lat, lon) | |
| # Create plot | |
| plot = create_forecast_plot(forecast_data) | |
| # Create summary text | |
| data_type = "Real DWD ICON Data" | |
| forecast_info = forecast_data.get('forecast_date', '') | |
| summary = f"Forecast for location: {lat:.3f}Β°N, {lon:.3f}Β°E\n\nUsing: {data_type}\nForecast: {forecast_info}" | |
| # Generate NOAA-style text forecast | |
| location_name = forecast_data.get('location_name', f"{lat:.2f}Β°N, {lon:.2f}Β°E") | |
| text_forecast = generate_forecast_text(forecast_data, location_name) | |
| return summary, plot, text_forecast | |
| except Exception as e: | |
| error_msg = f"β Failed to retrieve DWD ICON data: {str(e)}" | |
| return error_msg, None, "Unable to generate forecast - DWD ICON data unavailable" | |
| def create_attribution_text(): | |
| """Create proper attribution for the dataset""" | |
| attribution = """ | |
| ## Data Attribution | |
| This application accesses **DWD ICON Global** weather forecast data directly from the German Weather Service. | |
| - **Model**: DWD ICON Global Weather Model | |
| - **Source**: German Weather Service (Deutscher Wetterdienst - DWD) | |
| - **Data Server**: DWD Open Data Server (https://opendata.dwd.de) | |
| - **License**: Open Government Data (free for commercial use) | |
| - **Format**: GRIB2 meteorological data | |
| **Commercial Use**: DWD's Open Data Server provides free access to weather data suitable for commercial applications. | |
| **Production Implementation**: This application now includes real DWD ICON GRIB2 data access: | |
| - Downloads GRIB2 files directly from https://opendata.dwd.de/weather/nwp/icon/ | |
| - Parses meteorological data using cfgrib and xarray libraries | |
| - Handles icosahedral grid interpolation to lat/lon coordinates | |
| - Processes 9 core weather parameters from real DWD ICON model runs | |
| - Automatic fallback to simulated data if GRIB2 libraries unavailable | |
| **Citation**: Please cite the German Weather Service (DWD) ICON model when using this data. | |
| """ | |
| return attribution | |
| # Create the Gradio interface | |
| with gr.Blocks(title="DWD ICON Global Weather Forecast") as app: | |
| gr.Markdown("# π¦οΈ DWD ICON Global Weather Forecast") | |
| gr.Markdown(""" | |
| **Comprehensive Weather Forecasting Dashboard** - Click on the map to select any location and view detailed 4-day forecasts with: | |
| π **9 Weather Panels**: Temperature, Humidity/Moisture, Wind, Pressure, Precipitation, Cloud Cover, Solar Radiation, Atmospheric Conditions, and Data Summary | |
| π’ **30+ Weather Variables**: Temperature (min/max/dewpoint), Wind (speed/direction/gusts), Pressure (sea level/surface), | |
| Precipitation (rain/snow/convective), Cloud layers (low/mid/high/total), Solar radiation (direct/diffuse/longwave), | |
| Visibility, Boundary layer height, Atmospheric stability (CAPE/CIN), and more! | |
| π **NOAA-Style Text Forecasts**: Advanced text generation with timing predictions ("rain beginning around 4 PM"), weather advisories, | |
| and zone forecast product formatting similar to National Weather Service bulletins | |
| π― **DWD ICON Model Data** directly from the German Weather Service Open Data Server (Commercial Use Approved) | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Map component | |
| map_html = gr.HTML(create_map()._repr_html_(), label="Interactive Map") | |
| gr.Markdown("π Click anywhere on the map to select a location for forecast") | |
| with gr.Column(scale=2): | |
| # Forecast output | |
| forecast_text = gr.Textbox( | |
| label="Forecast Information", | |
| value="Click on the map to select a location", | |
| lines=3 | |
| ) | |
| forecast_plot = gr.Plot(label="Weather Forecast Charts") | |
| # NOAA-style text forecast section | |
| with gr.Row(): | |
| noaa_forecast_text = gr.Textbox( | |
| label="π NOAA-Style Detailed Forecast", | |
| value="Select a location to view detailed text forecast with timing predictions", | |
| lines=12, | |
| max_lines=20, | |
| show_copy_button=True | |
| ) | |
| # Input fields for manual coordinate entry | |
| with gr.Row(): | |
| lat_input = gr.Number( | |
| label="Latitude", | |
| value=52.5, | |
| minimum=-90, | |
| maximum=90, | |
| step=0.001, | |
| precision=3 | |
| ) | |
| lon_input = gr.Number( | |
| label="Longitude", | |
| value=13.4, | |
| minimum=-180, | |
| maximum=180, | |
| step=0.001, | |
| precision=3 | |
| ) | |
| submit_btn = gr.Button("Get Forecast", variant="primary") | |
| # Attribution section | |
| with gr.Accordion("π Data Attribution & Information", open=False): | |
| gr.Markdown(create_attribution_text()) | |
| # Event handlers | |
| submit_btn.click( | |
| fn=process_map_click, | |
| inputs=[lat_input, lon_input], | |
| outputs=[forecast_text, forecast_plot, noaa_forecast_text] | |
| ) | |
| # Example locations | |
| with gr.Row(): | |
| gr.Examples( | |
| examples=[ | |
| [52.5200, 13.4050], # Berlin | |
| [48.8566, 2.3522], # Paris | |
| [51.5074, -0.1278], # London | |
| [55.7558, 37.6176], # Moscow | |
| [41.9028, 12.4964], # Rome | |
| ], | |
| inputs=[lat_input, lon_input], | |
| outputs=[forecast_text, forecast_plot, noaa_forecast_text], | |
| fn=process_map_click, | |
| cache_examples=False, | |
| label="Try these example locations:" | |
| ) | |
| def test_data_access(): | |
| """Test function to verify data access works""" | |
| try: | |
| print("Testing data access...") | |
| file_path, forecast_date, hour = get_latest_available_file() | |
| print(f"Successfully accessed file: {file_path}") | |
| # Try to load the dataset | |
| import xarray as xr | |
| ds = xr.open_zarr(file_path) | |
| print(f"Dataset dimensions: {dict(ds.dims)}") | |
| print(f"Available variables: {list(ds.data_vars.keys())}") | |
| print("Data access test successful!") | |
| except Exception as e: | |
| print(f"Data access test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| # Uncomment the line below to test data access before launching the app | |
| # test_data_access() | |
| app.launch(share=True, server_name="0.0.0.0") |