Spaces:
Sleeping
Sleeping
| """ | |
| Satellite Change Detection Engine v2 | |
| High-accuracy detection with multi-channel analysis, SSIM, texture features, | |
| adaptive thresholding, and improved object classification. | |
| """ | |
| import io | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image | |
| from sklearn.cluster import KMeans | |
| from sklearn.preprocessing import StandardScaler | |
| from collections import Counter | |
| # --------------------------------------------------------------------------- | |
| # 1. Pre-processing | |
| # --------------------------------------------------------------------------- | |
| def preprocess_image(image): | |
| """Preprocess image: convert to RGB, limit size.""" | |
| img_array = np.array(image) | |
| if img_array.ndim == 2: | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) | |
| if img_array.shape[2] == 4: | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB) | |
| max_size = 2000 | |
| height, width = img_array.shape[:2] | |
| if max(height, width) > max_size: | |
| scale = max_size / max(height, width) | |
| img_array = cv2.resize(img_array, (int(width * scale), int(height * scale)), interpolation=cv2.INTER_AREA) | |
| return img_array | |
| # --------------------------------------------------------------------------- | |
| # 2. Improved image registration (alignment) | |
| # --------------------------------------------------------------------------- | |
| def register_images(img1, img2, max_features=2000): | |
| """Align img2 to img1 using ORB + ratio-test + RANSAC homography.""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) | |
| orb = cv2.ORB_create(nfeatures=max_features, scoreType=cv2.ORB_HARRIS_SCORE) | |
| kp1, des1 = orb.detectAndCompute(gray1, None) | |
| kp2, des2 = orb.detectAndCompute(gray2, None) | |
| if des1 is None or des2 is None or len(des1) < 10 or len(des2) < 10: | |
| return img1, img2, False | |
| # Use kNN matching with Lowe's ratio test for better matches | |
| bf = cv2.BFMatcher(cv2.NORM_HAMMING) | |
| raw_matches = bf.knnMatch(des1, des2, k=2) | |
| good_matches = [] | |
| for pair in raw_matches: | |
| if len(pair) == 2: | |
| m, n = pair | |
| if m.distance < 0.75 * n.distance: | |
| good_matches.append(m) | |
| if len(good_matches) < 10: | |
| return img1, img2, False | |
| src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) | |
| dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) | |
| homography, mask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 3.0) | |
| if homography is None: | |
| return img1, img2, False | |
| # Only accept if enough inliers | |
| inlier_ratio = np.sum(mask) / len(mask) if mask is not None else 0 | |
| if inlier_ratio < 0.3: | |
| return img1, img2, False | |
| h, w = img1.shape[:2] | |
| img2_aligned = cv2.warpPerspective(img2, homography, (w, h), borderMode=cv2.BORDER_REFLECT) | |
| return img1, img2_aligned, True | |
| # --------------------------------------------------------------------------- | |
| # 3. Improved radiometric normalization | |
| # --------------------------------------------------------------------------- | |
| def normalize_radiometry(img1, img2): | |
| """Histogram-matching normalization in LAB space for all channels.""" | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| result = lab2.copy() | |
| for ch in range(3): | |
| mean1, std1 = np.mean(lab1[:, :, ch]), np.std(lab1[:, :, ch]) | |
| mean2, std2 = np.mean(lab2[:, :, ch]), np.std(lab2[:, :, ch]) | |
| if std2 > 1e-6: | |
| result[:, :, ch] = (lab2[:, :, ch] - mean2) * (std1 / std2) + mean1 | |
| # Also apply CLAHE on L channel for contrast equalization | |
| result_uint8 = np.clip(result, 0, 255).astype(np.uint8) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| result_uint8[:, :, 0] = clahe.apply(result_uint8[:, :, 0]) | |
| img2_normalized = cv2.cvtColor(result_uint8, cv2.COLOR_LAB2RGB) | |
| return img1, img2_normalized | |
| # --------------------------------------------------------------------------- | |
| # 4. SSIM-based structural change map | |
| # --------------------------------------------------------------------------- | |
| def compute_ssim_change_map(img1, img2, win_size=7): | |
| """Compute per-pixel structural dissimilarity (1 - SSIM).""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float64) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float64) | |
| C1 = (0.01 * 255) ** 2 | |
| C2 = (0.03 * 255) ** 2 | |
| mu1 = cv2.GaussianBlur(gray1, (win_size, win_size), 1.5) | |
| mu2 = cv2.GaussianBlur(gray2, (win_size, win_size), 1.5) | |
| mu1_sq = mu1 * mu1 | |
| mu2_sq = mu2 * mu2 | |
| mu1_mu2 = mu1 * mu2 | |
| sigma1_sq = cv2.GaussianBlur(gray1 * gray1, (win_size, win_size), 1.5) - mu1_sq | |
| sigma2_sq = cv2.GaussianBlur(gray2 * gray2, (win_size, win_size), 1.5) - mu2_sq | |
| sigma12 = cv2.GaussianBlur(gray1 * gray2, (win_size, win_size), 1.5) - mu1_mu2 | |
| ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / \ | |
| ((mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2)) | |
| # Structural dissimilarity: 0 = identical, 1 = completely different | |
| dssim = np.clip((1.0 - ssim_map) / 2.0, 0, 1) | |
| return dssim | |
| # --------------------------------------------------------------------------- | |
| # 5. Texture feature extraction (LBP) | |
| # --------------------------------------------------------------------------- | |
| def compute_lbp(gray, radius=1, n_points=8): | |
| """Compute simplified Local Binary Pattern texture descriptor.""" | |
| h, w = gray.shape | |
| lbp = np.zeros_like(gray, dtype=np.float32) | |
| for i in range(n_points): | |
| angle = 2 * np.pi * i / n_points | |
| dx = int(round(radius * np.cos(angle))) | |
| dy = int(round(-radius * np.sin(angle))) | |
| shifted = np.roll(np.roll(gray, dy, axis=0), dx, axis=1) | |
| lbp += (shifted >= gray).astype(np.float32) | |
| return lbp / n_points | |
| def compute_texture_change(img1, img2): | |
| """Compute texture difference using LBP.""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| lbp1 = compute_lbp(gray1) | |
| lbp2 = compute_lbp(gray2) | |
| texture_diff = np.abs(lbp1 - lbp2) | |
| return texture_diff | |
| # --------------------------------------------------------------------------- | |
| # 6. Edge-aware change detection | |
| # --------------------------------------------------------------------------- | |
| def compute_edge_change(img1, img2): | |
| """Compute edge-based change map using Canny edges.""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) | |
| # Adaptive Canny thresholds based on median intensity | |
| med1 = np.median(gray1) | |
| edges1 = cv2.Canny(gray1, int(max(0, 0.67 * med1)), int(min(255, 1.33 * med1))) | |
| med2 = np.median(gray2) | |
| edges2 = cv2.Canny(gray2, int(max(0, 0.67 * med2)), int(min(255, 1.33 * med2))) | |
| # Dilate edges slightly so nearby edges match | |
| kernel = np.ones((3, 3), np.uint8) | |
| edges1_d = cv2.dilate(edges1, kernel, iterations=1) | |
| edges2_d = cv2.dilate(edges2, kernel, iterations=1) | |
| # New edges = present in one image but not the other | |
| edge_change = cv2.absdiff(edges1_d, edges2_d).astype(np.float32) / 255.0 | |
| return edge_change | |
| # --------------------------------------------------------------------------- | |
| # 7. Improved detection methods | |
| # --------------------------------------------------------------------------- | |
| def image_difference_method(img1, img2, threshold=0.25, blur_size=5): | |
| """Improved image difference with multi-channel analysis and adaptive threshold.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| # Multi-channel difference in LAB (perceptually uniform) | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab1_blur = cv2.GaussianBlur(lab1, (blur_size, blur_size), 0) | |
| lab2_blur = cv2.GaussianBlur(lab2, (blur_size, blur_size), 0) | |
| # Weighted Delta-E inspired difference | |
| diff = lab1_blur - lab2_blur | |
| delta_e = np.sqrt( | |
| (diff[:, :, 0] / 100.0) ** 2 + | |
| (diff[:, :, 1] / 128.0) ** 2 + | |
| (diff[:, :, 2] / 128.0) ** 2 | |
| ) | |
| delta_e = delta_e / delta_e.max() if delta_e.max() > 0 else delta_e | |
| # Adaptive threshold using Otsu on the change map | |
| delta_uint8 = (delta_e * 255).astype(np.uint8) | |
| _, change_mask = cv2.threshold(delta_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| change_mask = _clean_mask(change_mask) | |
| return change_mask | |
| def feature_based_method(img1, img2, num_clusters=4, sensitivity=0.5): | |
| """Feature-based change detection using multi-space clustering.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| # Combine LAB and HSV differences for richer features | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| hsv1 = cv2.cvtColor(img1, cv2.COLOR_RGB2HSV).astype(np.float32) | |
| hsv2 = cv2.cvtColor(img2, cv2.COLOR_RGB2HSV).astype(np.float32) | |
| diff_lab = np.abs(lab1 - lab2) | |
| diff_hsv = np.abs(hsv1 - hsv2) | |
| h, w, _ = diff_lab.shape | |
| features = np.concatenate([diff_lab, diff_hsv[:, :, 1:]], axis=2) # 5 channels | |
| features_flat = features.reshape(-1, features.shape[2]) | |
| scaler = StandardScaler() | |
| features_scaled = scaler.fit_transform(features_flat) | |
| kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10) | |
| labels = kmeans.fit_predict(features_scaled) | |
| # Find the cluster with highest mean difference (= change) | |
| cluster_means = [np.mean(np.linalg.norm(features_flat[labels == i], axis=1)) for i in range(num_clusters)] | |
| change_cluster_idx = np.argmax(cluster_means) | |
| change_mask = (labels == change_cluster_idx).astype(np.uint8) * 255 | |
| change_mask = change_mask.reshape(h, w) | |
| change_mask = _clean_mask(change_mask, sensitivity) | |
| return change_mask | |
| def ai_deep_learning_method(img1, img2): | |
| """ | |
| Advanced multi-signal fusion: | |
| - Multi-scale color difference (LAB) | |
| - Structural dissimilarity (SSIM) | |
| - Texture change (LBP) | |
| - Edge change (Canny) | |
| All fused with learned weights and adaptive thresholding. | |
| """ | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| # ---- Channel 1: Multi-scale LAB color difference ---- | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| scales = [1, 2, 4] | |
| color_maps = [] | |
| for scale in scales: | |
| if scale > 1: | |
| s1 = cv2.resize(lab1, (lab1.shape[1] // scale, lab1.shape[0] // scale)) | |
| s2 = cv2.resize(lab2, (lab2.shape[1] // scale, lab2.shape[0] // scale)) | |
| else: | |
| s1, s2 = lab1, lab2 | |
| diff = s1 - s2 | |
| # Delta-E (CIE76) normalized | |
| delta_e = np.sqrt((diff[:, :, 0] / 100.0) ** 2 + | |
| (diff[:, :, 1] / 128.0) ** 2 + | |
| (diff[:, :, 2] / 128.0) ** 2) | |
| if scale > 1: | |
| delta_e = cv2.resize(delta_e, (lab1.shape[1], lab1.shape[0])) | |
| color_maps.append(delta_e) | |
| color_change = np.mean(color_maps, axis=0) | |
| color_change = color_change / (color_change.max() + 1e-8) | |
| # ---- Channel 2: SSIM structural dissimilarity ---- | |
| ssim_change = compute_ssim_change_map(img1, img2) | |
| ssim_change = ssim_change / (ssim_change.max() + 1e-8) | |
| # ---- Channel 3: Texture change (LBP) ---- | |
| texture_change = compute_texture_change(img1, img2) | |
| texture_change = texture_change / (texture_change.max() + 1e-8) | |
| # ---- Channel 4: Edge change ---- | |
| edge_change = compute_edge_change(img1, img2) | |
| # ---- Adaptive fusion ---- | |
| # Weight channels by their discriminative power (entropy-based) | |
| channels = [color_change, ssim_change, texture_change, edge_change] | |
| weights = [] | |
| for ch in channels: | |
| ch_uint8 = (ch * 255).astype(np.uint8) | |
| hist = cv2.calcHist([ch_uint8], [0], None, [256], [0, 256]).flatten() | |
| hist = hist / (hist.sum() + 1e-8) | |
| entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10)) | |
| weights.append(entropy) | |
| # Normalize weights | |
| total_w = sum(weights) + 1e-8 | |
| weights = [w / total_w for w in weights] | |
| # Fuse | |
| fused = np.zeros_like(color_change, dtype=np.float64) | |
| for ch, w in zip(channels, weights): | |
| fused += w * ch.astype(np.float64) | |
| fused = fused / (fused.max() + 1e-8) | |
| fused_uint8 = (fused * 255).astype(np.uint8) | |
| # Adaptive threshold: Otsu + refinement | |
| _, change_mask = cv2.threshold(fused_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # Post-process | |
| change_mask = _clean_mask(change_mask) | |
| # Edge-preserving smoothing on the mask | |
| change_mask = cv2.bilateralFilter(change_mask, 9, 75, 75) | |
| _, change_mask = cv2.threshold(change_mask, 127, 255, cv2.THRESH_BINARY) | |
| return change_mask | |
| def hybrid_method(img1, img2): | |
| """Hybrid: weighted fusion of all methods with confidence-based merging.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| diff_mask = image_difference_method(img1, img2) | |
| feature_mask = feature_based_method(img1, img2) | |
| ai_mask = ai_deep_learning_method(img1, img2) | |
| # Weighted combination: AI method gets most weight | |
| combined = ( | |
| 0.2 * diff_mask.astype(np.float32) + | |
| 0.3 * feature_mask.astype(np.float32) + | |
| 0.5 * ai_mask.astype(np.float32) | |
| ) | |
| _, final_mask = cv2.threshold(combined.astype(np.uint8), 127, 255, cv2.THRESH_BINARY) | |
| final_mask = _clean_mask(final_mask) | |
| return final_mask | |
| # --------------------------------------------------------------------------- | |
| # 8. Robust post-processing | |
| # --------------------------------------------------------------------------- | |
| def _clean_mask(mask, sensitivity=0.5): | |
| """Adaptive morphological cleaning: close gaps, remove noise, fill holes.""" | |
| # Close small gaps | |
| close_size = max(3, int(7 * (1 - sensitivity))) | |
| if close_size % 2 == 0: | |
| close_size += 1 | |
| kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_close) | |
| # Remove small noise | |
| open_size = 3 | |
| kernel_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open) | |
| # Fill small holes inside detected regions | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| filled = np.zeros_like(mask) | |
| cv2.drawContours(filled, contours, -1, 255, thickness=cv2.FILLED) | |
| return filled | |
| # --------------------------------------------------------------------------- | |
| # 9. Improved visualization | |
| # --------------------------------------------------------------------------- | |
| def visualize_changes(img1, img2, change_mask, regions=None): | |
| """Overlay change mask on 'after' image in RED.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| if change_mask.shape[:2] != img2.shape[:2]: | |
| change_mask = cv2.resize(change_mask, (img2.shape[1], img2.shape[0])) | |
| overlay = img2.copy().astype(np.float32) | |
| mask_bool = change_mask > 127 | |
| mask_float = mask_bool.astype(np.float32) | |
| # Red overlay for all detected changes | |
| red_layer = np.zeros_like(img2, dtype=np.float32) | |
| red_layer[:, :, 0] = 255 # pure red | |
| alpha = 0.50 | |
| for c in range(3): | |
| overlay[:, :, c] = overlay[:, :, c] * (1 - mask_float * alpha) + red_layer[:, :, c] * mask_float * alpha | |
| # Draw thin white outlines around each region for clarity | |
| if regions: | |
| contour_mask = np.zeros(change_mask.shape[:2], dtype=np.uint8) | |
| for r in regions: | |
| x, y, w, h = r["bbox"] | |
| cv2.rectangle(contour_mask, (x, y), (x + w, y + h), 255, 1) | |
| outline = contour_mask > 0 | |
| overlay[outline] = [255, 255, 255] | |
| return np.clip(overlay, 0, 255).astype(np.uint8) | |
| # --------------------------------------------------------------------------- | |
| # 10. Improved object classification | |
| # --------------------------------------------------------------------------- | |
| def extract_advanced_features(region): | |
| """Extract rich features for classification: color, texture, edge, shape.""" | |
| if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3: | |
| return None | |
| hsv = cv2.cvtColor(region, cv2.COLOR_RGB2HSV) | |
| lab = cv2.cvtColor(region, cv2.COLOR_RGB2LAB) | |
| gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| # Color stats | |
| mean_rgb = np.mean(region, axis=(0, 1)) | |
| std_rgb = np.std(region, axis=(0, 1)) | |
| mean_hsv = np.mean(hsv, axis=(0, 1)) | |
| mean_lab = np.mean(lab, axis=(0, 1)) | |
| total_rgb = np.sum(mean_rgb) + 1e-6 | |
| green_ratio = mean_rgb[1] / total_rgb | |
| blue_ratio = mean_rgb[2] / total_rgb | |
| red_ratio = mean_rgb[0] / total_rgb | |
| # Vegetation indices | |
| ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6) | |
| # Texture | |
| texture_std = float(np.std(gray)) | |
| lbp = compute_lbp(gray.astype(np.float32)) | |
| lbp_variance = float(np.var(lbp)) | |
| # Edges | |
| grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) | |
| grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) | |
| edge_mag = np.sqrt(grad_x ** 2 + grad_y ** 2) | |
| edge_density = float(np.mean(edge_mag)) | |
| # Edge orientation histogram (structural regularity) | |
| angles = np.arctan2(grad_y, grad_x + 1e-8) | |
| angle_hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi)) | |
| angle_hist = angle_hist / (angle_hist.sum() + 1e-8) | |
| orientation_entropy = -np.sum(angle_hist[angle_hist > 0] * np.log2(angle_hist[angle_hist > 0] + 1e-10)) | |
| # GLCM-like contrast (simplified: variance of neighbors) | |
| shifted_r = np.roll(gray, 1, axis=1) | |
| shifted_d = np.roll(gray, 1, axis=0) | |
| glcm_contrast = float(np.mean((gray - shifted_r) ** 2 + (gray - shifted_d) ** 2)) | |
| return { | |
| "mean_rgb": mean_rgb, "std_rgb": std_rgb, "mean_hsv": mean_hsv, "mean_lab": mean_lab, | |
| "ndvi": ndvi, "texture_std": texture_std, "lbp_variance": lbp_variance, | |
| "edge_density": edge_density, "orientation_entropy": orientation_entropy, | |
| "glcm_contrast": glcm_contrast, | |
| "color_homogeneity": float(np.mean(std_rgb)), | |
| "brightness": float(mean_lab[0]), | |
| "green_ratio": green_ratio, "blue_ratio": blue_ratio, "red_ratio": red_ratio, | |
| "saturation": float(mean_hsv[1]), "hue": float(mean_hsv[0]), | |
| } | |
| def _is_transient_object(area, w, h, features): | |
| """ | |
| Filter out transient objects (people, cars, animals, shadows, etc.) | |
| that are NOT permanent ground/structural changes. | |
| Returns True if the region is likely transient and should be excluded. | |
| """ | |
| aspect_ratio = max(w, h) / max(min(w, h), 1) | |
| # Very small regions are likely noise, people, or small vehicles | |
| if area < 300: | |
| return True | |
| # Tall narrow regions (aspect > 4) are likely people or poles | |
| if aspect_ratio > 5.0 and area < 2000: | |
| return True | |
| # Very high edge density + small area = likely a person or vehicle | |
| if features["edge_density"] > 80 and area < 1500: | |
| return True | |
| # Extremely high texture variance in small area = likely transient clutter | |
| if features["texture_std"] > 60 and area < 1000: | |
| return True | |
| return False | |
| # Ground-level change categories only | |
| GROUND_CHANGE_TYPES = [ | |
| "New Construction/Building", | |
| "Demolition/Clearing", | |
| "Vegetation Change", | |
| "Water Body Change", | |
| "Road/Pavement Change", | |
| "Bare Land/Soil Change", | |
| ] | |
| def classify_object_type(image_region, bbox): | |
| """ | |
| Classify GROUND-LEVEL structural changes only. | |
| Categories: construction, demolition, vegetation, water, road, bare land. | |
| Transient objects (people, cars, animals) are filtered out. | |
| """ | |
| x, y, w, h = bbox | |
| pad = 5 | |
| y1 = max(0, y - pad) | |
| y2 = min(image_region.shape[0], y + h + pad) | |
| x1 = max(0, x - pad) | |
| x2 = min(image_region.shape[1], x + w + pad) | |
| region = image_region[y1:y2, x1:x2] | |
| if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3: | |
| return "Unclassified", 0.0 | |
| features = extract_advanced_features(region) | |
| if features is None: | |
| return "Unclassified", 0.0 | |
| area = w * h | |
| # Filter out transient objects (people, cars, animals) | |
| if _is_transient_object(area, w, h, features): | |
| return None, 0.0 # signal to exclude this region | |
| aspect_ratio = max(w, h) / max(min(w, h), 1) | |
| compactness = (4 * np.pi * area) / ((2 * (w + h)) ** 2 + 1e-6) | |
| scores = {} | |
| # ---- Water Body Change ---- | |
| water = 0.0 | |
| if features["blue_ratio"] > 0.36: | |
| water += 0.22 | |
| if features["texture_std"] < 28: | |
| water += 0.18 | |
| if features["edge_density"] < 35: | |
| water += 0.14 | |
| if 90 <= features["hue"] <= 135: | |
| water += 0.18 | |
| if features["lbp_variance"] < 0.05: | |
| water += 0.14 | |
| if features["glcm_contrast"] < 500: | |
| water += 0.10 | |
| if area > 800: | |
| water += 0.04 | |
| scores["Water Body Change"] = water | |
| # ---- Vegetation Change (deforestation, new growth, crop change) ---- | |
| veg = 0.0 | |
| if features["ndvi"] > 0.05: | |
| veg += 0.22 | |
| if features["ndvi"] > 0.15: | |
| veg += 0.10 | |
| if features["green_ratio"] > 0.36: | |
| veg += 0.18 | |
| if 35 <= features["hue"] <= 85: | |
| veg += 0.15 | |
| if features["texture_std"] > 18: | |
| veg += 0.08 | |
| if features["lbp_variance"] > 0.03: | |
| veg += 0.08 | |
| if features["saturation"] > 40: | |
| veg += 0.10 | |
| if features["orientation_entropy"] > 2.5: | |
| veg += 0.05 | |
| if area > 500: | |
| veg += 0.04 | |
| scores["Vegetation Change"] = veg | |
| # ---- New Construction/Building ---- | |
| bld = 0.0 | |
| if features["orientation_entropy"] < 2.5: | |
| bld += 0.18 | |
| if features["color_homogeneity"] < 28: | |
| bld += 0.15 | |
| if 1.0 <= aspect_ratio <= 4.0: | |
| bld += 0.12 | |
| if 0.3 <= compactness <= 0.9: | |
| bld += 0.10 | |
| if features["edge_density"] > 30: | |
| bld += 0.12 | |
| if features["glcm_contrast"] > 400: | |
| bld += 0.10 | |
| if features["saturation"] < 90: | |
| bld += 0.10 | |
| if 40 <= features["brightness"] <= 90: | |
| bld += 0.08 | |
| if area > 1000: | |
| bld += 0.05 | |
| scores["New Construction/Building"] = bld | |
| # ---- Demolition/Clearing ---- | |
| demo = 0.0 | |
| if features["texture_std"] > 30: | |
| demo += 0.18 | |
| if features["orientation_entropy"] > 2.8: | |
| demo += 0.15 | |
| if features["color_homogeneity"] > 25: | |
| demo += 0.15 | |
| if features["brightness"] > 60: | |
| demo += 0.10 | |
| if features["ndvi"] < 0.05: | |
| demo += 0.12 | |
| if features["saturation"] < 70: | |
| demo += 0.10 | |
| if area > 800: | |
| demo += 0.05 | |
| scores["Demolition/Clearing"] = demo | |
| # ---- Road/Pavement Change ---- | |
| road = 0.0 | |
| if aspect_ratio > 2.5: | |
| road += 0.22 | |
| if features["color_homogeneity"] < 22: | |
| road += 0.18 | |
| if features["texture_std"] < 32: | |
| road += 0.15 | |
| if features["saturation"] < 65: | |
| road += 0.12 | |
| if features["orientation_entropy"] < 2.0: | |
| road += 0.15 | |
| if 35 <= features["brightness"] <= 75: | |
| road += 0.10 | |
| if compactness < 0.3: | |
| road += 0.05 | |
| if area > 600: | |
| road += 0.03 | |
| scores["Road/Pavement Change"] = road | |
| # ---- Bare Land/Soil Change ---- | |
| soil = 0.0 | |
| if features["red_ratio"] > 0.34 and features["green_ratio"] < 0.36: | |
| soil += 0.20 | |
| if 8 <= features["hue"] <= 38: | |
| soil += 0.18 | |
| if features["ndvi"] < 0.05: | |
| soil += 0.18 | |
| if features["texture_std"] < 35: | |
| soil += 0.12 | |
| if features["lbp_variance"] < 0.04: | |
| soil += 0.12 | |
| if 40 <= features["saturation"] <= 130: | |
| soil += 0.10 | |
| if 45 <= features["brightness"] <= 82: | |
| soil += 0.10 | |
| scores["Bare Land/Soil Change"] = soil | |
| # Normalize scores | |
| max_score = max(scores.values()) if scores else 0 | |
| if max_score > 0: | |
| for k in scores: | |
| scores[k] /= max_score | |
| best = max(scores, key=scores.get) | |
| conf = scores[best] | |
| if conf < 0.30: | |
| return "Unclassified", conf | |
| return best, min(conf, 1.0) | |
| def classify_with_ensemble(image_region, bbox, num_sub=4): | |
| """Ensemble: classify full region + sub-regions, vote with confidence weighting.""" | |
| x, y, w, h = bbox | |
| sub_boxes = [(x, y, w, h)] # full region | |
| if w > 20 and h > 20: | |
| hw, hh = w // 2, h // 2 | |
| sub_boxes += [ | |
| (x, y, hw, hh), | |
| (x + hw, y, hw, hh), | |
| (x, y + hh, hw, hh), | |
| (x + hw, y + hh, hw, hh), | |
| (x + w // 4, y + h // 4, hw, hh), | |
| ] | |
| classifications = [] | |
| confidences = [] | |
| for sb in sub_boxes: | |
| obj_type, conf = classify_object_type(image_region, sb) | |
| if obj_type is None: | |
| return None, 0.0 # transient → exclude | |
| if obj_type != "Unclassified": | |
| classifications.append(obj_type) | |
| confidences.append(conf) | |
| if not classifications: | |
| return classify_object_type(image_region, (x, y, w, h)) | |
| # Weighted voting | |
| weighted = {} | |
| counts = Counter(classifications) | |
| for ot, c in zip(classifications, confidences): | |
| weighted[ot] = weighted.get(ot, 0) + c | |
| best_type = max(weighted, key=weighted.get) | |
| avg_conf = weighted[best_type] / counts[best_type] | |
| if counts[best_type] / len(classifications) >= 0.6: | |
| avg_conf = min(1.0, avg_conf * 1.15) | |
| return best_type, avg_conf | |
| # --------------------------------------------------------------------------- | |
| # 11. Region analysis | |
| # --------------------------------------------------------------------------- | |
| def analyze_change_regions(change_mask, image, min_area=200, use_ensemble=True): | |
| """ | |
| Find connected change regions, classify as ground-level changes only. | |
| Transient objects (people, cars, animals) are filtered out. | |
| """ | |
| num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(change_mask, connectivity=8) | |
| change_regions = [] | |
| region_id = 0 | |
| for i in range(1, num_labels): | |
| area = stats[i, cv2.CC_STAT_AREA] | |
| if area < min_area: | |
| continue | |
| x = stats[i, cv2.CC_STAT_LEFT] | |
| y = stats[i, cv2.CC_STAT_TOP] | |
| w = stats[i, cv2.CC_STAT_WIDTH] | |
| h = stats[i, cv2.CC_STAT_HEIGHT] | |
| cx, cy = centroids[i] | |
| if use_ensemble and area > 500: | |
| object_type, confidence = classify_with_ensemble(image, (x, y, w, h)) | |
| else: | |
| object_type, confidence = classify_object_type(image, (x, y, w, h)) | |
| # None means transient / irrelevant → skip | |
| if object_type is None: | |
| continue | |
| region_id += 1 | |
| change_regions.append({ | |
| "id": region_id, | |
| "area": area, | |
| "bbox": (x, y, w, h), | |
| "center": (int(cx), int(cy)), | |
| "object_type": object_type, | |
| "confidence": confidence, | |
| }) | |
| change_regions.sort(key=lambda r: r["area"], reverse=True) | |
| return change_regions | |
| # --------------------------------------------------------------------------- | |
| # 12. Main pipeline | |
| # --------------------------------------------------------------------------- | |
| def run_detection(before_pil, after_pil, method="AI-Based Deep Learning", | |
| enable_registration=True, enable_normalization=True): | |
| """Run full detection pipeline; returns change_mask, result_image, stats, regions.""" | |
| before_array = preprocess_image(before_pil) | |
| after_array = preprocess_image(after_pil) | |
| if enable_registration: | |
| before_array, after_array, _ = register_images(before_array, after_array) | |
| if enable_normalization: | |
| before_array, after_array = normalize_radiometry(before_array, after_array) | |
| if method == "AI-Based Deep Learning": | |
| change_mask = ai_deep_learning_method(before_array, after_array) | |
| elif method == "Image Difference": | |
| change_mask = image_difference_method(before_array, after_array) | |
| elif method == "Feature-Based": | |
| change_mask = feature_based_method(before_array, after_array) | |
| else: | |
| change_mask = hybrid_method(before_array, after_array) | |
| # Classify regions | |
| change_regions = analyze_change_regions(change_mask, after_array, min_area=80) | |
| # Color-coded visualization using region classifications | |
| result_image = visualize_changes(before_array, after_array, change_mask, regions=change_regions) | |
| total_pixels = int(change_mask.shape[0] * change_mask.shape[1]) | |
| changed_pixels = int(np.sum(change_mask > 127)) | |
| change_pct = (changed_pixels / total_pixels * 100.0) if total_pixels else 0.0 | |
| stats = { | |
| "total_pixels": total_pixels, | |
| "changed_pixels": changed_pixels, | |
| "unchanged_pixels": total_pixels - changed_pixels, | |
| "change_percentage": change_pct, | |
| "image_width": change_mask.shape[1], | |
| "image_height": change_mask.shape[0], | |
| } | |
| return change_mask, result_image, stats, change_regions | |