""" app.py — PV Detector HuggingFace Space GPS → IGN imagery → clf+seg → pypvroof → stats + overlay image """ import base64 import json import math import os import time import tempfile import importlib.util from concurrent.futures import ThreadPoolExecutor import cv2 import folium import gradio as gr import requests import torch import numpy as np from PIL import Image from io import BytesIO import geojson as gj from area import area as geojson_area from shapely.geometry import shape, mapping, Polygon from shapely.ops import unary_union from huggingface_hub import hf_hub_download # ── Constants ───────────────────────────────────────────────────────────────── GSD = 0.2 CLF_PX = 299 # native patch size: fetched at 299 px / 0.2 m/px (59.8 m) SEG_PX = 400 # segmentation input size (patch upscaled, georef unchanged) CLF_THR = 0.15 # ── Models (loaded once) ─────────────────────────────────────────────────────── _clf = None _seg = None def load_models(): global _clf, _seg if _clf is None: path = hf_hub_download("gabrielkasmi/bdappv-models", "model.py") spec = importlib.util.spec_from_file_location("bdappv_model", path) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) _clf = mod.load_classification_model("ign", device="cpu") _seg = mod.load_segmentation_model("ign", device="cpu") return _clf, _seg # ── Pipeline ────────────────────────────────────────────────────────────────── def compute_bbox(lat, lon, coverage_m): d_lat = coverage_m / 111_320 d_lon = coverage_m / (111_320 * math.cos(math.radians(lat))) return lat - d_lat/2, lat + d_lat/2, lon - d_lon/2, lon + d_lon/2 def fetch_ign(south, north, west, east, px): url = ( "https://data.geopf.fr/wms-r" "?SERVICE=WMS&VERSION=1.3.0&REQUEST=GetMap" "&FORMAT=image/png&LAYERS=ORTHOIMAGERY.ORTHOPHOTOS" "&CRS=EPSG:4326&STYLES=" f"&BBOX={south},{west},{north},{east}&WIDTH={px}&HEIGHT={px}" ) r = requests.get(url, timeout=30) r.raise_for_status() return Image.open(BytesIO(r.content)).convert("RGB") def preprocess(img, size): t = torch.tensor(np.array(img.resize((size, size), Image.BILINEAR))).permute(2, 0, 1).float() / 255.0 mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1) std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1) return ((t - mean) / std).unsqueeze(0) def run_clf_batch(clf, imgs): """Batch-classify patches → list of probabilities.""" t = torch.cat([preprocess(im, CLF_PX) for im in imgs]) with torch.no_grad(): logits = clf(t) if hasattr(logits, "logits"): logits = logits.logits return torch.sigmoid(logits).flatten().tolist() def run_seg_batch(seg, imgs): """Batch-segment patches → list of binary masks (uint8, 0/255).""" t = torch.cat([preprocess(im, SEG_PX) for im in imgs]) with torch.no_grad(): out = seg(t)["out"] return [(torch.sigmoid(out[i, 0]) > 0.5).numpy().astype(np.uint8) * 255 for i in range(len(imgs))] def mask_to_features(mask, south, north, west, east): H, W = mask.shape contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) features = [] for c in contours: if cv2.contourArea(c) < 10: continue coords = [] for pt in c: px, py = pt[0] coords.append([west + (px / W) * (east - west), north - (py / H) * (north - south)]) coords.append(coords[0]) features.append(gj.Feature(geometry=gj.Polygon([coords]), properties={})) return features MERGE_EPS = 3e-6 # ~0.3 m in degrees — bridges the ≤1 px gap at patch borders def merge_features(features): """Union polygons split across patch borders into single installations.""" if not features: return features geoms = [shape(f["geometry"]).buffer(MERGE_EPS) for f in features] merged = unary_union(geoms).buffer(-MERGE_EPS) if merged.is_empty: return [] polys = list(merged.geoms) if merged.geom_type == "MultiPolygon" else [merged] return [gj.Feature(geometry=mapping(p), properties={}) for p in polys] # Vendored from pypvroof (constant tilt / bounding-box azimuth / constant # regression). pypvroof itself pulls GDAL & rasterio at import time, which # breaks fresh installs — the methods used here never touch them. TILT_DEG = 20.0 # constant-tilt KWP_PER_M2 = 1 / 6.5 # pypvroof default-coefficient (kWp per m²) def _azimuth_bounding_box(feature): """Orientation of the long side of the minimum rotated rectangle. pypvroof convention: 0 = south-facing, ±90 = east/west, ±180 = north.""" poly = Polygon(feature["geometry"]["coordinates"][0]) x, y = poly.minimum_rotated_rectangle.exterior.coords.xy ulx, uly = x[1], y[1] llx, lly = x[2], y[2] lrx, lry = x[3], y[3] side_long = math.hypot(lrx - llx, lry - lly) side_short = math.hypot(ulx - llx, uly - lly) angle_short = 90 - math.degrees(math.atan2(lry - lly, lrx - llx)) angle_long = -90 + math.degrees(math.atan2(uly - lly, ulx - llx)) phi = angle_long if side_long >= side_short else angle_short return (-phi + 180) if phi > 0 else (abs(phi) - 180) def extract_chars(features): results = [] for f in features: try: coords = f["geometry"]["coordinates"][0] lon, lat = (float(v) for v in np.mean(coords, axis=0)) surface = geojson_area(f["geometry"]) / math.cos(math.radians(TILT_DEG)) results.append({"lon": lon, "lat": lat, "tilt": TILT_DEG, "azimuth": float(_azimuth_bounding_box(f)), "capacity": surface * KWP_PER_M2, "surface": surface}) except Exception: results.append({}) return results # ── Detection entry point ───────────────────────────────────────────────────── BATCH = 16 FETCH_WORKERS = 6 def detect(lat_str, lon_str, coverage, progress=gr.Progress()): try: lat, lon = float(lat_str), float(lon_str) except (ValueError, TypeError): return (error_stats("No zone selected — click the map or search a location."), gr.update(visible=False), "", gr.update()) # Pave the zone with n×n patches of CLF_PX*GSD meters (59.8 m) so each # WMS request is natively at model resolution. Covered area ≥ coverage. patch_m = CLF_PX * GSD n = max(1, math.ceil(coverage / patch_m)) south, north, west, east = compute_bbox(lat, lon, n * patch_m) d_lat = (north - south) / n d_lon = (east - west) / n boxes = [(north - (r + 1) * d_lat, north - r * d_lat, west + c * d_lon, west + (c + 1) * d_lon) for r in range(n) for c in range(n)] clf, seg = load_models() features = [] try: for i in range(0, len(boxes), BATCH): progress(i / len(boxes), desc=f"Patches {i}/{len(boxes)}") chunk = boxes[i:i + BATCH] with ThreadPoolExecutor(max_workers=FETCH_WORKERS) as ex: imgs = list(ex.map(lambda b: fetch_ign(*b, CLF_PX), chunk)) probs = run_clf_batch(clf, imgs) positives = [(im, b) for im, b, p in zip(imgs, chunk, probs) if p > CLF_THR] del imgs # flush negatives immediately if positives: masks = run_seg_batch(seg, [im for im, _ in positives]) for mask, (_, b) in zip(masks, positives): features += mask_to_features(mask, *b) except Exception as e: return error_stats(f"Detection failed: {e}"), gr.update(visible=False), "", gr.update() progress(1.0, desc="Extracting characteristics") features = merge_features(features) chars = extract_chars(features) for feat, c in zip(features, chars): feat["properties"].update(c) fc = gj.FeatureCollection(features) geojson_path = os.path.join(tempfile.gettempdir(), "detections.geojson") fc_str = json.dumps(fc, indent=2) with open(geojson_path, "w") as f: f.write(fc_str) return (build_stats_html(chars), gr.update(value=geojson_path, visible=True), fc_str, gr.update(visible=True)) # ── Geocoding (IGN Géoplateforme) ───────────────────────────────────────────── def geocode(q): q = (q or "").strip() if not q: return gr.update(), gr.update(), gr.update() try: r = requests.get("https://data.geopf.fr/geocodage/search", params={"q": q, "limit": 1}, timeout=10) r.raise_for_status() feats = r.json().get("features", []) if not feats: gr.Warning(f"No result for '{q}'.") return gr.update(), gr.update(), gr.update() lon, lat = feats[0]["geometry"]["coordinates"] center = json.dumps({"lat": lat, "lon": lon, "ts": time.time()}) return f"{lat:.6f}", f"{lon:.6f}", center except Exception as e: gr.Warning(f"Search failed: {e}") return gr.update(), gr.update(), gr.update() # ── Map ─────────────────────────────────────────────────────────────────────── def make_map_html(): """Return iframe HTML with folium map encoded as data URL (no file I/O, no srcdoc encoding issues).""" m = folium.Map(location=[46.5, 2.3], zoom_start=6, tiles=None, max_zoom=19) # IGN ortho — Géoplateforme WMTS (wxs.ign.fr est décommissionné). # NB: string normale, PAS un f-string ({z}/{y}/{x} sont remplacés par Leaflet), # et attr est obligatoire sinon folium lève ValueError. folium.TileLayer( tiles=("https://data.geopf.fr/wmts?SERVICE=WMTS&REQUEST=GetTile&VERSION=1.0.0" "&LAYER=ORTHOIMAGERY.ORTHOPHOTOS&STYLE=normal&TILEMATRIXSET=PM" "&FORMAT=image/jpeg&TILEMATRIX={z}&TILEROW={y}&TILECOL={x}"), attr="© IGN / Géoplateforme", name="IGN Ortho", max_zoom=19, max_native_zoom=19, ).add_to(m) folium.TileLayer("OpenStreetMap", name="OSM", show=False).add_to(m) folium.LayerControl(position="bottomright").add_to(m) click_js = """ (function attach() { var _map = window["__MAP__"]; if (!_map) { setTimeout(attach, 50); return; } _map.zoomControl.setPosition('bottomright'); var _rect = null, _cov = 500, _lat = null, _lon = null, _mode = 'select'; function drawRect() { if (_lat === null) return; if (_rect) { _map.removeLayer(_rect); } var dLat = _cov/111320, dLon = _cov/(111320*Math.cos(_lat*Math.PI/180)); _rect = L.rectangle( [[_lat-dLat/2,_lon-dLon/2],[_lat+dLat/2,_lon+dLon/2]], {color:'#3b82f6',weight:2,fillColor:'#3b82f6',fillOpacity:0.08,dashArray:'6,4'} ).addTo(_map); } _map.on('click', function(e) { if (_mode !== 'select') { return; } _lat = e.latlng.lat; _lon = e.latlng.lng; var msg = {type:'pvClick', lat:_lat, lon:_lon}; var w = window; do { w = w.parent; w.postMessage(msg, '*'); } while (w !== w.parent); drawRect(); }); var _detLayer = null; window.addEventListener('message', function(e) { if (!e.data) return; if (e.data.type === 'pvCoverage') { _cov = Number(e.data.coverage) || _cov; drawRect(); } if (e.data.type === 'pvCenter') { if (e.data.cov) { _cov = Number(e.data.cov); } _map.setView([e.data.lat, e.data.lon], 16); if (_mode === 'select') { _lat = e.data.lat; _lon = e.data.lon; drawRect(); } } if (e.data.type === 'pvReset') { if (_detLayer) { _map.removeLayer(_detLayer); _detLayer = null; } _mode = 'select'; } if (e.data.type === 'pvGeojson') { console.log('[pv-map] geojson received'); _mode = 'view'; if (_rect) { _map.removeLayer(_rect); _rect = null; _lat = null; } if (_detLayer) { _map.removeLayer(_detLayer); } _detLayer = L.geoJSON(e.data.fc, { style: {color:'#dc2626', weight:2, fillColor:'#dc2626', fillOpacity:0.35}, onEachFeature: function(f, layer) { var p = f.properties || {}; if (p.surface) layer.bindPopup( 'Surface: ' + p.surface.toFixed(0) + ' m²
' + 'Capacity: ' + p.capacity.toFixed(1) + ' kWp
' + 'Azimuth: ' + Math.round(p.azimuth) + '°'); } }).addTo(_map); var b = _detLayer.getBounds(); if (b.isValid()) { _map.fitBounds(b, {maxZoom: 18}); } } }); })(); """.replace("__MAP__", m.get_name()) m.get_root().script.add_child(folium.Element(click_js)) raw_html = m._repr_html_() page = ('' '' + raw_html + '') b64 = base64.b64encode(page.encode("utf-8")).decode("ascii") return ( f'' ) # ── Stats ───────────────────────────────────────────────────────────────────── def error_stats(msg): return f'
⚠️ {msg}
' def compass_svg(azimuths): labels = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] bins = [0] * 8 for az in azimuths: bins[int(((az % 360) + 22.5) / 45) % 8] += 1 max_b = max(bins) or 1 bars = "" for i, (lbl, cnt) in enumerate(zip(labels, bins)): h = int((cnt / max_b) * 36) bars += (f'' f'' f'{lbl}' + (f'{cnt}' if cnt else '') + '') return f'{bars}' def build_stats_html(chars): if not chars: return """
🔍
No PV installation detected in this zone.
""" n = len(chars) total_surface = sum(c.get("surface", 0) for c in chars) total_capacity = sum(c.get("capacity", 0) for c in chars) # already kWp azimuths = [c["azimuth"] for c in chars if c.get("azimuth") is not None] return f"""

