nakas's picture
Fix animation injection: wrap Template in MacroElement per branca API to avoid AttributeError
59b7edd
import gradio as gr
import folium
from folium.plugins import FloatImage
from branca.element import Template, MacroElement
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
import xml.etree.ElementTree as ET
with gr.Blocks(fill_height=True, theme=gr.themes.Default()) as demo:
# Mapping for Folium rendering only
ACCUM_MAP = {
"6h": ("dwd:Icon_reg025_fd_sl_TOTPREC06H", "icon_reg025_fd_sl_totprec06h_wmc_isoarea"),
"24h": ("dwd:Icon_reg025_fd_sl_TOTPREC24H", "icon_reg025_fd_sl_totprec24h_wmc_isoarea"),
"since_start": ("dwd:Icon_reg025_fd_sl_TOTPREC", "icon_reg025_fd_sl_totprec_wmc_isoarea"),
}
DWD_WMS = "https://maps.dwd.de/geoserver/ows?"
# Basemap: Blue Marble (level 8). For higher zooms we can switch to a MODIS layer later.
GIBS_TILES = "https://gibs.earthdata.nasa.gov/wmts/epsg3857/best/BlueMarble_ShadedRelief_Bathymetry/default/2013-12-01/GoogleMapsCompatible_Level8/{z}/{y}/{x}.jpg"
def _parse_iso_duration_to_seconds(d):
# Supports PnDTnHnMnS minimal subset
import re
m = re.match(r"^P(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?$", d)
if not m:
return 0
days = int(m.group(1) or 0)
hours = int(m.group(2) or 0)
minutes = int(m.group(3) or 0)
seconds = int(m.group(4) or 0)
return (((days * 24 + hours) * 60 + minutes) * 60 + seconds)
def _extract_times_from_layer(layer_el):
ns = {"wms": "http://www.opengis.net/wms"}
# Dimension may be default ns; try both
dim = layer_el.find("wms:Dimension[@name='time']", ns)
if dim is None:
# fallback without namespace
dim = layer_el.find("Dimension")
if dim is None or (dim.text or "").strip() == "":
return []
txt = (dim.text or "").strip()
if "/" in txt:
start_s, end_s, step_s = txt.split("/")
from datetime import datetime, timezone, timedelta
def parse_dt(s):
# Expect Zulu time
return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc)
start = parse_dt(start_s)
end = parse_dt(end_s)
step = _parse_iso_duration_to_seconds(step_s)
if step <= 0:
return [start_s]
out = []
t = start
while t <= end:
out.append(t.replace(microsecond=0).isoformat().replace("+00:00", "Z"))
t = t + timedelta(seconds=step)
return out
# Comma-separated
return [s.strip() for s in txt.split(",") if s.strip()]
def get_times(accum_choice: str):
layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"])
url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0"
try:
with urlopen(url, timeout=15) as resp:
xml = resp.read()
except (URLError, HTTPError) as e:
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Failed to fetch capabilities: {e}")
try:
root = ET.fromstring(xml)
except ET.ParseError as e:
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Parse error: {e}")
ns = {"wms": "http://www.opengis.net/wms"}
# Find all layer nodes
for lyr in root.findall(".//wms:Layer", ns):
name_el = lyr.find("wms:Name", ns)
if name_el is not None and (name_el.text or "").strip() == layer_name:
times = _extract_times_from_layer(lyr)
if not times:
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="No times available")
# choose latest
return gr.Dropdown(choices=times, value=times[-1], label="Time (UTC)")
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="Layer not found")
def fetch_times_list(accum_choice: str):
layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"])
url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0"
with urlopen(url, timeout=15) as resp:
xml = resp.read()
root = ET.fromstring(xml)
ns = {"wms": "http://www.opengis.net/wms"}
for lyr in root.findall(".//wms:Layer", ns):
name_el = lyr.find("wms:Name", ns)
if name_el is not None and (name_el.text or "").strip() == layer_name:
return _extract_times_from_layer(lyr)
return []
def build_legend_url(layer_name: str, style_name: str) -> str:
from urllib.parse import urlencode
qs = urlencode({
"service": "WMS",
"version": "1.3.0",
"request": "GetLegendGraphic",
"format": "image/png",
"width": 160,
"height": 300,
"layer": layer_name,
"style": style_name,
})
return f"{DWD_WMS}{qs}"
def render_folium(accum_choice: str, time_value: str, opacity: float, basemap: str, animate: bool, start_mode: str, speed_ms: int):
layer_name, style_name = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"])
m = folium.Map(location=[20, 0], zoom_start=2, tiles=None, control_scale=True)
# Basemap selection
if basemap == "OpenStreetMap":
folium.TileLayer(
tiles="https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png",
name="OpenStreetMap",
attr="© OpenStreetMap contributors",
max_zoom=19,
overlay=False,
control=True,
).add_to(m)
else:
folium.TileLayer(
tiles=GIBS_TILES,
name="NASA Blue Marble",
attr="Blue Marble — NASA GIBS",
max_zoom=8,
overlay=False,
control=True,
).add_to(m)
# DWD ICON WMS overlay
wms_layer = folium.raster_layers.WmsTileLayer(
url=DWD_WMS,
name=f"DWD ICON {accum_choice}",
layers=layer_name,
styles=style_name,
fmt="image/png",
transparent=True,
version="1.3.0",
overlay=True,
control=True,
show=True,
opacity=opacity,
)
wms_layer.add_to(m)
# Determine start time and animation frames
times = []
try:
times = fetch_times_list(accum_choice)
except Exception:
pass
start_idx = 0
if times:
from datetime import datetime, timezone
if start_mode == "nearest":
now = datetime.now(timezone.utc)
def parse_dt(s):
return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc)
diffs = [abs((parse_dt(t) - now).total_seconds()) for t in times]
start_idx = int(min(range(len(times)), key=lambda i: diffs[i]))
elif start_mode == "selected" and time_value:
try:
start_idx = max(0, times.index(time_value))
except ValueError:
start_idx = len(times) - 1
else:
start_idx = 0 # first time (approx. 0h)
# Apply initial time
init_time = (times[start_idx] if times else (time_value or None))
if init_time is not None:
# Update WMS params
wms_layer.options.update({"time": init_time})
# Inject simple animation script if requested
if animate and len(times) > 1:
import json
layer_var = wms_layer.get_name()
js = f"""
{{% macro script(this, kwargs) %}}
(function(){{
var layer = {layer_var};
var times = {json.dumps(times)};
var idx = {start_idx};
var speed = {int(speed_ms)};
function setTime(){{
layer.setParams({{ time: times[idx] }});
}}
setTime();
if (Array.isArray(times) && times.length>1) {{
window.__wmsTimer && clearInterval(window.__wmsTimer);
window.__wmsTimer = setInterval(function(){{
idx = (idx + 1) % times.length;
setTime();
}}, speed);
}}
}})();
{{% endmacro %}}
"""
macro = MacroElement()
macro._template = Template(js)
m.get_root().add_child(macro)
# Legend
legend_url = build_legend_url(layer_name, style_name)
try:
FloatImage(legend_url, bottom=5, left=5).add_to(m)
except Exception:
pass
folium.LayerControl(collapsed=False).add_to(m)
# Embed as iframe via srcdoc so it can run its scripts independently.
html_doc = m.get_root().render()
import html as htmlesc
srcdoc = htmlesc.escape(html_doc, quote=True)
iframe = f'<iframe srcdoc="{srcdoc}" style="width:100%;height:700px;border:0;border-radius:8px;"></iframe>'
return iframe
# Single Folium view
gr.Markdown("# Global Precipitation Forecast (DWD ICON) — Folium")
with gr.Row():
basemap = gr.Radio(["OpenStreetMap", "NASA Blue Marble"], value="OpenStreetMap", label="Basemap")
accum = gr.Radio(["6h", "24h", "since_start"], value="6h", label="Accumulation")
time_dd = gr.Dropdown(choices=[], label="Time (UTC)")
opacity = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Overlay opacity")
with gr.Row():
animate = gr.Checkbox(value=True, label="Animate all forecast times")
start_mode = gr.Radio(["first", "nearest", "selected"], value="first", label="Start at")
speed_ms = gr.Slider(200, 2000, value=800, step=100, label="Frame delay (ms)")
render_btn = gr.Button("Render Map", variant="primary")
folium_html = gr.HTML()
# Populate times on load and when accumulation changes
demo.load(fn=get_times, inputs=accum, outputs=time_dd)
accum.change(fn=get_times, inputs=accum, outputs=time_dd)
# Render map
render_btn.click(
fn=render_folium,
inputs=[accum, time_dd, opacity, basemap, animate, start_mode, speed_ms],
outputs=folium_html,
)
if __name__ == "__main__":
demo.launch()