solarseg / image_processing.py
0x1ay's picture
Sync all changes: demo dataset, quality tiers, confidence scoring, zoom fix, import fix
2d62fcb
"""
Image processing and analysis utilities.
"""
import io
import math
import numpy as np
import requests
from PIL import Image, ImageDraw
from typing import Optional, Tuple
from config import ValidationConfig, EARTH_RADIUS_M, WEB_MERCATOR_SIZE
# Try to import OpenCV
try:
import cv2
CV2_AVAILABLE = True
CV2_ERROR = None
except Exception as e:
CV2_AVAILABLE = False
cv2 = None
CV2_ERROR = str(e)
# Try to import skimage
try:
from skimage import measure, morphology, filters, color
SKIMAGE_AVAILABLE = True
except Exception:
SKIMAGE_AVAILABLE = False
measure = morphology = filters = color = None
def meters_per_pixel(latitude_deg: float, zoom: int) -> float:
"""Calculate meters per pixel for given latitude and zoom level"""
lat_rad = math.radians(latitude_deg)
meters_per_pixel_equator = 2 * math.pi * EARTH_RADIUS_M / (WEB_MERCATOR_SIZE * (2 ** zoom))
return meters_per_pixel_equator * math.cos(lat_rad)
def to_bytes(img: Image.Image, fmt="PNG") -> bytes:
"""Convert PIL Image to bytes"""
buf = io.BytesIO()
img.save(buf, format=fmt)
return buf.getvalue()
def zoom_for_target_mpp(lat_deg: float, target_mpp: float) -> int:
"""Calculate zoom level needed to achieve target meters per pixel"""
for zoom in range(1, 22):
mpp = meters_per_pixel(lat_deg, zoom)
if mpp <= target_mpp:
return zoom
return 21 # Max zoom
def calculate_adaptive_zoom(capacity_kwp: float, lat_deg: float = 0.0) -> int:
"""
Calculate adaptive zoom level based on site capacity.
Optimized for YOLO model detection performance.
Zoom levels below 18 are too wide for reliable detection.
For large sites (>1000 kWp), use zoom 18 with multi-tile expansion.
Args:
capacity_kwp: Site capacity in kWp
lat_deg: Latitude (optional, for more precise calculation)
Returns:
int: Recommended zoom level (18-21)
Zoom level guidelines (based on model detection capability):
- 21: < 60 kWp (~0.06 m/px, residential)
- 20: 60-150 kWp (~0.12 m/px, small commercial)
- 19: 151-350 kWp (~0.23 m/px, medium commercial) [default]
- 18: 351-1000 kWp (~0.46 m/px, multi-tile stitching)
- >1000 kWp: Use zoom 18 with edge detection for multi-tile expansion
"""
if capacity_kwp <= 0:
return 19 # Default zoom for unknown capacity
# Define capacity thresholds and corresponding zoom levels
# Note: Zoom below 18 is too wide for model to detect panels reliably
if capacity_kwp <= 60:
return 21
elif capacity_kwp <= 150:
return 20
elif capacity_kwp <= 350:
return 19
elif capacity_kwp <= 1000:
return 18
else:
# For large sites (>1000 kWp), use zoom 18 with multi-tile expansion
# Edge detection will trigger adaptive expansion automatically
return 18
# Zoom refinement constants
MIN_DETECTION_PIXELS = 1000 # Minimum detected pixels for valid detection
EDGE_CONTACT_THRESHOLD = 0.05 # 5% of edge must have mask to count as "touching"
MIN_ZOOM = 18 # Minimum zoom level (zooms below 18 don't detect panels reliably)
MAX_ZOOM = 21 # Maximum zoom level (highest detail)
def analyze_detection_for_zoom(mask: np.ndarray) -> dict:
"""
Analyze detection mask to determine if zoom adjustment is needed.
Args:
mask: Binary detection mask
Returns:
Dictionary with analysis results:
- detected_pixels: Number of detected pixels
- edges_touched: List of edges where mask touches boundary
- num_edges_touched: Count of edges touched
- needs_zoom_increase: True if zoom should increase (more detail)
- needs_zoom_decrease: True if zoom should decrease (larger area)
"""
height, width = mask.shape
detected_pixels = np.sum(mask > 0)
# Calculate threshold for edge contact
threshold_pixels_h = int(width * EDGE_CONTACT_THRESHOLD)
threshold_pixels_v = int(height * EDGE_CONTACT_THRESHOLD)
# Check each edge
top_edge = mask[0, :]
bottom_edge = mask[height - 1, :]
left_edge = mask[:, 0]
right_edge = mask[:, width - 1]
edges_touched = []
if np.sum(top_edge > 0) > threshold_pixels_h:
edges_touched.append('north')
if np.sum(bottom_edge > 0) > threshold_pixels_h:
edges_touched.append('south')
if np.sum(left_edge > 0) > threshold_pixels_v:
edges_touched.append('west')
if np.sum(right_edge > 0) > threshold_pixels_v:
edges_touched.append('east')
num_edges_touched = len(edges_touched)
# Determine zoom adjustment needs
needs_zoom_increase = detected_pixels < MIN_DETECTION_PIXELS and detected_pixels > 0
needs_zoom_decrease = num_edges_touched >= 2 # Panels touching 2+ edges = too zoomed in
return {
'detected_pixels': detected_pixels,
'edges_touched': edges_touched,
'num_edges_touched': num_edges_touched,
'needs_zoom_increase': needs_zoom_increase,
'needs_zoom_decrease': needs_zoom_decrease
}
def refine_zoom_level(current_zoom: int, analysis: dict) -> Tuple[int, str]:
"""
Determine the refined zoom level based on detection analysis.
Args:
current_zoom: Current zoom level
analysis: Result from analyze_detection_for_zoom
Returns:
Tuple of (new_zoom_level, reason_string)
"""
if analysis['needs_zoom_decrease'] and current_zoom > MIN_ZOOM:
# Panels exceed boundaries - zoom out
new_zoom = current_zoom - 1
reason = f"Panels touch {analysis['num_edges_touched']} edges ({', '.join(analysis['edges_touched'])}), zooming out"
return new_zoom, reason
elif analysis['needs_zoom_increase'] and current_zoom < MAX_ZOOM:
# Too few pixels detected - zoom in for more detail
new_zoom = current_zoom + 1
reason = f"Only {analysis['detected_pixels']} pixels detected (< {MIN_DETECTION_PIXELS}), zooming in"
return new_zoom, reason
else:
# No adjustment needed or at limits
return current_zoom, "Zoom level optimal"
def fetch_static_map(center_lat: float, center_lon: float, zoom: int, size: tuple,
maptype: str = "satellite", api_key: str = None) -> Image.Image:
"""Fetch satellite image from Google Static Maps API"""
if not api_key:
raise ValueError("API key is required")
size_str = f"{size[0]}x{size[1]}"
url = (
"https://maps.googleapis.com/maps/api/staticmap"
f"?center={center_lat},{center_lon}&zoom={zoom}&size={size_str}"
f"&maptype={maptype}&key={api_key}"
)
response = requests.get(url, timeout=30)
response.raise_for_status()
return Image.open(io.BytesIO(response.content)).convert("RGB")
def fetch_static_google_satellite(lat: float, lon: float, cfg: ValidationConfig) -> Image.Image:
"""Fetch satellite image using ValidationConfig"""
from config import get_api_key
api_key = get_api_key()
if not api_key:
raise ValueError("Google Maps API key not found. Please set GOOGLE_MAPS_API_KEY in your .env file.")
size = (cfg.image_size_px, cfg.image_size_px)
return fetch_static_map(lat, lon, cfg.zoom, size, cfg.maptype, api_key)
def calculate_image_parameters(capacity_kw: float, effective_px: int,
panel_density: float = 0.18) -> dict:
"""Calculate optimal image parameters based on expected capacity
Args:
capacity_kw: Expected capacity in kW
effective_px: Effective pixels in the image
panel_density: Power density in kWp/m² (default: 0.18)
"""
# Calculate required area
required_area_m2 = capacity_kw / panel_density
# Calculate meters per pixel needed
total_pixels = effective_px * effective_px
mpp_needed = math.sqrt(required_area_m2 / total_pixels)
return {
'required_area_m2': required_area_m2,
'mpp_needed': mpp_needed,
'total_pixels': total_pixels,
'panel_density_kwp_m2': panel_density
}
def preprocess_image(img: Image.Image, target_size: int) -> Image.Image:
"""Preprocess image to target size while maintaining aspect ratio"""
# Resize image to target size
img_resized = img.resize((target_size, target_size), Image.LANCZOS)
return img_resized
def centroid_from_mask(mask: np.ndarray) -> Optional[Tuple[float, float]]:
"""Calculate centroid from binary mask"""
if mask.sum() == 0:
return None
y, x = np.where(mask > 0)
return (float(x.mean()), float(y.mean()))
def area_from_mask(mask: np.ndarray, m_per_px: float) -> float:
"""Calculate area in square meters from binary mask"""
pixel_count = np.sum(mask > 0)
return pixel_count * (m_per_px ** 2)
def shading_fraction(rgb: np.ndarray, mask: np.ndarray) -> float:
"""Estimate shading fraction within the masked area"""
if mask.sum() == 0:
return 0.0
# Get pixels within the solar panel mask
masked_pixels = rgb[mask > 0]
if len(masked_pixels) == 0:
return 0.0
# Calculate brightness
brightness = np.mean(masked_pixels, axis=1)
# Consider pixels with brightness < 50 as heavily shaded
shaded_pixels = np.sum(brightness < 50)
total_pixels = len(brightness)
return shaded_pixels / total_pixels if total_pixels > 0 else 0.0
def draw_overlay(img: Image.Image, mask: np.ndarray, pin_px: Tuple[int, int],
centroid_px: Optional[Tuple[float, float]]) -> Image.Image:
"""Draw overlay with mask, pin, and centroid on image"""
# Convert image to RGBA for better blending
overlay = img.convert('RGBA')
# Create a more visible mask overlay
if mask.sum() > 0: # Only if there's a mask
# Normalize mask to 0-255 range
mask_normalized = ((mask > 0) * 255).astype(np.uint8)
# Create green overlay with higher opacity
mask_pil = Image.fromarray(mask_normalized, mode='L')
green_overlay = Image.new('RGBA', img.size, (0, 255, 0, 120)) # Increased alpha to 120
# Create mask for blending
mask_rgba = Image.new('RGBA', img.size, (0, 0, 0, 0))
mask_rgba.paste(green_overlay, mask=mask_pil)
# Blend with original image
overlay = Image.alpha_composite(overlay, mask_rgba)
# Convert back to RGB for drawing
overlay_rgb = overlay.convert('RGB')
draw = ImageDraw.Draw(overlay_rgb)
# Draw pin (larger red circle for better visibility)
pin_radius = 8
draw.ellipse([
(pin_px[0] - pin_radius, pin_px[1] - pin_radius),
(pin_px[0] + pin_radius, pin_px[1] + pin_radius)
], fill='red', outline='darkred', width=3)
# Draw centroid (larger blue circle) if available
if centroid_px:
centroid_radius = 6
cx, cy = int(centroid_px[0]), int(centroid_px[1])
draw.ellipse([
(cx - centroid_radius, cy - centroid_radius),
(cx + centroid_radius, cy + centroid_radius)
], fill='blue', outline='white', width=2)
# Draw line from pin to centroid (thicker and more visible)
draw.line([(pin_px[0], pin_px[1]), (cx, cy)], fill='yellow', width=4)
return overlay_rgb
def sanitize_name(name: str) -> str:
"""Sanitize string for use in filenames"""
import re
return re.sub(r'[<>:"/\\|?*]', '_', str(name))
def get_image_processing_status():
"""Return status of image processing libraries"""
return {
'cv2_available': CV2_AVAILABLE,
'cv2_error': CV2_ERROR,
'skimage_available': SKIMAGE_AVAILABLE
}