""" Image processing and analysis utilities. """ import io import math import numpy as np import requests from PIL import Image, ImageDraw from typing import Optional, Tuple from config import ValidationConfig, EARTH_RADIUS_M, WEB_MERCATOR_SIZE # Try to import OpenCV try: import cv2 CV2_AVAILABLE = True CV2_ERROR = None except Exception as e: CV2_AVAILABLE = False cv2 = None CV2_ERROR = str(e) # Try to import skimage try: from skimage import measure, morphology, filters, color SKIMAGE_AVAILABLE = True except Exception: SKIMAGE_AVAILABLE = False measure = morphology = filters = color = None def meters_per_pixel(latitude_deg: float, zoom: int) -> float: """Calculate meters per pixel for given latitude and zoom level""" lat_rad = math.radians(latitude_deg) meters_per_pixel_equator = 2 * math.pi * EARTH_RADIUS_M / (WEB_MERCATOR_SIZE * (2 ** zoom)) return meters_per_pixel_equator * math.cos(lat_rad) def to_bytes(img: Image.Image, fmt="PNG") -> bytes: """Convert PIL Image to bytes""" buf = io.BytesIO() img.save(buf, format=fmt) return buf.getvalue() def zoom_for_target_mpp(lat_deg: float, target_mpp: float) -> int: """Calculate zoom level needed to achieve target meters per pixel""" for zoom in range(1, 22): mpp = meters_per_pixel(lat_deg, zoom) if mpp <= target_mpp: return zoom return 21 # Max zoom def calculate_adaptive_zoom(capacity_kwp: float, lat_deg: float = 0.0) -> int: """ Calculate adaptive zoom level based on site capacity. Optimized for YOLO model detection performance. Zoom levels below 18 are too wide for reliable detection. For large sites (>1000 kWp), use zoom 18 with multi-tile expansion. Args: capacity_kwp: Site capacity in kWp lat_deg: Latitude (optional, for more precise calculation) Returns: int: Recommended zoom level (18-21) Zoom level guidelines (based on model detection capability): - 21: < 60 kWp (~0.06 m/px, residential) - 20: 60-150 kWp (~0.12 m/px, small commercial) - 19: 151-350 kWp (~0.23 m/px, medium commercial) [default] - 18: 351-1000 kWp (~0.46 m/px, multi-tile stitching) - >1000 kWp: Use zoom 18 with edge detection for multi-tile expansion """ if capacity_kwp <= 0: return 19 # Default zoom for unknown capacity # Define capacity thresholds and corresponding zoom levels # Note: Zoom below 18 is too wide for model to detect panels reliably if capacity_kwp <= 60: return 21 elif capacity_kwp <= 150: return 20 elif capacity_kwp <= 350: return 19 elif capacity_kwp <= 1000: return 18 else: # For large sites (>1000 kWp), use zoom 18 with multi-tile expansion # Edge detection will trigger adaptive expansion automatically return 18 # Zoom refinement constants MIN_DETECTION_PIXELS = 1000 # Minimum detected pixels for valid detection EDGE_CONTACT_THRESHOLD = 0.05 # 5% of edge must have mask to count as "touching" MIN_ZOOM = 18 # Minimum zoom level (zooms below 18 don't detect panels reliably) MAX_ZOOM = 21 # Maximum zoom level (highest detail) def analyze_detection_for_zoom(mask: np.ndarray) -> dict: """ Analyze detection mask to determine if zoom adjustment is needed. Args: mask: Binary detection mask Returns: Dictionary with analysis results: - detected_pixels: Number of detected pixels - edges_touched: List of edges where mask touches boundary - num_edges_touched: Count of edges touched - needs_zoom_increase: True if zoom should increase (more detail) - needs_zoom_decrease: True if zoom should decrease (larger area) """ height, width = mask.shape detected_pixels = np.sum(mask > 0) # Calculate threshold for edge contact threshold_pixels_h = int(width * EDGE_CONTACT_THRESHOLD) threshold_pixels_v = int(height * EDGE_CONTACT_THRESHOLD) # Check each edge top_edge = mask[0, :] bottom_edge = mask[height - 1, :] left_edge = mask[:, 0] right_edge = mask[:, width - 1] edges_touched = [] if np.sum(top_edge > 0) > threshold_pixels_h: edges_touched.append('north') if np.sum(bottom_edge > 0) > threshold_pixels_h: edges_touched.append('south') if np.sum(left_edge > 0) > threshold_pixels_v: edges_touched.append('west') if np.sum(right_edge > 0) > threshold_pixels_v: edges_touched.append('east') num_edges_touched = len(edges_touched) # Determine zoom adjustment needs needs_zoom_increase = detected_pixels < MIN_DETECTION_PIXELS and detected_pixels > 0 needs_zoom_decrease = num_edges_touched >= 2 # Panels touching 2+ edges = too zoomed in return { 'detected_pixels': detected_pixels, 'edges_touched': edges_touched, 'num_edges_touched': num_edges_touched, 'needs_zoom_increase': needs_zoom_increase, 'needs_zoom_decrease': needs_zoom_decrease } def refine_zoom_level(current_zoom: int, analysis: dict) -> Tuple[int, str]: """ Determine the refined zoom level based on detection analysis. Args: current_zoom: Current zoom level analysis: Result from analyze_detection_for_zoom Returns: Tuple of (new_zoom_level, reason_string) """ if analysis['needs_zoom_decrease'] and current_zoom > MIN_ZOOM: # Panels exceed boundaries - zoom out new_zoom = current_zoom - 1 reason = f"Panels touch {analysis['num_edges_touched']} edges ({', '.join(analysis['edges_touched'])}), zooming out" return new_zoom, reason elif analysis['needs_zoom_increase'] and current_zoom < MAX_ZOOM: # Too few pixels detected - zoom in for more detail new_zoom = current_zoom + 1 reason = f"Only {analysis['detected_pixels']} pixels detected (< {MIN_DETECTION_PIXELS}), zooming in" return new_zoom, reason else: # No adjustment needed or at limits return current_zoom, "Zoom level optimal" def fetch_static_map(center_lat: float, center_lon: float, zoom: int, size: tuple, maptype: str = "satellite", api_key: str = None) -> Image.Image: """Fetch satellite image from Google Static Maps API""" if not api_key: raise ValueError("API key is required") size_str = f"{size[0]}x{size[1]}" url = ( "https://maps.googleapis.com/maps/api/staticmap" f"?center={center_lat},{center_lon}&zoom={zoom}&size={size_str}" f"&maptype={maptype}&key={api_key}" ) response = requests.get(url, timeout=30) response.raise_for_status() return Image.open(io.BytesIO(response.content)).convert("RGB") def fetch_static_google_satellite(lat: float, lon: float, cfg: ValidationConfig) -> Image.Image: """Fetch satellite image using ValidationConfig""" from config import get_api_key api_key = get_api_key() if not api_key: raise ValueError("Google Maps API key not found. Please set GOOGLE_MAPS_API_KEY in your .env file.") size = (cfg.image_size_px, cfg.image_size_px) return fetch_static_map(lat, lon, cfg.zoom, size, cfg.maptype, api_key) def calculate_image_parameters(capacity_kw: float, effective_px: int, panel_density: float = 0.18) -> dict: """Calculate optimal image parameters based on expected capacity Args: capacity_kw: Expected capacity in kW effective_px: Effective pixels in the image panel_density: Power density in kWp/m² (default: 0.18) """ # Calculate required area required_area_m2 = capacity_kw / panel_density # Calculate meters per pixel needed total_pixels = effective_px * effective_px mpp_needed = math.sqrt(required_area_m2 / total_pixels) return { 'required_area_m2': required_area_m2, 'mpp_needed': mpp_needed, 'total_pixels': total_pixels, 'panel_density_kwp_m2': panel_density } def preprocess_image(img: Image.Image, target_size: int) -> Image.Image: """Preprocess image to target size while maintaining aspect ratio""" # Resize image to target size img_resized = img.resize((target_size, target_size), Image.LANCZOS) return img_resized def centroid_from_mask(mask: np.ndarray) -> Optional[Tuple[float, float]]: """Calculate centroid from binary mask""" if mask.sum() == 0: return None y, x = np.where(mask > 0) return (float(x.mean()), float(y.mean())) def area_from_mask(mask: np.ndarray, m_per_px: float) -> float: """Calculate area in square meters from binary mask""" pixel_count = np.sum(mask > 0) return pixel_count * (m_per_px ** 2) def shading_fraction(rgb: np.ndarray, mask: np.ndarray) -> float: """Estimate shading fraction within the masked area""" if mask.sum() == 0: return 0.0 # Get pixels within the solar panel mask masked_pixels = rgb[mask > 0] if len(masked_pixels) == 0: return 0.0 # Calculate brightness brightness = np.mean(masked_pixels, axis=1) # Consider pixels with brightness < 50 as heavily shaded shaded_pixels = np.sum(brightness < 50) total_pixels = len(brightness) return shaded_pixels / total_pixels if total_pixels > 0 else 0.0 def draw_overlay(img: Image.Image, mask: np.ndarray, pin_px: Tuple[int, int], centroid_px: Optional[Tuple[float, float]]) -> Image.Image: """Draw overlay with mask, pin, and centroid on image""" # Convert image to RGBA for better blending overlay = img.convert('RGBA') # Create a more visible mask overlay if mask.sum() > 0: # Only if there's a mask # Normalize mask to 0-255 range mask_normalized = ((mask > 0) * 255).astype(np.uint8) # Create green overlay with higher opacity mask_pil = Image.fromarray(mask_normalized, mode='L') green_overlay = Image.new('RGBA', img.size, (0, 255, 0, 120)) # Increased alpha to 120 # Create mask for blending mask_rgba = Image.new('RGBA', img.size, (0, 0, 0, 0)) mask_rgba.paste(green_overlay, mask=mask_pil) # Blend with original image overlay = Image.alpha_composite(overlay, mask_rgba) # Convert back to RGB for drawing overlay_rgb = overlay.convert('RGB') draw = ImageDraw.Draw(overlay_rgb) # Draw pin (larger red circle for better visibility) pin_radius = 8 draw.ellipse([ (pin_px[0] - pin_radius, pin_px[1] - pin_radius), (pin_px[0] + pin_radius, pin_px[1] + pin_radius) ], fill='red', outline='darkred', width=3) # Draw centroid (larger blue circle) if available if centroid_px: centroid_radius = 6 cx, cy = int(centroid_px[0]), int(centroid_px[1]) draw.ellipse([ (cx - centroid_radius, cy - centroid_radius), (cx + centroid_radius, cy + centroid_radius) ], fill='blue', outline='white', width=2) # Draw line from pin to centroid (thicker and more visible) draw.line([(pin_px[0], pin_px[1]), (cx, cy)], fill='yellow', width=4) return overlay_rgb def sanitize_name(name: str) -> str: """Sanitize string for use in filenames""" import re return re.sub(r'[<>:"/\\|?*]', '_', str(name)) def get_image_processing_status(): """Return status of image processing libraries""" return { 'cv2_available': CV2_AVAILABLE, 'cv2_error': CV2_ERROR, 'skimage_available': SKIMAGE_AVAILABLE }