📊 Results

{n}
installations
{total_surface:.0f}
m² total
{total_capacity:.1f}
kWp est.
Azimuth distribution
{compass_svg(azimuths)}
Capacity estimated with constant regression (pypvroof, no DEM). Large-scale → DeepPVMapper.
""" # ── Head script — postMessage listener (runs in , not innerHTML) ───────── HEAD_SCRIPT = """ """ # ── Gradio layout ───────────────────────────────────────────────────────────── INITIAL_STATS = """
🖱️
Click the map to select a zone,
then press Detect.
""" CSS = """ .gradio-container { max-width: 100% !important; padding: 8px !important; } footer { display: none !important; } #map-wrap { position: relative; } /* the intro modal is position:fixed — collapse its host block so it takes no flow space */ #intro-host { position: absolute; height: 0 !important; min-height: 0 !important; padding: 0 !important; margin: 0 !important; overflow: visible; border: none !important; } #header-card, #stats-card, #control-card, #search-card { position: absolute; z-index: 1000; border-radius: 14px; padding: 14px !important; box-shadow: 0 8px 28px rgba(0,0,0,0.35); backdrop-filter: blur(10px); -webkit-backdrop-filter: blur(10px); } #header-card { top: 20px; left: 50%; transform: translateX(-50%); width: 380px; background: rgba(17, 24, 39, 0.85) !important; border: 1px solid rgba(255,255,255,0.10) !important; } #search-card { top: 20px; left: 20px; width: 300px; background: rgba(17, 24, 39, 0.85) !important; border: 1px solid rgba(255,255,255,0.10) !important; } #search-card .block { background: transparent !important; } #stats-card { top: 20px; right: 20px; width: 320px; background: rgba(255, 255, 255, 0.95) !important; border: 1px solid rgba(0,0,0,0.06) !important; } #control-card { bottom: 64px; left: 20px; width: 320px; padding: 10px !important; gap: 6px !important; row-gap: 6px !important; background: rgba(17, 24, 39, 0.85) !important; border: 1px solid rgba(255,255,255,0.10) !important; } #control-card .block { padding: 4px 8px !important; } #control-card .form { gap: 4px !important; } #control-card label > span { font-size: 12px !important; } #control-card input[type="number"], #control-card input[type="text"] { padding-top: 4px !important; padding-bottom: 4px !important; } #control-card button { padding: 8px !important; } #header-card .html-container, #stats-card .html-container { padding: 0; } #stats-card .block, #control-card .block, #header-card .block { background: transparent !important; } /* stats card is light — force dark text even in dark mode */ #stats-card * { color: #111827; } /* progress overlays & toasts — dark glass, match the cards */ .wrap.default { background: rgba(17, 24, 39, 0.88) !important; border-radius: 14px; } .wrap.default *, .wrap.default .progress-text, .wrap.default .meta-text { color: #e5e7eb !important; } .toast-wrap { top: 50% !important; bottom: auto !important; right: 16px !important; transform: translateY(-50%); } .toast-body { background: rgba(17, 24, 39, 0.92) !important; border: 1px solid rgba(255, 255, 255, 0.10) !important; } .toast-body * { color: #e5e7eb !important; } """ THEME = gr.themes.Default(primary_hue="blue", neutral_hue="slate") INTRO_HTML = """

