satdetect / app /detection_engine.py
coderuday21's picture
Production overhaul: pre-trained AdaptFormer model + detection quality improvements
3808a54
raw
history blame
78 kB
"""
Satellite Change Detection Engine v3
High-accuracy detection with multi-channel analysis, SSIM, CVA, texture features,
adaptive thresholding, vegetation/shadow suppression, SNR-weighted fusion,
and improved object classification.
"""
import numpy as np
import cv2
from PIL import Image
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from collections import Counter
# ---------------------------------------------------------------------------
# 1. Pre-processing
# ---------------------------------------------------------------------------
def preprocess_image(image):
"""Preprocess image: convert to RGB, limit size, bilateral denoise."""
img_array = np.array(image)
if img_array.ndim == 2:
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
elif img_array.ndim == 3 and img_array.shape[2] == 4:
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
elif img_array.ndim != 3 or img_array.shape[2] != 3:
raise ValueError(f"Unsupported image shape: {img_array.shape}")
max_size = 2000
height, width = img_array.shape[:2]
if max(height, width) > max_size:
scale = max_size / max(height, width)
new_w, new_h = max(1, int(width * scale)), max(1, int(height * scale))
img_array = cv2.resize(img_array, (new_w, new_h), interpolation=cv2.INTER_AREA)
# Bilateral filter: reduces sensor noise while preserving edges
img_array = cv2.bilateralFilter(img_array, 9, 75, 75)
return img_array
# ---------------------------------------------------------------------------
# 2. Improved image registration (alignment)
# ---------------------------------------------------------------------------
def register_images(img1, img2, max_features=2000):
"""Align img2 to img1 using ORB + ratio-test + RANSAC homography."""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
orb = cv2.ORB_create(nfeatures=max_features, scoreType=cv2.ORB_HARRIS_SCORE)
kp1, des1 = orb.detectAndCompute(gray1, None)
kp2, des2 = orb.detectAndCompute(gray2, None)
if des1 is None or des2 is None or len(des1) < 10 or len(des2) < 10:
return _register_images_ecc_fallback(img1, img2)
# Use kNN matching with Lowe's ratio test for better matches
bf = cv2.BFMatcher(cv2.NORM_HAMMING)
raw_matches = bf.knnMatch(des1, des2, k=2)
good_matches = []
for pair in raw_matches:
if len(pair) == 2:
m, n = pair
if m.distance < 0.75 * n.distance:
good_matches.append(m)
if len(good_matches) < 10:
return _register_images_ecc_fallback(img1, img2)
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
homography, mask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 3.0)
if homography is None:
return _register_images_ecc_fallback(img1, img2)
inlier_ratio = np.sum(mask) / len(mask) if mask is not None else 0
if inlier_ratio < 0.3:
return _register_images_ecc_fallback(img1, img2)
# Reject degenerate homographies (near-singular or extreme distortion)
det = np.linalg.det(homography)
if abs(det) < 0.1 or abs(det) > 10.0:
return _register_images_ecc_fallback(img1, img2)
h, w = img1.shape[:2]
img2_aligned = cv2.warpPerspective(img2, homography, (w, h), borderMode=cv2.BORDER_REFLECT)
return img1, img2_aligned, True
def _register_images_ecc_fallback(img1, img2):
"""
Fallback alignment with ECC affine registration.
More stable than ORB on low-texture agricultural areas.
"""
try:
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
gray1_f = gray1.astype(np.float32) / 255.0
gray2_f = gray2.astype(np.float32) / 255.0
warp = np.eye(2, 3, dtype=np.float32)
criteria = (
cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
200,
1e-6,
)
cc, warp = cv2.findTransformECC(
gray1_f, gray2_f, warp, cv2.MOTION_AFFINE, criteria
)
h, w = img1.shape[:2]
aligned = cv2.warpAffine(
img2,
warp,
(w, h),
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP,
borderMode=cv2.BORDER_REFLECT,
)
# Treat as successful only if ECC correlation is reasonable.
return img1, aligned, bool(cc >= 0.45)
except Exception:
return img1, img2, False
# ---------------------------------------------------------------------------
# 3. Improved radiometric normalization
# ---------------------------------------------------------------------------
def normalize_radiometry(img1, img2):
"""Histogram-matching normalization in LAB space. CLAHE applied symmetrically."""
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
result = lab2.copy()
for ch in range(3):
mean1, std1 = np.mean(lab1[:, :, ch]), np.std(lab1[:, :, ch])
mean2, std2 = np.mean(lab2[:, :, ch]), np.std(lab2[:, :, ch])
if std2 > 1e-6:
result[:, :, ch] = (lab2[:, :, ch] - mean2) * (std1 / std2) + mean1
result_uint8 = np.clip(result, 0, 255).astype(np.uint8)
# CLAHE on L channel of BOTH images so downstream comparison is symmetric
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
lab1_uint8 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB)
lab1_uint8[:, :, 0] = clahe.apply(lab1_uint8[:, :, 0])
result_uint8[:, :, 0] = clahe.apply(result_uint8[:, :, 0])
img1_out = cv2.cvtColor(lab1_uint8, cv2.COLOR_LAB2RGB)
img2_out = cv2.cvtColor(result_uint8, cv2.COLOR_LAB2RGB)
return img1_out, img2_out
# ---------------------------------------------------------------------------
# 4. Vegetation suppression
# ---------------------------------------------------------------------------
def compute_vegetation_mask(img):
"""
Identify vegetation pixels using pseudo-NDVI and HSV hue/saturation.
Returns a float map in [0, 1] where 1.0 = vegetation, 0.0 = non-vegetation.
"""
r = img[:, :, 0].astype(np.float32)
g = img[:, :, 1].astype(np.float32)
ndvi = (g - r) / (g + r + 1e-6)
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
hue = hsv[:, :, 0].astype(np.float32)
sat = hsv[:, :, 1].astype(np.float32)
ndvi_veg = (ndvi > 0.08).astype(np.float32)
hsv_veg = ((hue >= 35) & (hue <= 85) & (sat > 30)).astype(np.float32)
veg = np.clip(ndvi_veg * 0.6 + hsv_veg * 0.4, 0, 1)
veg = cv2.GaussianBlur(veg, (11, 11), 0)
return veg
def compute_combined_vegetation_suppression(img1, img2):
"""
Asymmetric vegetation handling:
- Where BOTH images are vegetated: suppress (likely seasonal noise)
- Where only ONE image is vegetated: boost (real vegetation change)
Returns a float map where 1.0 = neutral, <1 = suppress, >1 = boost.
"""
veg1 = compute_vegetation_mask(img1)
veg2 = compute_vegetation_mask(img2)
both_veg = np.minimum(veg1, veg2)
one_only = np.abs(veg1 - veg2)
seasonal_suppression = 1.0 - both_veg * 0.7
vegetation_boost = 1.0 + one_only * 0.3
return (seasonal_suppression * vegetation_boost).astype(np.float32)
# ---------------------------------------------------------------------------
# 5. Shadow / illumination-only change suppression
# ---------------------------------------------------------------------------
def compute_shadow_suppression(img1, img2):
"""
Detect pixels where only brightness (L) changed but chrominance (A, B)
stayed similar. These are shadow/illumination shifts, not real changes.
Returns a float map in [0, 1]: 1.0 = real change, ~0.2 = illumination-only.
"""
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
delta_l = np.abs(lab1[:, :, 0] - lab2[:, :, 0])
delta_a = np.abs(lab1[:, :, 1] - lab2[:, :, 1])
delta_b = np.abs(lab1[:, :, 2] - lab2[:, :, 2])
chroma_change = delta_a + delta_b
brightness_only = (delta_l > 18) & (chroma_change < 12)
shadow_map = brightness_only.astype(np.float32)
shadow_map = cv2.GaussianBlur(shadow_map, (9, 9), 0)
suppression = 1.0 - shadow_map * 0.8
return suppression.astype(np.float32)
# ---------------------------------------------------------------------------
# 6. Change Vector Analysis (CVA)
# ---------------------------------------------------------------------------
def compute_cva(img1, img2):
"""
Change Vector Analysis in LAB space.
Returns a normalized change magnitude map with illumination-only
changes suppressed via direction filtering.
"""
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
dl = (lab2[:, :, 0] - lab1[:, :, 0]) / 100.0
da = (lab2[:, :, 1] - lab1[:, :, 1]) / 128.0
db = (lab2[:, :, 2] - lab1[:, :, 2]) / 128.0
magnitude = np.sqrt(dl ** 2 + da ** 2 + db ** 2)
chroma_mag = np.sqrt(da ** 2 + db ** 2)
total_mag = magnitude + 1e-8
chroma_ratio = chroma_mag / total_mag
# Suppress illumination-only changes (low chroma ratio)
suppression = np.clip(chroma_ratio * 2.5, 0.15, 1.0)
magnitude = magnitude * suppression
p995 = float(np.quantile(magnitude, 0.995))
if p995 > 1e-8:
magnitude = np.clip(magnitude / p995, 0, 1)
return magnitude.astype(np.float32)
# ---------------------------------------------------------------------------
# 7. SSIM-based structural change map
# ---------------------------------------------------------------------------
def _ssim_at_scale(gray1, gray2, win_size=11):
"""Compute SSIM dissimilarity at a single scale."""
sigma = win_size / 6.0
C1 = (0.01 * 255) ** 2
C2 = (0.03 * 255) ** 2
mu1 = cv2.GaussianBlur(gray1, (win_size, win_size), sigma)
mu2 = cv2.GaussianBlur(gray2, (win_size, win_size), sigma)
mu1_sq = mu1 * mu1
mu2_sq = mu2 * mu2
mu1_mu2 = mu1 * mu2
sigma1_sq = np.maximum(cv2.GaussianBlur(gray1 * gray1, (win_size, win_size), sigma) - mu1_sq, 0)
sigma2_sq = np.maximum(cv2.GaussianBlur(gray2 * gray2, (win_size, win_size), sigma) - mu2_sq, 0)
sigma12 = cv2.GaussianBlur(gray1 * gray2, (win_size, win_size), sigma) - mu1_mu2
denom = (mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2)
ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / (denom + 1e-12)
dssim = np.clip((1.0 - ssim_map) / 2.0, 0, 1)
return dssim
def compute_ssim_change_map(img1, img2, win_size=11):
"""
Multi-scale SSIM dissimilarity: averages full-res and half-res scales
to capture both fine and coarse structural changes.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float64)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float64)
dssim_full = _ssim_at_scale(gray1, gray2, win_size)
h, w = gray1.shape
g1_half = cv2.resize(gray1, (max(1, w // 2), max(1, h // 2)))
g2_half = cv2.resize(gray2, (max(1, w // 2), max(1, h // 2)))
half_win = max(3, win_size // 2) | 1
dssim_half = _ssim_at_scale(g1_half, g2_half, half_win)
dssim_half_up = cv2.resize(dssim_half, (w, h))
dssim = 0.6 * dssim_full + 0.4 * dssim_half_up
return dssim
# ---------------------------------------------------------------------------
# 8. Texture feature extraction (LBP)
# ---------------------------------------------------------------------------
def compute_lbp(gray, radius=1, n_points=8):
"""Compute simplified Local Binary Pattern texture descriptor."""
h, w = gray.shape
lbp = np.zeros_like(gray, dtype=np.float32)
for i in range(n_points):
angle = 2 * np.pi * i / n_points
dx = int(round(radius * np.cos(angle)))
dy = int(round(-radius * np.sin(angle)))
shifted = np.roll(np.roll(gray, dy, axis=0), dx, axis=1)
lbp += (shifted >= gray).astype(np.float32)
return lbp / n_points
def compute_texture_change(img1, img2):
"""Compute texture difference using LBP."""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float32)
lbp1 = compute_lbp(gray1)
lbp2 = compute_lbp(gray2)
texture_diff = np.abs(lbp1 - lbp2)
return texture_diff
# ---------------------------------------------------------------------------
# 9. Edge-aware change detection
# ---------------------------------------------------------------------------
def compute_edge_change(img1, img2):
"""Compute edge-based change map using Canny edges."""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
# Adaptive Canny thresholds based on median intensity
med1 = np.median(gray1)
edges1 = cv2.Canny(gray1, int(max(0, 0.67 * med1)), int(min(255, 1.33 * med1)))
med2 = np.median(gray2)
edges2 = cv2.Canny(gray2, int(max(0, 0.67 * med2)), int(min(255, 1.33 * med2)))
# Dilate edges slightly so nearby edges match
kernel = np.ones((3, 3), np.uint8)
edges1_d = cv2.dilate(edges1, kernel, iterations=1)
edges2_d = cv2.dilate(edges2, kernel, iterations=1)
# New edges = present in one image but not the other
edge_change = cv2.absdiff(edges1_d, edges2_d).astype(np.float32) / 255.0
return edge_change
# ---------------------------------------------------------------------------
# 10. Improved detection methods
# ---------------------------------------------------------------------------
def _adaptive_binary_threshold(score_uint8, min_floor=25, sensitivity=0.5):
"""
Robust thresholding for noisy scenes.
Uses max(Otsu, noise-floor, fixed floor) where noise-floor is median + 3*MAD.
"""
otsu_val, _ = cv2.threshold(
score_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
)
median = float(np.median(score_uint8))
mad = float(np.median(np.abs(score_uint8.astype(np.float32) - median)))
noise_floor = median + 3.0 * mad
# Higher sensitivity => lower threshold (detect more), lower sensitivity => stricter
sens = float(np.clip(sensitivity, 0.0, 1.0))
sens_shift = int((0.5 - sens) * 24) # approx -12..+12 around baseline
thr = int(max(min_floor, otsu_val, noise_floor) + sens_shift)
thr = max(0, min(255, thr))
_, mask = cv2.threshold(score_uint8, thr, 255, cv2.THRESH_BINARY)
return mask, thr, float(otsu_val), float(noise_floor)
def image_difference_method(img1, img2, threshold=0.25, blur_size=5, sensitivity=0.5):
"""Improved image difference with multi-channel analysis and adaptive threshold."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
# Multi-channel difference in LAB (perceptually uniform)
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
lab1_blur = cv2.GaussianBlur(lab1, (blur_size, blur_size), 0)
lab2_blur = cv2.GaussianBlur(lab2, (blur_size, blur_size), 0)
# Weighted Delta-E inspired difference
diff = lab1_blur - lab2_blur
delta_e = np.sqrt(
(diff[:, :, 0] / 100.0) ** 2 +
(diff[:, :, 1] / 128.0) ** 2 +
(diff[:, :, 2] / 128.0) ** 2
)
delta_e = delta_e / delta_e.max() if delta_e.max() > 0 else delta_e
delta_uint8 = (delta_e * 255).astype(np.uint8)
change_mask, used_thr, otsu_val, noise_floor = _adaptive_binary_threshold(
delta_uint8, min_floor=30, sensitivity=sensitivity
)
change_mask = _clean_mask(change_mask)
debug = {
"method": "Image Difference",
"threshold_used": int(used_thr),
"otsu": float(otsu_val),
"noise_floor": float(noise_floor),
"sensitivity": float(sensitivity),
}
return change_mask, debug
def feature_based_method(img1, img2, num_clusters=4, sensitivity=0.5):
"""Feature-based change detection using multi-space clustering."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
hsv1 = cv2.cvtColor(img1, cv2.COLOR_RGB2HSV).astype(np.float32)
hsv2 = cv2.cvtColor(img2, cv2.COLOR_RGB2HSV).astype(np.float32)
diff_lab = np.abs(lab1 - lab2)
diff_hsv = np.abs(hsv1 - hsv2)
h, w, _ = diff_lab.shape
features = np.concatenate([diff_lab, diff_hsv[:, :, 1:]], axis=2)
# Downsample for KMeans (full-res is too slow for >1M pixels)
MAX_PIXELS = 250_000
total = h * w
if total > MAX_PIXELS:
scale = np.sqrt(MAX_PIXELS / total)
sh, sw = max(1, int(h * scale)), max(1, int(w * scale))
features_small = cv2.resize(features, (sw, sh))
else:
features_small = features
sh, sw = h, w
features_flat = features_small.reshape(-1, features_small.shape[2])
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_flat)
kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10)
labels_small = kmeans.fit_predict(features_scaled)
cluster_means = [
np.mean(np.linalg.norm(features_flat[labels_small == i], axis=1))
if np.any(labels_small == i) else 0.0
for i in range(num_clusters)
]
change_cluster_idx = np.argmax(cluster_means)
# Map labels back to full resolution by predicting on all pixels
if total > MAX_PIXELS:
full_flat = features.reshape(-1, features.shape[2])
full_scaled = scaler.transform(full_flat)
labels = kmeans.predict(full_scaled)
else:
labels = labels_small
change_mask = (labels == change_cluster_idx).astype(np.uint8) * 255
change_mask = change_mask.reshape(h, w)
change_mask = _clean_mask(change_mask, sensitivity)
return change_mask
def _snr_weight(channel):
"""
Signal-to-noise ratio weight: signal = mean of top 5% values,
noise = std of bottom 50%. Channels with concentrated high responses
score higher than uniformly noisy ones.
"""
flat = channel.ravel()
p95 = float(np.quantile(flat, 0.95))
signal = float(np.mean(flat[flat >= p95])) if p95 > 1e-8 else 0.0
p50 = float(np.quantile(flat, 0.50))
noise = float(np.std(flat[flat <= p50])) + 1e-8
return signal / noise
def _ai_fusion_core(img1, img2, sensitivity=0.5):
"""
Single-pass AI fusion with 5 channels, SNR weighting, and
vegetation + shadow suppression. Returns (mask, debug).
"""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
# ---- Channel 1: Multi-scale LAB color difference ----
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
scales = [1, 2, 4]
color_maps = []
for scale in scales:
if scale > 1:
s1 = cv2.resize(lab1, (lab1.shape[1] // scale, lab1.shape[0] // scale))
s2 = cv2.resize(lab2, (lab2.shape[1] // scale, lab2.shape[0] // scale))
else:
s1, s2 = lab1, lab2
diff = s1 - s2
delta_e = np.sqrt((diff[:, :, 0] / 100.0) ** 2 +
(diff[:, :, 1] / 128.0) ** 2 +
(diff[:, :, 2] / 128.0) ** 2)
if scale > 1:
delta_e = cv2.resize(delta_e, (lab1.shape[1], lab1.shape[0]))
color_maps.append(delta_e)
color_change = np.mean(color_maps, axis=0)
color_change = color_change / (color_change.max() + 1e-8)
# ---- Channel 2: SSIM structural dissimilarity ----
ssim_change = compute_ssim_change_map(img1, img2)
ssim_change = ssim_change / (ssim_change.max() + 1e-8)
# ---- Channel 3: Texture change (LBP) ----
texture_change = compute_texture_change(img1, img2)
texture_change = texture_change / (texture_change.max() + 1e-8)
# ---- Channel 4: Edge change ----
edge_change = compute_edge_change(img1, img2)
# ---- Channel 5: Change Vector Analysis ----
cva_change = compute_cva(img1, img2)
# ---- SNR-weighted fusion ----
channels = [color_change, ssim_change, texture_change, edge_change, cva_change]
weights = [_snr_weight(ch) for ch in channels]
total_w = sum(weights) + 1e-8
weights = [w / total_w for w in weights]
fused = np.zeros_like(color_change, dtype=np.float64)
for ch, w in zip(channels, weights):
fused += w * ch.astype(np.float64)
# ---- Apply vegetation + shadow suppression before thresholding ----
veg_suppression = compute_combined_vegetation_suppression(img1, img2)
shadow_suppression = compute_shadow_suppression(img1, img2)
fused = fused * veg_suppression.astype(np.float64) * shadow_suppression.astype(np.float64)
# Percentile normalization
p995 = float(np.quantile(fused, 0.995))
if p995 <= 1e-8:
p995 = float(fused.max() + 1e-8)
fused_norm = np.clip(fused / (p995 + 1e-8), 0.0, 1.0)
gamma = 0.85
fused_norm = np.power(fused_norm, gamma)
fused_smooth = cv2.GaussianBlur(fused_norm.astype(np.float32), (7, 7), 0)
sens = float(np.clip(sensitivity, 0.0, 1.0))
q = 0.945 - (sens - 0.5) * 0.04
q = float(np.clip(q, 0.88, 0.97))
thr_score = float(np.quantile(fused_smooth, q))
change_mask = (fused_smooth >= thr_score).astype(np.uint8) * 255
change_mask = _clean_mask(change_mask, sensitivity=sens)
change_mask = cv2.bilateralFilter(change_mask, 9, 75, 75)
_, change_mask = cv2.threshold(change_mask, 127, 255, cv2.THRESH_BINARY)
debug = {
"method": "AI-Core",
"threshold_used": int(thr_score * 255),
"threshold_percentile_q": q,
"threshold_score": thr_score,
"fused_p95": float(np.quantile(fused_smooth, 0.95)),
"fused_p99": float(np.quantile(fused_smooth, 0.99)),
"fused_mean": float(np.mean(fused_smooth)),
"sensitivity": float(sensitivity),
"channel_weights": {
"color": round(weights[0], 4),
"ssim": round(weights[1], 4),
"texture": round(weights[2], 4),
"edge": round(weights[3], 4),
"cva": round(weights[4], 4),
},
}
return change_mask, debug
def ai_deep_learning_method(img1, img2, sensitivity=0.5):
"""
Uses the pre-trained AdaptFormer model when available; falls back to the
rule-based multi-channel fusion otherwise.
"""
from .model_inference import is_model_available, predict_change_mask
if is_model_available():
threshold = 0.35 + (1.0 - sensitivity) * 0.3
try:
change_mask, score_map = predict_change_mask(
img1, img2, threshold=threshold)
change_mask = _clean_mask(change_mask, sensitivity=sensitivity)
debug = {
"method": "AI-Based Deep Learning (AdaptFormer)",
"model": "adaptformer-levir-cd",
"threshold_used": int(threshold * 255),
"sensitivity": float(sensitivity),
}
return change_mask, debug
except Exception as e:
import logging
logging.getLogger(__name__).warning(
"AdaptFormer inference failed, falling back to rule-based: %s", e)
change_mask, core_debug = _ai_fusion_core(img1, img2, sensitivity=sensitivity)
debug = {
"method": "AI-Based Deep Learning (rule-based fallback)",
"threshold_used": core_debug.get("threshold_used"),
"sensitivity": float(sensitivity),
"core": core_debug,
}
return change_mask, debug
def hybrid_method(img1, img2, sensitivity=0.5):
"""Hybrid: weighted fusion of all methods with confidence-based merging."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
diff_mask, diff_debug = image_difference_method(img1, img2, sensitivity=sensitivity)
feature_mask = feature_based_method(img1, img2)
ai_mask, ai_debug = ai_deep_learning_method(img1, img2, sensitivity=sensitivity)
# Weighted combination: AI method gets most weight
combined = (
0.2 * diff_mask.astype(np.float32) +
0.3 * feature_mask.astype(np.float32) +
0.5 * ai_mask.astype(np.float32)
)
# Combined mask values:
# - diff only: 0.2*255 ≈ 51
# - feature only: 0.3*255 ≈ 76
# - ai only: 0.5*255 ≈ 127
# Keep threshold low enough that ai-only regions can pass.
base_thr = 98
sens = float(np.clip(sensitivity, 0.0, 1.0))
hybrid_thr = int(np.clip(base_thr + int((0.5 - sens) * 36), 60, 150))
_, final_mask = cv2.threshold(combined.astype(np.uint8), hybrid_thr, 255, cv2.THRESH_BINARY)
final_mask = _clean_mask(final_mask)
debug = {
"method": "Hybrid Approach",
"threshold_used": int(hybrid_thr),
"sensitivity": float(sensitivity),
"sub_methods": {
"image_difference": diff_debug,
"ai_deep_learning": ai_debug,
},
}
return final_mask, debug
# ---------------------------------------------------------------------------
# 11. Robust post-processing
# ---------------------------------------------------------------------------
def _clean_mask(mask, sensitivity=0.5, border_margin=12):
"""
Robust morphological cleaning:
1. Zero-out border pixels (registration artifacts)
2. Median filter to kill salt-and-pepper noise
3. Opening to remove small specks
4. Closing to bridge tiny gaps
5. Fill holes inside regions
6. Erode-then-dilate to break thin noise bridges
7. Connected-component area + circularity filtering
"""
mask = mask.copy()
h, w = mask.shape[:2]
if border_margin > 0:
mask[:border_margin, :] = 0
mask[-border_margin:, :] = 0
mask[:, :border_margin] = 0
mask[:, -border_margin:] = 0
mask = cv2.medianBlur(mask, 5)
open_size = max(3, int(5 * (1 - sensitivity * 0.5)))
if open_size % 2 == 0:
open_size += 1
k_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k_open)
close_size = max(3, int(7 * (1 - sensitivity)))
if close_size % 2 == 0:
close_size += 1
k_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_close)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
filled = np.zeros_like(mask)
cv2.drawContours(filled, contours, -1, 255, thickness=cv2.FILLED)
k_break = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
filled = cv2.erode(filled, k_break, iterations=1)
filled = cv2.dilate(filled, k_break, iterations=1)
# 7. Component-level filtering: remove tiny survivors and elongated noise
min_component_px = max(80, int(h * w * 0.00004))
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(filled, connectivity=8)
clean = np.zeros_like(filled)
for i in range(1, num_labels):
area = stats[i, cv2.CC_STAT_AREA]
if area < min_component_px:
continue
cw = stats[i, cv2.CC_STAT_WIDTH]
ch = stats[i, cv2.CC_STAT_HEIGHT]
bbox_area = max(cw * ch, 1)
perimeter_approx = 2 * (cw + ch)
# Circularity: thin elongated noise has very high perimeter^2/area
circularity = (perimeter_approx ** 2) / (bbox_area + 1e-8)
if circularity > 80 and area < min_component_px * 3:
continue
clean[labels == i] = 255
return clean
# ---------------------------------------------------------------------------
# 12. Severity classification and improved visualization
# ---------------------------------------------------------------------------
def _severity_from_region(region, total_pixels):
"""
Type-aware severity classification.
Building/structural changes use area + confidence.
Vegetation changes weight confidence (NDVI delta) more heavily.
"""
area = region.get("area", 0)
confidence = region.get("confidence", 0.0)
obj_type = region.get("object_type", "")
if total_pixels <= 0:
return "minor"
area_ratio = area / total_pixels
if obj_type in _VEGETATION_TYPES or "Vegetation" in (obj_type or ""):
score = area_ratio * 600 + confidence * 0.6
if score < 0.8:
return "minor"
if score < 3.0:
return "moderate"
return "major"
if obj_type in _STRUCTURAL_TYPES or obj_type in _BUILDING_TYPES:
score = area_ratio * 1200 + confidence * 0.4
if score < 1.2:
return "minor"
if score < 4.5:
return "moderate"
return "major"
score = area_ratio * 1000 + confidence * 0.3
if score < 1.0:
return "minor"
if score < 4.0:
return "moderate"
return "major"
# RGB colors for severity — high-contrast, colorblind-friendly palette
_SEVERITY_COLORS = {
"minor": (50, 205, 50), # Lime green
"moderate": (255, 165, 0), # Orange
"major": (255, 50, 50), # Bright red
}
# Maximum bounding boxes drawn on the image to avoid visual clutter
_MAX_VISIBLE_BOXES = 30
def visualize_changes(img1, img2, change_mask, regions=None, total_pixels=None):
"""
Clean visualization: subtle tinted overlay for changed pixels,
color-coded contour outlines (not filled boxes) for the top regions,
and compact numbered labels.
"""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
if change_mask.shape[:2] != img2.shape[:2]:
change_mask = cv2.resize(change_mask, (img2.shape[1], img2.shape[0]))
overlay = img2.copy().astype(np.float32)
mask_bool = change_mask > 127
mask_float = mask_bool.astype(np.float32)
# Subtle warm tint on changed pixels (18% alpha) — enough to see, not enough to hide
tint = np.zeros_like(img2, dtype=np.float32)
tint[:, :, 0] = 255
tint[:, :, 1] = 80
alpha = 0.18
for c in range(3):
overlay[:, :, c] = (overlay[:, :, c] * (1 - mask_float * alpha)
+ tint[:, :, c] * mask_float * alpha)
overlay_uint8 = np.clip(overlay, 0, 255).astype(np.uint8)
total_px = total_pixels if total_pixels is not None else (img2.shape[0] * img2.shape[1])
if regions:
diag = np.sqrt(img2.shape[0]**2 + img2.shape[1]**2)
line_thickness = max(1, int(diag / 1100))
visible = regions[:_MAX_VISIBLE_BOXES]
for r in visible:
x, y, w, h = r["bbox"]
severity = r.get("severity") or _severity_from_region(r, total_px)
color = _SEVERITY_COLORS.get(severity, (255, 255, 255))
# Draw only the outline — no fill, keeps the image readable
cv2.rectangle(overlay_uint8, (x, y), (x + w, y + h), color, line_thickness)
# Compact label: region number in a small pill
rid = r.get("id", 0)
label = str(rid)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = max(0.32, min(0.48, w / 200))
txt_thick = 1
(tw, th), _ = cv2.getTextSize(label, font, font_scale, txt_thick)
lx = x
ly = max(th + 3, y - 3)
# Dark background pill for contrast on any terrain
cv2.rectangle(overlay_uint8,
(lx, ly - th - 2), (lx + tw + 5, ly + 1),
(30, 30, 30), cv2.FILLED)
cv2.putText(overlay_uint8, label, (lx + 2, ly - 1),
font, font_scale, color, txt_thick, cv2.LINE_AA)
return overlay_uint8
# ---------------------------------------------------------------------------
# 13. Improved object classification
# ---------------------------------------------------------------------------
def extract_advanced_features(region):
"""Extract rich features for classification: color, texture, edge, shape."""
if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3:
return None
hsv = cv2.cvtColor(region, cv2.COLOR_RGB2HSV)
lab = cv2.cvtColor(region, cv2.COLOR_RGB2LAB)
gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY).astype(np.float32)
# Color stats
mean_rgb = np.mean(region, axis=(0, 1))
std_rgb = np.std(region, axis=(0, 1))
mean_hsv = np.mean(hsv, axis=(0, 1))
mean_lab = np.mean(lab, axis=(0, 1))
total_rgb = np.sum(mean_rgb) + 1e-6
green_ratio = mean_rgb[1] / total_rgb
blue_ratio = mean_rgb[2] / total_rgb
red_ratio = mean_rgb[0] / total_rgb
# Vegetation indices
ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6)
# Texture
texture_std = float(np.std(gray))
lbp = compute_lbp(gray.astype(np.float32))
lbp_variance = float(np.var(lbp))
# Edges
grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
edge_mag = np.sqrt(grad_x ** 2 + grad_y ** 2)
edge_density = float(np.mean(edge_mag))
# Edge orientation histogram (structural regularity)
angles = np.arctan2(grad_y, grad_x + 1e-8)
angle_hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi))
angle_hist = angle_hist / (angle_hist.sum() + 1e-8)
orientation_entropy = -np.sum(angle_hist[angle_hist > 0] * np.log2(angle_hist[angle_hist > 0] + 1e-10))
# GLCM-like contrast (simplified: variance of neighbors)
shifted_r = np.roll(gray, 1, axis=1)
shifted_d = np.roll(gray, 1, axis=0)
glcm_contrast = float(np.mean((gray - shifted_r) ** 2 + (gray - shifted_d) ** 2))
return {
"mean_rgb": mean_rgb, "std_rgb": std_rgb, "mean_hsv": mean_hsv, "mean_lab": mean_lab,
"ndvi": ndvi, "texture_std": texture_std, "lbp_variance": lbp_variance,
"edge_density": edge_density, "orientation_entropy": orientation_entropy,
"glcm_contrast": glcm_contrast,
"color_homogeneity": float(np.mean(std_rgb)),
"brightness": float(mean_lab[0]),
"green_ratio": green_ratio, "blue_ratio": blue_ratio, "red_ratio": red_ratio,
"saturation": float(mean_hsv[1]), "hue": float(mean_hsv[0]),
}
def _is_transient_object(area, w, h, features):
"""
Filter out transient objects (people, cars, animals, shadows, etc.)
that are NOT permanent ground/structural changes.
Returns True if the region is likely transient and should be excluded.
"""
aspect_ratio = max(w, h) / max(min(w, h), 1)
# Very small regions are likely noise, people, or small vehicles
if area < 300:
return True
# Tall narrow regions (aspect > 4) are likely people or poles
if aspect_ratio > 5.0 and area < 2000:
return True
# Very high edge density + small area = likely a person or vehicle
if features["edge_density"] > 80 and area < 1500:
return True
# Extremely high texture variance in small area = likely transient clutter
if features["texture_std"] > 60 and area < 1000:
return True
return False
def _count_line_segments(gray_crop):
"""Count straight line segments using LSD — buildings have many, vegetation has few."""
if gray_crop.size == 0 or gray_crop.shape[0] < 5 or gray_crop.shape[1] < 5:
return 0, 0.0
lsd = cv2.createLineSegmentDetector(0)
lines, _, _, _ = lsd.detect(gray_crop.astype(np.uint8))
if lines is None:
return 0, 0.0
n_lines = len(lines)
total_length = sum(
np.sqrt((l[0][2] - l[0][0])**2 + (l[0][3] - l[0][1])**2)
for l in lines
)
return n_lines, float(total_length)
def _count_corners(gray_crop):
"""Count strong corners — buildings have clustered grid-like corners."""
if gray_crop.size == 0 or gray_crop.shape[0] < 5 or gray_crop.shape[1] < 5:
return 0
corners = cv2.goodFeaturesToTrack(
gray_crop.astype(np.uint8), maxCorners=100,
qualityLevel=0.05, minDistance=5)
return 0 if corners is None else len(corners)
def _rectangular_hull_ratio(gray_crop, threshold=128):
"""Ratio of non-zero area to bounding rect — buildings fill their box."""
if gray_crop.size == 0:
return 0.0
_, binary = cv2.threshold(gray_crop.astype(np.uint8), threshold, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return 0.0
biggest = max(contours, key=cv2.contourArea)
contour_area = cv2.contourArea(biggest)
_, _, rw, rh = cv2.boundingRect(biggest)
rect_area = max(rw * rh, 1)
return contour_area / rect_area
def _extract_differential_features(before_crop, after_crop):
"""Extract features from BOTH before and after crops plus their deltas."""
feat_b = extract_advanced_features(before_crop)
feat_a = extract_advanced_features(after_crop)
if feat_b is None or feat_a is None:
return None
gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY)
gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY)
lines_b, linelen_b = _count_line_segments(gray_b)
lines_a, linelen_a = _count_line_segments(gray_a)
corners_b = _count_corners(gray_b)
corners_a = _count_corners(gray_a)
hull_a = _rectangular_hull_ratio(gray_a)
lab_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2LAB).astype(np.float32)
lab_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2LAB).astype(np.float32)
lab_dist = float(np.mean(np.sqrt(np.sum((lab_a - lab_b) ** 2, axis=2))))
return {
"before": feat_b, "after": feat_a,
"delta_ndvi": feat_a["ndvi"] - feat_b["ndvi"],
"delta_green_ratio": feat_a["green_ratio"] - feat_b["green_ratio"],
"delta_edge_density": feat_a["edge_density"] - feat_b["edge_density"],
"delta_brightness": feat_a["brightness"] - feat_b["brightness"],
"delta_texture_std": feat_a["texture_std"] - feat_b["texture_std"],
"delta_saturation": feat_a["saturation"] - feat_b["saturation"],
"delta_orientation_entropy": feat_a["orientation_entropy"] - feat_b["orientation_entropy"],
"delta_lines": lines_a - lines_b,
"delta_line_length": linelen_a - linelen_b,
"delta_corners": corners_a - corners_b,
"lines_after": lines_a, "corners_after": corners_a,
"lines_before": lines_b, "corners_before": corners_b,
"hull_ratio_after": hull_a,
"lab_color_distance": lab_dist,
}
def classify_object_type(image_region, bbox, before_region=None):
"""
Classify the type of change in a region.
When before_region is provided, uses differential (before vs after) analysis
for dramatically better accuracy. Falls back to single-image analysis otherwise.
"""
x, y, w, h = bbox
pad = 5
y1 = max(0, y - pad)
y2 = min(image_region.shape[0], y + h + pad)
x1 = max(0, x - pad)
x2 = min(image_region.shape[1], x + w + pad)
after_crop = image_region[y1:y2, x1:x2]
if after_crop.size == 0 or after_crop.shape[0] < 3 or after_crop.shape[1] < 3:
return "Unclassified", 0.0
feat_a = extract_advanced_features(after_crop)
if feat_a is None:
return "Unclassified", 0.0
area = w * h
if _is_transient_object(area, w, h, feat_a):
return None, 0.0
aspect_ratio = max(w, h) / max(min(w, h), 1)
compactness = (4 * np.pi * area) / ((2 * (w + h)) ** 2 + 1e-6)
# --- Differential classification when before image is available ---
diff = None
if before_region is not None:
by1 = max(0, y - pad)
by2 = min(before_region.shape[0], y + h + pad)
bx1 = max(0, x - pad)
bx2 = min(before_region.shape[1], x + w + pad)
before_crop = before_region[by1:by2, bx1:bx2]
if before_crop.size > 0 and before_crop.shape[0] >= 3 and before_crop.shape[1] >= 3:
diff = _extract_differential_features(before_crop, after_crop)
scores = {}
# ---- Water Body Change ----
water = 0.0
if feat_a["blue_ratio"] > 0.36:
water += 0.22
if feat_a["texture_std"] < 28:
water += 0.18
if feat_a["edge_density"] < 35:
water += 0.14
if 90 <= feat_a["hue"] <= 135:
water += 0.18
if feat_a["lbp_variance"] < 0.05:
water += 0.14
if feat_a["glcm_contrast"] < 500:
water += 0.10
if area > 800:
water += 0.04
scores["Water Body Change"] = water
# ---- Vegetation Change ----
veg = 0.0
if diff:
# Differential: detect actual vegetation gain or loss
if abs(diff["delta_ndvi"]) > 0.08:
veg += 0.30
if abs(diff["delta_green_ratio"]) > 0.04:
veg += 0.20
if diff["lab_color_distance"] > 15 and (
diff["before"]["ndvi"] > 0.05 or diff["after"]["ndvi"] > 0.05):
veg += 0.15
if abs(diff["delta_saturation"]) > 15 and (
diff["before"]["green_ratio"] > 0.34 or diff["after"]["green_ratio"] > 0.34):
veg += 0.15
if diff["delta_lines"] < 3 and diff["delta_corners"] < 5:
veg += 0.08
if area > 500:
veg += 0.04
else:
if feat_a["ndvi"] > 0.05:
veg += 0.22
if feat_a["ndvi"] > 0.15:
veg += 0.10
if feat_a["green_ratio"] > 0.36:
veg += 0.18
if 35 <= feat_a["hue"] <= 85:
veg += 0.15
if feat_a["saturation"] > 40:
veg += 0.10
if feat_a["orientation_entropy"] > 2.5:
veg += 0.05
if area > 500:
veg += 0.04
scores["Vegetation Change"] = veg
# ---- New Construction/Building ----
bld = 0.0
if diff:
if diff["delta_edge_density"] > 15:
bld += 0.20
if diff["delta_orientation_entropy"] < -0.4:
bld += 0.15
if diff["delta_lines"] > 5:
bld += 0.15
if diff["delta_corners"] > 8:
bld += 0.12
if diff["after"]["ndvi"] < 0.05 and diff["before"]["ndvi"] > 0.03:
bld += 0.12
if diff["hull_ratio_after"] > 0.55:
bld += 0.10
if 1.0 <= aspect_ratio <= 4.0:
bld += 0.08
if area > 1000:
bld += 0.05
else:
if feat_a["orientation_entropy"] < 2.5:
bld += 0.18
if feat_a["color_homogeneity"] < 28:
bld += 0.15
if 1.0 <= aspect_ratio <= 4.0:
bld += 0.12
if 0.3 <= compactness <= 0.9:
bld += 0.10
if feat_a["edge_density"] > 30:
bld += 0.12
if feat_a["glcm_contrast"] > 400:
bld += 0.10
if feat_a["saturation"] < 90:
bld += 0.10
if 40 <= feat_a["brightness"] <= 90:
bld += 0.08
if area > 1000:
bld += 0.05
scores["New Construction/Building"] = bld
# ---- Demolition/Clearing ----
demo = 0.0
if diff:
if diff["delta_edge_density"] < -15:
demo += 0.22
if diff["delta_lines"] < -5:
demo += 0.18
if diff["delta_corners"] < -8:
demo += 0.15
if diff["delta_texture_std"] > 8:
demo += 0.12
if diff["delta_brightness"] > 10:
demo += 0.12
if diff["after"]["ndvi"] > 0.03 and diff["before"]["ndvi"] < 0.02:
demo += 0.08
if area > 800:
demo += 0.05
else:
if feat_a["texture_std"] > 30:
demo += 0.18
if feat_a["orientation_entropy"] > 2.8:
demo += 0.15
if feat_a["color_homogeneity"] > 25:
demo += 0.15
if feat_a["brightness"] > 60:
demo += 0.10
if feat_a["ndvi"] < 0.05:
demo += 0.12
if feat_a["saturation"] < 70:
demo += 0.10
if area > 800:
demo += 0.05
scores["Demolition/Clearing"] = demo
# ---- Road/Pavement Change ----
road = 0.0
if aspect_ratio > 2.5:
road += 0.22
if feat_a["color_homogeneity"] < 22:
road += 0.18
if feat_a["texture_std"] < 32:
road += 0.15
if feat_a["saturation"] < 65:
road += 0.12
if feat_a["orientation_entropy"] < 2.0:
road += 0.15
if 35 <= feat_a["brightness"] <= 75:
road += 0.10
if compactness < 0.3:
road += 0.05
if area > 600:
road += 0.03
scores["Road/Pavement Change"] = road
# ---- Bare Land/Soil Change ----
soil = 0.0
if feat_a["red_ratio"] > 0.34 and feat_a["green_ratio"] < 0.36:
soil += 0.20
if 8 <= feat_a["hue"] <= 38:
soil += 0.18
if feat_a["ndvi"] < 0.05:
soil += 0.18
if feat_a["texture_std"] < 35:
soil += 0.12
if feat_a["lbp_variance"] < 0.04:
soil += 0.12
if 40 <= feat_a["saturation"] <= 130:
soil += 0.10
if 45 <= feat_a["brightness"] <= 82:
soil += 0.10
scores["Bare Land/Soil Change"] = soil
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.30:
return "Unclassified", conf
return best, min(conf, 1.0)
def classify_with_ensemble(image_region, bbox, before_region=None):
"""Ensemble: classify full region + sub-regions, vote with confidence weighting."""
x, y, w, h = bbox
sub_boxes = [(x, y, w, h)]
if w > 20 and h > 20:
hw, hh = w // 2, h // 2
sub_boxes += [
(x, y, hw, hh),
(x + hw, y, hw, hh),
(x, y + hh, hw, hh),
(x + hw, y + hh, hw, hh),
(x + w // 4, y + h // 4, hw, hh),
]
classifications = []
confidences = []
transient_count = 0
for sb in sub_boxes:
obj_type, conf = classify_object_type(image_region, sb,
before_region=before_region)
if obj_type is None:
transient_count += 1
continue
if obj_type != "Unclassified":
classifications.append(obj_type)
confidences.append(conf)
if transient_count > len(sub_boxes) // 2:
return None, 0.0
if not classifications:
return classify_object_type(image_region, (x, y, w, h),
before_region=before_region)
weighted = {}
counts = Counter(classifications)
for ot, c in zip(classifications, confidences):
weighted[ot] = weighted.get(ot, 0) + c
best_type = max(weighted, key=weighted.get)
avg_conf = weighted[best_type] / counts[best_type]
if counts[best_type] / len(classifications) >= 0.6:
avg_conf = min(1.0, avg_conf * 1.15)
return best_type, avg_conf
# ---------------------------------------------------------------------------
# 14. Vegetation sub-classification
# ---------------------------------------------------------------------------
_VEGETATION_TYPES = {"Vegetation Change"}
def _compute_region_greenness(crop):
"""Return (ndvi, green_ratio, mean_saturation) for an RGB crop."""
if crop.size == 0 or crop.shape[0] < 2 or crop.shape[1] < 2:
return 0.0, 0.0, 0.0
mean_rgb = np.mean(crop, axis=(0, 1)).astype(np.float64)
total = np.sum(mean_rgb) + 1e-6
green_ratio = mean_rgb[1] / total
ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6)
hsv = cv2.cvtColor(crop, cv2.COLOR_RGB2HSV)
mean_sat = float(np.mean(hsv[:, :, 1]))
return float(ndvi), float(green_ratio), mean_sat
def _compute_texture_regularity(gray_crop):
"""Measure how regular/grid-like the texture is (low entropy = regular crops)."""
if gray_crop.size == 0 or gray_crop.shape[0] < 3 or gray_crop.shape[1] < 3:
return 3.0
gx = cv2.Sobel(gray_crop.astype(np.float32), cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(gray_crop.astype(np.float32), cv2.CV_64F, 0, 1, ksize=3)
angles = np.arctan2(gy, gx + 1e-8)
hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi))
hist = hist / (hist.sum() + 1e-8)
entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10))
return float(entropy)
def classify_vegetation_subtype(before_img, after_img, bbox):
"""
Compare before/after crops to determine vegetation change sub-type.
Returns (subtype_name, confidence).
"""
x, y, w, h = bbox
pad = 5
y1, y2 = max(0, y - pad), min(before_img.shape[0], y + h + pad)
x1, x2 = max(0, x - pad), min(before_img.shape[1], x + w + pad)
before_crop = before_img[y1:y2, x1:x2]
after_crop = after_img[y1:y2, x1:x2]
if before_crop.size == 0 or after_crop.size == 0:
return "Vegetation Change", 0.3
ndvi_b, green_b, sat_b = _compute_region_greenness(before_crop)
ndvi_a, green_a, sat_a = _compute_region_greenness(after_crop)
gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY)
gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY)
tex_entropy_b = _compute_texture_regularity(gray_b)
tex_entropy_a = _compute_texture_regularity(gray_a)
brightness_b = float(np.mean(gray_b))
brightness_a = float(np.mean(gray_a))
ndvi_delta = ndvi_a - ndvi_b
green_delta = green_a - green_b
sat_delta = sat_a - sat_b
scores = {
"Deforestation/Tree Removal": 0.0,
"New Vegetation/Growth": 0.0,
"Crop/Agricultural Change": 0.0,
"Vegetation Health Decline": 0.0,
"Seasonal Variation": 0.0,
}
# --- Deforestation: was green, now not green ---
if ndvi_b > 0.08 and ndvi_delta < -0.06:
scores["Deforestation/Tree Removal"] += 0.30
if green_b > 0.36 and green_delta < -0.03:
scores["Deforestation/Tree Removal"] += 0.20
if brightness_a > brightness_b + 10:
scores["Deforestation/Tree Removal"] += 0.15
if sat_delta < -15:
scores["Deforestation/Tree Removal"] += 0.15
if tex_entropy_a < tex_entropy_b - 0.3:
scores["Deforestation/Tree Removal"] += 0.10
# --- New Vegetation/Growth: was bare, now green ---
if ndvi_a > 0.08 and ndvi_delta > 0.06:
scores["New Vegetation/Growth"] += 0.30
if green_a > 0.36 and green_delta > 0.03:
scores["New Vegetation/Growth"] += 0.20
if sat_delta > 15:
scores["New Vegetation/Growth"] += 0.15
if brightness_a < brightness_b - 5:
scores["New Vegetation/Growth"] += 0.10
if tex_entropy_a > tex_entropy_b + 0.2:
scores["New Vegetation/Growth"] += 0.10
# --- Crop/Agricultural Change: regular texture patterns, moderate color shift ---
is_regular = tex_entropy_b < 2.5 or tex_entropy_a < 2.5
if is_regular:
scores["Crop/Agricultural Change"] += 0.25
if 0.03 < abs(ndvi_delta) < 0.12:
scores["Crop/Agricultural Change"] += 0.20
if sat_b > 35 and sat_a > 35:
scores["Crop/Agricultural Change"] += 0.15
if abs(green_delta) < 0.04 and abs(ndvi_delta) > 0.02:
scores["Crop/Agricultural Change"] += 0.15
area = w * h
if area > 3000:
scores["Crop/Agricultural Change"] += 0.10
# --- Vegetation Health Decline: still green but browning ---
if ndvi_b > 0.05 and ndvi_a > 0.02 and ndvi_delta < -0.03:
scores["Vegetation Health Decline"] += 0.25
if green_b > 0.34 and green_a > 0.30 and green_delta < -0.02:
scores["Vegetation Health Decline"] += 0.20
if -20 < sat_delta < -3:
scores["Vegetation Health Decline"] += 0.20
if abs(brightness_a - brightness_b) < 15:
scores["Vegetation Health Decline"] += 0.10
# --- Seasonal Variation: mild shift in color/texture, both sides green ---
if ndvi_b > 0.04 and ndvi_a > 0.04 and abs(ndvi_delta) < 0.05:
scores["Seasonal Variation"] += 0.25
if abs(green_delta) < 0.03:
scores["Seasonal Variation"] += 0.20
if abs(sat_delta) < 12:
scores["Seasonal Variation"] += 0.15
if abs(brightness_a - brightness_b) < 12:
scores["Seasonal Variation"] += 0.15
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return "Vegetation Change", 0.3
return best, min(conf, 1.0)
# ---------------------------------------------------------------------------
# 15. Structural change sub-classification
# ---------------------------------------------------------------------------
_STRUCTURAL_TYPES = {"New Construction/Building", "Demolition/Clearing",
"Road/Pavement Change"}
def _region_has_structure(crop):
"""Heuristic: does this crop contain building-like structure (edges + regularity)?"""
if crop.size == 0 or crop.shape[0] < 3 or crop.shape[1] < 3:
return False, 0.0, 0.0
gray = cv2.cvtColor(crop, cv2.COLOR_RGB2GRAY).astype(np.float32)
gx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
edge_density = float(np.mean(np.sqrt(gx**2 + gy**2)))
angles = np.arctan2(gy, gx + 1e-8)
hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi))
hist = hist / (hist.sum() + 1e-8)
entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10))
has_structure = edge_density > 25 and entropy < 2.8
return has_structure, edge_density, entropy
def classify_structural_subtype(before_img, after_img, bbox, main_type):
"""
Compare before/after crops to determine structural change sub-type.
Returns (subtype_name, confidence).
"""
x, y, w, h = bbox
pad = 5
y1, y2 = max(0, y - pad), min(before_img.shape[0], y + h + pad)
x1, x2 = max(0, x - pad), min(before_img.shape[1], x + w + pad)
before_crop = before_img[y1:y2, x1:x2]
after_crop = after_img[y1:y2, x1:x2]
if before_crop.size == 0 or after_crop.size == 0:
return main_type, 0.3
struct_b, edge_b, ent_b = _region_has_structure(before_crop)
struct_a, edge_a, ent_a = _region_has_structure(after_crop)
gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY)
gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY)
brightness_b = float(np.mean(gray_b))
brightness_a = float(np.mean(gray_a))
texture_b = float(np.std(gray_b))
texture_a = float(np.std(gray_a))
hsv_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2HSV)
hsv_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2HSV)
sat_b = float(np.mean(hsv_b[:, :, 1]))
sat_a = float(np.mean(hsv_a[:, :, 1]))
# Check greenness to detect cleared-to-green or green-to-built transitions
mean_rgb_b = np.mean(before_crop, axis=(0, 1))
mean_rgb_a = np.mean(after_crop, axis=(0, 1))
ndvi_b = (mean_rgb_b[1] - mean_rgb_b[0]) / (mean_rgb_b[1] + mean_rgb_b[0] + 1e-6)
ndvi_a = (mean_rgb_a[1] - mean_rgb_a[0]) / (mean_rgb_a[1] + mean_rgb_a[0] + 1e-6)
area = w * h
if main_type == "Road/Pavement Change":
return _classify_road_subtype(
struct_b, struct_a, edge_b, edge_a, brightness_b, brightness_a,
texture_b, texture_a, area, w, h
)
scores = {
"New Building": 0.0,
"Building Expansion": 0.0,
"Renovation/Modification": 0.0,
"Partial Demolition": 0.0,
"Full Demolition": 0.0,
"Infrastructure Change": 0.0,
}
# --- New Building: before had no structure, after does ---
if not struct_b and struct_a:
scores["New Building"] += 0.35
if edge_a > edge_b + 15:
scores["New Building"] += 0.15
if ent_a < ent_b - 0.3:
scores["New Building"] += 0.10
if ndvi_b > 0.05 and ndvi_a < 0.03:
scores["New Building"] += 0.10
if sat_a < sat_b - 10:
scores["New Building"] += 0.10
# --- Building Expansion: both have structure but after has more ---
if struct_b and struct_a:
scores["Building Expansion"] += 0.15
if struct_b and edge_a > edge_b * 1.2:
scores["Building Expansion"] += 0.20
if struct_b and texture_a > texture_b + 5:
scores["Building Expansion"] += 0.15
if abs(ent_a - ent_b) < 0.5 and edge_a > edge_b:
scores["Building Expansion"] += 0.15
# --- Renovation/Modification: both have structure, similar density but different appearance ---
if struct_b and struct_a:
scores["Renovation/Modification"] += 0.15
if abs(edge_a - edge_b) < 12:
scores["Renovation/Modification"] += 0.15
if abs(brightness_a - brightness_b) > 8:
scores["Renovation/Modification"] += 0.20
if abs(sat_a - sat_b) > 10:
scores["Renovation/Modification"] += 0.15
if abs(texture_a - texture_b) < 10:
scores["Renovation/Modification"] += 0.10
# --- Partial Demolition: before had structure, after has less ---
if struct_b and edge_a < edge_b * 0.7:
scores["Partial Demolition"] += 0.25
if struct_b and ent_a > ent_b + 0.3:
scores["Partial Demolition"] += 0.15
if texture_a > texture_b + 8:
scores["Partial Demolition"] += 0.15
if brightness_a > brightness_b + 10:
scores["Partial Demolition"] += 0.10
# --- Full Demolition: before had structure, after is bare/empty ---
if struct_b and not struct_a:
scores["Full Demolition"] += 0.35
if edge_b > 30 and edge_a < 20:
scores["Full Demolition"] += 0.15
if texture_b > 25 and texture_a < 20:
scores["Full Demolition"] += 0.15
if brightness_a > brightness_b + 15:
scores["Full Demolition"] += 0.10
# --- Infrastructure Change: elongated shape, high edge regularity ---
aspect = max(w, h) / max(min(w, h), 1)
if aspect > 3.0:
scores["Infrastructure Change"] += 0.25
if ent_a < 2.0 or ent_b < 2.0:
scores["Infrastructure Change"] += 0.15
if area > 2000 and aspect > 2.5:
scores["Infrastructure Change"] += 0.15
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return main_type, 0.3
return best, min(conf, 1.0)
def _classify_road_subtype(struct_b, struct_a, edge_b, edge_a,
brightness_b, brightness_a, texture_b, texture_a,
area, w, h):
"""Sub-classify road/pavement changes."""
scores = {
"New Road/Pavement": 0.0,
"Road Widening": 0.0,
"Road Resurfacing": 0.0,
"Road Deterioration": 0.0,
}
if not struct_b and struct_a:
scores["New Road/Pavement"] += 0.30
if edge_a > edge_b + 10:
scores["New Road/Pavement"] += 0.20
if brightness_a < brightness_b:
scores["New Road/Pavement"] += 0.15
if struct_b and struct_a and edge_a > edge_b * 1.15:
scores["Road Widening"] += 0.30
if area > 2000:
scores["Road Widening"] += 0.15
if struct_b and struct_a and abs(edge_a - edge_b) < 10:
scores["Road Resurfacing"] += 0.20
if abs(brightness_a - brightness_b) > 12:
scores["Road Resurfacing"] += 0.25
if abs(texture_a - texture_b) < 8:
scores["Road Resurfacing"] += 0.15
if texture_a > texture_b + 10:
scores["Road Deterioration"] += 0.25
if edge_a < edge_b - 5:
scores["Road Deterioration"] += 0.20
if brightness_a > brightness_b + 8:
scores["Road Deterioration"] += 0.15
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return "Road/Pavement Change", 0.3
return best, min(conf, 1.0)
# ---------------------------------------------------------------------------
# 16. 3D Building Analysis — height estimation + construction stage
# ---------------------------------------------------------------------------
_BUILDING_TYPES = {"New Construction/Building", "Demolition/Clearing"}
_STORY_HEIGHT_M = 3.0 # assumed metres per story
def _detect_shadow_region(before_gray, after_gray, bbox, expand=0.6):
"""
Find new shadow pixels adjacent to a building bbox.
Returns a binary mask of likely shadow pixels in the expanded bbox area.
"""
x, y, w, h = bbox
img_h, img_w = after_gray.shape[:2]
# Expand bbox to capture shadows cast beside the building
ex = int(w * expand)
ey = int(h * expand)
x1 = max(0, x - ex)
y1 = max(0, y - ey)
x2 = min(img_w, x + w + ex)
y2 = min(img_h, y + h + ey)
before_crop = before_gray[y1:y2, x1:x2].astype(np.float32)
after_crop = after_gray[y1:y2, x1:x2].astype(np.float32)
if before_crop.size == 0 or after_crop.size == 0:
return None, 0
# New shadow = pixels that got significantly darker in the after image
darkening = before_crop - after_crop
dark_thresh = max(25, np.std(darkening) * 1.5)
shadow_mask = (darkening > dark_thresh).astype(np.uint8) * 255
# Remove shadow pixels inside the building footprint itself
bx1, by1 = x - x1, y - y1
bx2, by2 = bx1 + w, by1 + h
bx1, by1 = max(0, bx1), max(0, by1)
bx2 = min(shadow_mask.shape[1], bx2)
by2 = min(shadow_mask.shape[0], by2)
shadow_mask[by1:by2, bx1:bx2] = 0
# Clean noise
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
shadow_mask = cv2.morphologyEx(shadow_mask, cv2.MORPH_OPEN, kernel)
shadow_pixels = np.sum(shadow_mask > 0)
return shadow_mask, shadow_pixels
def estimate_building_height(before_img, after_img, bbox, features):
"""
Estimate building stories and height from shadow length and footprint geometry.
Returns (estimated_stories, estimated_height_m).
"""
before_gray = cv2.cvtColor(before_img, cv2.COLOR_RGB2GRAY)
after_gray = cv2.cvtColor(after_img, cv2.COLOR_RGB2GRAY)
x, y, w, h = bbox
shadow_mask, shadow_px = _detect_shadow_region(before_gray, after_gray, bbox)
short_side = max(min(w, h), 1)
footprint_area = w * h
# --- Shadow-based estimate ---
shadow_ratio = 0.0
if shadow_mask is not None and shadow_px > 20:
# Measure max extent of shadow perpendicular to building edge
coords = np.column_stack(np.where(shadow_mask > 0))
if len(coords) > 5:
# Shadow length = extent along the longer axis of shadow cluster
spread_y = coords[:, 0].max() - coords[:, 0].min()
spread_x = coords[:, 1].max() - coords[:, 1].min()
shadow_length = max(spread_y, spread_x)
shadow_ratio = shadow_length / short_side
# --- Footprint-based estimate ---
aspect = max(w, h) / max(short_side, 1)
# Compact footprints (aspect < 2.5) tend to be multi-story; elongated are single-story
footprint_factor = 1.0
if aspect > 3.0:
footprint_factor = 0.5 # likely single-story warehouse/industrial
elif aspect < 1.5 and footprint_area > 2000:
footprint_factor = 1.3 # compact large footprint = likely taller
# --- Texture regularity bonus ---
# Buildings with low orientation entropy (regular structure) tend to be taller
regularity_bonus = 0.0
if features and features.get("orientation_entropy", 3.0) < 2.2:
regularity_bonus = 0.5
# --- Combine signals ---
# Base: shadow ratio maps ~0.3-0.5 per story in typical nadir imagery
if shadow_ratio > 0.1:
raw_stories = shadow_ratio / 0.35
else:
# No clear shadow: use footprint area as rough proxy
if footprint_area > 5000:
raw_stories = 3.0
elif footprint_area > 2000:
raw_stories = 2.0
else:
raw_stories = 1.0
raw_stories = raw_stories * footprint_factor + regularity_bonus
stories = max(1, min(50, int(round(raw_stories))))
height_m = round(stories * _STORY_HEIGHT_M, 1)
return stories, height_m
def classify_construction_stage(features, bbox):
"""
Classify construction stage from visual features.
Returns (stage_name, confidence).
"""
if features is None:
return "Unknown", 0.0
w, h = bbox[2], bbox[3]
area = w * h
scores = {
"Foundation": 0.0,
"Structural": 0.0,
"Under Construction": 0.0,
"Complete": 0.0,
}
tex = features.get("texture_std", 30)
edge = features.get("edge_density", 40)
orient = features.get("orientation_entropy", 2.5)
homog = features.get("color_homogeneity", 25)
bright = features.get("brightness", 60)
sat = features.get("saturation", 50)
glcm = features.get("glcm_contrast", 500)
lbp_var = features.get("lbp_variance", 0.04)
# --- Foundation ---
# Flat, low-texture, soil/concrete colored, homogeneous
if tex < 22:
scores["Foundation"] += 0.25
if edge < 30:
scores["Foundation"] += 0.20
if homog < 20:
scores["Foundation"] += 0.20
if 40 <= bright <= 75:
scores["Foundation"] += 0.15
if sat < 60:
scores["Foundation"] += 0.10
if lbp_var < 0.03:
scores["Foundation"] += 0.10
# --- Structural/Framing ---
# High edges, geometric regularity, high contrast grid patterns
if edge > 50:
scores["Structural"] += 0.25
if orient < 2.2:
scores["Structural"] += 0.20
if glcm > 800:
scores["Structural"] += 0.20
if tex > 30:
scores["Structural"] += 0.15
if homog > 30:
scores["Structural"] += 0.10
if area > 1000:
scores["Structural"] += 0.10
# --- Under Construction ---
# Mixed materials, irregular texture, medium-high edge density
if 25 < tex < 50:
scores["Under Construction"] += 0.20
if 35 < edge < 65:
scores["Under Construction"] += 0.20
if orient > 2.6:
scores["Under Construction"] += 0.20
if homog > 25:
scores["Under Construction"] += 0.15
if 0.03 < lbp_var < 0.07:
scores["Under Construction"] += 0.15
if sat < 80:
scores["Under Construction"] += 0.10
# --- Complete ---
# Uniform roof, clean edges, low entropy, consistent color
if tex < 28:
scores["Complete"] += 0.20
if orient < 2.3:
scores["Complete"] += 0.25
if homog < 22:
scores["Complete"] += 0.20
if edge > 25:
scores["Complete"] += 0.10
if lbp_var < 0.04:
scores["Complete"] += 0.15
if bright > 50:
scores["Complete"] += 0.10
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return "Unknown", conf
return best, min(conf, 1.0)
def analyze_building_3d(before_img, after_img, region, features):
"""
Run 3D analysis on a single building/construction region.
Enriches the region dict with stories, height, and construction stage.
"""
bbox = region["bbox"]
stories, height_m = estimate_building_height(before_img, after_img, bbox, features)
stage, stage_conf = classify_construction_stage(features, bbox)
region["estimated_stories"] = stories
region["estimated_height_m"] = height_m
region["construction_stage"] = stage
region["construction_stage_confidence"] = stage_conf
return region
# ---------------------------------------------------------------------------
# 17. Region analysis
# ---------------------------------------------------------------------------
def _tight_bbox(labels, label_id, stats_row):
"""
Compute a tighter bounding box using actual changed pixels.
Falls back to the connected-component bbox if the mask is dense enough.
"""
x = stats_row[cv2.CC_STAT_LEFT]
y = stats_row[cv2.CC_STAT_TOP]
w = stats_row[cv2.CC_STAT_WIDTH]
h = stats_row[cv2.CC_STAT_HEIGHT]
area = stats_row[cv2.CC_STAT_AREA]
fill_ratio = area / max(w * h, 1)
# If the component fills less than 20% of its bbox, compute a tighter fit
if fill_ratio < 0.20 and area > 100:
ys, xs = np.where(labels == label_id)
if len(xs) > 0:
x = int(np.min(xs))
y = int(np.min(ys))
w = int(np.max(xs) - x + 1)
h = int(np.max(ys) - y + 1)
fill_ratio = area / max(w * h, 1)
return x, y, w, h, fill_ratio
def _iou(boxA, boxB):
"""Intersection-over-union for two (x,y,w,h) boxes."""
ax1, ay1, aw, ah = boxA
bx1, by1, bw, bh = boxB
ax2, ay2 = ax1 + aw, ay1 + ah
bx2, by2 = bx1 + bw, by1 + bh
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
union = aw * ah + bw * bh - inter
return inter / max(union, 1)
def _nms_regions(regions, iou_thresh=0.45):
"""Non-maximum suppression: keep the highest-area box when two overlap."""
if len(regions) < 2:
return regions
keep = []
used = set()
for i, r in enumerate(regions):
if i in used:
continue
keep.append(r)
for j in range(i + 1, len(regions)):
if j in used:
continue
if _iou(r["bbox"], regions[j]["bbox"]) > iou_thresh:
used.add(j)
return keep
def analyze_change_regions(change_mask, image, min_area=400, use_ensemble=True,
before_img=None, registration_ok=True):
"""
Find connected change regions with strict quality filters:
- Adaptive min_area scaled to image size
- Fill-ratio filter (>= 0.12) rejects sparse noise boxes
- Tighter bounding boxes computed from actual pixel coordinates
- NMS to remove overlapping/duplicate boxes
- Max 60 regions cap to avoid flooding the UI
"""
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(
change_mask, connectivity=8)
change_regions = []
region_id = 0
img_h, img_w = change_mask.shape[:2]
img_area = img_h * img_w
# Adaptive minimum region size:
# - keeps sensitivity on smaller images
# - suppresses speckle noise on larger images
if min_area is None:
min_area = int(max(350, min(1400, img_area * 0.00012)))
for i in range(1, num_labels):
raw_area = stats[i, cv2.CC_STAT_AREA]
if raw_area < min_area:
continue
x, y, w, h, fill_ratio = _tight_bbox(labels, i, stats[i])
# Reject very sparse regions (bbox is mostly empty)
if fill_ratio < 0.12:
continue
# Keep large real changes; only suppress near-full-frame artifacts.
# When registration failed, allow larger regions to avoid missing true changes.
max_region_cover = 0.92 if not registration_ok else 0.75
if (w * h) > img_area * max_region_cover and fill_ratio < 0.35:
continue
cx, cy = centroids[i]
if use_ensemble and raw_area > 500:
object_type, confidence = classify_with_ensemble(
image, (x, y, w, h), before_region=before_img)
else:
object_type, confidence = classify_object_type(
image, (x, y, w, h), before_region=before_img)
if object_type is None:
# Do not silently drop large coherent regions; keep them as generic
# ground-change candidates so key changes are still surfaced.
if raw_area >= max(min_area * 2, 800) and fill_ratio >= 0.18:
object_type = "Unclassified Ground Change"
confidence = max(0.2, min(0.5, fill_ratio))
else:
continue
region_id += 1
region = {
"id": region_id,
"area": int(raw_area),
"bbox": (x, y, w, h),
"center": (int(cx), int(cy)),
"object_type": object_type,
"confidence": confidence,
"fill_ratio": round(fill_ratio, 3),
"sub_type": None,
"sub_type_confidence": None,
"estimated_stories": None,
"estimated_height_m": None,
"construction_stage": None,
}
if before_img is not None:
if object_type in _VEGETATION_TYPES:
sub, sub_conf = classify_vegetation_subtype(
before_img, image, (x, y, w, h))
region["sub_type"] = sub
region["sub_type_confidence"] = sub_conf
elif object_type in _STRUCTURAL_TYPES:
sub, sub_conf = classify_structural_subtype(
before_img, image, (x, y, w, h), object_type)
region["sub_type"] = sub
region["sub_type_confidence"] = sub_conf
if object_type in _BUILDING_TYPES:
pad = 5
ry1 = max(0, y - pad)
ry2 = min(image.shape[0], y + h + pad)
rx1 = max(0, x - pad)
rx2 = min(image.shape[1], x + w + pad)
crop = image[ry1:ry2, rx1:rx2]
feats = extract_advanced_features(crop) if crop.size > 0 else None
analyze_building_3d(before_img, image, region, feats)
change_regions.append(region)
# Sort by area descending, apply NMS, cap at 60
change_regions.sort(key=lambda r: r["area"], reverse=True)
change_regions = _nms_regions(change_regions, iou_thresh=0.45)
change_regions = change_regions[:60]
# Re-number after filtering
for idx, r in enumerate(change_regions, start=1):
r["id"] = idx
total_px = img_area
for r in change_regions:
r["severity"] = _severity_from_region(r, total_px)
return change_regions
# ---------------------------------------------------------------------------
# 18. Main pipeline
# ---------------------------------------------------------------------------
def run_detection(before_pil, after_pil, method="AI-Based Deep Learning",
enable_registration=True, enable_normalization=True,
detection_sensitivity=0.5, min_region_area=None):
"""Run full detection pipeline; returns change_mask, result_image, stats, regions."""
before_array = preprocess_image(before_pil)
after_array = preprocess_image(after_pil)
registration_ok = False
if enable_registration:
before_array, after_array, registration_ok = register_images(before_array, after_array)
if enable_normalization:
before_array, after_array = normalize_radiometry(before_array, after_array)
if method == "AI-Based Deep Learning":
change_mask, threshold_debug = ai_deep_learning_method(
before_array, after_array, sensitivity=detection_sensitivity
)
elif method == "Image Difference":
change_mask, threshold_debug = image_difference_method(
before_array, after_array, sensitivity=detection_sensitivity
)
elif method == "Feature-Based":
change_mask = feature_based_method(before_array, after_array)
threshold_debug = {
"method": "Feature-Based",
"threshold_used": None,
"note": "KMeans clustering path does not use binary threshold.",
"sensitivity": float(detection_sensitivity),
}
else:
change_mask, threshold_debug = hybrid_method(
before_array, after_array, sensitivity=detection_sensitivity
)
# --- Adaptive fallback for empty/sparse masks ---
# In some scenes, ORB/ECC registration + fused thresholding can produce an overly
# sparse binary mask (leading to 0 detected regions). If that happens, fall back
# to the more stable Image Difference mask.
total_pixels = int(change_mask.shape[0] * change_mask.shape[1])
changed_pixels_ratio = float(np.sum(change_mask > 127)) / float(total_pixels) if total_pixels else 0.0
used_fallback = False
if method in ("AI-Based Deep Learning", "Hybrid Approach") and changed_pixels_ratio < 0.0025:
diff_mask, diff_debug = image_difference_method(
before_array, after_array, sensitivity=detection_sensitivity
)
diff_ratio = float(np.sum(diff_mask > 127)) / float(total_pixels) if total_pixels else 0.0
# Only switch if the diff mask clearly contains more signal.
if diff_ratio > max(0.005, changed_pixels_ratio * 3.0):
change_mask = diff_mask
used_fallback = True
threshold_debug = {
"method": f"{method} (fallback->Image Difference)",
"fallback_used": True,
"ai_hybrid_changed_ratio": changed_pixels_ratio,
"diff_changed_ratio": diff_ratio,
"diff_debug": diff_debug,
"sensitivity": float(detection_sensitivity),
}
change_regions = analyze_change_regions(
change_mask,
after_array,
min_area=min_region_area,
before_img=before_array,
registration_ok=registration_ok,
)
total_pixels = int(change_mask.shape[0] * change_mask.shape[1])
result_image = visualize_changes(
before_array, after_array, change_mask,
regions=change_regions, total_pixels=total_pixels
)
changed_pixels = int(np.sum(change_mask > 127))
change_pct = (changed_pixels / total_pixels * 100.0) if total_pixels else 0.0
stats = {
"total_pixels": total_pixels,
"changed_pixels": changed_pixels,
"unchanged_pixels": total_pixels - changed_pixels,
"change_percentage": change_pct,
"image_width": change_mask.shape[1],
"image_height": change_mask.shape[0],
"threshold_debug": threshold_debug,
"params": {
"detection_sensitivity": float(detection_sensitivity),
"min_region_area": min_region_area,
"enable_registration": bool(enable_registration),
"enable_normalization": bool(enable_normalization),
"registration_ok": bool(registration_ok),
},
}
return change_mask, result_image, stats, change_regions