Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import folium | |
| from folium.plugins import FloatImage | |
| from branca.element import Template, MacroElement | |
| from urllib.request import urlopen | |
| from urllib.error import URLError, HTTPError | |
| import xml.etree.ElementTree as ET | |
| with gr.Blocks(fill_height=True, theme=gr.themes.Default()) as demo: | |
| # Mapping for Folium rendering only | |
| ACCUM_MAP = { | |
| "6h": ("dwd:Icon_reg025_fd_sl_TOTPREC06H", "icon_reg025_fd_sl_totprec06h_wmc_isoarea"), | |
| "24h": ("dwd:Icon_reg025_fd_sl_TOTPREC24H", "icon_reg025_fd_sl_totprec24h_wmc_isoarea"), | |
| "since_start": ("dwd:Icon_reg025_fd_sl_TOTPREC", "icon_reg025_fd_sl_totprec_wmc_isoarea"), | |
| } | |
| DWD_WMS = "https://maps.dwd.de/geoserver/ows?" | |
| # Basemap: Blue Marble (level 8). For higher zooms we can switch to a MODIS layer later. | |
| GIBS_TILES = "https://gibs.earthdata.nasa.gov/wmts/epsg3857/best/BlueMarble_ShadedRelief_Bathymetry/default/2013-12-01/GoogleMapsCompatible_Level8/{z}/{y}/{x}.jpg" | |
| def _parse_iso_duration_to_seconds(d): | |
| # Supports PnDTnHnMnS minimal subset | |
| import re | |
| m = re.match(r"^P(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?$", d) | |
| if not m: | |
| return 0 | |
| days = int(m.group(1) or 0) | |
| hours = int(m.group(2) or 0) | |
| minutes = int(m.group(3) or 0) | |
| seconds = int(m.group(4) or 0) | |
| return (((days * 24 + hours) * 60 + minutes) * 60 + seconds) | |
| def _extract_times_from_layer(layer_el): | |
| ns = {"wms": "http://www.opengis.net/wms"} | |
| # Dimension may be default ns; try both | |
| dim = layer_el.find("wms:Dimension[@name='time']", ns) | |
| if dim is None: | |
| # fallback without namespace | |
| dim = layer_el.find("Dimension") | |
| if dim is None or (dim.text or "").strip() == "": | |
| return [] | |
| txt = (dim.text or "").strip() | |
| if "/" in txt: | |
| start_s, end_s, step_s = txt.split("/") | |
| from datetime import datetime, timezone, timedelta | |
| def parse_dt(s): | |
| # Expect Zulu time | |
| return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc) | |
| start = parse_dt(start_s) | |
| end = parse_dt(end_s) | |
| step = _parse_iso_duration_to_seconds(step_s) | |
| if step <= 0: | |
| return [start_s] | |
| out = [] | |
| t = start | |
| while t <= end: | |
| out.append(t.replace(microsecond=0).isoformat().replace("+00:00", "Z")) | |
| t = t + timedelta(seconds=step) | |
| return out | |
| # Comma-separated | |
| return [s.strip() for s in txt.split(",") if s.strip()] | |
| def get_times(accum_choice: str): | |
| layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"]) | |
| url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0" | |
| try: | |
| with urlopen(url, timeout=15) as resp: | |
| xml = resp.read() | |
| except (URLError, HTTPError) as e: | |
| return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Failed to fetch capabilities: {e}") | |
| try: | |
| root = ET.fromstring(xml) | |
| except ET.ParseError as e: | |
| return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Parse error: {e}") | |
| ns = {"wms": "http://www.opengis.net/wms"} | |
| # Find all layer nodes | |
| for lyr in root.findall(".//wms:Layer", ns): | |
| name_el = lyr.find("wms:Name", ns) | |
| if name_el is not None and (name_el.text or "").strip() == layer_name: | |
| times = _extract_times_from_layer(lyr) | |
| if not times: | |
| return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="No times available") | |
| # choose latest | |
| return gr.Dropdown(choices=times, value=times[-1], label="Time (UTC)") | |
| return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="Layer not found") | |
| def fetch_times_list(accum_choice: str): | |
| layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"]) | |
| url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0" | |
| with urlopen(url, timeout=15) as resp: | |
| xml = resp.read() | |
| root = ET.fromstring(xml) | |
| ns = {"wms": "http://www.opengis.net/wms"} | |
| for lyr in root.findall(".//wms:Layer", ns): | |
| name_el = lyr.find("wms:Name", ns) | |
| if name_el is not None and (name_el.text or "").strip() == layer_name: | |
| return _extract_times_from_layer(lyr) | |
| return [] | |
| def build_legend_url(layer_name: str, style_name: str) -> str: | |
| from urllib.parse import urlencode | |
| qs = urlencode({ | |
| "service": "WMS", | |
| "version": "1.3.0", | |
| "request": "GetLegendGraphic", | |
| "format": "image/png", | |
| "width": 160, | |
| "height": 300, | |
| "layer": layer_name, | |
| "style": style_name, | |
| }) | |
| return f"{DWD_WMS}{qs}" | |
| def render_folium(accum_choice: str, time_value: str, opacity: float, basemap: str, animate: bool, start_mode: str, speed_ms: int): | |
| layer_name, style_name = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"]) | |
| m = folium.Map(location=[20, 0], zoom_start=2, tiles=None, control_scale=True) | |
| # Basemap selection | |
| if basemap == "OpenStreetMap": | |
| folium.TileLayer( | |
| tiles="https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png", | |
| name="OpenStreetMap", | |
| attr="© OpenStreetMap contributors", | |
| max_zoom=19, | |
| overlay=False, | |
| control=True, | |
| ).add_to(m) | |
| else: | |
| folium.TileLayer( | |
| tiles=GIBS_TILES, | |
| name="NASA Blue Marble", | |
| attr="Blue Marble — NASA GIBS", | |
| max_zoom=8, | |
| overlay=False, | |
| control=True, | |
| ).add_to(m) | |
| # DWD ICON WMS overlay | |
| wms_layer = folium.raster_layers.WmsTileLayer( | |
| url=DWD_WMS, | |
| name=f"DWD ICON {accum_choice}", | |
| layers=layer_name, | |
| styles=style_name, | |
| fmt="image/png", | |
| transparent=True, | |
| version="1.3.0", | |
| overlay=True, | |
| control=True, | |
| show=True, | |
| opacity=opacity, | |
| ) | |
| wms_layer.add_to(m) | |
| # Determine start time and animation frames | |
| times = [] | |
| try: | |
| times = fetch_times_list(accum_choice) | |
| except Exception: | |
| pass | |
| start_idx = 0 | |
| if times: | |
| from datetime import datetime, timezone | |
| if start_mode == "nearest": | |
| now = datetime.now(timezone.utc) | |
| def parse_dt(s): | |
| return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc) | |
| diffs = [abs((parse_dt(t) - now).total_seconds()) for t in times] | |
| start_idx = int(min(range(len(times)), key=lambda i: diffs[i])) | |
| elif start_mode == "selected" and time_value: | |
| try: | |
| start_idx = max(0, times.index(time_value)) | |
| except ValueError: | |
| start_idx = len(times) - 1 | |
| else: | |
| start_idx = 0 # first time (approx. 0h) | |
| # Apply initial time | |
| init_time = (times[start_idx] if times else (time_value or None)) | |
| if init_time is not None: | |
| # Update WMS params | |
| wms_layer.options.update({"time": init_time}) | |
| # Inject simple animation script if requested | |
| if animate and len(times) > 1: | |
| import json | |
| layer_var = wms_layer.get_name() | |
| js = f""" | |
| {{% macro script(this, kwargs) %}} | |
| (function(){{ | |
| var layer = {layer_var}; | |
| var times = {json.dumps(times)}; | |
| var idx = {start_idx}; | |
| var speed = {int(speed_ms)}; | |
| function setTime(){{ | |
| layer.setParams({{ time: times[idx] }}); | |
| }} | |
| setTime(); | |
| if (Array.isArray(times) && times.length>1) {{ | |
| window.__wmsTimer && clearInterval(window.__wmsTimer); | |
| window.__wmsTimer = setInterval(function(){{ | |
| idx = (idx + 1) % times.length; | |
| setTime(); | |
| }}, speed); | |
| }} | |
| }})(); | |
| {{% endmacro %}} | |
| """ | |
| macro = MacroElement() | |
| macro._template = Template(js) | |
| m.get_root().add_child(macro) | |
| # Legend | |
| legend_url = build_legend_url(layer_name, style_name) | |
| try: | |
| FloatImage(legend_url, bottom=5, left=5).add_to(m) | |
| except Exception: | |
| pass | |
| folium.LayerControl(collapsed=False).add_to(m) | |
| # Embed as iframe via srcdoc so it can run its scripts independently. | |
| html_doc = m.get_root().render() | |
| import html as htmlesc | |
| srcdoc = htmlesc.escape(html_doc, quote=True) | |
| iframe = f'<iframe srcdoc="{srcdoc}" style="width:100%;height:700px;border:0;border-radius:8px;"></iframe>' | |
| return iframe | |
| # Single Folium view | |
| gr.Markdown("# Global Precipitation Forecast (DWD ICON) — Folium") | |
| with gr.Row(): | |
| basemap = gr.Radio(["OpenStreetMap", "NASA Blue Marble"], value="OpenStreetMap", label="Basemap") | |
| accum = gr.Radio(["6h", "24h", "since_start"], value="6h", label="Accumulation") | |
| time_dd = gr.Dropdown(choices=[], label="Time (UTC)") | |
| opacity = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Overlay opacity") | |
| with gr.Row(): | |
| animate = gr.Checkbox(value=True, label="Animate all forecast times") | |
| start_mode = gr.Radio(["first", "nearest", "selected"], value="first", label="Start at") | |
| speed_ms = gr.Slider(200, 2000, value=800, step=100, label="Frame delay (ms)") | |
| render_btn = gr.Button("Render Map", variant="primary") | |
| folium_html = gr.HTML() | |
| # Populate times on load and when accumulation changes | |
| demo.load(fn=get_times, inputs=accum, outputs=time_dd) | |
| accum.change(fn=get_times, inputs=accum, outputs=time_dd) | |
| # Render map | |
| render_btn.click( | |
| fn=render_folium, | |
| inputs=[accum, time_dd, opacity, basemap, animate, start_mode, speed_ms], | |
| outputs=folium_html, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |