Spaces:
Sleeping
Sleeping
File size: 10,510 Bytes
7ed6735 0dd451c 59b7edd 0dd451c 7ed6735 15a3f43 0dd451c 15a3f43 0dd451c 187a3cf 0dd451c 187a3cf 0dd451c 187a3cf 0dd451c 187a3cf 59b7edd 0dd451c 5decb95 0dd451c 15a3f43 187a3cf 15a3f43 187a3cf 15a3f43 187a3cf 7ed6735 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | import gradio as gr
import folium
from folium.plugins import FloatImage
from branca.element import Template, MacroElement
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
import xml.etree.ElementTree as ET
with gr.Blocks(fill_height=True, theme=gr.themes.Default()) as demo:
# Mapping for Folium rendering only
ACCUM_MAP = {
"6h": ("dwd:Icon_reg025_fd_sl_TOTPREC06H", "icon_reg025_fd_sl_totprec06h_wmc_isoarea"),
"24h": ("dwd:Icon_reg025_fd_sl_TOTPREC24H", "icon_reg025_fd_sl_totprec24h_wmc_isoarea"),
"since_start": ("dwd:Icon_reg025_fd_sl_TOTPREC", "icon_reg025_fd_sl_totprec_wmc_isoarea"),
}
DWD_WMS = "https://maps.dwd.de/geoserver/ows?"
# Basemap: Blue Marble (level 8). For higher zooms we can switch to a MODIS layer later.
GIBS_TILES = "https://gibs.earthdata.nasa.gov/wmts/epsg3857/best/BlueMarble_ShadedRelief_Bathymetry/default/2013-12-01/GoogleMapsCompatible_Level8/{z}/{y}/{x}.jpg"
def _parse_iso_duration_to_seconds(d):
# Supports PnDTnHnMnS minimal subset
import re
m = re.match(r"^P(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?$", d)
if not m:
return 0
days = int(m.group(1) or 0)
hours = int(m.group(2) or 0)
minutes = int(m.group(3) or 0)
seconds = int(m.group(4) or 0)
return (((days * 24 + hours) * 60 + minutes) * 60 + seconds)
def _extract_times_from_layer(layer_el):
ns = {"wms": "http://www.opengis.net/wms"}
# Dimension may be default ns; try both
dim = layer_el.find("wms:Dimension[@name='time']", ns)
if dim is None:
# fallback without namespace
dim = layer_el.find("Dimension")
if dim is None or (dim.text or "").strip() == "":
return []
txt = (dim.text or "").strip()
if "/" in txt:
start_s, end_s, step_s = txt.split("/")
from datetime import datetime, timezone, timedelta
def parse_dt(s):
# Expect Zulu time
return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc)
start = parse_dt(start_s)
end = parse_dt(end_s)
step = _parse_iso_duration_to_seconds(step_s)
if step <= 0:
return [start_s]
out = []
t = start
while t <= end:
out.append(t.replace(microsecond=0).isoformat().replace("+00:00", "Z"))
t = t + timedelta(seconds=step)
return out
# Comma-separated
return [s.strip() for s in txt.split(",") if s.strip()]
def get_times(accum_choice: str):
layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"])
url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0"
try:
with urlopen(url, timeout=15) as resp:
xml = resp.read()
except (URLError, HTTPError) as e:
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Failed to fetch capabilities: {e}")
try:
root = ET.fromstring(xml)
except ET.ParseError as e:
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info=f"Parse error: {e}")
ns = {"wms": "http://www.opengis.net/wms"}
# Find all layer nodes
for lyr in root.findall(".//wms:Layer", ns):
name_el = lyr.find("wms:Name", ns)
if name_el is not None and (name_el.text or "").strip() == layer_name:
times = _extract_times_from_layer(lyr)
if not times:
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="No times available")
# choose latest
return gr.Dropdown(choices=times, value=times[-1], label="Time (UTC)")
return gr.Dropdown(choices=[], value=None, label="Time (UTC)", info="Layer not found")
def fetch_times_list(accum_choice: str):
layer_name, _ = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"])
url = f"{DWD_WMS}service=WMS&request=GetCapabilities&version=1.3.0"
with urlopen(url, timeout=15) as resp:
xml = resp.read()
root = ET.fromstring(xml)
ns = {"wms": "http://www.opengis.net/wms"}
for lyr in root.findall(".//wms:Layer", ns):
name_el = lyr.find("wms:Name", ns)
if name_el is not None and (name_el.text or "").strip() == layer_name:
return _extract_times_from_layer(lyr)
return []
def build_legend_url(layer_name: str, style_name: str) -> str:
from urllib.parse import urlencode
qs = urlencode({
"service": "WMS",
"version": "1.3.0",
"request": "GetLegendGraphic",
"format": "image/png",
"width": 160,
"height": 300,
"layer": layer_name,
"style": style_name,
})
return f"{DWD_WMS}{qs}"
def render_folium(accum_choice: str, time_value: str, opacity: float, basemap: str, animate: bool, start_mode: str, speed_ms: int):
layer_name, style_name = ACCUM_MAP.get(accum_choice, ACCUM_MAP["6h"])
m = folium.Map(location=[20, 0], zoom_start=2, tiles=None, control_scale=True)
# Basemap selection
if basemap == "OpenStreetMap":
folium.TileLayer(
tiles="https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png",
name="OpenStreetMap",
attr="© OpenStreetMap contributors",
max_zoom=19,
overlay=False,
control=True,
).add_to(m)
else:
folium.TileLayer(
tiles=GIBS_TILES,
name="NASA Blue Marble",
attr="Blue Marble — NASA GIBS",
max_zoom=8,
overlay=False,
control=True,
).add_to(m)
# DWD ICON WMS overlay
wms_layer = folium.raster_layers.WmsTileLayer(
url=DWD_WMS,
name=f"DWD ICON {accum_choice}",
layers=layer_name,
styles=style_name,
fmt="image/png",
transparent=True,
version="1.3.0",
overlay=True,
control=True,
show=True,
opacity=opacity,
)
wms_layer.add_to(m)
# Determine start time and animation frames
times = []
try:
times = fetch_times_list(accum_choice)
except Exception:
pass
start_idx = 0
if times:
from datetime import datetime, timezone
if start_mode == "nearest":
now = datetime.now(timezone.utc)
def parse_dt(s):
return datetime.fromisoformat(s.replace("Z", "+00:00")).astimezone(timezone.utc)
diffs = [abs((parse_dt(t) - now).total_seconds()) for t in times]
start_idx = int(min(range(len(times)), key=lambda i: diffs[i]))
elif start_mode == "selected" and time_value:
try:
start_idx = max(0, times.index(time_value))
except ValueError:
start_idx = len(times) - 1
else:
start_idx = 0 # first time (approx. 0h)
# Apply initial time
init_time = (times[start_idx] if times else (time_value or None))
if init_time is not None:
# Update WMS params
wms_layer.options.update({"time": init_time})
# Inject simple animation script if requested
if animate and len(times) > 1:
import json
layer_var = wms_layer.get_name()
js = f"""
{{% macro script(this, kwargs) %}}
(function(){{
var layer = {layer_var};
var times = {json.dumps(times)};
var idx = {start_idx};
var speed = {int(speed_ms)};
function setTime(){{
layer.setParams({{ time: times[idx] }});
}}
setTime();
if (Array.isArray(times) && times.length>1) {{
window.__wmsTimer && clearInterval(window.__wmsTimer);
window.__wmsTimer = setInterval(function(){{
idx = (idx + 1) % times.length;
setTime();
}}, speed);
}}
}})();
{{% endmacro %}}
"""
macro = MacroElement()
macro._template = Template(js)
m.get_root().add_child(macro)
# Legend
legend_url = build_legend_url(layer_name, style_name)
try:
FloatImage(legend_url, bottom=5, left=5).add_to(m)
except Exception:
pass
folium.LayerControl(collapsed=False).add_to(m)
# Embed as iframe via srcdoc so it can run its scripts independently.
html_doc = m.get_root().render()
import html as htmlesc
srcdoc = htmlesc.escape(html_doc, quote=True)
iframe = f'<iframe srcdoc="{srcdoc}" style="width:100%;height:700px;border:0;border-radius:8px;"></iframe>'
return iframe
# Single Folium view
gr.Markdown("# Global Precipitation Forecast (DWD ICON) — Folium")
with gr.Row():
basemap = gr.Radio(["OpenStreetMap", "NASA Blue Marble"], value="OpenStreetMap", label="Basemap")
accum = gr.Radio(["6h", "24h", "since_start"], value="6h", label="Accumulation")
time_dd = gr.Dropdown(choices=[], label="Time (UTC)")
opacity = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Overlay opacity")
with gr.Row():
animate = gr.Checkbox(value=True, label="Animate all forecast times")
start_mode = gr.Radio(["first", "nearest", "selected"], value="first", label="Start at")
speed_ms = gr.Slider(200, 2000, value=800, step=100, label="Frame delay (ms)")
render_btn = gr.Button("Render Map", variant="primary")
folium_html = gr.HTML()
# Populate times on load and when accumulation changes
demo.load(fn=get_times, inputs=accum, outputs=time_dd)
accum.change(fn=get_times, inputs=accum, outputs=time_dd)
# Render map
render_btn.click(
fn=render_folium,
inputs=[accum, time_dd, opacity, basemap, animate, start_mode, speed_ms],
outputs=folium_html,
)
if __name__ == "__main__":
demo.launch()
|