import gradio as gr import folium from folium.plugins import FloatImage from branca.element import Template, MacroElement from urllib.request import urlopen from urllib.error import URLError, HTTPError import xml.etree.ElementTree as ET with gr.Blocks(fill_height=True, theme=gr.themes.Default()) as demo: # Mapping for Folium rendering only ACCUM_MAP = { "6h": ("dwd:Icon_reg025_fd_sl_TOTPREC06H", "icon_reg025_fd_sl_totprec06h_wmc_isoarea"), "24h": ("dwd:Icon_reg025_fd_sl_TOTPREC24H", "icon_reg025_fd_sl_totprec24h_wmc_isoarea"), "since_start": ("dwd:Icon_reg025_fd_sl_TOTPREC", "icon_reg025_fd_sl_totprec_wmc_isoarea"), } DWD_WMS = "https://maps.dwd.de/geoserver/ows?" # Basemap: Blue Marble (level 8). For higher zooms we can switch to a MODIS layer later. GIBS_TILES = "https://gibs.earthdata.nasa.gov/wmts/epsg3857/best/BlueMarble_ShadedRelief_Bathymetry/default/2013-12-01/GoogleMapsCompatible_Level8/{z}/{y}/{x}.jpg" def _parse_iso_duration_to_seconds(d): # Supports PnDTnHnMnS minimal subset import re m = re.match(r"^P(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?$", d) if not m: return 0 days = int(m.group(1) or 0) hours = int(m.group(2) or 0) minutes = int(m.group(3) or 0) seconds = int(m.group(4) or 0) return (((days * 24 + hours) * 60 + minutes) * 60 + seconds) def _extract_times_from_layer(layer_el): ns = {"wms": "http://www.opengis.net/wms"} # Dimension may be default ns; try both dim = layer_el.find("wms:Dimension[@name='time']", ns) if dim is None: # fallback without namespace dim = layer_el.find("Dimension") if dim is None or (dim.text or "").strip() == "": return [] txt = (dim.text or "").strip() if "/" in txt: start_s, end_s, step_s = txt.split("/") from datetime import datetime, timezone, timedelta def parse_dt(s): # Expect Zulu time return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc) start = parse_dt(start_s) end = parse_dt(end_s) step = _parse_iso_duration_to_seconds(step_s) if step <= 0: return [start_s] out = [] t = start while t <= end: out.append(t.replace(microsecond=0).isoformat().replace("+00:00", "Z")) t = t + timedelta(seconds=step) return out # Comma-separated return [s.strip() for s in txt.split(",") if s.strip()] def get_times(accum_choice: str): layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"]) url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0" try: with urlopen(url, timeout=15) as resp: xml = resp.read() except (URLError, HTTPError) as e: return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Failed to fetch capabilities: {e}") try: root = ET.fromstring(xml) except ET.ParseError as e: return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Parse error: {e}") ns = {"wms": "http://www.opengis.net/wms"} # Find all layer nodes for lyr in root.findall(".//wms:Layer", ns): name_el = lyr.find("wms:Name", ns) if name_el is not None and (name_el.text or "").strip() == layer_name: times = _extract_times_from_layer(lyr) if not times: return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="No times available") # choose latest return gr.Dropdown(choices=times, value=times[-1], label="Time (UTC)") return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="Layer not found") def fetch_times_list(accum_choice: str): layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"]) url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0" with urlopen(url, timeout=15) as resp: xml = resp.read() root = ET.fromstring(xml) ns = {"wms": "http://www.opengis.net/wms"} for lyr in root.findall(".//wms:Layer", ns): name_el = lyr.find("wms:Name", ns) if name_el is not None and (name_el.text or "").strip() == layer_name: return _extract_times_from_layer(lyr) return [] def build_legend_url(layer_name: str, style_name: str) -> str: from urllib.parse import urlencode qs = urlencode({ "service": "WMS", "version": "1.3.0", "request": "GetLegendGraphic", "format": "image/png", "width": 160, "height": 300, "layer": layer_name, "style": style_name, }) return f"{DWD_WMS}{qs}" def render_folium(accum_choice: str, time_value: str, opacity: float, basemap: str, animate: bool, start_mode: str, speed_ms: int): layer_name, style_name = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"]) m = folium.Map(location=[20, 0], zoom_start=2, tiles=None, control_scale=True) # Basemap selection if basemap == "OpenStreetMap": folium.TileLayer( tiles="https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png", name="OpenStreetMap", attr="© OpenStreetMap contributors", max_zoom=19, overlay=False, control=True, ).add_to(m) else: folium.TileLayer( tiles=GIBS_TILES, name="NASA Blue Marble", attr="Blue Marble — NASA GIBS", max_zoom=8, overlay=False, control=True, ).add_to(m) # DWD ICON WMS overlay wms_layer = folium.raster_layers.WmsTileLayer( url=DWD_WMS, name=f"DWD ICON {accum_choice}", layers=layer_name, styles=style_name, fmt="image/png", transparent=True, version="1.3.0", overlay=True, control=True, show=True, opacity=opacity, ) wms_layer.add_to(m) # Determine start time and animation frames times = [] try: times = fetch_times_list(accum_choice) except Exception: pass start_idx = 0 if times: from datetime import datetime, timezone if start_mode == "nearest": now = datetime.now(timezone.utc) def parse_dt(s): return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc) diffs = [abs((parse_dt(t) - now).total_seconds()) for t in times] start_idx = int(min(range(len(times)), key=lambda i: diffs[i])) elif start_mode == "selected" and time_value: try: start_idx = max(0, times.index(time_value)) except ValueError: start_idx = len(times) - 1 else: start_idx = 0 # first time (approx. 0h) # Apply initial time init_time = (times[start_idx] if times else (time_value or None)) if init_time is not None: # Update WMS params wms_layer.options.update({"time": init_time}) # Inject simple animation script if requested if animate and len(times) > 1: import json layer_var = wms_layer.get_name() js = f""" {{% macro script(this, kwargs) %}} (function(){{ var layer = {layer_var}; var times = {json.dumps(times)}; var idx = {start_idx}; var speed = {int(speed_ms)}; function setTime(){{ layer.setParams({{ time: times[idx] }}); }} setTime(); if (Array.isArray(times) && times.length>1) {{ window.__wmsTimer && clearInterval(window.__wmsTimer); window.__wmsTimer = setInterval(function(){{ idx = (idx + 1) % times.length; setTime(); }}, speed); }} }})(); {{% endmacro %}} """ macro = MacroElement() macro._template = Template(js) m.get_root().add_child(macro) # Legend legend_url = build_legend_url(layer_name, style_name) try: FloatImage(legend_url, bottom=5, left=5).add_to(m) except Exception: pass folium.LayerControl(collapsed=False).add_to(m) # Embed as iframe via srcdoc so it can run its scripts independently. html_doc = m.get_root().render() import html as htmlesc srcdoc = htmlesc.escape(html_doc, quote=True) iframe = f'' return iframe # Single Folium view gr.Markdown("# Global Precipitation Forecast (DWD ICON) — Folium") with gr.Row(): basemap = gr.Radio(["OpenStreetMap", "NASA Blue Marble"], value="OpenStreetMap", label="Basemap") accum = gr.Radio(["6h", "24h", "since_start"], value="6h", label="Accumulation") time_dd = gr.Dropdown(choices=[], label="Time (UTC)") opacity = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Overlay opacity") with gr.Row(): animate = gr.Checkbox(value=True, label="Animate all forecast times") start_mode = gr.Radio(["first", "nearest", "selected"], value="first", label="Start at") speed_ms = gr.Slider(200, 2000, value=800, step=100, label="Frame delay (ms)") render_btn = gr.Button("Render Map", variant="primary") folium_html = gr.HTML() # Populate times on load and when accumulation changes demo.load(fn=get_times, inputs=accum, outputs=time_dd) accum.change(fn=get_times, inputs=accum, outputs=time_dd) # Render map render_btn.click( fn=render_folium, inputs=[accum, time_dd, opacity, basemap, animate, start_mode, speed_ms], outputs=folium_html, ) if __name__ == "__main__": demo.launch()