🗺️ DeepPVMapper

Map rooftop solar installations anywhere in France, from aerial imagery.

1. Search a city or click the map to select a zone (up to 1 km²).

2. Press Detect — deep learning models detect and segment PV panels on IGN orthophotos (CPU inference, ~1 min for 1 km²).

3. Click the detected polygons to see surface, capacity and azimuth — and download everything as GeoJSON.

Characteristics estimated with pypvroof (no DEM). To map a large area, we recommend running DeepPVMapper directly from the GitHub repository. Best viewed on a desktop browser.

""" with gr.Blocks(css=CSS, theme=THEME, title="DeepPVMapper — demo", head=HEAD_SCRIPT) as demo: gr.HTML(INTRO_HTML, elem_id="intro-host") with gr.Column(elem_id="map-wrap"): map_display = gr.HTML(value=make_map_html()) with gr.Column(elem_id="header-card"): gr.HTML("""

🗺️ DeepPVMapper

Interactive demo — rooftop PV detection & characterization on IGN aerial imagery (France).

⭐ GitHub 📄 Paper 📦 Dataset 🤖 Models
""") with gr.Column(elem_id="stats-card"): stats_display = gr.HTML(value=INITIAL_STATS) download_file = gr.File(label="⬇️ Detections (GeoJSON)", visible=False) new_btn = gr.Button("🔄 New detection", visible=False, size="sm") with gr.Column(elem_id="search-card"): search_input = gr.Textbox(show_label=False, placeholder="🔎 Search location") with gr.Column(elem_id="control-card"): # hidden — filled by map clicks / search, nobody types coordinates lat_input = gr.Textbox(visible=False, elem_id="lat-input") lon_input = gr.Textbox(visible=False, elem_id="lon-input") coverage = gr.Slider(100, 1000, value=500, step=50, label="Coverage (m)") detect_btn = gr.Button("🔍 Detect PV installations", variant="primary") geojson_state = gr.Textbox(visible=False) center_state = gr.Textbox(visible=False) # Broadcast the detection GeoJSON into the nested map iframe. # Hooked on .change() of the hidden textbox: fires when the backend # pushes the value (js on a chained .then(fn=None) is unreliable). SHOW_GEOJSON_JS = """ (g) => { if (!g) { return; } var data; try { data = JSON.parse(g); } catch (e) { console.log('[pv] bad geojson', e); return; } console.log('[pv] broadcasting geojson,', (data.features || []).length, 'features'); function bcast(w) { try { w.postMessage({type:'pvGeojson', fc: data}, '*'); } catch (e) {} try { for (var i = 0; i < w.frames.length; i++) bcast(w.frames[i]); } catch (e) {} } bcast(window); } """ geojson_state.change(fn=None, inputs=[geojson_state], outputs=None, js=SHOW_GEOJSON_JS) # Search → fill lat/lon, pan the map and preview the zone. CENTER_JS = """ (g) => { if (!g) { return; } var c; try { c = JSON.parse(g); } catch (e) { return; } window._pvLat = String(c.lat); window._pvLon = String(c.lon); function bcast(w) { try { w.postMessage({type:'pvCenter', lat: c.lat, lon: c.lon, cov: c.cov || null}, '*'); } catch (e) {} try { for (var i = 0; i < w.frames.length; i++) bcast(w.frames[i]); } catch (e) {} } // retry a few times: on page load the map iframe may not be ready yet var tries = 0; (function send() { bcast(window); if (++tries < 5) { setTimeout(send, 700); } })(); } """ center_state.change(fn=None, inputs=[center_state], outputs=None, js=CENTER_JS) search_input.submit( fn=geocode, inputs=[search_input], outputs=[lat_input, lon_input, center_state], ) # js runs client-side first; its return value replaces the inputs sent to # the backend — this is how the map click (stored in window._pvLat/_pvLon) # reaches detect() without DOM-input hacks. DETECT_JS = """ (lat, lon, cov) => [window._pvLat || lat, window._pvLon || lon, cov] """ detect_btn.click( fn=detect, inputs=[lat_input, lon_input, coverage], outputs=[stats_display, download_file, geojson_state, new_btn], js=DETECT_JS, ) # "New detection": full page reload, persisting the location in the URL. RELOAD_JS = """ (lat, lon, cov) => { var u = new URL(window.location.href); var la = window._pvLat || lat, lo = window._pvLon || lon; if (la && lo) { u.searchParams.set('lat', la); u.searchParams.set('lon', lo); u.searchParams.set('cov', cov); } window.location.href = u.toString(); } """ new_btn.click(fn=None, inputs=[lat_input, lon_input, coverage], outputs=None, js=RELOAD_JS) # On load: restore location from URL query params (set by the reload above). def init_from_url(request: gr.Request): q = dict(request.query_params) if request else {} lat, lon, cov = q.get("lat"), q.get("lon"), q.get("cov") if lat and lon: try: center = json.dumps({"lat": float(lat), "lon": float(lon), "cov": float(cov) if cov else None, "ts": time.time()}) return lat, lon, (float(cov) if cov else gr.update()), center except ValueError: pass return gr.update(), gr.update(), gr.update(), gr.update() demo.load(fn=init_from_url, inputs=None, outputs=[lat_input, lon_input, coverage, center_state]) # Push slider value into the (nested, cross-origin) map iframe. # `frames`, `length` and `postMessage` are on the cross-origin allowlist, # so a recursive broadcast reaches the srcdoc frame two levels down. BCAST_JS = """ (cov) => { function bcast(w) { try { w.postMessage({type:'pvCoverage', coverage: cov}, '*'); } catch (e) {} try { for (var i = 0; i < w.frames.length; i++) bcast(w.frames[i]); } catch (e) {} } bcast(window); } """ coverage.input(fn=None, inputs=[coverage], outputs=None, js=BCAST_JS) if __name__ == "__main__": load_models() demo.launch()