""" Satellite Change Detection Engine v2 High-accuracy detection with multi-channel analysis, SSIM, texture features, adaptive thresholding, and improved object classification. """ import io import numpy as np import cv2 from PIL import Image from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler from collections import Counter # --------------------------------------------------------------------------- # 1. Pre-processing # --------------------------------------------------------------------------- def preprocess_image(image): """Preprocess image: convert to RGB, limit size.""" img_array = np.array(image) if img_array.ndim == 2: img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) if img_array.shape[2] == 4: img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB) max_size = 2000 height, width = img_array.shape[:2] if max(height, width) > max_size: scale = max_size / max(height, width) img_array = cv2.resize(img_array, (int(width * scale), int(height * scale)), interpolation=cv2.INTER_AREA) return img_array # --------------------------------------------------------------------------- # 2. Improved image registration (alignment) # --------------------------------------------------------------------------- def register_images(img1, img2, max_features=2000): """Align img2 to img1 using ORB + ratio-test + RANSAC homography.""" gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) orb = cv2.ORB_create(nfeatures=max_features, scoreType=cv2.ORB_HARRIS_SCORE) kp1, des1 = orb.detectAndCompute(gray1, None) kp2, des2 = orb.detectAndCompute(gray2, None) if des1 is None or des2 is None or len(des1) < 10 or len(des2) < 10: return img1, img2, False # Use kNN matching with Lowe's ratio test for better matches bf = cv2.BFMatcher(cv2.NORM_HAMMING) raw_matches = bf.knnMatch(des1, des2, k=2) good_matches = [] for pair in raw_matches: if len(pair) == 2: m, n = pair if m.distance < 0.75 * n.distance: good_matches.append(m) if len(good_matches) < 10: return img1, img2, False src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) homography, mask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 3.0) if homography is None: return img1, img2, False # Only accept if enough inliers inlier_ratio = np.sum(mask) / len(mask) if mask is not None else 0 if inlier_ratio < 0.3: return img1, img2, False h, w = img1.shape[:2] img2_aligned = cv2.warpPerspective(img2, homography, (w, h), borderMode=cv2.BORDER_REFLECT) return img1, img2_aligned, True # --------------------------------------------------------------------------- # 3. Improved radiometric normalization # --------------------------------------------------------------------------- def normalize_radiometry(img1, img2): """Histogram-matching normalization in LAB space for all channels.""" lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) result = lab2.copy() for ch in range(3): mean1, std1 = np.mean(lab1[:, :, ch]), np.std(lab1[:, :, ch]) mean2, std2 = np.mean(lab2[:, :, ch]), np.std(lab2[:, :, ch]) if std2 > 1e-6: result[:, :, ch] = (lab2[:, :, ch] - mean2) * (std1 / std2) + mean1 # Also apply CLAHE on L channel for contrast equalization result_uint8 = np.clip(result, 0, 255).astype(np.uint8) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) result_uint8[:, :, 0] = clahe.apply(result_uint8[:, :, 0]) img2_normalized = cv2.cvtColor(result_uint8, cv2.COLOR_LAB2RGB) return img1, img2_normalized # --------------------------------------------------------------------------- # 4. SSIM-based structural change map # --------------------------------------------------------------------------- def compute_ssim_change_map(img1, img2, win_size=7): """Compute per-pixel structural dissimilarity (1 - SSIM).""" gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float64) gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float64) C1 = (0.01 * 255) ** 2 C2 = (0.03 * 255) ** 2 mu1 = cv2.GaussianBlur(gray1, (win_size, win_size), 1.5) mu2 = cv2.GaussianBlur(gray2, (win_size, win_size), 1.5) mu1_sq = mu1 * mu1 mu2_sq = mu2 * mu2 mu1_mu2 = mu1 * mu2 sigma1_sq = cv2.GaussianBlur(gray1 * gray1, (win_size, win_size), 1.5) - mu1_sq sigma2_sq = cv2.GaussianBlur(gray2 * gray2, (win_size, win_size), 1.5) - mu2_sq sigma12 = cv2.GaussianBlur(gray1 * gray2, (win_size, win_size), 1.5) - mu1_mu2 ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / \ ((mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2)) # Structural dissimilarity: 0 = identical, 1 = completely different dssim = np.clip((1.0 - ssim_map) / 2.0, 0, 1) return dssim # --------------------------------------------------------------------------- # 5. Texture feature extraction (LBP) # --------------------------------------------------------------------------- def compute_lbp(gray, radius=1, n_points=8): """Compute simplified Local Binary Pattern texture descriptor.""" h, w = gray.shape lbp = np.zeros_like(gray, dtype=np.float32) for i in range(n_points): angle = 2 * np.pi * i / n_points dx = int(round(radius * np.cos(angle))) dy = int(round(-radius * np.sin(angle))) shifted = np.roll(np.roll(gray, dy, axis=0), dx, axis=1) lbp += (shifted >= gray).astype(np.float32) return lbp / n_points def compute_texture_change(img1, img2): """Compute texture difference using LBP.""" gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32) gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float32) lbp1 = compute_lbp(gray1) lbp2 = compute_lbp(gray2) texture_diff = np.abs(lbp1 - lbp2) return texture_diff # --------------------------------------------------------------------------- # 6. Edge-aware change detection # --------------------------------------------------------------------------- def compute_edge_change(img1, img2): """Compute edge-based change map using Canny edges.""" gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) # Adaptive Canny thresholds based on median intensity med1 = np.median(gray1) edges1 = cv2.Canny(gray1, int(max(0, 0.67 * med1)), int(min(255, 1.33 * med1))) med2 = np.median(gray2) edges2 = cv2.Canny(gray2, int(max(0, 0.67 * med2)), int(min(255, 1.33 * med2))) # Dilate edges slightly so nearby edges match kernel = np.ones((3, 3), np.uint8) edges1_d = cv2.dilate(edges1, kernel, iterations=1) edges2_d = cv2.dilate(edges2, kernel, iterations=1) # New edges = present in one image but not the other edge_change = cv2.absdiff(edges1_d, edges2_d).astype(np.float32) / 255.0 return edge_change # --------------------------------------------------------------------------- # 7. Improved detection methods # --------------------------------------------------------------------------- def image_difference_method(img1, img2, threshold=0.25, blur_size=5): """Improved image difference with multi-channel analysis and adaptive threshold.""" if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) # Multi-channel difference in LAB (perceptually uniform) lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) lab1_blur = cv2.GaussianBlur(lab1, (blur_size, blur_size), 0) lab2_blur = cv2.GaussianBlur(lab2, (blur_size, blur_size), 0) # Weighted Delta-E inspired difference diff = lab1_blur - lab2_blur delta_e = np.sqrt( (diff[:, :, 0] / 100.0) ** 2 + (diff[:, :, 1] / 128.0) ** 2 + (diff[:, :, 2] / 128.0) ** 2 ) delta_e = delta_e / delta_e.max() if delta_e.max() > 0 else delta_e # Adaptive threshold using Otsu on the change map delta_uint8 = (delta_e * 255).astype(np.uint8) _, change_mask = cv2.threshold(delta_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) change_mask = _clean_mask(change_mask) return change_mask def feature_based_method(img1, img2, num_clusters=4, sensitivity=0.5): """Feature-based change detection using multi-space clustering.""" if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) # Combine LAB and HSV differences for richer features lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) hsv1 = cv2.cvtColor(img1, cv2.COLOR_RGB2HSV).astype(np.float32) hsv2 = cv2.cvtColor(img2, cv2.COLOR_RGB2HSV).astype(np.float32) diff_lab = np.abs(lab1 - lab2) diff_hsv = np.abs(hsv1 - hsv2) h, w, _ = diff_lab.shape features = np.concatenate([diff_lab, diff_hsv[:, :, 1:]], axis=2) # 5 channels features_flat = features.reshape(-1, features.shape[2]) scaler = StandardScaler() features_scaled = scaler.fit_transform(features_flat) kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10) labels = kmeans.fit_predict(features_scaled) # Find the cluster with highest mean difference (= change) cluster_means = [np.mean(np.linalg.norm(features_flat[labels == i], axis=1)) for i in range(num_clusters)] change_cluster_idx = np.argmax(cluster_means) change_mask = (labels == change_cluster_idx).astype(np.uint8) * 255 change_mask = change_mask.reshape(h, w) change_mask = _clean_mask(change_mask, sensitivity) return change_mask def ai_deep_learning_method(img1, img2): """ Advanced multi-signal fusion: - Multi-scale color difference (LAB) - Structural dissimilarity (SSIM) - Texture change (LBP) - Edge change (Canny) All fused with learned weights and adaptive thresholding. """ if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) # ---- Channel 1: Multi-scale LAB color difference ---- lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) scales = [1, 2, 4] color_maps = [] for scale in scales: if scale > 1: s1 = cv2.resize(lab1, (lab1.shape[1] // scale, lab1.shape[0] // scale)) s2 = cv2.resize(lab2, (lab2.shape[1] // scale, lab2.shape[0] // scale)) else: s1, s2 = lab1, lab2 diff = s1 - s2 # Delta-E (CIE76) normalized delta_e = np.sqrt((diff[:, :, 0] / 100.0) ** 2 + (diff[:, :, 1] / 128.0) ** 2 + (diff[:, :, 2] / 128.0) ** 2) if scale > 1: delta_e = cv2.resize(delta_e, (lab1.shape[1], lab1.shape[0])) color_maps.append(delta_e) color_change = np.mean(color_maps, axis=0) color_change = color_change / (color_change.max() + 1e-8) # ---- Channel 2: SSIM structural dissimilarity ---- ssim_change = compute_ssim_change_map(img1, img2) ssim_change = ssim_change / (ssim_change.max() + 1e-8) # ---- Channel 3: Texture change (LBP) ---- texture_change = compute_texture_change(img1, img2) texture_change = texture_change / (texture_change.max() + 1e-8) # ---- Channel 4: Edge change ---- edge_change = compute_edge_change(img1, img2) # ---- Adaptive fusion ---- # Weight channels by their discriminative power (entropy-based) channels = [color_change, ssim_change, texture_change, edge_change] weights = [] for ch in channels: ch_uint8 = (ch * 255).astype(np.uint8) hist = cv2.calcHist([ch_uint8], [0], None, [256], [0, 256]).flatten() hist = hist / (hist.sum() + 1e-8) entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10)) weights.append(entropy) # Normalize weights total_w = sum(weights) + 1e-8 weights = [w / total_w for w in weights] # Fuse fused = np.zeros_like(color_change, dtype=np.float64) for ch, w in zip(channels, weights): fused += w * ch.astype(np.float64) fused = fused / (fused.max() + 1e-8) fused_uint8 = (fused * 255).astype(np.uint8) # Adaptive threshold: Otsu + refinement _, change_mask = cv2.threshold(fused_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # Post-process change_mask = _clean_mask(change_mask) # Edge-preserving smoothing on the mask change_mask = cv2.bilateralFilter(change_mask, 9, 75, 75) _, change_mask = cv2.threshold(change_mask, 127, 255, cv2.THRESH_BINARY) return change_mask def hybrid_method(img1, img2): """Hybrid: weighted fusion of all methods with confidence-based merging.""" if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) diff_mask = image_difference_method(img1, img2) feature_mask = feature_based_method(img1, img2) ai_mask = ai_deep_learning_method(img1, img2) # Weighted combination: AI method gets most weight combined = ( 0.2 * diff_mask.astype(np.float32) + 0.3 * feature_mask.astype(np.float32) + 0.5 * ai_mask.astype(np.float32) ) _, final_mask = cv2.threshold(combined.astype(np.uint8), 127, 255, cv2.THRESH_BINARY) final_mask = _clean_mask(final_mask) return final_mask # --------------------------------------------------------------------------- # 8. Robust post-processing # --------------------------------------------------------------------------- def _clean_mask(mask, sensitivity=0.5): """Adaptive morphological cleaning: close gaps, remove noise, fill holes.""" # Close small gaps close_size = max(3, int(7 * (1 - sensitivity))) if close_size % 2 == 0: close_size += 1 kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_close) # Remove small noise open_size = 3 kernel_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open) # Fill small holes inside detected regions contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) filled = np.zeros_like(mask) cv2.drawContours(filled, contours, -1, 255, thickness=cv2.FILLED) return filled # --------------------------------------------------------------------------- # 9. Improved visualization # --------------------------------------------------------------------------- def visualize_changes(img1, img2, change_mask, regions=None): """Overlay change mask on 'after' image in RED.""" if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) if change_mask.shape[:2] != img2.shape[:2]: change_mask = cv2.resize(change_mask, (img2.shape[1], img2.shape[0])) overlay = img2.copy().astype(np.float32) mask_bool = change_mask > 127 mask_float = mask_bool.astype(np.float32) # Red overlay for all detected changes red_layer = np.zeros_like(img2, dtype=np.float32) red_layer[:, :, 0] = 255 # pure red alpha = 0.50 for c in range(3): overlay[:, :, c] = overlay[:, :, c] * (1 - mask_float * alpha) + red_layer[:, :, c] * mask_float * alpha # Draw thin white outlines around each region for clarity if regions: contour_mask = np.zeros(change_mask.shape[:2], dtype=np.uint8) for r in regions: x, y, w, h = r["bbox"] cv2.rectangle(contour_mask, (x, y), (x + w, y + h), 255, 1) outline = contour_mask > 0 overlay[outline] = [255, 255, 255] return np.clip(overlay, 0, 255).astype(np.uint8) # --------------------------------------------------------------------------- # 10. Improved object classification # --------------------------------------------------------------------------- def extract_advanced_features(region): """Extract rich features for classification: color, texture, edge, shape.""" if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3: return None hsv = cv2.cvtColor(region, cv2.COLOR_RGB2HSV) lab = cv2.cvtColor(region, cv2.COLOR_RGB2LAB) gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY).astype(np.float32) # Color stats mean_rgb = np.mean(region, axis=(0, 1)) std_rgb = np.std(region, axis=(0, 1)) mean_hsv = np.mean(hsv, axis=(0, 1)) mean_lab = np.mean(lab, axis=(0, 1)) total_rgb = np.sum(mean_rgb) + 1e-6 green_ratio = mean_rgb[1] / total_rgb blue_ratio = mean_rgb[2] / total_rgb red_ratio = mean_rgb[0] / total_rgb # Vegetation indices ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6) # Texture texture_std = float(np.std(gray)) lbp = compute_lbp(gray.astype(np.float32)) lbp_variance = float(np.var(lbp)) # Edges grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) edge_mag = np.sqrt(grad_x ** 2 + grad_y ** 2) edge_density = float(np.mean(edge_mag)) # Edge orientation histogram (structural regularity) angles = np.arctan2(grad_y, grad_x + 1e-8) angle_hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi)) angle_hist = angle_hist / (angle_hist.sum() + 1e-8) orientation_entropy = -np.sum(angle_hist[angle_hist > 0] * np.log2(angle_hist[angle_hist > 0] + 1e-10)) # GLCM-like contrast (simplified: variance of neighbors) shifted_r = np.roll(gray, 1, axis=1) shifted_d = np.roll(gray, 1, axis=0) glcm_contrast = float(np.mean((gray - shifted_r) ** 2 + (gray - shifted_d) ** 2)) return { "mean_rgb": mean_rgb, "std_rgb": std_rgb, "mean_hsv": mean_hsv, "mean_lab": mean_lab, "ndvi": ndvi, "texture_std": texture_std, "lbp_variance": lbp_variance, "edge_density": edge_density, "orientation_entropy": orientation_entropy, "glcm_contrast": glcm_contrast, "color_homogeneity": float(np.mean(std_rgb)), "brightness": float(mean_lab[0]), "green_ratio": green_ratio, "blue_ratio": blue_ratio, "red_ratio": red_ratio, "saturation": float(mean_hsv[1]), "hue": float(mean_hsv[0]), } def _is_transient_object(area, w, h, features): """ Filter out transient objects (people, cars, animals, shadows, etc.) that are NOT permanent ground/structural changes. Returns True if the region is likely transient and should be excluded. """ aspect_ratio = max(w, h) / max(min(w, h), 1) # Very small regions are likely noise, people, or small vehicles if area < 300: return True # Tall narrow regions (aspect > 4) are likely people or poles if aspect_ratio > 5.0 and area < 2000: return True # Very high edge density + small area = likely a person or vehicle if features["edge_density"] > 80 and area < 1500: return True # Extremely high texture variance in small area = likely transient clutter if features["texture_std"] > 60 and area < 1000: return True return False # Ground-level change categories only GROUND_CHANGE_TYPES = [ "New Construction/Building", "Demolition/Clearing", "Vegetation Change", "Water Body Change", "Road/Pavement Change", "Bare Land/Soil Change", ] def classify_object_type(image_region, bbox): """ Classify GROUND-LEVEL structural changes only. Categories: construction, demolition, vegetation, water, road, bare land. Transient objects (people, cars, animals) are filtered out. """ x, y, w, h = bbox pad = 5 y1 = max(0, y - pad) y2 = min(image_region.shape[0], y + h + pad) x1 = max(0, x - pad) x2 = min(image_region.shape[1], x + w + pad) region = image_region[y1:y2, x1:x2] if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3: return "Unclassified", 0.0 features = extract_advanced_features(region) if features is None: return "Unclassified", 0.0 area = w * h # Filter out transient objects (people, cars, animals) if _is_transient_object(area, w, h, features): return None, 0.0 # signal to exclude this region aspect_ratio = max(w, h) / max(min(w, h), 1) compactness = (4 * np.pi * area) / ((2 * (w + h)) ** 2 + 1e-6) scores = {} # ---- Water Body Change ---- water = 0.0 if features["blue_ratio"] > 0.36: water += 0.22 if features["texture_std"] < 28: water += 0.18 if features["edge_density"] < 35: water += 0.14 if 90 <= features["hue"] <= 135: water += 0.18 if features["lbp_variance"] < 0.05: water += 0.14 if features["glcm_contrast"] < 500: water += 0.10 if area > 800: water += 0.04 scores["Water Body Change"] = water # ---- Vegetation Change (deforestation, new growth, crop change) ---- veg = 0.0 if features["ndvi"] > 0.05: veg += 0.22 if features["ndvi"] > 0.15: veg += 0.10 if features["green_ratio"] > 0.36: veg += 0.18 if 35 <= features["hue"] <= 85: veg += 0.15 if features["texture_std"] > 18: veg += 0.08 if features["lbp_variance"] > 0.03: veg += 0.08 if features["saturation"] > 40: veg += 0.10 if features["orientation_entropy"] > 2.5: veg += 0.05 if area > 500: veg += 0.04 scores["Vegetation Change"] = veg # ---- New Construction/Building ---- bld = 0.0 if features["orientation_entropy"] < 2.5: bld += 0.18 if features["color_homogeneity"] < 28: bld += 0.15 if 1.0 <= aspect_ratio <= 4.0: bld += 0.12 if 0.3 <= compactness <= 0.9: bld += 0.10 if features["edge_density"] > 30: bld += 0.12 if features["glcm_contrast"] > 400: bld += 0.10 if features["saturation"] < 90: bld += 0.10 if 40 <= features["brightness"] <= 90: bld += 0.08 if area > 1000: bld += 0.05 scores["New Construction/Building"] = bld # ---- Demolition/Clearing ---- demo = 0.0 if features["texture_std"] > 30: demo += 0.18 if features["orientation_entropy"] > 2.8: demo += 0.15 if features["color_homogeneity"] > 25: demo += 0.15 if features["brightness"] > 60: demo += 0.10 if features["ndvi"] < 0.05: demo += 0.12 if features["saturation"] < 70: demo += 0.10 if area > 800: demo += 0.05 scores["Demolition/Clearing"] = demo # ---- Road/Pavement Change ---- road = 0.0 if aspect_ratio > 2.5: road += 0.22 if features["color_homogeneity"] < 22: road += 0.18 if features["texture_std"] < 32: road += 0.15 if features["saturation"] < 65: road += 0.12 if features["orientation_entropy"] < 2.0: road += 0.15 if 35 <= features["brightness"] <= 75: road += 0.10 if compactness < 0.3: road += 0.05 if area > 600: road += 0.03 scores["Road/Pavement Change"] = road # ---- Bare Land/Soil Change ---- soil = 0.0 if features["red_ratio"] > 0.34 and features["green_ratio"] < 0.36: soil += 0.20 if 8 <= features["hue"] <= 38: soil += 0.18 if features["ndvi"] < 0.05: soil += 0.18 if features["texture_std"] < 35: soil += 0.12 if features["lbp_variance"] < 0.04: soil += 0.12 if 40 <= features["saturation"] <= 130: soil += 0.10 if 45 <= features["brightness"] <= 82: soil += 0.10 scores["Bare Land/Soil Change"] = soil # Normalize scores max_score = max(scores.values()) if scores else 0 if max_score > 0: for k in scores: scores[k] /= max_score best = max(scores, key=scores.get) conf = scores[best] if conf < 0.30: return "Unclassified", conf return best, min(conf, 1.0) def classify_with_ensemble(image_region, bbox, num_sub=4): """Ensemble: classify full region + sub-regions, vote with confidence weighting.""" x, y, w, h = bbox sub_boxes = [(x, y, w, h)] # full region if w > 20 and h > 20: hw, hh = w // 2, h // 2 sub_boxes += [ (x, y, hw, hh), (x + hw, y, hw, hh), (x, y + hh, hw, hh), (x + hw, y + hh, hw, hh), (x + w // 4, y + h // 4, hw, hh), ] classifications = [] confidences = [] for sb in sub_boxes: obj_type, conf = classify_object_type(image_region, sb) if obj_type is None: return None, 0.0 # transient → exclude if obj_type != "Unclassified": classifications.append(obj_type) confidences.append(conf) if not classifications: return classify_object_type(image_region, (x, y, w, h)) # Weighted voting weighted = {} counts = Counter(classifications) for ot, c in zip(classifications, confidences): weighted[ot] = weighted.get(ot, 0) + c best_type = max(weighted, key=weighted.get) avg_conf = weighted[best_type] / counts[best_type] if counts[best_type] / len(classifications) >= 0.6: avg_conf = min(1.0, avg_conf * 1.15) return best_type, avg_conf # --------------------------------------------------------------------------- # 11. Region analysis # --------------------------------------------------------------------------- def analyze_change_regions(change_mask, image, min_area=200, use_ensemble=True): """ Find connected change regions, classify as ground-level changes only. Transient objects (people, cars, animals) are filtered out. """ num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(change_mask, connectivity=8) change_regions = [] region_id = 0 for i in range(1, num_labels): area = stats[i, cv2.CC_STAT_AREA] if area < min_area: continue x = stats[i, cv2.CC_STAT_LEFT] y = stats[i, cv2.CC_STAT_TOP] w = stats[i, cv2.CC_STAT_WIDTH] h = stats[i, cv2.CC_STAT_HEIGHT] cx, cy = centroids[i] if use_ensemble and area > 500: object_type, confidence = classify_with_ensemble(image, (x, y, w, h)) else: object_type, confidence = classify_object_type(image, (x, y, w, h)) # None means transient / irrelevant → skip if object_type is None: continue region_id += 1 change_regions.append({ "id": region_id, "area": area, "bbox": (x, y, w, h), "center": (int(cx), int(cy)), "object_type": object_type, "confidence": confidence, }) change_regions.sort(key=lambda r: r["area"], reverse=True) return change_regions # --------------------------------------------------------------------------- # 12. Main pipeline # --------------------------------------------------------------------------- def run_detection(before_pil, after_pil, method="AI-Based Deep Learning", enable_registration=True, enable_normalization=True): """Run full detection pipeline; returns change_mask, result_image, stats, regions.""" before_array = preprocess_image(before_pil) after_array = preprocess_image(after_pil) if enable_registration: before_array, after_array, _ = register_images(before_array, after_array) if enable_normalization: before_array, after_array = normalize_radiometry(before_array, after_array) if method == "AI-Based Deep Learning": change_mask = ai_deep_learning_method(before_array, after_array) elif method == "Image Difference": change_mask = image_difference_method(before_array, after_array) elif method == "Feature-Based": change_mask = feature_based_method(before_array, after_array) else: change_mask = hybrid_method(before_array, after_array) # Classify regions change_regions = analyze_change_regions(change_mask, after_array, min_area=80) # Color-coded visualization using region classifications result_image = visualize_changes(before_array, after_array, change_mask, regions=change_regions) total_pixels = int(change_mask.shape[0] * change_mask.shape[1]) changed_pixels = int(np.sum(change_mask > 127)) change_pct = (changed_pixels / total_pixels * 100.0) if total_pixels else 0.0 stats = { "total_pixels": total_pixels, "changed_pixels": changed_pixels, "unchanged_pixels": total_pixels - changed_pixels, "change_percentage": change_pct, "image_width": change_mask.shape[1], "image_height": change_mask.shape[0], } return change_mask, result_image, stats, change_regions