"""
app.py — PV Detector HuggingFace Space
GPS → IGN imagery → clf+seg → pypvroof → stats + overlay image
"""
import base64
import json
import math
import os
import time
import tempfile
import importlib.util
from concurrent.futures import ThreadPoolExecutor
import cv2
import folium
import gradio as gr
import requests
import torch
import numpy as np
from PIL import Image
from io import BytesIO
import geojson as gj
from area import area as geojson_area
from shapely.geometry import shape, mapping, Polygon
from shapely.ops import unary_union
from huggingface_hub import hf_hub_download
# ── Constants ─────────────────────────────────────────────────────────────────
GSD = 0.2
CLF_PX = 299 # native patch size: fetched at 299 px / 0.2 m/px (59.8 m)
SEG_PX = 400 # segmentation input size (patch upscaled, georef unchanged)
CLF_THR = 0.15
# ── Models (loaded once) ───────────────────────────────────────────────────────
_clf = None
_seg = None
def load_models():
global _clf, _seg
if _clf is None:
path = hf_hub_download("gabrielkasmi/bdappv-models", "model.py")
spec = importlib.util.spec_from_file_location("bdappv_model", path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
_clf = mod.load_classification_model("ign", device="cpu")
_seg = mod.load_segmentation_model("ign", device="cpu")
return _clf, _seg
# ── Pipeline ──────────────────────────────────────────────────────────────────
def compute_bbox(lat, lon, coverage_m):
d_lat = coverage_m / 111_320
d_lon = coverage_m / (111_320 * math.cos(math.radians(lat)))
return lat - d_lat/2, lat + d_lat/2, lon - d_lon/2, lon + d_lon/2
def fetch_ign(south, north, west, east, px):
url = (
"https://data.geopf.fr/wms-r"
"?SERVICE=WMS&VERSION=1.3.0&REQUEST=GetMap"
"&FORMAT=image/png&LAYERS=ORTHOIMAGERY.ORTHOPHOTOS"
"&CRS=EPSG:4326&STYLES="
f"&BBOX={south},{west},{north},{east}&WIDTH={px}&HEIGHT={px}"
)
r = requests.get(url, timeout=30)
r.raise_for_status()
return Image.open(BytesIO(r.content)).convert("RGB")
def preprocess(img, size):
t = torch.tensor(np.array(img.resize((size, size), Image.BILINEAR))).permute(2, 0, 1).float() / 255.0
mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
return ((t - mean) / std).unsqueeze(0)
def run_clf_batch(clf, imgs):
"""Batch-classify patches → list of probabilities."""
t = torch.cat([preprocess(im, CLF_PX) for im in imgs])
with torch.no_grad():
logits = clf(t)
if hasattr(logits, "logits"): logits = logits.logits
return torch.sigmoid(logits).flatten().tolist()
def run_seg_batch(seg, imgs):
"""Batch-segment patches → list of binary masks (uint8, 0/255)."""
t = torch.cat([preprocess(im, SEG_PX) for im in imgs])
with torch.no_grad():
out = seg(t)["out"]
return [(torch.sigmoid(out[i, 0]) > 0.5).numpy().astype(np.uint8) * 255
for i in range(len(imgs))]
def mask_to_features(mask, south, north, west, east):
H, W = mask.shape
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
features = []
for c in contours:
if cv2.contourArea(c) < 10: continue
coords = []
for pt in c:
px, py = pt[0]
coords.append([west + (px / W) * (east - west),
north - (py / H) * (north - south)])
coords.append(coords[0])
features.append(gj.Feature(geometry=gj.Polygon([coords]), properties={}))
return features
MERGE_EPS = 3e-6 # ~0.3 m in degrees — bridges the ≤1 px gap at patch borders
def merge_features(features):
"""Union polygons split across patch borders into single installations."""
if not features:
return features
geoms = [shape(f["geometry"]).buffer(MERGE_EPS) for f in features]
merged = unary_union(geoms).buffer(-MERGE_EPS)
if merged.is_empty:
return []
polys = list(merged.geoms) if merged.geom_type == "MultiPolygon" else [merged]
return [gj.Feature(geometry=mapping(p), properties={}) for p in polys]
# Vendored from pypvroof (constant tilt / bounding-box azimuth / constant
# regression). pypvroof itself pulls GDAL & rasterio at import time, which
# breaks fresh installs — the methods used here never touch them.
TILT_DEG = 20.0 # constant-tilt
KWP_PER_M2 = 1 / 6.5 # pypvroof default-coefficient (kWp per m²)
def _azimuth_bounding_box(feature):
"""Orientation of the long side of the minimum rotated rectangle.
pypvroof convention: 0 = south-facing, ±90 = east/west, ±180 = north."""
poly = Polygon(feature["geometry"]["coordinates"][0])
x, y = poly.minimum_rotated_rectangle.exterior.coords.xy
ulx, uly = x[1], y[1]
llx, lly = x[2], y[2]
lrx, lry = x[3], y[3]
side_long = math.hypot(lrx - llx, lry - lly)
side_short = math.hypot(ulx - llx, uly - lly)
angle_short = 90 - math.degrees(math.atan2(lry - lly, lrx - llx))
angle_long = -90 + math.degrees(math.atan2(uly - lly, ulx - llx))
phi = angle_long if side_long >= side_short else angle_short
return (-phi + 180) if phi > 0 else (abs(phi) - 180)
def extract_chars(features):
results = []
for f in features:
try:
coords = f["geometry"]["coordinates"][0]
lon, lat = (float(v) for v in np.mean(coords, axis=0))
surface = geojson_area(f["geometry"]) / math.cos(math.radians(TILT_DEG))
results.append({"lon": lon, "lat": lat, "tilt": TILT_DEG,
"azimuth": float(_azimuth_bounding_box(f)),
"capacity": surface * KWP_PER_M2, "surface": surface})
except Exception:
results.append({})
return results
# ── Detection entry point ─────────────────────────────────────────────────────
BATCH = 16
FETCH_WORKERS = 6
def detect(lat_str, lon_str, coverage, progress=gr.Progress()):
try:
lat, lon = float(lat_str), float(lon_str)
except (ValueError, TypeError):
return (error_stats("No zone selected — click the map or search a location."),
gr.update(visible=False), "", gr.update())
# Pave the zone with n×n patches of CLF_PX*GSD meters (59.8 m) so each
# WMS request is natively at model resolution. Covered area ≥ coverage.
patch_m = CLF_PX * GSD
n = max(1, math.ceil(coverage / patch_m))
south, north, west, east = compute_bbox(lat, lon, n * patch_m)
d_lat = (north - south) / n
d_lon = (east - west) / n
boxes = [(north - (r + 1) * d_lat, north - r * d_lat,
west + c * d_lon, west + (c + 1) * d_lon)
for r in range(n) for c in range(n)]
clf, seg = load_models()
features = []
try:
for i in range(0, len(boxes), BATCH):
progress(i / len(boxes), desc=f"Patches {i}/{len(boxes)}")
chunk = boxes[i:i + BATCH]
with ThreadPoolExecutor(max_workers=FETCH_WORKERS) as ex:
imgs = list(ex.map(lambda b: fetch_ign(*b, CLF_PX), chunk))
probs = run_clf_batch(clf, imgs)
positives = [(im, b) for im, b, p in zip(imgs, chunk, probs) if p > CLF_THR]
del imgs # flush negatives immediately
if positives:
masks = run_seg_batch(seg, [im for im, _ in positives])
for mask, (_, b) in zip(masks, positives):
features += mask_to_features(mask, *b)
except Exception as e:
return error_stats(f"Detection failed: {e}"), gr.update(visible=False), "", gr.update()
progress(1.0, desc="Extracting characteristics")
features = merge_features(features)
chars = extract_chars(features)
for feat, c in zip(features, chars):
feat["properties"].update(c)
fc = gj.FeatureCollection(features)
geojson_path = os.path.join(tempfile.gettempdir(), "detections.geojson")
fc_str = json.dumps(fc, indent=2)
with open(geojson_path, "w") as f:
f.write(fc_str)
return (build_stats_html(chars), gr.update(value=geojson_path, visible=True),
fc_str, gr.update(visible=True))
# ── Geocoding (IGN Géoplateforme) ─────────────────────────────────────────────
def geocode(q):
q = (q or "").strip()
if not q:
return gr.update(), gr.update(), gr.update()
try:
r = requests.get("https://data.geopf.fr/geocodage/search",
params={"q": q, "limit": 1}, timeout=10)
r.raise_for_status()
feats = r.json().get("features", [])
if not feats:
gr.Warning(f"No result for '{q}'.")
return gr.update(), gr.update(), gr.update()
lon, lat = feats[0]["geometry"]["coordinates"]
center = json.dumps({"lat": lat, "lon": lon, "ts": time.time()})
return f"{lat:.6f}", f"{lon:.6f}", center
except Exception as e:
gr.Warning(f"Search failed: {e}")
return gr.update(), gr.update(), gr.update()
# ── Map ───────────────────────────────────────────────────────────────────────
def make_map_html():
"""Return iframe HTML with folium map encoded as data URL (no file I/O, no srcdoc encoding issues)."""
m = folium.Map(location=[46.5, 2.3], zoom_start=6, tiles=None, max_zoom=19)
# IGN ortho — Géoplateforme WMTS (wxs.ign.fr est décommissionné).
# NB: string normale, PAS un f-string ({z}/{y}/{x} sont remplacés par Leaflet),
# et attr est obligatoire sinon folium lève ValueError.
folium.TileLayer(
tiles=("https://data.geopf.fr/wmts?SERVICE=WMTS&REQUEST=GetTile&VERSION=1.0.0"
"&LAYER=ORTHOIMAGERY.ORTHOPHOTOS&STYLE=normal&TILEMATRIXSET=PM"
"&FORMAT=image/jpeg&TILEMATRIX={z}&TILEROW={y}&TILECOL={x}"),
attr="© IGN / Géoplateforme",
name="IGN Ortho",
max_zoom=19,
max_native_zoom=19,
).add_to(m)
folium.TileLayer("OpenStreetMap", name="OSM", show=False).add_to(m)
folium.LayerControl(position="bottomright").add_to(m)
click_js = """
(function attach() {
var _map = window["__MAP__"];
if (!_map) { setTimeout(attach, 50); return; }
_map.zoomControl.setPosition('bottomright');
var _rect = null, _cov = 500, _lat = null, _lon = null, _mode = 'select';
function drawRect() {
if (_lat === null) return;
if (_rect) { _map.removeLayer(_rect); }
var dLat = _cov/111320, dLon = _cov/(111320*Math.cos(_lat*Math.PI/180));
_rect = L.rectangle(
[[_lat-dLat/2,_lon-dLon/2],[_lat+dLat/2,_lon+dLon/2]],
{color:'#3b82f6',weight:2,fillColor:'#3b82f6',fillOpacity:0.08,dashArray:'6,4'}
).addTo(_map);
}
_map.on('click', function(e) {
if (_mode !== 'select') { return; }
_lat = e.latlng.lat; _lon = e.latlng.lng;
var msg = {type:'pvClick', lat:_lat, lon:_lon};
var w = window;
do { w = w.parent; w.postMessage(msg, '*'); } while (w !== w.parent);
drawRect();
});
var _detLayer = null;
window.addEventListener('message', function(e) {
if (!e.data) return;
if (e.data.type === 'pvCoverage') {
_cov = Number(e.data.coverage) || _cov;
drawRect();
}
if (e.data.type === 'pvCenter') {
if (e.data.cov) { _cov = Number(e.data.cov); }
_map.setView([e.data.lat, e.data.lon], 16);
if (_mode === 'select') { _lat = e.data.lat; _lon = e.data.lon; drawRect(); }
}
if (e.data.type === 'pvReset') {
if (_detLayer) { _map.removeLayer(_detLayer); _detLayer = null; }
_mode = 'select';
}
if (e.data.type === 'pvGeojson') {
console.log('[pv-map] geojson received');
_mode = 'view';
if (_rect) { _map.removeLayer(_rect); _rect = null; _lat = null; }
if (_detLayer) { _map.removeLayer(_detLayer); }
_detLayer = L.geoJSON(e.data.fc, {
style: {color:'#dc2626', weight:2, fillColor:'#dc2626', fillOpacity:0.35},
onEachFeature: function(f, layer) {
var p = f.properties || {};
if (p.surface) layer.bindPopup(
'Surface: ' + p.surface.toFixed(0) + ' m²
' +
'Capacity: ' + p.capacity.toFixed(1) + ' kWp
' +
'Azimuth: ' + Math.round(p.azimuth) + '°');
}
}).addTo(_map);
var b = _detLayer.getBounds();
if (b.isValid()) { _map.fitBounds(b, {maxZoom: 18}); }
}
});
})();
""".replace("__MAP__", m.get_name())
m.get_root().script.add_child(folium.Element(click_js))
raw_html = m._repr_html_()
page = ('
Map rooftop solar installations anywhere in France, from aerial imagery.
1. Search a city or click the map to select a zone (up to 1 km²).
2. Press Detect — deep learning models detect and segment PV panels on IGN orthophotos (CPU inference, ~1 min for 1 km²).
3. Click the detected polygons to see surface, capacity and azimuth — and download everything as GeoJSON.
Characteristics estimated with pypvroof (no DEM). To map a large area, we recommend running DeepPVMapper directly from the GitHub repository. Best viewed on a desktop browser.