| """ |
| Image processing and analysis utilities. |
| """ |
|
|
| import io |
| import math |
| import numpy as np |
| import requests |
| from PIL import Image, ImageDraw |
| from typing import Optional, Tuple |
| from config import ValidationConfig, EARTH_RADIUS_M, WEB_MERCATOR_SIZE |
|
|
| |
| try: |
| import cv2 |
| CV2_AVAILABLE = True |
| CV2_ERROR = None |
| except Exception as e: |
| CV2_AVAILABLE = False |
| cv2 = None |
| CV2_ERROR = str(e) |
|
|
| |
| try: |
| from skimage import measure, morphology, filters, color |
| SKIMAGE_AVAILABLE = True |
| except Exception: |
| SKIMAGE_AVAILABLE = False |
| measure = morphology = filters = color = None |
|
|
|
|
| def meters_per_pixel(latitude_deg: float, zoom: int) -> float: |
| """Calculate meters per pixel for given latitude and zoom level""" |
| lat_rad = math.radians(latitude_deg) |
| meters_per_pixel_equator = 2 * math.pi * EARTH_RADIUS_M / (WEB_MERCATOR_SIZE * (2 ** zoom)) |
| return meters_per_pixel_equator * math.cos(lat_rad) |
|
|
|
|
| def to_bytes(img: Image.Image, fmt="PNG") -> bytes: |
| """Convert PIL Image to bytes""" |
| buf = io.BytesIO() |
| img.save(buf, format=fmt) |
| return buf.getvalue() |
|
|
|
|
| def zoom_for_target_mpp(lat_deg: float, target_mpp: float) -> int: |
| """Calculate zoom level needed to achieve target meters per pixel""" |
| for zoom in range(1, 22): |
| mpp = meters_per_pixel(lat_deg, zoom) |
| if mpp <= target_mpp: |
| return zoom |
| return 21 |
|
|
|
|
| def calculate_adaptive_zoom(capacity_kwp: float, lat_deg: float = 0.0) -> int: |
| """ |
| Calculate adaptive zoom level based on site capacity. |
| |
| Optimized for YOLO model detection performance. |
| Zoom levels below 18 are too wide for reliable detection. |
| For large sites (>1000 kWp), use zoom 18 with multi-tile expansion. |
| |
| Args: |
| capacity_kwp: Site capacity in kWp |
| lat_deg: Latitude (optional, for more precise calculation) |
| |
| Returns: |
| int: Recommended zoom level (18-21) |
| |
| Zoom level guidelines (based on model detection capability): |
| - 21: < 60 kWp (~0.06 m/px, residential) |
| - 20: 60-150 kWp (~0.12 m/px, small commercial) |
| - 19: 151-350 kWp (~0.23 m/px, medium commercial) [default] |
| - 18: 351-1000 kWp (~0.46 m/px, multi-tile stitching) |
| - >1000 kWp: Use zoom 18 with edge detection for multi-tile expansion |
| """ |
| if capacity_kwp <= 0: |
| return 19 |
| |
| |
| |
| if capacity_kwp <= 60: |
| return 21 |
| elif capacity_kwp <= 150: |
| return 20 |
| elif capacity_kwp <= 350: |
| return 19 |
| elif capacity_kwp <= 1000: |
| return 18 |
| else: |
| |
| |
| return 18 |
|
|
|
|
| |
| MIN_DETECTION_PIXELS = 1000 |
| EDGE_CONTACT_THRESHOLD = 0.05 |
| MIN_ZOOM = 18 |
| MAX_ZOOM = 21 |
|
|
|
|
| def analyze_detection_for_zoom(mask: np.ndarray) -> dict: |
| """ |
| Analyze detection mask to determine if zoom adjustment is needed. |
| |
| Args: |
| mask: Binary detection mask |
| |
| Returns: |
| Dictionary with analysis results: |
| - detected_pixels: Number of detected pixels |
| - edges_touched: List of edges where mask touches boundary |
| - num_edges_touched: Count of edges touched |
| - needs_zoom_increase: True if zoom should increase (more detail) |
| - needs_zoom_decrease: True if zoom should decrease (larger area) |
| """ |
| height, width = mask.shape |
| detected_pixels = np.sum(mask > 0) |
| |
| |
| threshold_pixels_h = int(width * EDGE_CONTACT_THRESHOLD) |
| threshold_pixels_v = int(height * EDGE_CONTACT_THRESHOLD) |
| |
| |
| top_edge = mask[0, :] |
| bottom_edge = mask[height - 1, :] |
| left_edge = mask[:, 0] |
| right_edge = mask[:, width - 1] |
| |
| edges_touched = [] |
| if np.sum(top_edge > 0) > threshold_pixels_h: |
| edges_touched.append('north') |
| if np.sum(bottom_edge > 0) > threshold_pixels_h: |
| edges_touched.append('south') |
| if np.sum(left_edge > 0) > threshold_pixels_v: |
| edges_touched.append('west') |
| if np.sum(right_edge > 0) > threshold_pixels_v: |
| edges_touched.append('east') |
| |
| num_edges_touched = len(edges_touched) |
| |
| |
| needs_zoom_increase = detected_pixels < MIN_DETECTION_PIXELS and detected_pixels > 0 |
| needs_zoom_decrease = num_edges_touched >= 2 |
| |
| return { |
| 'detected_pixels': detected_pixels, |
| 'edges_touched': edges_touched, |
| 'num_edges_touched': num_edges_touched, |
| 'needs_zoom_increase': needs_zoom_increase, |
| 'needs_zoom_decrease': needs_zoom_decrease |
| } |
|
|
|
|
| def refine_zoom_level(current_zoom: int, analysis: dict) -> Tuple[int, str]: |
| """ |
| Determine the refined zoom level based on detection analysis. |
| |
| Args: |
| current_zoom: Current zoom level |
| analysis: Result from analyze_detection_for_zoom |
| |
| Returns: |
| Tuple of (new_zoom_level, reason_string) |
| """ |
| if analysis['needs_zoom_decrease'] and current_zoom > MIN_ZOOM: |
| |
| new_zoom = current_zoom - 1 |
| reason = f"Panels touch {analysis['num_edges_touched']} edges ({', '.join(analysis['edges_touched'])}), zooming out" |
| return new_zoom, reason |
| |
| elif analysis['needs_zoom_increase'] and current_zoom < MAX_ZOOM: |
| |
| new_zoom = current_zoom + 1 |
| reason = f"Only {analysis['detected_pixels']} pixels detected (< {MIN_DETECTION_PIXELS}), zooming in" |
| return new_zoom, reason |
| |
| else: |
| |
| return current_zoom, "Zoom level optimal" |
|
|
|
|
| def fetch_static_map(center_lat: float, center_lon: float, zoom: int, size: tuple, |
| maptype: str = "satellite", api_key: str = None) -> Image.Image: |
| """Fetch satellite image from Google Static Maps API""" |
| if not api_key: |
| raise ValueError("API key is required") |
| |
| size_str = f"{size[0]}x{size[1]}" |
| url = ( |
| "https://maps.googleapis.com/maps/api/staticmap" |
| f"?center={center_lat},{center_lon}&zoom={zoom}&size={size_str}" |
| f"&maptype={maptype}&key={api_key}" |
| ) |
| |
| response = requests.get(url, timeout=30) |
| response.raise_for_status() |
| return Image.open(io.BytesIO(response.content)).convert("RGB") |
|
|
|
|
| def fetch_static_google_satellite(lat: float, lon: float, cfg: ValidationConfig) -> Image.Image: |
| """Fetch satellite image using ValidationConfig""" |
| from config import get_api_key |
| api_key = get_api_key() |
| if not api_key: |
| raise ValueError("Google Maps API key not found. Please set GOOGLE_MAPS_API_KEY in your .env file.") |
| |
| size = (cfg.image_size_px, cfg.image_size_px) |
| return fetch_static_map(lat, lon, cfg.zoom, size, cfg.maptype, api_key) |
|
|
|
|
| def calculate_image_parameters(capacity_kw: float, effective_px: int, |
| panel_density: float = 0.18) -> dict: |
| """Calculate optimal image parameters based on expected capacity |
| |
| Args: |
| capacity_kw: Expected capacity in kW |
| effective_px: Effective pixels in the image |
| panel_density: Power density in kWp/m² (default: 0.18) |
| """ |
| |
| required_area_m2 = capacity_kw / panel_density |
| |
| |
| total_pixels = effective_px * effective_px |
| mpp_needed = math.sqrt(required_area_m2 / total_pixels) |
| |
| return { |
| 'required_area_m2': required_area_m2, |
| 'mpp_needed': mpp_needed, |
| 'total_pixels': total_pixels, |
| 'panel_density_kwp_m2': panel_density |
| } |
|
|
|
|
| def preprocess_image(img: Image.Image, target_size: int) -> Image.Image: |
| """Preprocess image to target size while maintaining aspect ratio""" |
| |
| img_resized = img.resize((target_size, target_size), Image.LANCZOS) |
| return img_resized |
|
|
|
|
| def centroid_from_mask(mask: np.ndarray) -> Optional[Tuple[float, float]]: |
| """Calculate centroid from binary mask""" |
| if mask.sum() == 0: |
| return None |
| y, x = np.where(mask > 0) |
| return (float(x.mean()), float(y.mean())) |
|
|
|
|
| def area_from_mask(mask: np.ndarray, m_per_px: float) -> float: |
| """Calculate area in square meters from binary mask""" |
| pixel_count = np.sum(mask > 0) |
| return pixel_count * (m_per_px ** 2) |
|
|
|
|
| def shading_fraction(rgb: np.ndarray, mask: np.ndarray) -> float: |
| """Estimate shading fraction within the masked area""" |
| if mask.sum() == 0: |
| return 0.0 |
| |
| |
| masked_pixels = rgb[mask > 0] |
| if len(masked_pixels) == 0: |
| return 0.0 |
| |
| |
| brightness = np.mean(masked_pixels, axis=1) |
| |
| |
| shaded_pixels = np.sum(brightness < 50) |
| total_pixels = len(brightness) |
| |
| return shaded_pixels / total_pixels if total_pixels > 0 else 0.0 |
|
|
|
|
| def draw_overlay(img: Image.Image, mask: np.ndarray, pin_px: Tuple[int, int], |
| centroid_px: Optional[Tuple[float, float]]) -> Image.Image: |
| """Draw overlay with mask, pin, and centroid on image""" |
| |
| overlay = img.convert('RGBA') |
| |
| |
| if mask.sum() > 0: |
| |
| mask_normalized = ((mask > 0) * 255).astype(np.uint8) |
| |
| |
| mask_pil = Image.fromarray(mask_normalized, mode='L') |
| green_overlay = Image.new('RGBA', img.size, (0, 255, 0, 120)) |
| |
| |
| mask_rgba = Image.new('RGBA', img.size, (0, 0, 0, 0)) |
| mask_rgba.paste(green_overlay, mask=mask_pil) |
| |
| |
| overlay = Image.alpha_composite(overlay, mask_rgba) |
| |
| |
| overlay_rgb = overlay.convert('RGB') |
| draw = ImageDraw.Draw(overlay_rgb) |
| |
| |
| pin_radius = 8 |
| draw.ellipse([ |
| (pin_px[0] - pin_radius, pin_px[1] - pin_radius), |
| (pin_px[0] + pin_radius, pin_px[1] + pin_radius) |
| ], fill='red', outline='darkred', width=3) |
| |
| |
| if centroid_px: |
| centroid_radius = 6 |
| cx, cy = int(centroid_px[0]), int(centroid_px[1]) |
| draw.ellipse([ |
| (cx - centroid_radius, cy - centroid_radius), |
| (cx + centroid_radius, cy + centroid_radius) |
| ], fill='blue', outline='white', width=2) |
| |
| |
| draw.line([(pin_px[0], pin_px[1]), (cx, cy)], fill='yellow', width=4) |
| |
| return overlay_rgb |
|
|
|
|
| def sanitize_name(name: str) -> str: |
| """Sanitize string for use in filenames""" |
| import re |
| return re.sub(r'[<>:"/\\|?*]', '_', str(name)) |
|
|
|
|
| def get_image_processing_status(): |
| """Return status of image processing libraries""" |
| return { |
| 'cv2_available': CV2_AVAILABLE, |
| 'cv2_error': CV2_ERROR, |
| 'skimage_available': SKIMAGE_AVAILABLE |
| } |
|
|