File size: 11,792 Bytes
271c5a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d62fcb
271c5a6
 
 
 
 
 
 
 
 
2d62fcb
 
 
 
 
271c5a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d62fcb
271c5a6
2d62fcb
271c5a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
Image processing and analysis utilities.
"""

import io
import math
import numpy as np
import requests
from PIL import Image, ImageDraw
from typing import Optional, Tuple
from config import ValidationConfig, EARTH_RADIUS_M, WEB_MERCATOR_SIZE

# Try to import OpenCV
try:
    import cv2
    CV2_AVAILABLE = True
    CV2_ERROR = None
except Exception as e:
    CV2_AVAILABLE = False
    cv2 = None
    CV2_ERROR = str(e)

# Try to import skimage
try:
    from skimage import measure, morphology, filters, color
    SKIMAGE_AVAILABLE = True
except Exception:
    SKIMAGE_AVAILABLE = False
    measure = morphology = filters = color = None


def meters_per_pixel(latitude_deg: float, zoom: int) -> float:
    """Calculate meters per pixel for given latitude and zoom level"""
    lat_rad = math.radians(latitude_deg)
    meters_per_pixel_equator = 2 * math.pi * EARTH_RADIUS_M / (WEB_MERCATOR_SIZE * (2 ** zoom))
    return meters_per_pixel_equator * math.cos(lat_rad)


def to_bytes(img: Image.Image, fmt="PNG") -> bytes:
    """Convert PIL Image to bytes"""
    buf = io.BytesIO()
    img.save(buf, format=fmt)
    return buf.getvalue()


def zoom_for_target_mpp(lat_deg: float, target_mpp: float) -> int:
    """Calculate zoom level needed to achieve target meters per pixel"""
    for zoom in range(1, 22):
        mpp = meters_per_pixel(lat_deg, zoom)
        if mpp <= target_mpp:
            return zoom
    return 21  # Max zoom


def calculate_adaptive_zoom(capacity_kwp: float, lat_deg: float = 0.0) -> int:
    """
    Calculate adaptive zoom level based on site capacity.
    
    Optimized for YOLO model detection performance.
    Zoom levels below 18 are too wide for reliable detection.
    For large sites (>1000 kWp), use zoom 18 with multi-tile expansion.
    
    Args:
        capacity_kwp: Site capacity in kWp
        lat_deg: Latitude (optional, for more precise calculation)
    
    Returns:
        int: Recommended zoom level (18-21)
    
    Zoom level guidelines (based on model detection capability):
    - 21: < 60 kWp (~0.06 m/px, residential)
    - 20: 60-150 kWp (~0.12 m/px, small commercial)
    - 19: 151-350 kWp (~0.23 m/px, medium commercial) [default]
    - 18: 351-1000 kWp (~0.46 m/px, multi-tile stitching)
    - >1000 kWp: Use zoom 18 with edge detection for multi-tile expansion
    """
    if capacity_kwp <= 0:
        return 19  # Default zoom for unknown capacity
    
    # Define capacity thresholds and corresponding zoom levels
    # Note: Zoom below 18 is too wide for model to detect panels reliably
    if capacity_kwp <= 60:
        return 21
    elif capacity_kwp <= 150:
        return 20
    elif capacity_kwp <= 350:
        return 19
    elif capacity_kwp <= 1000:
        return 18
    else:
        # For large sites (>1000 kWp), use zoom 18 with multi-tile expansion
        # Edge detection will trigger adaptive expansion automatically
        return 18


# Zoom refinement constants
MIN_DETECTION_PIXELS = 1000  # Minimum detected pixels for valid detection
EDGE_CONTACT_THRESHOLD = 0.05  # 5% of edge must have mask to count as "touching"
MIN_ZOOM = 18  # Minimum zoom level (zooms below 18 don't detect panels reliably)
MAX_ZOOM = 21  # Maximum zoom level (highest detail)


def analyze_detection_for_zoom(mask: np.ndarray) -> dict:
    """
    Analyze detection mask to determine if zoom adjustment is needed.
    
    Args:
        mask: Binary detection mask
        
    Returns:
        Dictionary with analysis results:
        - detected_pixels: Number of detected pixels
        - edges_touched: List of edges where mask touches boundary
        - num_edges_touched: Count of edges touched
        - needs_zoom_increase: True if zoom should increase (more detail)
        - needs_zoom_decrease: True if zoom should decrease (larger area)
    """
    height, width = mask.shape
    detected_pixels = np.sum(mask > 0)
    
    # Calculate threshold for edge contact
    threshold_pixels_h = int(width * EDGE_CONTACT_THRESHOLD)
    threshold_pixels_v = int(height * EDGE_CONTACT_THRESHOLD)
    
    # Check each edge
    top_edge = mask[0, :]
    bottom_edge = mask[height - 1, :]
    left_edge = mask[:, 0]
    right_edge = mask[:, width - 1]
    
    edges_touched = []
    if np.sum(top_edge > 0) > threshold_pixels_h:
        edges_touched.append('north')
    if np.sum(bottom_edge > 0) > threshold_pixels_h:
        edges_touched.append('south')
    if np.sum(left_edge > 0) > threshold_pixels_v:
        edges_touched.append('west')
    if np.sum(right_edge > 0) > threshold_pixels_v:
        edges_touched.append('east')
    
    num_edges_touched = len(edges_touched)
    
    # Determine zoom adjustment needs
    needs_zoom_increase = detected_pixels < MIN_DETECTION_PIXELS and detected_pixels > 0
    needs_zoom_decrease = num_edges_touched >= 2  # Panels touching 2+ edges = too zoomed in
    
    return {
        'detected_pixels': detected_pixels,
        'edges_touched': edges_touched,
        'num_edges_touched': num_edges_touched,
        'needs_zoom_increase': needs_zoom_increase,
        'needs_zoom_decrease': needs_zoom_decrease
    }


def refine_zoom_level(current_zoom: int, analysis: dict) -> Tuple[int, str]:
    """
    Determine the refined zoom level based on detection analysis.
    
    Args:
        current_zoom: Current zoom level
        analysis: Result from analyze_detection_for_zoom
        
    Returns:
        Tuple of (new_zoom_level, reason_string)
    """
    if analysis['needs_zoom_decrease'] and current_zoom > MIN_ZOOM:
        # Panels exceed boundaries - zoom out
        new_zoom = current_zoom - 1
        reason = f"Panels touch {analysis['num_edges_touched']} edges ({', '.join(analysis['edges_touched'])}), zooming out"
        return new_zoom, reason
    
    elif analysis['needs_zoom_increase'] and current_zoom < MAX_ZOOM:
        # Too few pixels detected - zoom in for more detail
        new_zoom = current_zoom + 1
        reason = f"Only {analysis['detected_pixels']} pixels detected (< {MIN_DETECTION_PIXELS}), zooming in"
        return new_zoom, reason
    
    else:
        # No adjustment needed or at limits
        return current_zoom, "Zoom level optimal"


def fetch_static_map(center_lat: float, center_lon: float, zoom: int, size: tuple, 
                    maptype: str = "satellite", api_key: str = None) -> Image.Image:
    """Fetch satellite image from Google Static Maps API"""
    if not api_key:
        raise ValueError("API key is required")
    
    size_str = f"{size[0]}x{size[1]}"
    url = (
        "https://maps.googleapis.com/maps/api/staticmap"
        f"?center={center_lat},{center_lon}&zoom={zoom}&size={size_str}"
        f"&maptype={maptype}&key={api_key}"
    )
    
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return Image.open(io.BytesIO(response.content)).convert("RGB")


def fetch_static_google_satellite(lat: float, lon: float, cfg: ValidationConfig) -> Image.Image:
    """Fetch satellite image using ValidationConfig"""
    from config import get_api_key
    api_key = get_api_key()
    if not api_key:
        raise ValueError("Google Maps API key not found. Please set GOOGLE_MAPS_API_KEY in your .env file.")
    
    size = (cfg.image_size_px, cfg.image_size_px)
    return fetch_static_map(lat, lon, cfg.zoom, size, cfg.maptype, api_key)


def calculate_image_parameters(capacity_kw: float, effective_px: int, 
                               panel_density: float = 0.18) -> dict:
    """Calculate optimal image parameters based on expected capacity
    
    Args:
        capacity_kw: Expected capacity in kW
        effective_px: Effective pixels in the image
        panel_density: Power density in kWp/m² (default: 0.18)
    """
    # Calculate required area
    required_area_m2 = capacity_kw / panel_density
    
    # Calculate meters per pixel needed
    total_pixels = effective_px * effective_px
    mpp_needed = math.sqrt(required_area_m2 / total_pixels)
    
    return {
        'required_area_m2': required_area_m2,
        'mpp_needed': mpp_needed,
        'total_pixels': total_pixels,
        'panel_density_kwp_m2': panel_density
    }


def preprocess_image(img: Image.Image, target_size: int) -> Image.Image:
    """Preprocess image to target size while maintaining aspect ratio"""
    # Resize image to target size
    img_resized = img.resize((target_size, target_size), Image.LANCZOS)
    return img_resized


def centroid_from_mask(mask: np.ndarray) -> Optional[Tuple[float, float]]:
    """Calculate centroid from binary mask"""
    if mask.sum() == 0:
        return None
    y, x = np.where(mask > 0)
    return (float(x.mean()), float(y.mean()))


def area_from_mask(mask: np.ndarray, m_per_px: float) -> float:
    """Calculate area in square meters from binary mask"""
    pixel_count = np.sum(mask > 0)
    return pixel_count * (m_per_px ** 2)


def shading_fraction(rgb: np.ndarray, mask: np.ndarray) -> float:
    """Estimate shading fraction within the masked area"""
    if mask.sum() == 0:
        return 0.0
    
    # Get pixels within the solar panel mask
    masked_pixels = rgb[mask > 0]
    if len(masked_pixels) == 0:
        return 0.0
    
    # Calculate brightness
    brightness = np.mean(masked_pixels, axis=1)
    
    # Consider pixels with brightness < 50 as heavily shaded
    shaded_pixels = np.sum(brightness < 50)
    total_pixels = len(brightness)
    
    return shaded_pixels / total_pixels if total_pixels > 0 else 0.0


def draw_overlay(img: Image.Image, mask: np.ndarray, pin_px: Tuple[int, int], 
                centroid_px: Optional[Tuple[float, float]]) -> Image.Image:
    """Draw overlay with mask, pin, and centroid on image"""
    # Convert image to RGBA for better blending
    overlay = img.convert('RGBA')
    
    # Create a more visible mask overlay
    if mask.sum() > 0:  # Only if there's a mask
        # Normalize mask to 0-255 range
        mask_normalized = ((mask > 0) * 255).astype(np.uint8)
        
        # Create green overlay with higher opacity
        mask_pil = Image.fromarray(mask_normalized, mode='L')
        green_overlay = Image.new('RGBA', img.size, (0, 255, 0, 120))  # Increased alpha to 120
        
        # Create mask for blending
        mask_rgba = Image.new('RGBA', img.size, (0, 0, 0, 0))
        mask_rgba.paste(green_overlay, mask=mask_pil)
        
        # Blend with original image
        overlay = Image.alpha_composite(overlay, mask_rgba)
    
    # Convert back to RGB for drawing
    overlay_rgb = overlay.convert('RGB')
    draw = ImageDraw.Draw(overlay_rgb)
    
    # Draw pin (larger red circle for better visibility)
    pin_radius = 8
    draw.ellipse([
        (pin_px[0] - pin_radius, pin_px[1] - pin_radius),
        (pin_px[0] + pin_radius, pin_px[1] + pin_radius)
    ], fill='red', outline='darkred', width=3)
    
    # Draw centroid (larger blue circle) if available
    if centroid_px:
        centroid_radius = 6
        cx, cy = int(centroid_px[0]), int(centroid_px[1])
        draw.ellipse([
            (cx - centroid_radius, cy - centroid_radius),
            (cx + centroid_radius, cy + centroid_radius)
        ], fill='blue', outline='white', width=2)
        
        # Draw line from pin to centroid (thicker and more visible)
        draw.line([(pin_px[0], pin_px[1]), (cx, cy)], fill='yellow', width=4)
    
    return overlay_rgb


def sanitize_name(name: str) -> str:
    """Sanitize string for use in filenames"""
    import re
    return re.sub(r'[<>:"/\\|?*]', '_', str(name))


def get_image_processing_status():
    """Return status of image processing libraries"""
    return {
        'cv2_available': CV2_AVAILABLE,
        'cv2_error': CV2_ERROR,
        'skimage_available': SKIMAGE_AVAILABLE
    }