Spaces:
Sleeping
Sleeping
| """ | |
| Satellite Change Detection Engine v3 | |
| High-accuracy detection with multi-channel analysis, SSIM, CVA, texture features, | |
| adaptive thresholding, vegetation/shadow suppression, SNR-weighted fusion, | |
| and improved object classification. | |
| """ | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image | |
| from sklearn.cluster import KMeans | |
| from sklearn.preprocessing import StandardScaler | |
| from collections import Counter | |
| # --------------------------------------------------------------------------- | |
| # 1. Pre-processing | |
| # --------------------------------------------------------------------------- | |
| def preprocess_image(image): | |
| """Preprocess image: convert to RGB, limit size, bilateral denoise.""" | |
| img_array = np.array(image) | |
| if img_array.ndim == 2: | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) | |
| elif img_array.ndim == 3 and img_array.shape[2] == 4: | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB) | |
| elif img_array.ndim != 3 or img_array.shape[2] != 3: | |
| raise ValueError(f"Unsupported image shape: {img_array.shape}") | |
| max_size = 2000 | |
| height, width = img_array.shape[:2] | |
| if max(height, width) > max_size: | |
| scale = max_size / max(height, width) | |
| new_w, new_h = max(1, int(width * scale)), max(1, int(height * scale)) | |
| img_array = cv2.resize(img_array, (new_w, new_h), interpolation=cv2.INTER_AREA) | |
| # Bilateral filter: reduces sensor noise while preserving edges | |
| img_array = cv2.bilateralFilter(img_array, 9, 75, 75) | |
| return img_array | |
| # --------------------------------------------------------------------------- | |
| # 2. Improved image registration (alignment) | |
| # --------------------------------------------------------------------------- | |
| def register_images(img1, img2, max_features=2000): | |
| """Align img2 to img1 using ORB + ratio-test + RANSAC homography.""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) | |
| orb = cv2.ORB_create(nfeatures=max_features, scoreType=cv2.ORB_HARRIS_SCORE) | |
| kp1, des1 = orb.detectAndCompute(gray1, None) | |
| kp2, des2 = orb.detectAndCompute(gray2, None) | |
| if des1 is None or des2 is None or len(des1) < 10 or len(des2) < 10: | |
| return _register_images_ecc_fallback(img1, img2) | |
| # Use kNN matching with Lowe's ratio test for better matches | |
| bf = cv2.BFMatcher(cv2.NORM_HAMMING) | |
| raw_matches = bf.knnMatch(des1, des2, k=2) | |
| good_matches = [] | |
| for pair in raw_matches: | |
| if len(pair) == 2: | |
| m, n = pair | |
| if m.distance < 0.75 * n.distance: | |
| good_matches.append(m) | |
| if len(good_matches) < 10: | |
| return _register_images_ecc_fallback(img1, img2) | |
| src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) | |
| dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) | |
| homography, mask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 3.0) | |
| if homography is None: | |
| return _register_images_ecc_fallback(img1, img2) | |
| inlier_ratio = np.sum(mask) / len(mask) if mask is not None else 0 | |
| if inlier_ratio < 0.3: | |
| return _register_images_ecc_fallback(img1, img2) | |
| # Reject degenerate homographies (near-singular or extreme distortion) | |
| det = np.linalg.det(homography) | |
| if abs(det) < 0.1 or abs(det) > 10.0: | |
| return _register_images_ecc_fallback(img1, img2) | |
| h, w = img1.shape[:2] | |
| img2_aligned = cv2.warpPerspective(img2, homography, (w, h), borderMode=cv2.BORDER_REFLECT) | |
| return img1, img2_aligned, True | |
| def _register_images_ecc_fallback(img1, img2): | |
| """ | |
| Fallback alignment with ECC affine registration. | |
| More stable than ORB on low-texture agricultural areas. | |
| """ | |
| try: | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) | |
| gray1_f = gray1.astype(np.float32) / 255.0 | |
| gray2_f = gray2.astype(np.float32) / 255.0 | |
| warp = np.eye(2, 3, dtype=np.float32) | |
| criteria = ( | |
| cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, | |
| 200, | |
| 1e-6, | |
| ) | |
| cc, warp = cv2.findTransformECC( | |
| gray1_f, gray2_f, warp, cv2.MOTION_AFFINE, criteria | |
| ) | |
| h, w = img1.shape[:2] | |
| aligned = cv2.warpAffine( | |
| img2, | |
| warp, | |
| (w, h), | |
| flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP, | |
| borderMode=cv2.BORDER_REFLECT, | |
| ) | |
| # Treat as successful only if ECC correlation is reasonable. | |
| return img1, aligned, bool(cc >= 0.45) | |
| except Exception: | |
| return img1, img2, False | |
| # --------------------------------------------------------------------------- | |
| # 3. Improved radiometric normalization | |
| # --------------------------------------------------------------------------- | |
| def normalize_radiometry(img1, img2): | |
| """Histogram-matching normalization in LAB space. CLAHE applied symmetrically.""" | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| result = lab2.copy() | |
| for ch in range(3): | |
| mean1, std1 = np.mean(lab1[:, :, ch]), np.std(lab1[:, :, ch]) | |
| mean2, std2 = np.mean(lab2[:, :, ch]), np.std(lab2[:, :, ch]) | |
| if std2 > 1e-6: | |
| result[:, :, ch] = (lab2[:, :, ch] - mean2) * (std1 / std2) + mean1 | |
| result_uint8 = np.clip(result, 0, 255).astype(np.uint8) | |
| # CLAHE on L channel of BOTH images so downstream comparison is symmetric | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| lab1_uint8 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB) | |
| lab1_uint8[:, :, 0] = clahe.apply(lab1_uint8[:, :, 0]) | |
| result_uint8[:, :, 0] = clahe.apply(result_uint8[:, :, 0]) | |
| img1_out = cv2.cvtColor(lab1_uint8, cv2.COLOR_LAB2RGB) | |
| img2_out = cv2.cvtColor(result_uint8, cv2.COLOR_LAB2RGB) | |
| return img1_out, img2_out | |
| # --------------------------------------------------------------------------- | |
| # 4. Vegetation suppression | |
| # --------------------------------------------------------------------------- | |
| def compute_vegetation_mask(img): | |
| """ | |
| Identify vegetation pixels using pseudo-NDVI and HSV hue/saturation. | |
| Returns a float map in [0, 1] where 1.0 = vegetation, 0.0 = non-vegetation. | |
| """ | |
| r = img[:, :, 0].astype(np.float32) | |
| g = img[:, :, 1].astype(np.float32) | |
| ndvi = (g - r) / (g + r + 1e-6) | |
| hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV) | |
| hue = hsv[:, :, 0].astype(np.float32) | |
| sat = hsv[:, :, 1].astype(np.float32) | |
| ndvi_veg = (ndvi > 0.08).astype(np.float32) | |
| hsv_veg = ((hue >= 35) & (hue <= 85) & (sat > 30)).astype(np.float32) | |
| veg = np.clip(ndvi_veg * 0.6 + hsv_veg * 0.4, 0, 1) | |
| veg = cv2.GaussianBlur(veg, (11, 11), 0) | |
| return veg | |
| def compute_combined_vegetation_suppression(img1, img2): | |
| """ | |
| Asymmetric vegetation handling: | |
| - Where BOTH images are vegetated: suppress (likely seasonal noise) | |
| - Where only ONE image is vegetated: boost (real vegetation change) | |
| Returns a float map where 1.0 = neutral, <1 = suppress, >1 = boost. | |
| """ | |
| veg1 = compute_vegetation_mask(img1) | |
| veg2 = compute_vegetation_mask(img2) | |
| both_veg = np.minimum(veg1, veg2) | |
| one_only = np.abs(veg1 - veg2) | |
| seasonal_suppression = 1.0 - both_veg * 0.7 | |
| vegetation_boost = 1.0 + one_only * 0.3 | |
| return (seasonal_suppression * vegetation_boost).astype(np.float32) | |
| # --------------------------------------------------------------------------- | |
| # 5. Shadow / illumination-only change suppression | |
| # --------------------------------------------------------------------------- | |
| def compute_shadow_suppression(img1, img2): | |
| """ | |
| Detect pixels where only brightness (L) changed but chrominance (A, B) | |
| stayed similar. These are shadow/illumination shifts, not real changes. | |
| Returns a float map in [0, 1]: 1.0 = real change, ~0.2 = illumination-only. | |
| """ | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| delta_l = np.abs(lab1[:, :, 0] - lab2[:, :, 0]) | |
| delta_a = np.abs(lab1[:, :, 1] - lab2[:, :, 1]) | |
| delta_b = np.abs(lab1[:, :, 2] - lab2[:, :, 2]) | |
| chroma_change = delta_a + delta_b | |
| brightness_only = (delta_l > 18) & (chroma_change < 12) | |
| shadow_map = brightness_only.astype(np.float32) | |
| shadow_map = cv2.GaussianBlur(shadow_map, (9, 9), 0) | |
| suppression = 1.0 - shadow_map * 0.8 | |
| return suppression.astype(np.float32) | |
| # --------------------------------------------------------------------------- | |
| # 6. Change Vector Analysis (CVA) | |
| # --------------------------------------------------------------------------- | |
| def compute_cva(img1, img2): | |
| """ | |
| Change Vector Analysis in LAB space. | |
| Returns a normalized change magnitude map with illumination-only | |
| changes suppressed via direction filtering. | |
| """ | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| dl = (lab2[:, :, 0] - lab1[:, :, 0]) / 100.0 | |
| da = (lab2[:, :, 1] - lab1[:, :, 1]) / 128.0 | |
| db = (lab2[:, :, 2] - lab1[:, :, 2]) / 128.0 | |
| magnitude = np.sqrt(dl ** 2 + da ** 2 + db ** 2) | |
| chroma_mag = np.sqrt(da ** 2 + db ** 2) | |
| total_mag = magnitude + 1e-8 | |
| chroma_ratio = chroma_mag / total_mag | |
| # Suppress illumination-only changes (low chroma ratio) | |
| suppression = np.clip(chroma_ratio * 2.5, 0.15, 1.0) | |
| magnitude = magnitude * suppression | |
| p995 = float(np.quantile(magnitude, 0.995)) | |
| if p995 > 1e-8: | |
| magnitude = np.clip(magnitude / p995, 0, 1) | |
| return magnitude.astype(np.float32) | |
| # --------------------------------------------------------------------------- | |
| # 7. SSIM-based structural change map | |
| # --------------------------------------------------------------------------- | |
| def _ssim_at_scale(gray1, gray2, win_size=11): | |
| """Compute SSIM dissimilarity at a single scale.""" | |
| sigma = win_size / 6.0 | |
| C1 = (0.01 * 255) ** 2 | |
| C2 = (0.03 * 255) ** 2 | |
| mu1 = cv2.GaussianBlur(gray1, (win_size, win_size), sigma) | |
| mu2 = cv2.GaussianBlur(gray2, (win_size, win_size), sigma) | |
| mu1_sq = mu1 * mu1 | |
| mu2_sq = mu2 * mu2 | |
| mu1_mu2 = mu1 * mu2 | |
| sigma1_sq = np.maximum(cv2.GaussianBlur(gray1 * gray1, (win_size, win_size), sigma) - mu1_sq, 0) | |
| sigma2_sq = np.maximum(cv2.GaussianBlur(gray2 * gray2, (win_size, win_size), sigma) - mu2_sq, 0) | |
| sigma12 = cv2.GaussianBlur(gray1 * gray2, (win_size, win_size), sigma) - mu1_mu2 | |
| denom = (mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2) | |
| ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / (denom + 1e-12) | |
| dssim = np.clip((1.0 - ssim_map) / 2.0, 0, 1) | |
| return dssim | |
| def compute_ssim_change_map(img1, img2, win_size=11): | |
| """ | |
| Multi-scale SSIM dissimilarity: averages full-res and half-res scales | |
| to capture both fine and coarse structural changes. | |
| """ | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float64) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float64) | |
| dssim_full = _ssim_at_scale(gray1, gray2, win_size) | |
| h, w = gray1.shape | |
| g1_half = cv2.resize(gray1, (max(1, w // 2), max(1, h // 2))) | |
| g2_half = cv2.resize(gray2, (max(1, w // 2), max(1, h // 2))) | |
| half_win = max(3, win_size // 2) | 1 | |
| dssim_half = _ssim_at_scale(g1_half, g2_half, half_win) | |
| dssim_half_up = cv2.resize(dssim_half, (w, h)) | |
| dssim = 0.6 * dssim_full + 0.4 * dssim_half_up | |
| return dssim | |
| # --------------------------------------------------------------------------- | |
| # 8. Texture feature extraction (LBP) | |
| # --------------------------------------------------------------------------- | |
| def compute_lbp(gray, radius=1, n_points=8): | |
| """Compute simplified Local Binary Pattern texture descriptor.""" | |
| h, w = gray.shape | |
| lbp = np.zeros_like(gray, dtype=np.float32) | |
| for i in range(n_points): | |
| angle = 2 * np.pi * i / n_points | |
| dx = int(round(radius * np.cos(angle))) | |
| dy = int(round(-radius * np.sin(angle))) | |
| shifted = np.roll(np.roll(gray, dy, axis=0), dx, axis=1) | |
| lbp += (shifted >= gray).astype(np.float32) | |
| return lbp / n_points | |
| def compute_texture_change(img1, img2): | |
| """Compute texture difference using LBP.""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| lbp1 = compute_lbp(gray1) | |
| lbp2 = compute_lbp(gray2) | |
| texture_diff = np.abs(lbp1 - lbp2) | |
| return texture_diff | |
| # --------------------------------------------------------------------------- | |
| # 9. Edge-aware change detection | |
| # --------------------------------------------------------------------------- | |
| def compute_edge_change(img1, img2): | |
| """Compute edge-based change map using Canny edges.""" | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) | |
| # Adaptive Canny thresholds based on median intensity | |
| med1 = np.median(gray1) | |
| edges1 = cv2.Canny(gray1, int(max(0, 0.67 * med1)), int(min(255, 1.33 * med1))) | |
| med2 = np.median(gray2) | |
| edges2 = cv2.Canny(gray2, int(max(0, 0.67 * med2)), int(min(255, 1.33 * med2))) | |
| # Dilate edges slightly so nearby edges match | |
| kernel = np.ones((3, 3), np.uint8) | |
| edges1_d = cv2.dilate(edges1, kernel, iterations=1) | |
| edges2_d = cv2.dilate(edges2, kernel, iterations=1) | |
| # New edges = present in one image but not the other | |
| edge_change = cv2.absdiff(edges1_d, edges2_d).astype(np.float32) / 255.0 | |
| return edge_change | |
| # --------------------------------------------------------------------------- | |
| # 10. Improved detection methods | |
| # --------------------------------------------------------------------------- | |
| def _adaptive_binary_threshold(score_uint8, min_floor=25, sensitivity=0.5): | |
| """ | |
| Robust thresholding for noisy scenes. | |
| Uses max(Otsu, noise-floor, fixed floor) where noise-floor is median + 3*MAD. | |
| """ | |
| otsu_val, _ = cv2.threshold( | |
| score_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU | |
| ) | |
| median = float(np.median(score_uint8)) | |
| mad = float(np.median(np.abs(score_uint8.astype(np.float32) - median))) | |
| noise_floor = median + 3.0 * mad | |
| # Higher sensitivity => lower threshold (detect more), lower sensitivity => stricter | |
| sens = float(np.clip(sensitivity, 0.0, 1.0)) | |
| sens_shift = int((0.5 - sens) * 24) # approx -12..+12 around baseline | |
| thr = int(max(min_floor, otsu_val, noise_floor) + sens_shift) | |
| thr = max(0, min(255, thr)) | |
| _, mask = cv2.threshold(score_uint8, thr, 255, cv2.THRESH_BINARY) | |
| return mask, thr, float(otsu_val), float(noise_floor) | |
| def image_difference_method(img1, img2, threshold=0.25, blur_size=5, sensitivity=0.5): | |
| """Improved image difference with multi-channel analysis and adaptive threshold.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| # Multi-channel difference in LAB (perceptually uniform) | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab1_blur = cv2.GaussianBlur(lab1, (blur_size, blur_size), 0) | |
| lab2_blur = cv2.GaussianBlur(lab2, (blur_size, blur_size), 0) | |
| # Weighted Delta-E inspired difference | |
| diff = lab1_blur - lab2_blur | |
| delta_e = np.sqrt( | |
| (diff[:, :, 0] / 100.0) ** 2 + | |
| (diff[:, :, 1] / 128.0) ** 2 + | |
| (diff[:, :, 2] / 128.0) ** 2 | |
| ) | |
| delta_e = delta_e / delta_e.max() if delta_e.max() > 0 else delta_e | |
| delta_uint8 = (delta_e * 255).astype(np.uint8) | |
| change_mask, used_thr, otsu_val, noise_floor = _adaptive_binary_threshold( | |
| delta_uint8, min_floor=30, sensitivity=sensitivity | |
| ) | |
| change_mask = _clean_mask(change_mask) | |
| debug = { | |
| "method": "Image Difference", | |
| "threshold_used": int(used_thr), | |
| "otsu": float(otsu_val), | |
| "noise_floor": float(noise_floor), | |
| "sensitivity": float(sensitivity), | |
| } | |
| return change_mask, debug | |
| def feature_based_method(img1, img2, num_clusters=4, sensitivity=0.5): | |
| """Feature-based change detection using multi-space clustering.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| hsv1 = cv2.cvtColor(img1, cv2.COLOR_RGB2HSV).astype(np.float32) | |
| hsv2 = cv2.cvtColor(img2, cv2.COLOR_RGB2HSV).astype(np.float32) | |
| diff_lab = np.abs(lab1 - lab2) | |
| diff_hsv = np.abs(hsv1 - hsv2) | |
| h, w, _ = diff_lab.shape | |
| features = np.concatenate([diff_lab, diff_hsv[:, :, 1:]], axis=2) | |
| # Downsample for KMeans (full-res is too slow for >1M pixels) | |
| MAX_PIXELS = 250_000 | |
| total = h * w | |
| if total > MAX_PIXELS: | |
| scale = np.sqrt(MAX_PIXELS / total) | |
| sh, sw = max(1, int(h * scale)), max(1, int(w * scale)) | |
| features_small = cv2.resize(features, (sw, sh)) | |
| else: | |
| features_small = features | |
| sh, sw = h, w | |
| features_flat = features_small.reshape(-1, features_small.shape[2]) | |
| scaler = StandardScaler() | |
| features_scaled = scaler.fit_transform(features_flat) | |
| kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10) | |
| labels_small = kmeans.fit_predict(features_scaled) | |
| cluster_means = [ | |
| np.mean(np.linalg.norm(features_flat[labels_small == i], axis=1)) | |
| if np.any(labels_small == i) else 0.0 | |
| for i in range(num_clusters) | |
| ] | |
| change_cluster_idx = np.argmax(cluster_means) | |
| # Map labels back to full resolution by predicting on all pixels | |
| if total > MAX_PIXELS: | |
| full_flat = features.reshape(-1, features.shape[2]) | |
| full_scaled = scaler.transform(full_flat) | |
| labels = kmeans.predict(full_scaled) | |
| else: | |
| labels = labels_small | |
| change_mask = (labels == change_cluster_idx).astype(np.uint8) * 255 | |
| change_mask = change_mask.reshape(h, w) | |
| change_mask = _clean_mask(change_mask, sensitivity) | |
| return change_mask | |
| def _snr_weight(channel): | |
| """ | |
| Signal-to-noise ratio weight: signal = mean of top 5% values, | |
| noise = std of bottom 50%. Channels with concentrated high responses | |
| score higher than uniformly noisy ones. | |
| """ | |
| flat = channel.ravel() | |
| p95 = float(np.quantile(flat, 0.95)) | |
| signal = float(np.mean(flat[flat >= p95])) if p95 > 1e-8 else 0.0 | |
| p50 = float(np.quantile(flat, 0.50)) | |
| noise = float(np.std(flat[flat <= p50])) + 1e-8 | |
| return signal / noise | |
| def _ai_fusion_core(img1, img2, sensitivity=0.5): | |
| """ | |
| Single-pass AI fusion with 5 channels, SNR weighting, and | |
| vegetation + shadow suppression. Returns (mask, debug). | |
| """ | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| # ---- Channel 1: Multi-scale LAB color difference ---- | |
| lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| scales = [1, 2, 4] | |
| color_maps = [] | |
| for scale in scales: | |
| if scale > 1: | |
| s1 = cv2.resize(lab1, (lab1.shape[1] // scale, lab1.shape[0] // scale)) | |
| s2 = cv2.resize(lab2, (lab2.shape[1] // scale, lab2.shape[0] // scale)) | |
| else: | |
| s1, s2 = lab1, lab2 | |
| diff = s1 - s2 | |
| delta_e = np.sqrt((diff[:, :, 0] / 100.0) ** 2 + | |
| (diff[:, :, 1] / 128.0) ** 2 + | |
| (diff[:, :, 2] / 128.0) ** 2) | |
| if scale > 1: | |
| delta_e = cv2.resize(delta_e, (lab1.shape[1], lab1.shape[0])) | |
| color_maps.append(delta_e) | |
| color_change = np.mean(color_maps, axis=0) | |
| color_change = color_change / (color_change.max() + 1e-8) | |
| # ---- Channel 2: SSIM structural dissimilarity ---- | |
| ssim_change = compute_ssim_change_map(img1, img2) | |
| ssim_change = ssim_change / (ssim_change.max() + 1e-8) | |
| # ---- Channel 3: Texture change (LBP) ---- | |
| texture_change = compute_texture_change(img1, img2) | |
| texture_change = texture_change / (texture_change.max() + 1e-8) | |
| # ---- Channel 4: Edge change ---- | |
| edge_change = compute_edge_change(img1, img2) | |
| # ---- Channel 5: Change Vector Analysis ---- | |
| cva_change = compute_cva(img1, img2) | |
| # ---- SNR-weighted fusion ---- | |
| channels = [color_change, ssim_change, texture_change, edge_change, cva_change] | |
| weights = [_snr_weight(ch) for ch in channels] | |
| total_w = sum(weights) + 1e-8 | |
| weights = [w / total_w for w in weights] | |
| fused = np.zeros_like(color_change, dtype=np.float64) | |
| for ch, w in zip(channels, weights): | |
| fused += w * ch.astype(np.float64) | |
| # ---- Apply vegetation + shadow suppression before thresholding ---- | |
| veg_suppression = compute_combined_vegetation_suppression(img1, img2) | |
| shadow_suppression = compute_shadow_suppression(img1, img2) | |
| fused = fused * veg_suppression.astype(np.float64) * shadow_suppression.astype(np.float64) | |
| # Percentile normalization | |
| p995 = float(np.quantile(fused, 0.995)) | |
| if p995 <= 1e-8: | |
| p995 = float(fused.max() + 1e-8) | |
| fused_norm = np.clip(fused / (p995 + 1e-8), 0.0, 1.0) | |
| gamma = 0.85 | |
| fused_norm = np.power(fused_norm, gamma) | |
| fused_smooth = cv2.GaussianBlur(fused_norm.astype(np.float32), (7, 7), 0) | |
| sens = float(np.clip(sensitivity, 0.0, 1.0)) | |
| q = 0.945 - (sens - 0.5) * 0.04 | |
| q = float(np.clip(q, 0.88, 0.97)) | |
| thr_score = float(np.quantile(fused_smooth, q)) | |
| change_mask = (fused_smooth >= thr_score).astype(np.uint8) * 255 | |
| change_mask = _clean_mask(change_mask, sensitivity=sens) | |
| change_mask = cv2.bilateralFilter(change_mask, 9, 75, 75) | |
| _, change_mask = cv2.threshold(change_mask, 127, 255, cv2.THRESH_BINARY) | |
| debug = { | |
| "method": "AI-Core", | |
| "threshold_used": int(thr_score * 255), | |
| "threshold_percentile_q": q, | |
| "threshold_score": thr_score, | |
| "fused_p95": float(np.quantile(fused_smooth, 0.95)), | |
| "fused_p99": float(np.quantile(fused_smooth, 0.99)), | |
| "fused_mean": float(np.mean(fused_smooth)), | |
| "sensitivity": float(sensitivity), | |
| "channel_weights": { | |
| "color": round(weights[0], 4), | |
| "ssim": round(weights[1], 4), | |
| "texture": round(weights[2], 4), | |
| "edge": round(weights[3], 4), | |
| "cva": round(weights[4], 4), | |
| }, | |
| } | |
| return change_mask, debug | |
| def ai_deep_learning_method(img1, img2, sensitivity=0.5): | |
| """ | |
| Uses the pre-trained AdaptFormer model when available; falls back to the | |
| rule-based multi-channel fusion otherwise. | |
| """ | |
| from .model_inference import is_model_available, predict_change_mask | |
| if is_model_available(): | |
| threshold = 0.35 + (1.0 - sensitivity) * 0.3 | |
| try: | |
| change_mask, score_map = predict_change_mask( | |
| img1, img2, threshold=threshold) | |
| change_mask = _clean_mask(change_mask, sensitivity=sensitivity) | |
| debug = { | |
| "method": "AI-Based Deep Learning (AdaptFormer)", | |
| "model": "adaptformer-levir-cd", | |
| "threshold_used": int(threshold * 255), | |
| "sensitivity": float(sensitivity), | |
| } | |
| return change_mask, debug | |
| except Exception as e: | |
| import logging | |
| logging.getLogger(__name__).warning( | |
| "AdaptFormer inference failed, falling back to rule-based: %s", e) | |
| change_mask, core_debug = _ai_fusion_core(img1, img2, sensitivity=sensitivity) | |
| debug = { | |
| "method": "AI-Based Deep Learning (rule-based fallback)", | |
| "threshold_used": core_debug.get("threshold_used"), | |
| "sensitivity": float(sensitivity), | |
| "core": core_debug, | |
| } | |
| return change_mask, debug | |
| def hybrid_method(img1, img2, sensitivity=0.5): | |
| """Hybrid: weighted fusion of all methods with confidence-based merging.""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| diff_mask, diff_debug = image_difference_method(img1, img2, sensitivity=sensitivity) | |
| feature_mask = feature_based_method(img1, img2) | |
| ai_mask, ai_debug = ai_deep_learning_method(img1, img2, sensitivity=sensitivity) | |
| # Weighted combination: AI method gets most weight | |
| combined = ( | |
| 0.2 * diff_mask.astype(np.float32) + | |
| 0.3 * feature_mask.astype(np.float32) + | |
| 0.5 * ai_mask.astype(np.float32) | |
| ) | |
| # Combined mask values: | |
| # - diff only: 0.2*255 ≈ 51 | |
| # - feature only: 0.3*255 ≈ 76 | |
| # - ai only: 0.5*255 ≈ 127 | |
| # Keep threshold low enough that ai-only regions can pass. | |
| base_thr = 98 | |
| sens = float(np.clip(sensitivity, 0.0, 1.0)) | |
| hybrid_thr = int(np.clip(base_thr + int((0.5 - sens) * 36), 60, 150)) | |
| _, final_mask = cv2.threshold(combined.astype(np.uint8), hybrid_thr, 255, cv2.THRESH_BINARY) | |
| final_mask = _clean_mask(final_mask) | |
| debug = { | |
| "method": "Hybrid Approach", | |
| "threshold_used": int(hybrid_thr), | |
| "sensitivity": float(sensitivity), | |
| "sub_methods": { | |
| "image_difference": diff_debug, | |
| "ai_deep_learning": ai_debug, | |
| }, | |
| } | |
| return final_mask, debug | |
| # --------------------------------------------------------------------------- | |
| # 11. Robust post-processing | |
| # --------------------------------------------------------------------------- | |
| def _clean_mask(mask, sensitivity=0.5, border_margin=12): | |
| """ | |
| Robust morphological cleaning: | |
| 1. Zero-out border pixels (registration artifacts) | |
| 2. Median filter to kill salt-and-pepper noise | |
| 3. Opening to remove small specks | |
| 4. Closing to bridge tiny gaps | |
| 5. Fill holes inside regions | |
| 6. Erode-then-dilate to break thin noise bridges | |
| 7. Connected-component area + circularity filtering | |
| """ | |
| mask = mask.copy() | |
| h, w = mask.shape[:2] | |
| if border_margin > 0: | |
| mask[:border_margin, :] = 0 | |
| mask[-border_margin:, :] = 0 | |
| mask[:, :border_margin] = 0 | |
| mask[:, -border_margin:] = 0 | |
| mask = cv2.medianBlur(mask, 5) | |
| open_size = max(3, int(5 * (1 - sensitivity * 0.5))) | |
| if open_size % 2 == 0: | |
| open_size += 1 | |
| k_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size)) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k_open) | |
| close_size = max(3, int(7 * (1 - sensitivity))) | |
| if close_size % 2 == 0: | |
| close_size += 1 | |
| k_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size)) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_close) | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| filled = np.zeros_like(mask) | |
| cv2.drawContours(filled, contours, -1, 255, thickness=cv2.FILLED) | |
| k_break = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) | |
| filled = cv2.erode(filled, k_break, iterations=1) | |
| filled = cv2.dilate(filled, k_break, iterations=1) | |
| # 7. Component-level filtering: remove tiny survivors and elongated noise | |
| min_component_px = max(80, int(h * w * 0.00004)) | |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(filled, connectivity=8) | |
| clean = np.zeros_like(filled) | |
| for i in range(1, num_labels): | |
| area = stats[i, cv2.CC_STAT_AREA] | |
| if area < min_component_px: | |
| continue | |
| cw = stats[i, cv2.CC_STAT_WIDTH] | |
| ch = stats[i, cv2.CC_STAT_HEIGHT] | |
| bbox_area = max(cw * ch, 1) | |
| perimeter_approx = 2 * (cw + ch) | |
| # Circularity: thin elongated noise has very high perimeter^2/area | |
| circularity = (perimeter_approx ** 2) / (bbox_area + 1e-8) | |
| if circularity > 80 and area < min_component_px * 3: | |
| continue | |
| clean[labels == i] = 255 | |
| return clean | |
| # --------------------------------------------------------------------------- | |
| # 12. Severity classification and improved visualization | |
| # --------------------------------------------------------------------------- | |
| def _severity_from_region(region, total_pixels): | |
| """ | |
| Type-aware severity classification. | |
| Building/structural changes use area + confidence. | |
| Vegetation changes weight confidence (NDVI delta) more heavily. | |
| """ | |
| area = region.get("area", 0) | |
| confidence = region.get("confidence", 0.0) | |
| obj_type = region.get("object_type", "") | |
| if total_pixels <= 0: | |
| return "minor" | |
| area_ratio = area / total_pixels | |
| if obj_type in _VEGETATION_TYPES or "Vegetation" in (obj_type or ""): | |
| score = area_ratio * 600 + confidence * 0.6 | |
| if score < 0.8: | |
| return "minor" | |
| if score < 3.0: | |
| return "moderate" | |
| return "major" | |
| if obj_type in _STRUCTURAL_TYPES or obj_type in _BUILDING_TYPES: | |
| score = area_ratio * 1200 + confidence * 0.4 | |
| if score < 1.2: | |
| return "minor" | |
| if score < 4.5: | |
| return "moderate" | |
| return "major" | |
| score = area_ratio * 1000 + confidence * 0.3 | |
| if score < 1.0: | |
| return "minor" | |
| if score < 4.0: | |
| return "moderate" | |
| return "major" | |
| # RGB colors for severity — high-contrast, colorblind-friendly palette | |
| _SEVERITY_COLORS = { | |
| "minor": (50, 205, 50), # Lime green | |
| "moderate": (255, 165, 0), # Orange | |
| "major": (255, 50, 50), # Bright red | |
| } | |
| # Maximum bounding boxes drawn on the image to avoid visual clutter | |
| _MAX_VISIBLE_BOXES = 30 | |
| def visualize_changes(img1, img2, change_mask, regions=None, total_pixels=None): | |
| """ | |
| Clean visualization: subtle tinted overlay for changed pixels, | |
| color-coded contour outlines (not filled boxes) for the top regions, | |
| and compact numbered labels. | |
| """ | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| if change_mask.shape[:2] != img2.shape[:2]: | |
| change_mask = cv2.resize(change_mask, (img2.shape[1], img2.shape[0])) | |
| overlay = img2.copy().astype(np.float32) | |
| mask_bool = change_mask > 127 | |
| mask_float = mask_bool.astype(np.float32) | |
| # Subtle warm tint on changed pixels (18% alpha) — enough to see, not enough to hide | |
| tint = np.zeros_like(img2, dtype=np.float32) | |
| tint[:, :, 0] = 255 | |
| tint[:, :, 1] = 80 | |
| alpha = 0.18 | |
| for c in range(3): | |
| overlay[:, :, c] = (overlay[:, :, c] * (1 - mask_float * alpha) | |
| + tint[:, :, c] * mask_float * alpha) | |
| overlay_uint8 = np.clip(overlay, 0, 255).astype(np.uint8) | |
| total_px = total_pixels if total_pixels is not None else (img2.shape[0] * img2.shape[1]) | |
| if regions: | |
| diag = np.sqrt(img2.shape[0]**2 + img2.shape[1]**2) | |
| line_thickness = max(1, int(diag / 1100)) | |
| visible = regions[:_MAX_VISIBLE_BOXES] | |
| for r in visible: | |
| x, y, w, h = r["bbox"] | |
| severity = r.get("severity") or _severity_from_region(r, total_px) | |
| color = _SEVERITY_COLORS.get(severity, (255, 255, 255)) | |
| # Draw only the outline — no fill, keeps the image readable | |
| cv2.rectangle(overlay_uint8, (x, y), (x + w, y + h), color, line_thickness) | |
| # Compact label: region number in a small pill | |
| rid = r.get("id", 0) | |
| label = str(rid) | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| font_scale = max(0.32, min(0.48, w / 200)) | |
| txt_thick = 1 | |
| (tw, th), _ = cv2.getTextSize(label, font, font_scale, txt_thick) | |
| lx = x | |
| ly = max(th + 3, y - 3) | |
| # Dark background pill for contrast on any terrain | |
| cv2.rectangle(overlay_uint8, | |
| (lx, ly - th - 2), (lx + tw + 5, ly + 1), | |
| (30, 30, 30), cv2.FILLED) | |
| cv2.putText(overlay_uint8, label, (lx + 2, ly - 1), | |
| font, font_scale, color, txt_thick, cv2.LINE_AA) | |
| return overlay_uint8 | |
| # --------------------------------------------------------------------------- | |
| # 13. Improved object classification | |
| # --------------------------------------------------------------------------- | |
| def extract_advanced_features(region): | |
| """Extract rich features for classification: color, texture, edge, shape.""" | |
| if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3: | |
| return None | |
| hsv = cv2.cvtColor(region, cv2.COLOR_RGB2HSV) | |
| lab = cv2.cvtColor(region, cv2.COLOR_RGB2LAB) | |
| gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| # Color stats | |
| mean_rgb = np.mean(region, axis=(0, 1)) | |
| std_rgb = np.std(region, axis=(0, 1)) | |
| mean_hsv = np.mean(hsv, axis=(0, 1)) | |
| mean_lab = np.mean(lab, axis=(0, 1)) | |
| total_rgb = np.sum(mean_rgb) + 1e-6 | |
| green_ratio = mean_rgb[1] / total_rgb | |
| blue_ratio = mean_rgb[2] / total_rgb | |
| red_ratio = mean_rgb[0] / total_rgb | |
| # Vegetation indices | |
| ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6) | |
| # Texture | |
| texture_std = float(np.std(gray)) | |
| lbp = compute_lbp(gray.astype(np.float32)) | |
| lbp_variance = float(np.var(lbp)) | |
| # Edges | |
| grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) | |
| grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) | |
| edge_mag = np.sqrt(grad_x ** 2 + grad_y ** 2) | |
| edge_density = float(np.mean(edge_mag)) | |
| # Edge orientation histogram (structural regularity) | |
| angles = np.arctan2(grad_y, grad_x + 1e-8) | |
| angle_hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi)) | |
| angle_hist = angle_hist / (angle_hist.sum() + 1e-8) | |
| orientation_entropy = -np.sum(angle_hist[angle_hist > 0] * np.log2(angle_hist[angle_hist > 0] + 1e-10)) | |
| # GLCM-like contrast (simplified: variance of neighbors) | |
| shifted_r = np.roll(gray, 1, axis=1) | |
| shifted_d = np.roll(gray, 1, axis=0) | |
| glcm_contrast = float(np.mean((gray - shifted_r) ** 2 + (gray - shifted_d) ** 2)) | |
| return { | |
| "mean_rgb": mean_rgb, "std_rgb": std_rgb, "mean_hsv": mean_hsv, "mean_lab": mean_lab, | |
| "ndvi": ndvi, "texture_std": texture_std, "lbp_variance": lbp_variance, | |
| "edge_density": edge_density, "orientation_entropy": orientation_entropy, | |
| "glcm_contrast": glcm_contrast, | |
| "color_homogeneity": float(np.mean(std_rgb)), | |
| "brightness": float(mean_lab[0]), | |
| "green_ratio": green_ratio, "blue_ratio": blue_ratio, "red_ratio": red_ratio, | |
| "saturation": float(mean_hsv[1]), "hue": float(mean_hsv[0]), | |
| } | |
| def _is_transient_object(area, w, h, features): | |
| """ | |
| Filter out transient objects (people, cars, animals, shadows, etc.) | |
| that are NOT permanent ground/structural changes. | |
| Returns True if the region is likely transient and should be excluded. | |
| """ | |
| aspect_ratio = max(w, h) / max(min(w, h), 1) | |
| # Very small regions are likely noise, people, or small vehicles | |
| if area < 300: | |
| return True | |
| # Tall narrow regions (aspect > 4) are likely people or poles | |
| if aspect_ratio > 5.0 and area < 2000: | |
| return True | |
| # Very high edge density + small area = likely a person or vehicle | |
| if features["edge_density"] > 80 and area < 1500: | |
| return True | |
| # Extremely high texture variance in small area = likely transient clutter | |
| if features["texture_std"] > 60 and area < 1000: | |
| return True | |
| return False | |
| def _count_line_segments(gray_crop): | |
| """Count straight line segments using LSD — buildings have many, vegetation has few.""" | |
| if gray_crop.size == 0 or gray_crop.shape[0] < 5 or gray_crop.shape[1] < 5: | |
| return 0, 0.0 | |
| lsd = cv2.createLineSegmentDetector(0) | |
| lines, _, _, _ = lsd.detect(gray_crop.astype(np.uint8)) | |
| if lines is None: | |
| return 0, 0.0 | |
| n_lines = len(lines) | |
| total_length = sum( | |
| np.sqrt((l[0][2] - l[0][0])**2 + (l[0][3] - l[0][1])**2) | |
| for l in lines | |
| ) | |
| return n_lines, float(total_length) | |
| def _count_corners(gray_crop): | |
| """Count strong corners — buildings have clustered grid-like corners.""" | |
| if gray_crop.size == 0 or gray_crop.shape[0] < 5 or gray_crop.shape[1] < 5: | |
| return 0 | |
| corners = cv2.goodFeaturesToTrack( | |
| gray_crop.astype(np.uint8), maxCorners=100, | |
| qualityLevel=0.05, minDistance=5) | |
| return 0 if corners is None else len(corners) | |
| def _rectangular_hull_ratio(gray_crop, threshold=128): | |
| """Ratio of non-zero area to bounding rect — buildings fill their box.""" | |
| if gray_crop.size == 0: | |
| return 0.0 | |
| _, binary = cv2.threshold(gray_crop.astype(np.uint8), threshold, 255, cv2.THRESH_BINARY) | |
| contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return 0.0 | |
| biggest = max(contours, key=cv2.contourArea) | |
| contour_area = cv2.contourArea(biggest) | |
| _, _, rw, rh = cv2.boundingRect(biggest) | |
| rect_area = max(rw * rh, 1) | |
| return contour_area / rect_area | |
| def _extract_differential_features(before_crop, after_crop): | |
| """Extract features from BOTH before and after crops plus their deltas.""" | |
| feat_b = extract_advanced_features(before_crop) | |
| feat_a = extract_advanced_features(after_crop) | |
| if feat_b is None or feat_a is None: | |
| return None | |
| gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY) | |
| gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY) | |
| lines_b, linelen_b = _count_line_segments(gray_b) | |
| lines_a, linelen_a = _count_line_segments(gray_a) | |
| corners_b = _count_corners(gray_b) | |
| corners_a = _count_corners(gray_a) | |
| hull_a = _rectangular_hull_ratio(gray_a) | |
| lab_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2LAB).astype(np.float32) | |
| lab_dist = float(np.mean(np.sqrt(np.sum((lab_a - lab_b) ** 2, axis=2)))) | |
| return { | |
| "before": feat_b, "after": feat_a, | |
| "delta_ndvi": feat_a["ndvi"] - feat_b["ndvi"], | |
| "delta_green_ratio": feat_a["green_ratio"] - feat_b["green_ratio"], | |
| "delta_edge_density": feat_a["edge_density"] - feat_b["edge_density"], | |
| "delta_brightness": feat_a["brightness"] - feat_b["brightness"], | |
| "delta_texture_std": feat_a["texture_std"] - feat_b["texture_std"], | |
| "delta_saturation": feat_a["saturation"] - feat_b["saturation"], | |
| "delta_orientation_entropy": feat_a["orientation_entropy"] - feat_b["orientation_entropy"], | |
| "delta_lines": lines_a - lines_b, | |
| "delta_line_length": linelen_a - linelen_b, | |
| "delta_corners": corners_a - corners_b, | |
| "lines_after": lines_a, "corners_after": corners_a, | |
| "lines_before": lines_b, "corners_before": corners_b, | |
| "hull_ratio_after": hull_a, | |
| "lab_color_distance": lab_dist, | |
| } | |
| def classify_object_type(image_region, bbox, before_region=None): | |
| """ | |
| Classify the type of change in a region. | |
| When before_region is provided, uses differential (before vs after) analysis | |
| for dramatically better accuracy. Falls back to single-image analysis otherwise. | |
| """ | |
| x, y, w, h = bbox | |
| pad = 5 | |
| y1 = max(0, y - pad) | |
| y2 = min(image_region.shape[0], y + h + pad) | |
| x1 = max(0, x - pad) | |
| x2 = min(image_region.shape[1], x + w + pad) | |
| after_crop = image_region[y1:y2, x1:x2] | |
| if after_crop.size == 0 or after_crop.shape[0] < 3 or after_crop.shape[1] < 3: | |
| return "Unclassified", 0.0 | |
| feat_a = extract_advanced_features(after_crop) | |
| if feat_a is None: | |
| return "Unclassified", 0.0 | |
| area = w * h | |
| if _is_transient_object(area, w, h, feat_a): | |
| return None, 0.0 | |
| aspect_ratio = max(w, h) / max(min(w, h), 1) | |
| compactness = (4 * np.pi * area) / ((2 * (w + h)) ** 2 + 1e-6) | |
| # --- Differential classification when before image is available --- | |
| diff = None | |
| if before_region is not None: | |
| by1 = max(0, y - pad) | |
| by2 = min(before_region.shape[0], y + h + pad) | |
| bx1 = max(0, x - pad) | |
| bx2 = min(before_region.shape[1], x + w + pad) | |
| before_crop = before_region[by1:by2, bx1:bx2] | |
| if before_crop.size > 0 and before_crop.shape[0] >= 3 and before_crop.shape[1] >= 3: | |
| diff = _extract_differential_features(before_crop, after_crop) | |
| scores = {} | |
| # ---- Water Body Change ---- | |
| water = 0.0 | |
| if feat_a["blue_ratio"] > 0.36: | |
| water += 0.22 | |
| if feat_a["texture_std"] < 28: | |
| water += 0.18 | |
| if feat_a["edge_density"] < 35: | |
| water += 0.14 | |
| if 90 <= feat_a["hue"] <= 135: | |
| water += 0.18 | |
| if feat_a["lbp_variance"] < 0.05: | |
| water += 0.14 | |
| if feat_a["glcm_contrast"] < 500: | |
| water += 0.10 | |
| if area > 800: | |
| water += 0.04 | |
| scores["Water Body Change"] = water | |
| # ---- Vegetation Change ---- | |
| veg = 0.0 | |
| if diff: | |
| # Differential: detect actual vegetation gain or loss | |
| if abs(diff["delta_ndvi"]) > 0.08: | |
| veg += 0.30 | |
| if abs(diff["delta_green_ratio"]) > 0.04: | |
| veg += 0.20 | |
| if diff["lab_color_distance"] > 15 and ( | |
| diff["before"]["ndvi"] > 0.05 or diff["after"]["ndvi"] > 0.05): | |
| veg += 0.15 | |
| if abs(diff["delta_saturation"]) > 15 and ( | |
| diff["before"]["green_ratio"] > 0.34 or diff["after"]["green_ratio"] > 0.34): | |
| veg += 0.15 | |
| if diff["delta_lines"] < 3 and diff["delta_corners"] < 5: | |
| veg += 0.08 | |
| if area > 500: | |
| veg += 0.04 | |
| else: | |
| if feat_a["ndvi"] > 0.05: | |
| veg += 0.22 | |
| if feat_a["ndvi"] > 0.15: | |
| veg += 0.10 | |
| if feat_a["green_ratio"] > 0.36: | |
| veg += 0.18 | |
| if 35 <= feat_a["hue"] <= 85: | |
| veg += 0.15 | |
| if feat_a["saturation"] > 40: | |
| veg += 0.10 | |
| if feat_a["orientation_entropy"] > 2.5: | |
| veg += 0.05 | |
| if area > 500: | |
| veg += 0.04 | |
| scores["Vegetation Change"] = veg | |
| # ---- New Construction/Building ---- | |
| bld = 0.0 | |
| if diff: | |
| if diff["delta_edge_density"] > 15: | |
| bld += 0.20 | |
| if diff["delta_orientation_entropy"] < -0.4: | |
| bld += 0.15 | |
| if diff["delta_lines"] > 5: | |
| bld += 0.15 | |
| if diff["delta_corners"] > 8: | |
| bld += 0.12 | |
| if diff["after"]["ndvi"] < 0.05 and diff["before"]["ndvi"] > 0.03: | |
| bld += 0.12 | |
| if diff["hull_ratio_after"] > 0.55: | |
| bld += 0.10 | |
| if 1.0 <= aspect_ratio <= 4.0: | |
| bld += 0.08 | |
| if area > 1000: | |
| bld += 0.05 | |
| else: | |
| if feat_a["orientation_entropy"] < 2.5: | |
| bld += 0.18 | |
| if feat_a["color_homogeneity"] < 28: | |
| bld += 0.15 | |
| if 1.0 <= aspect_ratio <= 4.0: | |
| bld += 0.12 | |
| if 0.3 <= compactness <= 0.9: | |
| bld += 0.10 | |
| if feat_a["edge_density"] > 30: | |
| bld += 0.12 | |
| if feat_a["glcm_contrast"] > 400: | |
| bld += 0.10 | |
| if feat_a["saturation"] < 90: | |
| bld += 0.10 | |
| if 40 <= feat_a["brightness"] <= 90: | |
| bld += 0.08 | |
| if area > 1000: | |
| bld += 0.05 | |
| scores["New Construction/Building"] = bld | |
| # ---- Demolition/Clearing ---- | |
| demo = 0.0 | |
| if diff: | |
| if diff["delta_edge_density"] < -15: | |
| demo += 0.22 | |
| if diff["delta_lines"] < -5: | |
| demo += 0.18 | |
| if diff["delta_corners"] < -8: | |
| demo += 0.15 | |
| if diff["delta_texture_std"] > 8: | |
| demo += 0.12 | |
| if diff["delta_brightness"] > 10: | |
| demo += 0.12 | |
| if diff["after"]["ndvi"] > 0.03 and diff["before"]["ndvi"] < 0.02: | |
| demo += 0.08 | |
| if area > 800: | |
| demo += 0.05 | |
| else: | |
| if feat_a["texture_std"] > 30: | |
| demo += 0.18 | |
| if feat_a["orientation_entropy"] > 2.8: | |
| demo += 0.15 | |
| if feat_a["color_homogeneity"] > 25: | |
| demo += 0.15 | |
| if feat_a["brightness"] > 60: | |
| demo += 0.10 | |
| if feat_a["ndvi"] < 0.05: | |
| demo += 0.12 | |
| if feat_a["saturation"] < 70: | |
| demo += 0.10 | |
| if area > 800: | |
| demo += 0.05 | |
| scores["Demolition/Clearing"] = demo | |
| # ---- Road/Pavement Change ---- | |
| road = 0.0 | |
| if aspect_ratio > 2.5: | |
| road += 0.22 | |
| if feat_a["color_homogeneity"] < 22: | |
| road += 0.18 | |
| if feat_a["texture_std"] < 32: | |
| road += 0.15 | |
| if feat_a["saturation"] < 65: | |
| road += 0.12 | |
| if feat_a["orientation_entropy"] < 2.0: | |
| road += 0.15 | |
| if 35 <= feat_a["brightness"] <= 75: | |
| road += 0.10 | |
| if compactness < 0.3: | |
| road += 0.05 | |
| if area > 600: | |
| road += 0.03 | |
| scores["Road/Pavement Change"] = road | |
| # ---- Bare Land/Soil Change ---- | |
| soil = 0.0 | |
| if feat_a["red_ratio"] > 0.34 and feat_a["green_ratio"] < 0.36: | |
| soil += 0.20 | |
| if 8 <= feat_a["hue"] <= 38: | |
| soil += 0.18 | |
| if feat_a["ndvi"] < 0.05: | |
| soil += 0.18 | |
| if feat_a["texture_std"] < 35: | |
| soil += 0.12 | |
| if feat_a["lbp_variance"] < 0.04: | |
| soil += 0.12 | |
| if 40 <= feat_a["saturation"] <= 130: | |
| soil += 0.10 | |
| if 45 <= feat_a["brightness"] <= 82: | |
| soil += 0.10 | |
| scores["Bare Land/Soil Change"] = soil | |
| best = max(scores, key=scores.get) | |
| conf = scores[best] | |
| if conf < 0.30: | |
| return "Unclassified", conf | |
| return best, min(conf, 1.0) | |
| def classify_with_ensemble(image_region, bbox, before_region=None): | |
| """Ensemble: classify full region + sub-regions, vote with confidence weighting.""" | |
| x, y, w, h = bbox | |
| sub_boxes = [(x, y, w, h)] | |
| if w > 20 and h > 20: | |
| hw, hh = w // 2, h // 2 | |
| sub_boxes += [ | |
| (x, y, hw, hh), | |
| (x + hw, y, hw, hh), | |
| (x, y + hh, hw, hh), | |
| (x + hw, y + hh, hw, hh), | |
| (x + w // 4, y + h // 4, hw, hh), | |
| ] | |
| classifications = [] | |
| confidences = [] | |
| transient_count = 0 | |
| for sb in sub_boxes: | |
| obj_type, conf = classify_object_type(image_region, sb, | |
| before_region=before_region) | |
| if obj_type is None: | |
| transient_count += 1 | |
| continue | |
| if obj_type != "Unclassified": | |
| classifications.append(obj_type) | |
| confidences.append(conf) | |
| if transient_count > len(sub_boxes) // 2: | |
| return None, 0.0 | |
| if not classifications: | |
| return classify_object_type(image_region, (x, y, w, h), | |
| before_region=before_region) | |
| weighted = {} | |
| counts = Counter(classifications) | |
| for ot, c in zip(classifications, confidences): | |
| weighted[ot] = weighted.get(ot, 0) + c | |
| best_type = max(weighted, key=weighted.get) | |
| avg_conf = weighted[best_type] / counts[best_type] | |
| if counts[best_type] / len(classifications) >= 0.6: | |
| avg_conf = min(1.0, avg_conf * 1.15) | |
| return best_type, avg_conf | |
| # --------------------------------------------------------------------------- | |
| # 14. Vegetation sub-classification | |
| # --------------------------------------------------------------------------- | |
| _VEGETATION_TYPES = {"Vegetation Change"} | |
| def _compute_region_greenness(crop): | |
| """Return (ndvi, green_ratio, mean_saturation) for an RGB crop.""" | |
| if crop.size == 0 or crop.shape[0] < 2 or crop.shape[1] < 2: | |
| return 0.0, 0.0, 0.0 | |
| mean_rgb = np.mean(crop, axis=(0, 1)).astype(np.float64) | |
| total = np.sum(mean_rgb) + 1e-6 | |
| green_ratio = mean_rgb[1] / total | |
| ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6) | |
| hsv = cv2.cvtColor(crop, cv2.COLOR_RGB2HSV) | |
| mean_sat = float(np.mean(hsv[:, :, 1])) | |
| return float(ndvi), float(green_ratio), mean_sat | |
| def _compute_texture_regularity(gray_crop): | |
| """Measure how regular/grid-like the texture is (low entropy = regular crops).""" | |
| if gray_crop.size == 0 or gray_crop.shape[0] < 3 or gray_crop.shape[1] < 3: | |
| return 3.0 | |
| gx = cv2.Sobel(gray_crop.astype(np.float32), cv2.CV_64F, 1, 0, ksize=3) | |
| gy = cv2.Sobel(gray_crop.astype(np.float32), cv2.CV_64F, 0, 1, ksize=3) | |
| angles = np.arctan2(gy, gx + 1e-8) | |
| hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi)) | |
| hist = hist / (hist.sum() + 1e-8) | |
| entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10)) | |
| return float(entropy) | |
| def classify_vegetation_subtype(before_img, after_img, bbox): | |
| """ | |
| Compare before/after crops to determine vegetation change sub-type. | |
| Returns (subtype_name, confidence). | |
| """ | |
| x, y, w, h = bbox | |
| pad = 5 | |
| y1, y2 = max(0, y - pad), min(before_img.shape[0], y + h + pad) | |
| x1, x2 = max(0, x - pad), min(before_img.shape[1], x + w + pad) | |
| before_crop = before_img[y1:y2, x1:x2] | |
| after_crop = after_img[y1:y2, x1:x2] | |
| if before_crop.size == 0 or after_crop.size == 0: | |
| return "Vegetation Change", 0.3 | |
| ndvi_b, green_b, sat_b = _compute_region_greenness(before_crop) | |
| ndvi_a, green_a, sat_a = _compute_region_greenness(after_crop) | |
| gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY) | |
| gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY) | |
| tex_entropy_b = _compute_texture_regularity(gray_b) | |
| tex_entropy_a = _compute_texture_regularity(gray_a) | |
| brightness_b = float(np.mean(gray_b)) | |
| brightness_a = float(np.mean(gray_a)) | |
| ndvi_delta = ndvi_a - ndvi_b | |
| green_delta = green_a - green_b | |
| sat_delta = sat_a - sat_b | |
| scores = { | |
| "Deforestation/Tree Removal": 0.0, | |
| "New Vegetation/Growth": 0.0, | |
| "Crop/Agricultural Change": 0.0, | |
| "Vegetation Health Decline": 0.0, | |
| "Seasonal Variation": 0.0, | |
| } | |
| # --- Deforestation: was green, now not green --- | |
| if ndvi_b > 0.08 and ndvi_delta < -0.06: | |
| scores["Deforestation/Tree Removal"] += 0.30 | |
| if green_b > 0.36 and green_delta < -0.03: | |
| scores["Deforestation/Tree Removal"] += 0.20 | |
| if brightness_a > brightness_b + 10: | |
| scores["Deforestation/Tree Removal"] += 0.15 | |
| if sat_delta < -15: | |
| scores["Deforestation/Tree Removal"] += 0.15 | |
| if tex_entropy_a < tex_entropy_b - 0.3: | |
| scores["Deforestation/Tree Removal"] += 0.10 | |
| # --- New Vegetation/Growth: was bare, now green --- | |
| if ndvi_a > 0.08 and ndvi_delta > 0.06: | |
| scores["New Vegetation/Growth"] += 0.30 | |
| if green_a > 0.36 and green_delta > 0.03: | |
| scores["New Vegetation/Growth"] += 0.20 | |
| if sat_delta > 15: | |
| scores["New Vegetation/Growth"] += 0.15 | |
| if brightness_a < brightness_b - 5: | |
| scores["New Vegetation/Growth"] += 0.10 | |
| if tex_entropy_a > tex_entropy_b + 0.2: | |
| scores["New Vegetation/Growth"] += 0.10 | |
| # --- Crop/Agricultural Change: regular texture patterns, moderate color shift --- | |
| is_regular = tex_entropy_b < 2.5 or tex_entropy_a < 2.5 | |
| if is_regular: | |
| scores["Crop/Agricultural Change"] += 0.25 | |
| if 0.03 < abs(ndvi_delta) < 0.12: | |
| scores["Crop/Agricultural Change"] += 0.20 | |
| if sat_b > 35 and sat_a > 35: | |
| scores["Crop/Agricultural Change"] += 0.15 | |
| if abs(green_delta) < 0.04 and abs(ndvi_delta) > 0.02: | |
| scores["Crop/Agricultural Change"] += 0.15 | |
| area = w * h | |
| if area > 3000: | |
| scores["Crop/Agricultural Change"] += 0.10 | |
| # --- Vegetation Health Decline: still green but browning --- | |
| if ndvi_b > 0.05 and ndvi_a > 0.02 and ndvi_delta < -0.03: | |
| scores["Vegetation Health Decline"] += 0.25 | |
| if green_b > 0.34 and green_a > 0.30 and green_delta < -0.02: | |
| scores["Vegetation Health Decline"] += 0.20 | |
| if -20 < sat_delta < -3: | |
| scores["Vegetation Health Decline"] += 0.20 | |
| if abs(brightness_a - brightness_b) < 15: | |
| scores["Vegetation Health Decline"] += 0.10 | |
| # --- Seasonal Variation: mild shift in color/texture, both sides green --- | |
| if ndvi_b > 0.04 and ndvi_a > 0.04 and abs(ndvi_delta) < 0.05: | |
| scores["Seasonal Variation"] += 0.25 | |
| if abs(green_delta) < 0.03: | |
| scores["Seasonal Variation"] += 0.20 | |
| if abs(sat_delta) < 12: | |
| scores["Seasonal Variation"] += 0.15 | |
| if abs(brightness_a - brightness_b) < 12: | |
| scores["Seasonal Variation"] += 0.15 | |
| best = max(scores, key=scores.get) | |
| conf = scores[best] | |
| if conf < 0.25: | |
| return "Vegetation Change", 0.3 | |
| return best, min(conf, 1.0) | |
| # --------------------------------------------------------------------------- | |
| # 15. Structural change sub-classification | |
| # --------------------------------------------------------------------------- | |
| _STRUCTURAL_TYPES = {"New Construction/Building", "Demolition/Clearing", | |
| "Road/Pavement Change"} | |
| def _region_has_structure(crop): | |
| """Heuristic: does this crop contain building-like structure (edges + regularity)?""" | |
| if crop.size == 0 or crop.shape[0] < 3 or crop.shape[1] < 3: | |
| return False, 0.0, 0.0 | |
| gray = cv2.cvtColor(crop, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| gx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) | |
| gy = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) | |
| edge_density = float(np.mean(np.sqrt(gx**2 + gy**2))) | |
| angles = np.arctan2(gy, gx + 1e-8) | |
| hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi)) | |
| hist = hist / (hist.sum() + 1e-8) | |
| entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10)) | |
| has_structure = edge_density > 25 and entropy < 2.8 | |
| return has_structure, edge_density, entropy | |
| def classify_structural_subtype(before_img, after_img, bbox, main_type): | |
| """ | |
| Compare before/after crops to determine structural change sub-type. | |
| Returns (subtype_name, confidence). | |
| """ | |
| x, y, w, h = bbox | |
| pad = 5 | |
| y1, y2 = max(0, y - pad), min(before_img.shape[0], y + h + pad) | |
| x1, x2 = max(0, x - pad), min(before_img.shape[1], x + w + pad) | |
| before_crop = before_img[y1:y2, x1:x2] | |
| after_crop = after_img[y1:y2, x1:x2] | |
| if before_crop.size == 0 or after_crop.size == 0: | |
| return main_type, 0.3 | |
| struct_b, edge_b, ent_b = _region_has_structure(before_crop) | |
| struct_a, edge_a, ent_a = _region_has_structure(after_crop) | |
| gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY) | |
| gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY) | |
| brightness_b = float(np.mean(gray_b)) | |
| brightness_a = float(np.mean(gray_a)) | |
| texture_b = float(np.std(gray_b)) | |
| texture_a = float(np.std(gray_a)) | |
| hsv_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2HSV) | |
| hsv_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2HSV) | |
| sat_b = float(np.mean(hsv_b[:, :, 1])) | |
| sat_a = float(np.mean(hsv_a[:, :, 1])) | |
| # Check greenness to detect cleared-to-green or green-to-built transitions | |
| mean_rgb_b = np.mean(before_crop, axis=(0, 1)) | |
| mean_rgb_a = np.mean(after_crop, axis=(0, 1)) | |
| ndvi_b = (mean_rgb_b[1] - mean_rgb_b[0]) / (mean_rgb_b[1] + mean_rgb_b[0] + 1e-6) | |
| ndvi_a = (mean_rgb_a[1] - mean_rgb_a[0]) / (mean_rgb_a[1] + mean_rgb_a[0] + 1e-6) | |
| area = w * h | |
| if main_type == "Road/Pavement Change": | |
| return _classify_road_subtype( | |
| struct_b, struct_a, edge_b, edge_a, brightness_b, brightness_a, | |
| texture_b, texture_a, area, w, h | |
| ) | |
| scores = { | |
| "New Building": 0.0, | |
| "Building Expansion": 0.0, | |
| "Renovation/Modification": 0.0, | |
| "Partial Demolition": 0.0, | |
| "Full Demolition": 0.0, | |
| "Infrastructure Change": 0.0, | |
| } | |
| # --- New Building: before had no structure, after does --- | |
| if not struct_b and struct_a: | |
| scores["New Building"] += 0.35 | |
| if edge_a > edge_b + 15: | |
| scores["New Building"] += 0.15 | |
| if ent_a < ent_b - 0.3: | |
| scores["New Building"] += 0.10 | |
| if ndvi_b > 0.05 and ndvi_a < 0.03: | |
| scores["New Building"] += 0.10 | |
| if sat_a < sat_b - 10: | |
| scores["New Building"] += 0.10 | |
| # --- Building Expansion: both have structure but after has more --- | |
| if struct_b and struct_a: | |
| scores["Building Expansion"] += 0.15 | |
| if struct_b and edge_a > edge_b * 1.2: | |
| scores["Building Expansion"] += 0.20 | |
| if struct_b and texture_a > texture_b + 5: | |
| scores["Building Expansion"] += 0.15 | |
| if abs(ent_a - ent_b) < 0.5 and edge_a > edge_b: | |
| scores["Building Expansion"] += 0.15 | |
| # --- Renovation/Modification: both have structure, similar density but different appearance --- | |
| if struct_b and struct_a: | |
| scores["Renovation/Modification"] += 0.15 | |
| if abs(edge_a - edge_b) < 12: | |
| scores["Renovation/Modification"] += 0.15 | |
| if abs(brightness_a - brightness_b) > 8: | |
| scores["Renovation/Modification"] += 0.20 | |
| if abs(sat_a - sat_b) > 10: | |
| scores["Renovation/Modification"] += 0.15 | |
| if abs(texture_a - texture_b) < 10: | |
| scores["Renovation/Modification"] += 0.10 | |
| # --- Partial Demolition: before had structure, after has less --- | |
| if struct_b and edge_a < edge_b * 0.7: | |
| scores["Partial Demolition"] += 0.25 | |
| if struct_b and ent_a > ent_b + 0.3: | |
| scores["Partial Demolition"] += 0.15 | |
| if texture_a > texture_b + 8: | |
| scores["Partial Demolition"] += 0.15 | |
| if brightness_a > brightness_b + 10: | |
| scores["Partial Demolition"] += 0.10 | |
| # --- Full Demolition: before had structure, after is bare/empty --- | |
| if struct_b and not struct_a: | |
| scores["Full Demolition"] += 0.35 | |
| if edge_b > 30 and edge_a < 20: | |
| scores["Full Demolition"] += 0.15 | |
| if texture_b > 25 and texture_a < 20: | |
| scores["Full Demolition"] += 0.15 | |
| if brightness_a > brightness_b + 15: | |
| scores["Full Demolition"] += 0.10 | |
| # --- Infrastructure Change: elongated shape, high edge regularity --- | |
| aspect = max(w, h) / max(min(w, h), 1) | |
| if aspect > 3.0: | |
| scores["Infrastructure Change"] += 0.25 | |
| if ent_a < 2.0 or ent_b < 2.0: | |
| scores["Infrastructure Change"] += 0.15 | |
| if area > 2000 and aspect > 2.5: | |
| scores["Infrastructure Change"] += 0.15 | |
| best = max(scores, key=scores.get) | |
| conf = scores[best] | |
| if conf < 0.25: | |
| return main_type, 0.3 | |
| return best, min(conf, 1.0) | |
| def _classify_road_subtype(struct_b, struct_a, edge_b, edge_a, | |
| brightness_b, brightness_a, texture_b, texture_a, | |
| area, w, h): | |
| """Sub-classify road/pavement changes.""" | |
| scores = { | |
| "New Road/Pavement": 0.0, | |
| "Road Widening": 0.0, | |
| "Road Resurfacing": 0.0, | |
| "Road Deterioration": 0.0, | |
| } | |
| if not struct_b and struct_a: | |
| scores["New Road/Pavement"] += 0.30 | |
| if edge_a > edge_b + 10: | |
| scores["New Road/Pavement"] += 0.20 | |
| if brightness_a < brightness_b: | |
| scores["New Road/Pavement"] += 0.15 | |
| if struct_b and struct_a and edge_a > edge_b * 1.15: | |
| scores["Road Widening"] += 0.30 | |
| if area > 2000: | |
| scores["Road Widening"] += 0.15 | |
| if struct_b and struct_a and abs(edge_a - edge_b) < 10: | |
| scores["Road Resurfacing"] += 0.20 | |
| if abs(brightness_a - brightness_b) > 12: | |
| scores["Road Resurfacing"] += 0.25 | |
| if abs(texture_a - texture_b) < 8: | |
| scores["Road Resurfacing"] += 0.15 | |
| if texture_a > texture_b + 10: | |
| scores["Road Deterioration"] += 0.25 | |
| if edge_a < edge_b - 5: | |
| scores["Road Deterioration"] += 0.20 | |
| if brightness_a > brightness_b + 8: | |
| scores["Road Deterioration"] += 0.15 | |
| best = max(scores, key=scores.get) | |
| conf = scores[best] | |
| if conf < 0.25: | |
| return "Road/Pavement Change", 0.3 | |
| return best, min(conf, 1.0) | |
| # --------------------------------------------------------------------------- | |
| # 16. 3D Building Analysis — height estimation + construction stage | |
| # --------------------------------------------------------------------------- | |
| _BUILDING_TYPES = {"New Construction/Building", "Demolition/Clearing"} | |
| _STORY_HEIGHT_M = 3.0 # assumed metres per story | |
| def _detect_shadow_region(before_gray, after_gray, bbox, expand=0.6): | |
| """ | |
| Find new shadow pixels adjacent to a building bbox. | |
| Returns a binary mask of likely shadow pixels in the expanded bbox area. | |
| """ | |
| x, y, w, h = bbox | |
| img_h, img_w = after_gray.shape[:2] | |
| # Expand bbox to capture shadows cast beside the building | |
| ex = int(w * expand) | |
| ey = int(h * expand) | |
| x1 = max(0, x - ex) | |
| y1 = max(0, y - ey) | |
| x2 = min(img_w, x + w + ex) | |
| y2 = min(img_h, y + h + ey) | |
| before_crop = before_gray[y1:y2, x1:x2].astype(np.float32) | |
| after_crop = after_gray[y1:y2, x1:x2].astype(np.float32) | |
| if before_crop.size == 0 or after_crop.size == 0: | |
| return None, 0 | |
| # New shadow = pixels that got significantly darker in the after image | |
| darkening = before_crop - after_crop | |
| dark_thresh = max(25, np.std(darkening) * 1.5) | |
| shadow_mask = (darkening > dark_thresh).astype(np.uint8) * 255 | |
| # Remove shadow pixels inside the building footprint itself | |
| bx1, by1 = x - x1, y - y1 | |
| bx2, by2 = bx1 + w, by1 + h | |
| bx1, by1 = max(0, bx1), max(0, by1) | |
| bx2 = min(shadow_mask.shape[1], bx2) | |
| by2 = min(shadow_mask.shape[0], by2) | |
| shadow_mask[by1:by2, bx1:bx2] = 0 | |
| # Clean noise | |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) | |
| shadow_mask = cv2.morphologyEx(shadow_mask, cv2.MORPH_OPEN, kernel) | |
| shadow_pixels = np.sum(shadow_mask > 0) | |
| return shadow_mask, shadow_pixels | |
| def estimate_building_height(before_img, after_img, bbox, features): | |
| """ | |
| Estimate building stories and height from shadow length and footprint geometry. | |
| Returns (estimated_stories, estimated_height_m). | |
| """ | |
| before_gray = cv2.cvtColor(before_img, cv2.COLOR_RGB2GRAY) | |
| after_gray = cv2.cvtColor(after_img, cv2.COLOR_RGB2GRAY) | |
| x, y, w, h = bbox | |
| shadow_mask, shadow_px = _detect_shadow_region(before_gray, after_gray, bbox) | |
| short_side = max(min(w, h), 1) | |
| footprint_area = w * h | |
| # --- Shadow-based estimate --- | |
| shadow_ratio = 0.0 | |
| if shadow_mask is not None and shadow_px > 20: | |
| # Measure max extent of shadow perpendicular to building edge | |
| coords = np.column_stack(np.where(shadow_mask > 0)) | |
| if len(coords) > 5: | |
| # Shadow length = extent along the longer axis of shadow cluster | |
| spread_y = coords[:, 0].max() - coords[:, 0].min() | |
| spread_x = coords[:, 1].max() - coords[:, 1].min() | |
| shadow_length = max(spread_y, spread_x) | |
| shadow_ratio = shadow_length / short_side | |
| # --- Footprint-based estimate --- | |
| aspect = max(w, h) / max(short_side, 1) | |
| # Compact footprints (aspect < 2.5) tend to be multi-story; elongated are single-story | |
| footprint_factor = 1.0 | |
| if aspect > 3.0: | |
| footprint_factor = 0.5 # likely single-story warehouse/industrial | |
| elif aspect < 1.5 and footprint_area > 2000: | |
| footprint_factor = 1.3 # compact large footprint = likely taller | |
| # --- Texture regularity bonus --- | |
| # Buildings with low orientation entropy (regular structure) tend to be taller | |
| regularity_bonus = 0.0 | |
| if features and features.get("orientation_entropy", 3.0) < 2.2: | |
| regularity_bonus = 0.5 | |
| # --- Combine signals --- | |
| # Base: shadow ratio maps ~0.3-0.5 per story in typical nadir imagery | |
| if shadow_ratio > 0.1: | |
| raw_stories = shadow_ratio / 0.35 | |
| else: | |
| # No clear shadow: use footprint area as rough proxy | |
| if footprint_area > 5000: | |
| raw_stories = 3.0 | |
| elif footprint_area > 2000: | |
| raw_stories = 2.0 | |
| else: | |
| raw_stories = 1.0 | |
| raw_stories = raw_stories * footprint_factor + regularity_bonus | |
| stories = max(1, min(50, int(round(raw_stories)))) | |
| height_m = round(stories * _STORY_HEIGHT_M, 1) | |
| return stories, height_m | |
| def classify_construction_stage(features, bbox): | |
| """ | |
| Classify construction stage from visual features. | |
| Returns (stage_name, confidence). | |
| """ | |
| if features is None: | |
| return "Unknown", 0.0 | |
| w, h = bbox[2], bbox[3] | |
| area = w * h | |
| scores = { | |
| "Foundation": 0.0, | |
| "Structural": 0.0, | |
| "Under Construction": 0.0, | |
| "Complete": 0.0, | |
| } | |
| tex = features.get("texture_std", 30) | |
| edge = features.get("edge_density", 40) | |
| orient = features.get("orientation_entropy", 2.5) | |
| homog = features.get("color_homogeneity", 25) | |
| bright = features.get("brightness", 60) | |
| sat = features.get("saturation", 50) | |
| glcm = features.get("glcm_contrast", 500) | |
| lbp_var = features.get("lbp_variance", 0.04) | |
| # --- Foundation --- | |
| # Flat, low-texture, soil/concrete colored, homogeneous | |
| if tex < 22: | |
| scores["Foundation"] += 0.25 | |
| if edge < 30: | |
| scores["Foundation"] += 0.20 | |
| if homog < 20: | |
| scores["Foundation"] += 0.20 | |
| if 40 <= bright <= 75: | |
| scores["Foundation"] += 0.15 | |
| if sat < 60: | |
| scores["Foundation"] += 0.10 | |
| if lbp_var < 0.03: | |
| scores["Foundation"] += 0.10 | |
| # --- Structural/Framing --- | |
| # High edges, geometric regularity, high contrast grid patterns | |
| if edge > 50: | |
| scores["Structural"] += 0.25 | |
| if orient < 2.2: | |
| scores["Structural"] += 0.20 | |
| if glcm > 800: | |
| scores["Structural"] += 0.20 | |
| if tex > 30: | |
| scores["Structural"] += 0.15 | |
| if homog > 30: | |
| scores["Structural"] += 0.10 | |
| if area > 1000: | |
| scores["Structural"] += 0.10 | |
| # --- Under Construction --- | |
| # Mixed materials, irregular texture, medium-high edge density | |
| if 25 < tex < 50: | |
| scores["Under Construction"] += 0.20 | |
| if 35 < edge < 65: | |
| scores["Under Construction"] += 0.20 | |
| if orient > 2.6: | |
| scores["Under Construction"] += 0.20 | |
| if homog > 25: | |
| scores["Under Construction"] += 0.15 | |
| if 0.03 < lbp_var < 0.07: | |
| scores["Under Construction"] += 0.15 | |
| if sat < 80: | |
| scores["Under Construction"] += 0.10 | |
| # --- Complete --- | |
| # Uniform roof, clean edges, low entropy, consistent color | |
| if tex < 28: | |
| scores["Complete"] += 0.20 | |
| if orient < 2.3: | |
| scores["Complete"] += 0.25 | |
| if homog < 22: | |
| scores["Complete"] += 0.20 | |
| if edge > 25: | |
| scores["Complete"] += 0.10 | |
| if lbp_var < 0.04: | |
| scores["Complete"] += 0.15 | |
| if bright > 50: | |
| scores["Complete"] += 0.10 | |
| best = max(scores, key=scores.get) | |
| conf = scores[best] | |
| if conf < 0.25: | |
| return "Unknown", conf | |
| return best, min(conf, 1.0) | |
| def analyze_building_3d(before_img, after_img, region, features): | |
| """ | |
| Run 3D analysis on a single building/construction region. | |
| Enriches the region dict with stories, height, and construction stage. | |
| """ | |
| bbox = region["bbox"] | |
| stories, height_m = estimate_building_height(before_img, after_img, bbox, features) | |
| stage, stage_conf = classify_construction_stage(features, bbox) | |
| region["estimated_stories"] = stories | |
| region["estimated_height_m"] = height_m | |
| region["construction_stage"] = stage | |
| region["construction_stage_confidence"] = stage_conf | |
| return region | |
| # --------------------------------------------------------------------------- | |
| # 17. Region analysis | |
| # --------------------------------------------------------------------------- | |
| def _tight_bbox(labels, label_id, stats_row): | |
| """ | |
| Compute a tighter bounding box using actual changed pixels. | |
| Falls back to the connected-component bbox if the mask is dense enough. | |
| """ | |
| x = stats_row[cv2.CC_STAT_LEFT] | |
| y = stats_row[cv2.CC_STAT_TOP] | |
| w = stats_row[cv2.CC_STAT_WIDTH] | |
| h = stats_row[cv2.CC_STAT_HEIGHT] | |
| area = stats_row[cv2.CC_STAT_AREA] | |
| fill_ratio = area / max(w * h, 1) | |
| # If the component fills less than 20% of its bbox, compute a tighter fit | |
| if fill_ratio < 0.20 and area > 100: | |
| ys, xs = np.where(labels == label_id) | |
| if len(xs) > 0: | |
| x = int(np.min(xs)) | |
| y = int(np.min(ys)) | |
| w = int(np.max(xs) - x + 1) | |
| h = int(np.max(ys) - y + 1) | |
| fill_ratio = area / max(w * h, 1) | |
| return x, y, w, h, fill_ratio | |
| def _iou(boxA, boxB): | |
| """Intersection-over-union for two (x,y,w,h) boxes.""" | |
| ax1, ay1, aw, ah = boxA | |
| bx1, by1, bw, bh = boxB | |
| ax2, ay2 = ax1 + aw, ay1 + ah | |
| bx2, by2 = bx1 + bw, by1 + bh | |
| ix1, iy1 = max(ax1, bx1), max(ay1, by1) | |
| ix2, iy2 = min(ax2, bx2), min(ay2, by2) | |
| inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) | |
| union = aw * ah + bw * bh - inter | |
| return inter / max(union, 1) | |
| def _nms_regions(regions, iou_thresh=0.45): | |
| """Non-maximum suppression: keep the highest-area box when two overlap.""" | |
| if len(regions) < 2: | |
| return regions | |
| keep = [] | |
| used = set() | |
| for i, r in enumerate(regions): | |
| if i in used: | |
| continue | |
| keep.append(r) | |
| for j in range(i + 1, len(regions)): | |
| if j in used: | |
| continue | |
| if _iou(r["bbox"], regions[j]["bbox"]) > iou_thresh: | |
| used.add(j) | |
| return keep | |
| def analyze_change_regions(change_mask, image, min_area=400, use_ensemble=True, | |
| before_img=None, registration_ok=True): | |
| """ | |
| Find connected change regions with strict quality filters: | |
| - Adaptive min_area scaled to image size | |
| - Fill-ratio filter (>= 0.12) rejects sparse noise boxes | |
| - Tighter bounding boxes computed from actual pixel coordinates | |
| - NMS to remove overlapping/duplicate boxes | |
| - Max 60 regions cap to avoid flooding the UI | |
| """ | |
| num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats( | |
| change_mask, connectivity=8) | |
| change_regions = [] | |
| region_id = 0 | |
| img_h, img_w = change_mask.shape[:2] | |
| img_area = img_h * img_w | |
| # Adaptive minimum region size: | |
| # - keeps sensitivity on smaller images | |
| # - suppresses speckle noise on larger images | |
| if min_area is None: | |
| min_area = int(max(350, min(1400, img_area * 0.00012))) | |
| for i in range(1, num_labels): | |
| raw_area = stats[i, cv2.CC_STAT_AREA] | |
| if raw_area < min_area: | |
| continue | |
| x, y, w, h, fill_ratio = _tight_bbox(labels, i, stats[i]) | |
| # Reject very sparse regions (bbox is mostly empty) | |
| if fill_ratio < 0.12: | |
| continue | |
| # Keep large real changes; only suppress near-full-frame artifacts. | |
| # When registration failed, allow larger regions to avoid missing true changes. | |
| max_region_cover = 0.92 if not registration_ok else 0.75 | |
| if (w * h) > img_area * max_region_cover and fill_ratio < 0.35: | |
| continue | |
| cx, cy = centroids[i] | |
| if use_ensemble and raw_area > 500: | |
| object_type, confidence = classify_with_ensemble( | |
| image, (x, y, w, h), before_region=before_img) | |
| else: | |
| object_type, confidence = classify_object_type( | |
| image, (x, y, w, h), before_region=before_img) | |
| if object_type is None: | |
| # Do not silently drop large coherent regions; keep them as generic | |
| # ground-change candidates so key changes are still surfaced. | |
| if raw_area >= max(min_area * 2, 800) and fill_ratio >= 0.18: | |
| object_type = "Unclassified Ground Change" | |
| confidence = max(0.2, min(0.5, fill_ratio)) | |
| else: | |
| continue | |
| region_id += 1 | |
| region = { | |
| "id": region_id, | |
| "area": int(raw_area), | |
| "bbox": (x, y, w, h), | |
| "center": (int(cx), int(cy)), | |
| "object_type": object_type, | |
| "confidence": confidence, | |
| "fill_ratio": round(fill_ratio, 3), | |
| "sub_type": None, | |
| "sub_type_confidence": None, | |
| "estimated_stories": None, | |
| "estimated_height_m": None, | |
| "construction_stage": None, | |
| } | |
| if before_img is not None: | |
| if object_type in _VEGETATION_TYPES: | |
| sub, sub_conf = classify_vegetation_subtype( | |
| before_img, image, (x, y, w, h)) | |
| region["sub_type"] = sub | |
| region["sub_type_confidence"] = sub_conf | |
| elif object_type in _STRUCTURAL_TYPES: | |
| sub, sub_conf = classify_structural_subtype( | |
| before_img, image, (x, y, w, h), object_type) | |
| region["sub_type"] = sub | |
| region["sub_type_confidence"] = sub_conf | |
| if object_type in _BUILDING_TYPES: | |
| pad = 5 | |
| ry1 = max(0, y - pad) | |
| ry2 = min(image.shape[0], y + h + pad) | |
| rx1 = max(0, x - pad) | |
| rx2 = min(image.shape[1], x + w + pad) | |
| crop = image[ry1:ry2, rx1:rx2] | |
| feats = extract_advanced_features(crop) if crop.size > 0 else None | |
| analyze_building_3d(before_img, image, region, feats) | |
| change_regions.append(region) | |
| # Sort by area descending, apply NMS, cap at 60 | |
| change_regions.sort(key=lambda r: r["area"], reverse=True) | |
| change_regions = _nms_regions(change_regions, iou_thresh=0.45) | |
| change_regions = change_regions[:60] | |
| # Re-number after filtering | |
| for idx, r in enumerate(change_regions, start=1): | |
| r["id"] = idx | |
| total_px = img_area | |
| for r in change_regions: | |
| r["severity"] = _severity_from_region(r, total_px) | |
| return change_regions | |
| # --------------------------------------------------------------------------- | |
| # 18. Main pipeline | |
| # --------------------------------------------------------------------------- | |
| def run_detection(before_pil, after_pil, method="AI-Based Deep Learning", | |
| enable_registration=True, enable_normalization=True, | |
| detection_sensitivity=0.5, min_region_area=None): | |
| """Run full detection pipeline; returns change_mask, result_image, stats, regions.""" | |
| before_array = preprocess_image(before_pil) | |
| after_array = preprocess_image(after_pil) | |
| registration_ok = False | |
| if enable_registration: | |
| before_array, after_array, registration_ok = register_images(before_array, after_array) | |
| if enable_normalization: | |
| before_array, after_array = normalize_radiometry(before_array, after_array) | |
| if method == "AI-Based Deep Learning": | |
| change_mask, threshold_debug = ai_deep_learning_method( | |
| before_array, after_array, sensitivity=detection_sensitivity | |
| ) | |
| elif method == "Image Difference": | |
| change_mask, threshold_debug = image_difference_method( | |
| before_array, after_array, sensitivity=detection_sensitivity | |
| ) | |
| elif method == "Feature-Based": | |
| change_mask = feature_based_method(before_array, after_array) | |
| threshold_debug = { | |
| "method": "Feature-Based", | |
| "threshold_used": None, | |
| "note": "KMeans clustering path does not use binary threshold.", | |
| "sensitivity": float(detection_sensitivity), | |
| } | |
| else: | |
| change_mask, threshold_debug = hybrid_method( | |
| before_array, after_array, sensitivity=detection_sensitivity | |
| ) | |
| # --- Adaptive fallback for empty/sparse masks --- | |
| # In some scenes, ORB/ECC registration + fused thresholding can produce an overly | |
| # sparse binary mask (leading to 0 detected regions). If that happens, fall back | |
| # to the more stable Image Difference mask. | |
| total_pixels = int(change_mask.shape[0] * change_mask.shape[1]) | |
| changed_pixels_ratio = float(np.sum(change_mask > 127)) / float(total_pixels) if total_pixels else 0.0 | |
| used_fallback = False | |
| if method in ("AI-Based Deep Learning", "Hybrid Approach") and changed_pixels_ratio < 0.0025: | |
| diff_mask, diff_debug = image_difference_method( | |
| before_array, after_array, sensitivity=detection_sensitivity | |
| ) | |
| diff_ratio = float(np.sum(diff_mask > 127)) / float(total_pixels) if total_pixels else 0.0 | |
| # Only switch if the diff mask clearly contains more signal. | |
| if diff_ratio > max(0.005, changed_pixels_ratio * 3.0): | |
| change_mask = diff_mask | |
| used_fallback = True | |
| threshold_debug = { | |
| "method": f"{method} (fallback->Image Difference)", | |
| "fallback_used": True, | |
| "ai_hybrid_changed_ratio": changed_pixels_ratio, | |
| "diff_changed_ratio": diff_ratio, | |
| "diff_debug": diff_debug, | |
| "sensitivity": float(detection_sensitivity), | |
| } | |
| change_regions = analyze_change_regions( | |
| change_mask, | |
| after_array, | |
| min_area=min_region_area, | |
| before_img=before_array, | |
| registration_ok=registration_ok, | |
| ) | |
| total_pixels = int(change_mask.shape[0] * change_mask.shape[1]) | |
| result_image = visualize_changes( | |
| before_array, after_array, change_mask, | |
| regions=change_regions, total_pixels=total_pixels | |
| ) | |
| changed_pixels = int(np.sum(change_mask > 127)) | |
| change_pct = (changed_pixels / total_pixels * 100.0) if total_pixels else 0.0 | |
| stats = { | |
| "total_pixels": total_pixels, | |
| "changed_pixels": changed_pixels, | |
| "unchanged_pixels": total_pixels - changed_pixels, | |
| "change_percentage": change_pct, | |
| "image_width": change_mask.shape[1], | |
| "image_height": change_mask.shape[0], | |
| "threshold_debug": threshold_debug, | |
| "params": { | |
| "detection_sensitivity": float(detection_sensitivity), | |
| "min_region_area": min_region_area, | |
| "enable_registration": bool(enable_registration), | |
| "enable_normalization": bool(enable_normalization), | |
| "registration_ok": bool(registration_ok), | |
| }, | |
| } | |
| return change_mask, result_image, stats, change_regions | |