# DWD ICON Weather Data - Production Implementation Guide ## Overview This guide covers implementing a production weather forecasting system using real-time DWD ICON global model data from the German Weather Service. ## Table of Contents - [Data Source Information](#data-source-information) - [Update Schedule](#update-schedule) - [Architecture Overview](#architecture-overview) - [Production Implementation](#production-implementation) - [API Endpoints](#api-endpoints) - [Monitoring & Reliability](#monitoring--reliability) - [Performance Optimization](#performance-optimization) - [Legal & Attribution](#legal--attribution) ## Data Source Information ### Source Details - **Provider**: German Weather Service (Deutscher Wetterdienst - DWD) - **Model**: ICON Global Weather Model - **Data Server**: https://opendata.dwd.de/weather/nwp/icon/grib/ - **License**: Open Government Data (commercial use permitted) - **Format**: GRIB2 compressed with bzip2 - **Grid**: Icosahedral unstructured grid (global coverage) - **Resolution**: ~13km globally ### Available Parameters **Essential Parameters (recommended for production):** - `t_2m`: Temperature at 2m (Kelvin → Celsius) - `u_10m`: U-component wind at 10m (m/s) - `v_10m`: V-component wind at 10m (m/s) - `tot_prec`: Total precipitation (kg/m²/s → mm/h) - `snow_gsp`: Grid-scale snow (kg/m²/s → mm/h) - `clct`: Total cloud cover (fraction → percentage) - `cape_con`: Convective Available Potential Energy (J/kg) - `vmax_10m`: Wind gusts at 10m (m/s) **Additional Parameters Available:** - `relhum_2m`: Relative humidity at 2m - `pmsl`: Pressure at mean sea level - `rain_con`: Convective rain - `rain_gsp`: Grid-scale rain - `snow_con`: Convective snow - `asob_s`: Net shortwave radiation - Pressure level data (850, 700, 500, 300 hPa) ## Update Schedule ### Model Run Times (UTC) - **00:00 UTC** - Available ~03:30 UTC - **06:00 UTC** - Available ~09:30 UTC - **12:00 UTC** - Available ~15:30 UTC - **18:00 UTC** - Available ~21:30 UTC ### Data Availability Delay - **Typical delay**: 3-4 hours after model run time - **Coordinate files**: Only available from 00Z run (time-invariant) - **Forecast range**: 0-180 hours (7.5 days) ### Recommended Update Strategy ```cron # Download every 6 hours at 30 minutes past availability 30 4,10,16,22 * * * /path/to/download_dwd_data.py ``` ## Architecture Overview ### Optimal Production Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ PRODUCTION SYSTEM │ ├─────────────────────────────────────────────────────────────────┤ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────── │ │ │ Background │ │ Data Storage │ │ API Server │ │ │ Downloader │───▶│ & Processing │───▶│ (Instant │ │ │ (Every 6hrs) │ │ │ │ Response) │ │ └─────────────────┘ └─────────────────┘ └─────────────── │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ • Download GRIBs • Parse & Store • Extract │ │ • Validate data • Index by location • Generate │ │ • Handle failures • Cache coordinates • Serve JSON │ └─────────────────────────────────────────────────────────────────┘ ``` ### File URL Structure ``` # Coordinate files (time-invariant, only from 00Z run) https://opendata.dwd.de/weather/nwp/icon/grib/00/clat/icon_global_icosahedral_time-invariant_YYYYMMDD00_CLAT.grib2.bz2 https://opendata.dwd.de/weather/nwp/icon/grib/00/clon/icon_global_icosahedral_time-invariant_YYYYMMDD00_CLON.grib2.bz2 # Weather data files https://opendata.dwd.de/weather/nwp/icon/grib/{RUN_HOUR}/{PARAMETER}/icon_global_icosahedral_single-level_{YYYYMMDD}{RUN_HOUR}_{FORECAST_HOUR:03d}_{PARAMETER}.grib2.bz2 ``` ### Example URLs ``` # Temperature at 2m, 12Z run, +006 forecast hour https://opendata.dwd.de/weather/nwp/icon/grib/12/t_2m/icon_global_icosahedral_single-level_2025092412_006_T_2M.grib2.bz2 # Wind gusts, 00Z run, +024 forecast hour https://opendata.dwd.de/weather/nwp/icon/grib/00/vmax_10m/icon_global_icosahedral_single-level_2025092400_024_VMAX_10M.grib2.bz2 ``` ## Production Implementation ### 1. Background Data Downloader ```python #!/usr/bin/env python3 """ DWD ICON Data Downloader - Production Service Downloads global weather data every 6 hours """ import requests import tempfile import logging from datetime import datetime, timedelta, timezone from pathlib import Path import os import bz2 # Configuration DATA_DIR = Path("/var/lib/weather-data") LOG_FILE = "/var/log/dwd-downloader.log" MAX_RETRIES = 3 TIMEOUT = 300 # 5 minutes per file # Essential parameters for production PARAMETERS = { 't_2m': 'T_2M', 'u_10m': 'U_10M', 'v_10m': 'V_10M', 'tot_prec': 'TOT_PREC', 'snow_gsp': 'SNOW_GSP', 'clct': 'CLCT', 'cape_con': 'CAPE_CON', 'vmax_10m': 'VMAX_10M' } # Optimized forecast hours: every 3hrs for 48hrs, then 24hr intervals FORECAST_HOURS = [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 72, 96] def get_latest_dwd_run(): """Get the latest available DWD ICON model run""" now = datetime.now(timezone.utc) available_time = now - timedelta(hours=4) # 4-hour delay run_hours = [0, 6, 12, 18] current_hour = available_time.hour latest_run = max([h for h in run_hours if h <= current_hour], default=18) if latest_run > current_hour: available_time = available_time - timedelta(days=1) latest_run = 18 return available_time.replace(hour=latest_run, minute=0, second=0, microsecond=0) def download_coordinate_files(run_date, data_dir): """Download coordinate files (only from 00Z run)""" base_url = "https://opendata.dwd.de/weather/nwp/icon/grib" date_str = run_date.strftime("%Y%m%d") coord_dir = data_dir / "coordinates" / date_str coord_dir.mkdir(parents=True, exist_ok=True) files = { 'clat': f"icon_global_icosahedral_time-invariant_{date_str}00_CLAT.grib2.bz2", 'clon': f"icon_global_icosahedral_time-invariant_{date_str}00_CLON.grib2.bz2" } for coord_type, filename in files.items(): url = f"{base_url}/00/{coord_type}/{filename}" output_path = coord_dir / filename if output_path.exists(): logging.info(f"Coordinate file exists: {output_path}") continue logging.info(f"Downloading coordinate file: {url}") download_file(url, output_path) return coord_dir def download_weather_data(run_date, data_dir): """Download weather parameter files""" base_url = "https://opendata.dwd.de/weather/nwp/icon/grib" date_str = run_date.strftime("%Y%m%d") run_hour = f"{run_date.hour:02d}" weather_dir = data_dir / "weather" / f"{date_str}_{run_hour}" weather_dir.mkdir(parents=True, exist_ok=True) total_files = len(PARAMETERS) * len(FORECAST_HOURS) downloaded = 0 for param_key, param_dwd in PARAMETERS.items(): param_dir = weather_dir / param_key param_dir.mkdir(exist_ok=True) for forecast_hour in FORECAST_HOURS: filename = f"icon_global_icosahedral_single-level_{date_str}{run_hour}_{forecast_hour:03d}_{param_dwd}.grib2.bz2" url = f"{base_url}/{run_hour}/{param_key}/{filename}" output_path = param_dir / filename if output_path.exists(): logging.info(f"File exists: {output_path}") downloaded += 1 continue logging.info(f"Downloading [{downloaded+1}/{total_files}]: {param_key} +{forecast_hour:03d}h") if download_file(url, output_path): downloaded += 1 else: logging.error(f"Failed to download: {url}") logging.info(f"Downloaded {downloaded}/{total_files} files") return weather_dir def download_file(url, output_path): """Download a single file with retries""" for attempt in range(MAX_RETRIES): try: response = requests.get(url, timeout=TIMEOUT, stream=True) response.raise_for_status() # Stream download to handle large files with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) file_size = output_path.stat().st_size logging.info(f"Downloaded: {output_path.name} ({file_size / 1024 / 1024:.1f} MB)") return True except Exception as e: logging.warning(f"Download attempt {attempt + 1} failed: {e}") if output_path.exists(): output_path.unlink() if attempt == MAX_RETRIES - 1: logging.error(f"Failed to download after {MAX_RETRIES} attempts: {url}") return False return False def cleanup_old_data(data_dir, keep_days=3): """Remove data older than keep_days""" cutoff_date = datetime.now() - timedelta(days=keep_days) for data_type in ['coordinates', 'weather']: type_dir = data_dir / data_type if not type_dir.exists(): continue for item in type_dir.iterdir(): if item.is_dir(): try: # Parse date from directory name if data_type == 'coordinates': item_date = datetime.strptime(item.name, '%Y%m%d') else: # weather item_date = datetime.strptime(item.name[:8], '%Y%m%d') if item_date < cutoff_date: logging.info(f"Removing old data: {item}") import shutil shutil.rmtree(item) except ValueError: continue # Skip items that don't match date pattern def main(): """Main download process""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) try: DATA_DIR.mkdir(parents=True, exist_ok=True) run_date = get_latest_dwd_run() logging.info(f"Downloading DWD ICON data for run: {run_date.strftime('%Y-%m-%d %H:%M UTC')}") # Download coordinate files coord_dir = download_coordinate_files(run_date, DATA_DIR) # Download weather data weather_dir = download_weather_data(run_date, DATA_DIR) # Cleanup old data cleanup_old_data(DATA_DIR) logging.info("Download process completed successfully") except Exception as e: logging.error(f"Download process failed: {e}") raise if __name__ == "__main__": main() ``` ### 2. Data Processing Service ```python #!/usr/bin/env python3 """ DWD ICON Data Processor - Production Service Processes GRIB files into queryable format """ import xarray as xr import numpy as np from pathlib import Path import sqlite3 import json import logging from scipy.spatial import cKDTree import pickle def process_coordinates(coord_dir): """Process coordinate files and build spatial index""" clat_file = next(coord_dir.glob("*_CLAT.grib2.bz2")) clon_file = next(coord_dir.glob("*_CLON.grib2.bz2")) # Load coordinate data clat_ds = xr.open_dataset(clat_file, engine='cfgrib') clon_ds = xr.open_dataset(clon_file, engine='cfgrib') # Extract coordinates (handle different variable names) if 'clat' in clat_ds: lats = clat_ds.clat.values else: lats = clat_ds[list(clat_ds.data_vars.keys())[0]].values if 'clon' in clon_ds: lons = clon_ds.clon.values else: lons = clon_ds[list(clon_ds.data_vars.keys())[0]].values # Build spatial index for fast lookups coords = np.column_stack([lats.ravel(), lons.ravel()]) tree = cKDTree(np.radians(coords)) return { 'lats': lats, 'lons': lons, 'tree': tree, 'coords': coords } def find_nearest_point(lat, lon, spatial_index): """Find nearest grid point using spatial index""" target = np.radians([lat, lon]) distance, index = spatial_index['tree'].query(target) grid_shape = spatial_index['lats'].shape return np.unravel_index(index, grid_shape) def extract_forecast_data(weather_dir, spatial_index, lat, lon): """Extract forecast data for specific location""" nearest_idx = find_nearest_point(lat, lon, spatial_index) forecast_data = { 'location': {'lat': lat, 'lon': lon}, 'grid_point': { 'lat': float(spatial_index['lats'][nearest_idx]), 'lon': float(spatial_index['lons'][nearest_idx]) }, 'forecast': [] } # Process each parameter for param_key in PARAMETERS.keys(): param_dir = weather_dir / param_key if not param_dir.exists(): continue param_data = [] for forecast_hour in FORECAST_HOURS: grib_files = list(param_dir.glob(f"*_{forecast_hour:03d}_*.grib2.bz2")) if not grib_files: param_data.append(None) continue try: ds = xr.open_dataset(grib_files[0], engine='cfgrib') var_name = list(ds.data_vars.keys())[0] value = ds[var_name].values[nearest_idx] param_data.append(float(value)) except Exception as e: logging.warning(f"Error processing {param_key} +{forecast_hour:03d}h: {e}") param_data.append(None) forecast_data[param_key] = param_data return forecast_data ``` ### 3. Fast API Server ```python #!/usr/bin/env python3 """ DWD Weather API - Production Server Serves instant forecasts from processed data """ from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel import uvicorn from pathlib import Path import pickle import json from datetime import datetime, timedelta import logging app = FastAPI( title="DWD ICON Weather API", description="Real-time weather forecasts from German Weather Service", version="1.0.0" ) # Global variables for cached data spatial_index = None latest_run_date = None data_cache = {} class ForecastRequest(BaseModel): latitude: float longitude: float class ForecastResponse(BaseModel): location: dict grid_point: dict forecast_run: str forecast_data: dict @app.on_event("startup") async def startup_event(): """Load latest data on startup""" global spatial_index, latest_run_date try: # Load spatial index index_file = Path("/var/lib/weather-data/spatial_index.pkl") if index_file.exists(): with open(index_file, 'rb') as f: spatial_index = pickle.load(f) logging.info("Loaded spatial index") # Determine latest run weather_dir = Path("/var/lib/weather-data/weather") if weather_dir.exists(): run_dirs = sorted([d for d in weather_dir.iterdir() if d.is_dir()]) if run_dirs: latest_run_date = run_dirs[-1].name logging.info(f"Latest data run: {latest_run_date}") except Exception as e: logging.error(f"Startup failed: {e}") @app.get("/") async def root(): return {"message": "DWD ICON Weather API", "status": "operational"} @app.get("/health") async def health_check(): """Health check endpoint""" if spatial_index is None: raise HTTPException(status_code=503, detail="Spatial index not loaded") if latest_run_date is None: raise HTTPException(status_code=503, detail="No weather data available") return { "status": "healthy", "latest_run": latest_run_date, "data_points": len(spatial_index['coords']) if spatial_index else 0 } @app.post("/forecast", response_model=ForecastResponse) async def get_forecast(request: ForecastRequest): """Get weather forecast for specific location""" if spatial_index is None: raise HTTPException(status_code=503, detail="Service not ready") try: # Extract forecast data weather_dir = Path(f"/var/lib/weather-data/weather/{latest_run_date}") forecast_data = extract_forecast_data( weather_dir, spatial_index, request.latitude, request.longitude ) return ForecastResponse( location=forecast_data['location'], grid_point=forecast_data['grid_point'], forecast_run=latest_run_date, forecast_data={k: v for k, v in forecast_data.items() if k not in ['location', 'grid_point']} ) except Exception as e: logging.error(f"Forecast generation failed: {e}") raise HTTPException(status_code=500, detail="Forecast generation failed") @app.get("/locations/nearest") async def get_nearest_grid_point(lat: float, lon: float): """Get nearest grid point information""" if spatial_index is None: raise HTTPException(status_code=503, detail="Service not ready") try: nearest_idx = find_nearest_point(lat, lon, spatial_index) return { "requested": {"lat": lat, "lon": lon}, "nearest_grid": { "lat": float(spatial_index['lats'][nearest_idx]), "lon": float(spatial_index['lons'][nearest_idx]), "index": nearest_idx } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) ``` ## API Endpoints ### Base URL ``` https://your-domain.com/api/weather/ ``` ### Endpoints #### GET /health Health check and service status ```json { "status": "healthy", "latest_run": "20250924_12", "data_points": 2949120 } ``` #### POST /forecast Get weather forecast for location ```json // Request { "latitude": 52.5200, "longitude": 13.4050 } // Response { "location": {"lat": 52.52, "lon": 13.405}, "grid_point": {"lat": 52.520, "lon": 13.336}, "forecast_run": "20250924_12", "forecast_data": { "t_2m": [287.15, 286.8, 285.5, ...], "u_10m": [2.1, 2.3, 1.8, ...], "v_10m": [-1.2, -0.8, -1.5, ...], "tot_prec": [0.0, 0.1, 0.3, ...], "snow_gsp": [0.0, 0.0, 0.0, ...], "clct": [0.65, 0.72, 0.58, ...], "cape_con": [0, 150, 320, ...], "vmax_10m": [3.2, 3.8, 4.1, ...] } } ``` #### GET /locations/nearest?lat=52.52&lon=13.405 Get nearest grid point information ```json { "requested": {"lat": 52.52, "lon": 13.405}, "nearest_grid": { "lat": 52.520, "lon": 13.336, "index": [1247, 856] } } ``` ## Monitoring & Reliability ### Key Metrics to Monitor - **Download success rate**: >95% - **API response time**: <100ms - **Data freshness**: <6 hours old - **Storage usage**: Monitor disk space - **Memory usage**: Monitor spatial index memory ### Alerting Thresholds ```yaml # Example monitoring config alerts: - name: "DWD Download Failed" condition: "download_success_rate < 0.95" severity: "critical" - name: "API Slow Response" condition: "api_response_time_p95 > 200ms" severity: "warning" - name: "Stale Data" condition: "data_age > 8h" severity: "critical" - name: "Disk Space Low" condition: "disk_usage > 80%" severity: "warning" ``` ### Log Files - **Downloader**: `/var/log/dwd-downloader.log` - **Processor**: `/var/log/dwd-processor.log` - **API Server**: `/var/log/dwd-api.log` ### Systemd Services ```ini # /etc/systemd/system/dwd-downloader.service [Unit] Description=DWD ICON Data Downloader After=network.target [Service] Type=oneshot ExecStart=/usr/local/bin/dwd-downloader User=weather Group=weather # /etc/systemd/system/dwd-downloader.timer [Unit] Description=Run DWD downloader every 6 hours Requires=dwd-downloader.service [Timer] OnCalendar=*-*-* 04,10,16,22:30:00 Persistent=true [Install] WantedBy=timers.target # /etc/systemd/system/dwd-api.service [Unit] Description=DWD Weather API Server After=network.target [Service] Type=simple ExecStart=/usr/local/bin/dwd-api Restart=always User=weather Group=weather [Install] WantedBy=multi-user.target ``` ## Performance Optimization ### Storage Optimization ```bash # Compressed storage (optional) # Store processed data in compressed format STORAGE_FORMAT="zarr" # or "parquet", "hdf5" # Partition by date for faster queries DATA_STRUCTURE=" /var/lib/weather-data/ ├── coordinates/ │ └── 20250924/ │ ├── CLAT.grib2.bz2 │ └── CLON.grib2.bz2 ├── weather/ │ └── 20250924_12/ │ ├── t_2m/ │ ├── u_10m/ │ └── ... └── processed/ └── 20250924_12/ ├── spatial_index.pkl └── weather_data.zarr " ``` ### Memory Optimization ```python # Load only required regions for specific queries def load_regional_data(bounds): """Load data only for specific geographic bounds""" # Implementation for regional data loading pass # Use memory mapping for large datasets def memory_map_data(file_path): """Memory map data files for efficient access""" return np.memmap(file_path, mode='r') ``` ### Caching Strategy ```python # Redis/Memcached for frequently requested locations CACHE_CONFIG = { 'redis_url': 'redis://localhost:6379', 'cache_ttl': 3600, # 1 hour 'max_cached_locations': 10000 } # Pre-compute forecasts for major cities PRECOMPUTE_LOCATIONS = [ (52.5200, 13.4050), # Berlin (48.8566, 2.3522), # Paris (51.5074, -0.1278), # London # ... add more major cities ] ``` ## Legal & Attribution ### License Requirements - **Data Source**: DWD Open Government Data - **Attribution**: "Weather data provided by German Weather Service (DWD)" - **Commercial Use**: ✅ Permitted - **Redistribution**: ✅ Allowed with attribution ### Required Attribution Text ``` Weather data provided by: German Weather Service (Deutscher Wetterdienst - DWD) ICON Global Weather Model https://opendata.dwd.de/ This product uses data from the DWD ICON model. DWD bears no responsibility for the correctness, accuracy or completeness of the data provided. ``` ### Terms of Use - No warranty on data accuracy - Users responsible for verification - Commercial use permitted - Must maintain attribution - Cannot claim data as proprietary ## Deployment Checklist ### Pre-Production - [ ] Set up monitoring and alerting - [ ] Configure log rotation - [ ] Set up automated backups - [ ] Test failover scenarios - [ ] Load test API endpoints - [ ] Validate data quality - [ ] Set up SSL certificates ### Production Deployment - [ ] Deploy downloader service - [ ] Deploy API server - [ ] Configure reverse proxy (nginx) - [ ] Set up monitoring dashboards - [ ] Configure automated scaling - [ ] Test end-to-end workflow - [ ] Document operational procedures ### Post-Deployment - [ ] Monitor for 48 hours - [ ] Verify data accuracy - [ ] Check performance metrics - [ ] Test backup/restore - [ ] Update documentation - [ ] Train operations team ## Support & Maintenance ### Regular Maintenance Tasks - **Daily**: Monitor system health, check logs - **Weekly**: Verify data quality, check storage usage - **Monthly**: Review performance metrics, update documentation - **Quarterly**: Security updates, capacity planning ### Troubleshooting Common Issues #### Download Failures ```bash # Check DWD server status curl -I https://opendata.dwd.de/weather/nwp/icon/grib/ # Verify network connectivity nslookup opendata.dwd.de # Check disk space df -h /var/lib/weather-data/ # Review download logs tail -f /var/log/dwd-downloader.log ``` #### API Performance Issues ```bash # Check API server status curl http://localhost:8000/health # Monitor response times curl -w "@curl-format.txt" http://localhost:8000/forecast # Check memory usage ps aux | grep dwd-api ``` ## Contact & Support - **Issues**: Create GitHub issue with system details - **Documentation**: Keep this guide updated with changes - **Monitoring**: Set up alerts for critical failures --- **Version**: 1.0.0 **Last Updated**: 2025-09-24 **Maintainer**: Weather API Team