solarseg / validation.py
0x1ay's picture
feat: regional priors from thesis + confidence breakdown in UI
45299d1
"""
Core validation logic for solar panel analysis.
"""
import math
import numpy as np
import pandas as pd
from PIL import Image
from typing import Optional, Dict, Any, List, Tuple
from config import ValidationConfig, ValidationOutputs, get_api_key
from image_processing import (
fetch_static_google_satellite, meters_per_pixel, centroid_from_mask,
area_from_mask, shading_fraction, draw_overlay,
analyze_detection_for_zoom, refine_zoom_level, MIN_ZOOM, MAX_ZOOM
)
from models import dual_model_detection, yolo_solar_detection, deeplab_solar_detection, simple_solar_mask
# Import efficiency-aware power density
try:
from efficiency_aware_density import get_power_density_for_site, DEFAULT_POWER_DENSITY
EFFICIENCY_MODEL_AVAILABLE = True
except ImportError:
EFFICIENCY_MODEL_AVAILABLE = False
DEFAULT_POWER_DENSITY = 0.18
# Adaptive expansion threshold - trigger when estimated < 85% of actual
EXPANSION_CAPACITY_THRESHOLD = 0.85
# Zoom refinement settings
MAX_ZOOM_ITERATIONS = 3 # Maximum zoom adjustment iterations
def process_with_zoom_refinement(lat: float, lon: float, initial_zoom: int,
yolo_model, cfg: ValidationConfig,
max_iterations: int = MAX_ZOOM_ITERATIONS) -> Dict[str, Any]:
"""
Process a site with iterative zoom refinement.
This function automatically adjusts zoom level based on detection results:
- Increases zoom if detected area < 1000 pixels (need more detail)
- Decreases zoom if panels touch >= 2 edges (installation too large)
Args:
lat: Latitude
lon: Longitude
initial_zoom: Starting zoom level
yolo_model: Loaded YOLO model
cfg: Validation configuration
max_iterations: Maximum refinement iterations
Returns:
Dictionary with:
- final_zoom: The refined zoom level
- raw_img: Final satellite image
- mask: Final detection mask
- rgb: Processed RGB array
- zoom_history: List of (zoom, reason) tuples
- iterations: Number of iterations performed
"""
from image_processing import fetch_static_map
from config import get_api_key
api_key = get_api_key()
current_zoom = initial_zoom
zoom_history = [(current_zoom, "Initial zoom level")]
final_result = None
for iteration in range(max_iterations):
# Create config with current zoom
iter_cfg = ValidationConfig(
zoom=current_zoom,
yolo_confidence=cfg.yolo_confidence,
panel_power_density_kwp_m2=cfg.panel_power_density_kwp_m2
)
# Fetch image at current zoom
try:
raw_img = fetch_static_map(
lat, lon, current_zoom,
(iter_cfg.image_size_px, iter_cfg.image_size_px),
iter_cfg.maptype, api_key
)
except Exception as e:
return {
'success': False,
'error': f"Failed to fetch image at zoom {current_zoom}: {str(e)}",
'final_zoom': current_zoom,
'zoom_history': zoom_history,
'iterations': iteration + 1
}
# Process image
proc_img = raw_img.resize((iter_cfg.model_input_size, iter_cfg.model_input_size), Image.LANCZOS)
rgb = np.array(proc_img)
# Run detection
mask = yolo_solar_detection(rgb, yolo_model, iter_cfg)
# Analyze detection for zoom adjustment
analysis = analyze_detection_for_zoom(mask)
new_zoom, reason = refine_zoom_level(current_zoom, analysis)
# Store current result
final_result = {
'success': True,
'final_zoom': current_zoom,
'raw_img': raw_img,
'mask': mask,
'rgb': rgb,
'proc_img': proc_img,
'zoom_history': zoom_history,
'iterations': iteration + 1,
'analysis': analysis,
'cfg': iter_cfg
}
# Check if zoom needs adjustment
if new_zoom == current_zoom:
# Zoom is optimal, stop iterating
zoom_history.append((current_zoom, reason))
break
else:
# Update zoom and continue
zoom_history.append((new_zoom, reason))
current_zoom = new_zoom
return final_result
def process_single_site(lat: float, lon: float, cfg: ValidationConfig, yolo_model,
deeplab_model, detection_mode: str = "YOLO Only",
site_info: Optional[Dict] = None, use_advanced_ml: bool = False,
tilt_angle: float = None, azimuth_angle: float = None,
enable_zoom_refinement: bool = True) -> Dict[str, Any]:
"""
Process a single site for solar panel validation.
Args:
lat, lon: Site coordinates
cfg: Validation configuration
yolo_model: Loaded YOLO model
deeplab_model: Loaded DeepLab model
detection_mode: Detection mode ("YOLO Only", "DeepLab Only", "Both Models (Dual)")
site_info: Optional site information from database
enable_zoom_refinement: Whether to use iterative zoom refinement
Returns:
Dictionary containing all processing results
"""
# Get location-specific power density using efficiency model
if EFFICIENCY_MODEL_AVAILABLE:
power_density = get_power_density_for_site(lat, lon)
else:
power_density = cfg.panel_power_density_kwp_m2
# Update config with location-specific power density
cfg = ValidationConfig(
zoom=cfg.zoom,
yolo_confidence=cfg.yolo_confidence,
panel_power_density_kwp_m2=power_density
)
try:
# Check if we should use zoom refinement
# Use refinement when: YOLO mode, no known capacity, and refinement enabled
actual_capacity = float(site_info.get('Capacity', 0)) if site_info else 0
use_zoom_refinement = (
enable_zoom_refinement and
detection_mode == "YOLO Only" and
yolo_model is not None and
actual_capacity <= 0 # No known capacity to guide zoom
)
zoom_refinement_result = None
if use_zoom_refinement:
# Use iterative zoom refinement
zoom_refinement_result = process_with_zoom_refinement(
lat, lon, cfg.zoom, yolo_model, cfg
)
if not zoom_refinement_result.get('success', False):
return zoom_refinement_result
# Use refined results
raw_img = zoom_refinement_result['raw_img']
proc_img = zoom_refinement_result['proc_img']
rgb = zoom_refinement_result['rgb']
mask = zoom_refinement_result['mask']
cfg = zoom_refinement_result['cfg'] # Updated config with refined zoom
# Calculate image parameters with refined zoom
mpp = meters_per_pixel(lat, cfg.zoom)
ground_span_m = cfg.image_size_px * mpp
pin_px = (cfg.image_size_px // 2, cfg.image_size_px // 2)
detection_results = None
else:
# 1. Fetch satellite image (standard path)
raw_img = fetch_static_google_satellite(lat, lon, cfg)
proc_img = raw_img.resize((cfg.model_input_size, cfg.model_input_size), Image.LANCZOS)
rgb = np.array(proc_img)
# Calculate image parameters
mpp = meters_per_pixel(lat, cfg.zoom)
ground_span_m = cfg.image_size_px * mpp
pin_px = (cfg.image_size_px // 2, cfg.image_size_px // 2)
# 2. Run detection based on mode
detection_results = None
if detection_mode == "Both Models (Dual)":
detection_results = dual_model_detection(rgb, yolo_model, deeplab_model, cfg)
mask = detection_results['combined_mask']
elif detection_mode == "YOLO Only" and yolo_model is not None:
mask = yolo_solar_detection(rgb, yolo_model, cfg)
elif detection_mode == "DeepLab Only" and deeplab_model is not None:
mask = deeplab_solar_detection(rgb, deeplab_model, cfg)
else:
# Fallback to simple detection
mask = simple_solar_mask(rgb)
# 2b. Extract detection confidence from same inference
from models import get_yolo_mean_confidence
if detection_mode == "YOLO Only" and yolo_model is not None:
yolo_mean_conf = get_yolo_mean_confidence(rgb, yolo_model, cfg)
else:
yolo_mean_conf = 0.0
# 3. Calculate metrics
centroid = centroid_from_mask(mask)
area_m2 = area_from_mask(mask, mpp)
capacity_kwp = area_m2 * cfg.panel_power_density_kwp_m2
capacity_kva = capacity_kwp / cfg.dc_ac_ratio
# Advanced ML Capacity Estimation (if enabled)
advanced_ml_result = None
if use_advanced_ml and tilt_angle is not None and azimuth_angle is not None:
try:
from models import get_advanced_capacity_estimation
advanced_ml_result = get_advanced_capacity_estimation(
proc_img, tilt=tilt_angle, azimuth=azimuth_angle
)
if advanced_ml_result.get('success'):
# Use advanced ML capacity if successful
capacity_kwp = advanced_ml_result.get('capacity_kw', capacity_kwp)
capacity_kva = capacity_kwp / cfg.dc_ac_ratio
except Exception as e:
advanced_ml_result = {
'success': False,
'error': f"Advanced ML estimation failed: {str(e)}"
}
# Calculate centroid offset
if centroid is not None:
cx, cy = centroid
dx_px = (cx - pin_px[0])
dy_px = (cy - pin_px[1])
offset_px = math.hypot(dx_px, dy_px)
offset_m = offset_px * mpp
else:
offset_m = float("inf")
centroid_ok = (offset_m <= cfg.centroid_tolerance_m)
# Calculate location warning based on 20% of sqrt(detected area)
# This is a dynamic threshold based on site size
location_threshold_m = 0.20 * math.sqrt(area_m2) if area_m2 > 0 else 0
location_warning = offset_m > location_threshold_m if area_m2 > 0 else False
shade_frac = shading_fraction(rgb, mask)
# 4. Assess detection quality
quality_result = validate_detection_quality(mask, cfg)
quality_score = quality_result['quality_score']
quality_issues = quality_result['quality_issues']
# Map quality score to category
if quality_score >= 0.8:
quality_assessment = "excellent"
elif quality_score >= 0.6:
quality_assessment = "good"
elif quality_score >= 0.3:
quality_assessment = "acceptable"
else:
quality_assessment = "poor"
# 5. Create overlay image
overlay = draw_overlay(proc_img, mask, pin_px, centroid)
# 6. Generate outputs
outputs = ValidationOutputs(
pin=(lat, lon),
zoom=cfg.zoom,
meters_per_pixel=mpp,
image_ground_span_m=ground_span_m,
detected_area_m2=area_m2,
capacity_kwp=capacity_kwp,
capacity_kva_est=capacity_kva,
centroid_offset_m=offset_m,
centroid_within_tolerance=centroid_ok,
shading_fraction=shade_frac,
notes="Processing completed successfully"
)
# Add site info if available
if site_info:
outputs.site_id = str(site_info.get('Id', ''))
outputs.developer = str(site_info.get('Developer', ''))
outputs.site_name = str(site_info.get('SiteName', ''))
outputs.cod = str(site_info.get('COD', ''))
outputs.country = str(site_info.get('Country', ''))
outputs.technology_type = str(site_info.get('Technology Type', ''))
outputs.actual_capacity = float(site_info.get('Capacity', 0))
# 7. Check if adaptive expansion is needed
expansion_result = None
actual_capacity = outputs.actual_capacity if outputs.actual_capacity else 0
if actual_capacity > 0 and detection_mode == "YOLO Only" and yolo_model is not None:
capacity_ratio = capacity_kwp / actual_capacity
if capacity_ratio < EXPANSION_CAPACITY_THRESHOLD:
# Trigger adaptive expansion
try:
from adaptive_expansion import run_adaptive_expansion, should_trigger_expansion
api_key = get_api_key()
if api_key and should_trigger_expansion(capacity_kwp, actual_capacity, EXPANSION_CAPACITY_THRESHOLD):
# Build initial result dict for expansion
initial_result = {
'success': True,
'raw_img': raw_img,
'mask': mask
}
expansion_result = run_adaptive_expansion(
lat, lon, actual_capacity,
cfg, yolo_model, api_key,
initial_result
)
# If expansion was successful, update outputs with expanded results
if expansion_result.success and expansion_result.estimated_capacity_kwp > capacity_kwp:
# Update capacity and area with expanded values
outputs.detected_area_m2 = expansion_result.total_area_m2
outputs.capacity_kwp = expansion_result.estimated_capacity_kwp
outputs.capacity_kva_est = expansion_result.estimated_capacity_kwp / cfg.dc_ac_ratio
outputs.notes = f"Expanded detection: {expansion_result.num_tiles} tiles, {expansion_result.iterations} iterations"
# Update overlay with stitched version
if expansion_result.stitched_overlay:
overlay = expansion_result.stitched_overlay
# Update mask with combined mask
if expansion_result.combined_mask is not None:
mask = expansion_result.combined_mask
# Recalculate quality
quality_result = validate_detection_quality(mask, cfg)
quality_score = quality_result['quality_score']
quality_issues = quality_result['quality_issues']
if quality_score >= 0.8:
quality_assessment = "excellent"
elif quality_score >= 0.6:
quality_assessment = "good"
elif quality_score >= 0.3:
quality_assessment = "acceptable"
else:
quality_assessment = "poor"
# Update capacity values
capacity_kwp = expansion_result.estimated_capacity_kwp
area_m2 = expansion_result.total_area_m2
except Exception as e:
# Log but don't fail - expansion is optional enhancement
print(f"Adaptive expansion failed: {e}")
expansion_result = None
return {
'success': True,
'raw_img': raw_img,
'rgb': rgb,
'overlay': overlay,
'outputs': outputs,
'quality_score': quality_score,
'quality_assessment': quality_assessment,
'quality_issues': quality_issues,
'cfg': cfg,
'detection_results': detection_results,
'detection_mode': detection_mode,
'site_info': site_info,
'mpp': mpp,
'ground_span_m': ground_span_m,
'mask': mask,
'centroid': centroid,
'shade_frac': shade_frac,
'lat': lat,
'lon': lon,
'advanced_ml_result': advanced_ml_result,
'use_advanced_ml': use_advanced_ml,
'tilt_angle': tilt_angle,
'azimuth_angle': azimuth_angle,
'expansion_result': expansion_result,
'expansion_triggered': expansion_result is not None,
'zoom_refinement_result': zoom_refinement_result,
'zoom_refined': zoom_refinement_result is not None,
'power_density_kwp_m2': cfg.panel_power_density_kwp_m2,
'location_warning': location_warning,
'location_threshold_m': location_threshold_m,
'centroid_offset_m': offset_m,
'yolo_mean_conf': yolo_mean_conf
}
except Exception as e:
return {
'success': False,
'error': str(e),
'lat': lat,
'lon': lon
}
def generate_txt_report(outputs: ValidationOutputs, cfg: ValidationConfig, lat: float, lon: float) -> str:
"""Generate text report for validation results"""
report_lines = [
"=== SOLAR SITE VALIDATION REPORT ===",
f"Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"SITE INFORMATION:",
f" Coordinates: {lat:.6f}, {lon:.6f}",
f" Site ID: {outputs.site_id}",
f" Site Name: {outputs.site_name}",
f" Developer: {outputs.developer}",
f" Country: {outputs.country}",
f" Technology: {outputs.technology_type}",
f" COD: {outputs.cod}",
"",
"IMAGE PARAMETERS:",
f" Provider: {cfg.provider}",
f" Zoom Level: {cfg.zoom}",
f" Image Size: {cfg.image_size_px}x{cfg.image_size_px} pixels",
f" Ground Resolution: {outputs.meters_per_pixel:.3f} m/pixel",
f" Ground Span: {outputs.image_ground_span_m:.0f} m",
"",
"DETECTION RESULTS:",
f" Detected Area: {outputs.detected_area_m2:,.0f} m²",
f" Estimated Capacity: {outputs.capacity_kwp:,.1f} kWp",
f" Estimated AC Capacity: {outputs.capacity_kva_est:,.1f} kVA",
f" Panel Density Used: {cfg.panel_power_density_kwp_m2:.3f} kWp/m²",
f" DC/AC Ratio: {cfg.dc_ac_ratio:.2f}",
"",
"VALIDATION METRICS:",
f" Area Detection: {'PASS' if outputs.detected_area_m2 > 0 else 'FAIL'}",
f" Processing Status: COMPLETE",
"",
"CAPACITY COMPARISON:",
]
if outputs.actual_capacity > 0:
diff_kwp = outputs.capacity_kwp - outputs.actual_capacity
diff_pct = (diff_kwp / outputs.actual_capacity) * 100
report_lines.extend([
f" Actual Capacity: {outputs.actual_capacity:,.1f} kWp",
f" Detected Capacity: {outputs.capacity_kwp:,.1f} kWp",
f" Difference: {diff_kwp:+.1f} kWp ({diff_pct:+.1f}%)",
])
else:
report_lines.append(" No actual capacity data available for comparison")
report_lines.extend([
"",
"NOTES:",
f" {outputs.notes}",
"",
"=== END REPORT ==="
])
return "\n".join(report_lines)
def validate_detection_quality(mask: np.ndarray, cfg: ValidationConfig) -> Dict[str, Any]:
"""Validate the quality of detection results"""
# Basic statistics
total_pixels = mask.size
detected_pixels = np.sum(mask > 0)
detection_coverage = detected_pixels / total_pixels if total_pixels > 0 else 0
# Shape analysis
if detected_pixels > 0:
# Connected components analysis
try:
from skimage import measure, morphology
labeled = measure.label(mask > 0)
regions = measure.regionprops(labeled)
num_components = len(regions)
largest_component_area = max([r.area for r in regions]) if regions else 0
avg_component_area = np.mean([r.area for r in regions]) if regions else 0
# Calculate compactness (closer to 1 is more compact/square-like)
compactness_scores = []
for region in regions:
if region.area > 10: # Only consider significant regions
perimeter = region.perimeter
area = region.area
compactness = 4 * np.pi * area / (perimeter ** 2) if perimeter > 0 else 0
compactness_scores.append(compactness)
avg_compactness = np.mean(compactness_scores) if compactness_scores else 0
except Exception:
# Fallback if skimage not available
num_components = 1 if detected_pixels > 0 else 0
largest_component_area = detected_pixels
avg_component_area = detected_pixels
avg_compactness = 0.5
else:
num_components = 0
largest_component_area = 0
avg_component_area = 0
avg_compactness = 0
# Quality assessment
quality_score = 0
quality_issues = []
# Coverage check
if detection_coverage < 0.001: # Less than 0.1% coverage
quality_issues.append("Very low detection coverage")
elif detection_coverage > 0.5: # More than 50% coverage
quality_issues.append("Unusually high detection coverage")
else:
quality_score += 0.3
# Component count check
if num_components == 0:
quality_issues.append("No detection found")
elif num_components > 20:
quality_issues.append("Too many small components (may be noise)")
else:
quality_score += 0.3
# Compactness check
if avg_compactness > 0.3:
quality_score += 0.4
elif avg_compactness < 0.1:
quality_issues.append("Detected shapes are not compact (may be artifacts)")
return {
'quality_score': quality_score,
'quality_issues': quality_issues,
'detection_coverage': detection_coverage,
'num_components': num_components,
'largest_component_area': largest_component_area,
'avg_component_area': avg_component_area,
'avg_compactness': avg_compactness,
'total_pixels': total_pixels,
'detected_pixels': detected_pixels
}
def batch_process_sites(sites_data: list, cfg: ValidationConfig, yolo_model,
deeplab_model, detection_mode: str = "YOLO Only") -> list:
"""Process multiple sites in batch"""
results = []
for i, site in enumerate(sites_data):
try:
lat = float(site.get('Latitude', 0))
lon = float(site.get('Longitude', 0))
if lat == 0 and lon == 0:
continue
result = process_single_site(lat, lon, cfg, yolo_model, deeplab_model,
detection_mode, site)
results.append(result)
except Exception as e:
results.append({
'success': False,
'error': str(e),
'site_data': site,
'index': i
})
return results