RoofSegmentation2 / google_solar.py
Deagin's picture
Rewrite: C-RADIOv4-H + RANSAC fusion pipeline
5c52fb9
"""Google Solar API data acquisition.
Handles geocoding, GeoTIFF download (RGB, building mask, DSM),
and parsing with CRS normalization to EPSG:4326.
"""
import io
import numpy as np
import requests
import rasterio
from rasterio.crs import CRS
from PIL import Image
def geocode_address(address: str, api_key: str) -> tuple[float, float, str]:
"""Convert address to lat/lng using Google Geocoding API."""
url = "https://maps.googleapis.com/maps/api/geocode/json"
params = {"address": address, "key": api_key}
response = requests.get(url, params=params)
data = response.json()
if data["status"] != "OK":
raise ValueError(f"Geocoding failed: {data['status']}")
location = data["results"][0]["geometry"]["location"]
formatted_address = data["results"][0]["formatted_address"]
return location["lat"], location["lng"], formatted_address
def fetch_geotiff(
lat: float, lng: float, api_key: str, radius_meters: int = 50
) -> tuple[bytes, bytes | None, bytes | None, dict]:
"""Fetch RGB GeoTIFF, building mask, and DSM from Google Solar API.
Downloads at 0.1 m/pixel resolution. Tries HIGH quality first,
falls back to MEDIUM.
Returns:
(rgb_bytes, mask_bytes, dsm_bytes, layers_info)
"""
layers_url = "https://solar.googleapis.com/v1/dataLayers:get"
params = {
"location.latitude": lat,
"location.longitude": lng,
"radiusMeters": radius_meters,
"view": "FULL_LAYERS",
"requiredQuality": "HIGH",
"pixelSizeMeters": 0.1,
"key": api_key,
}
response = requests.get(layers_url, params=params)
if response.status_code != 200:
params["requiredQuality"] = "MEDIUM"
response = requests.get(layers_url, params=params)
if response.status_code != 200:
raise ValueError(
f"Data Layers API error: {response.status_code} - {response.text}"
)
layers = response.json()
# RGB imagery
rgb_url = layers.get("rgbUrl")
if not rgb_url:
raise ValueError("No RGB imagery available for this location")
rgb_response = requests.get(f"{rgb_url}&key={api_key}")
if rgb_response.status_code != 200:
raise ValueError(f"Failed to download RGB GeoTIFF: {rgb_response.status_code}")
# Building mask
mask_bytes = None
mask_url = layers.get("maskUrl")
if mask_url:
mask_response = requests.get(f"{mask_url}&key={api_key}")
if mask_response.status_code == 200:
mask_bytes = mask_response.content
# DSM (Digital Surface Model)
dsm_bytes = None
dsm_url = layers.get("dsmUrl")
if dsm_url:
dsm_response = requests.get(f"{dsm_url}&key={api_key}")
if dsm_response.status_code == 200:
dsm_bytes = dsm_response.content
return rgb_response.content, mask_bytes, dsm_bytes, layers
def _reproject_bounds(crs, bounds):
"""Reproject bounds to EPSG:4326 if needed."""
if crs and crs != CRS.from_epsg(4326):
from rasterio.warp import transform_bounds
bounds = transform_bounds(crs, CRS.from_epsg(4326), *bounds)
return bounds
def parse_geotiff(geotiff_bytes: bytes) -> tuple[Image.Image, tuple]:
"""Parse RGB GeoTIFF. Returns (PIL Image, bounds) with WGS84 bounds."""
with rasterio.open(io.BytesIO(geotiff_bytes)) as src:
if src.count >= 3:
r, g, b = src.read(1), src.read(2), src.read(3)
img_array = np.stack([r, g, b], axis=-1)
else:
band = src.read(1)
img_array = np.stack([band] * 3, axis=-1)
bounds = _reproject_bounds(src.crs, src.bounds)
return Image.fromarray(img_array.astype(np.uint8)), bounds
def parse_building_mask(mask_bytes: bytes | None) -> tuple[np.ndarray | None, tuple | None]:
"""Parse building mask GeoTIFF. Binary: 1=building, 0=not."""
if not mask_bytes:
return None, None
with rasterio.open(io.BytesIO(mask_bytes)) as src:
mask = src.read(1)
bounds = _reproject_bounds(src.crs, src.bounds)
return mask, bounds
def parse_dsm(dsm_bytes: bytes | None) -> tuple[np.ndarray | None, tuple | None]:
"""Parse DSM GeoTIFF. Returns float height array in meters."""
if not dsm_bytes:
return None, None
with rasterio.open(io.BytesIO(dsm_bytes)) as src:
dsm = src.read(1)
bounds = _reproject_bounds(src.crs, src.bounds)
return dsm, bounds