satdetect / app /detection_engine.py
coderuday21's picture
Fix detection precision: gated fusion, strict registration, calibrated thresholds, preload model
ba4abf7
"""
Satellite Change Detection Engine v4
High-accuracy detection with multi-channel analysis, SSIM, CVA, texture features,
adaptive thresholding, vegetation/shadow suppression, SNR-weighted fusion,
SIFT+FLANN registration, tile-based + multi-scale processing, Excess Green
vegetation index, confidence maps, and improved object classification.
"""
import logging
import numpy as np
import cv2
from PIL import Image
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from collections import Counter
_log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 1. Pre-processing
# ---------------------------------------------------------------------------
def _ensure_rgb_uint8(img_array):
"""Convert any image array to 3-channel RGB uint8."""
if img_array.ndim == 2:
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
elif img_array.ndim == 3 and img_array.shape[2] == 4:
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
elif img_array.ndim != 3 or img_array.shape[2] != 3:
raise ValueError(f"Unsupported image shape: {img_array.shape}")
if img_array.dtype != np.uint8:
img_array = np.clip(img_array, 0, 255).astype(np.uint8)
return img_array
def _to_float32(img):
"""Normalize uint8 image to float32 [0,1]."""
return img.astype(np.float32) / 255.0
def preprocess_image(image, max_size=1600):
"""Preprocess image: convert to RGB, limit size, light Gaussian denoise."""
img_array = np.array(image)
img_array = _ensure_rgb_uint8(img_array)
height, width = img_array.shape[:2]
if max(height, width) > max_size:
scale = max_size / max(height, width)
new_w, new_h = max(1, int(width * scale)), max(1, int(height * scale))
img_array = cv2.resize(img_array, (new_w, new_h), interpolation=cv2.INTER_AREA)
img_array = cv2.GaussianBlur(img_array, (5, 5), 0)
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
lap_var = float(cv2.Laplacian(gray, cv2.CV_64F).var())
if lap_var < 80.0:
img_array = cv2.bilateralFilter(img_array, 5, 50, 50)
return img_array
# ---------------------------------------------------------------------------
# 2. Improved image registration (alignment)
# ---------------------------------------------------------------------------
def _match_features_sift(gray1, gray2):
"""SIFT + FLANN matching with Lowe's ratio test. Returns (homography, inlier_ratio) or (None, 0)."""
try:
sift = cv2.SIFT_create(nfeatures=4000)
kp1, des1 = sift.detectAndCompute(gray1, None)
kp2, des2 = sift.detectAndCompute(gray2, None)
if des1 is None or des2 is None or len(des1) < 10 or len(des2) < 10:
return None, 0.0
FLANN_INDEX_KDTREE = 1
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
search_params = dict(checks=100)
flann = cv2.FlannBasedMatcher(index_params, search_params)
raw_matches = flann.knnMatch(des1, des2, k=2)
good = [m for m, n in raw_matches if len([m, n]) == 2 and m.distance < 0.7 * n.distance]
if len(good) < 8:
return None, 0.0
src = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
dst = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
H, mask = cv2.findHomography(dst, src, cv2.RANSAC, 4.0, maxIters=3000)
if H is None or mask is None:
return None, 0.0
det = np.linalg.det(H[:2, :2])
if abs(det) < 0.1 or abs(det) > 10.0:
return None, 0.0
return H, float(np.sum(mask)) / len(mask)
except Exception:
return None, 0.0
def _match_features_orb(gray1, gray2, max_features=3000):
"""ORB fallback matching. Returns (homography, inlier_ratio) or (None, 0)."""
best_H, best_ir = None, 0.0
for nf, ratio_thr in [(max_features, 0.75), (max_features * 2, 0.80)]:
orb = cv2.ORB_create(nfeatures=nf, scoreType=cv2.ORB_HARRIS_SCORE,
edgeThreshold=15, patchSize=31)
kp1, des1 = orb.detectAndCompute(gray1, None)
kp2, des2 = orb.detectAndCompute(gray2, None)
if des1 is None or des2 is None or len(des1) < 10 or len(des2) < 10:
continue
bf = cv2.BFMatcher(cv2.NORM_HAMMING)
raw_matches = bf.knnMatch(des1, des2, k=2)
good = [m for m, n in raw_matches if len([m, n]) == 2 and m.distance < ratio_thr * n.distance]
if len(good) < 8:
continue
src = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
dst = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
H, mask = cv2.findHomography(dst, src, cv2.RANSAC, 4.0, maxIters=2000)
if H is None or mask is None:
continue
det = np.linalg.det(H[:2, :2])
if abs(det) < 0.1 or abs(det) > 10.0:
continue
ir = float(np.sum(mask)) / len(mask)
if ir > best_ir:
best_H, best_ir = H, ir
return best_H, best_ir
def _alignment_ncc(img1, img2):
"""Global normalized cross-correlation between two RGB images."""
g1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32).ravel()
g2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float32).ravel()
if g1.size != g2.size or g1.size < 64:
return 0.0
c = np.corrcoef(g1, g2)[0, 1]
return float(c) if np.isfinite(c) else 0.0
def _phase_correlation_translate(img2, gray1, gray2):
"""Shift img2 by phase-correlation offset (same-scale screenshot pairs)."""
try:
shift, _ = cv2.phaseCorrelate(gray1.astype(np.float32), gray2.astype(np.float32))
dx, dy = float(shift[0]), float(shift[1])
if abs(dx) < 0.5 and abs(dy) < 0.5:
return img2
h, w = img2.shape[:2]
M = np.float32([[1, 0, dx], [0, 1, dy]])
return cv2.warpAffine(img2, M, (w, h), borderMode=cv2.BORDER_REFLECT)
except Exception:
return img2
def register_images(img1, img2, max_features=3000):
"""
Multi-stage alignment with quality metrics.
Returns (img1, img2_aligned, registration_ok, reg_meta).
"""
h, w = img1.shape[:2]
reg_meta = {
"method": "none",
"inlier_ratio": 0.0,
"ncc": 0.0,
"homography_used": False,
}
if img1.shape[:2] != img2.shape[:2]:
img2 = cv2.resize(img2, (w, h), interpolation=cv2.INTER_LINEAR)
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
img2 = _phase_correlation_translate(img2, gray1, gray2)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
match_method = "sift"
H, ir = _match_features_sift(gray1, gray2)
if H is None or ir < 0.25:
H_orb, ir_orb = _match_features_orb(gray1, gray2, max_features)
if ir_orb > ir:
H, ir = H_orb, ir_orb
match_method = "orb"
reg_meta["inlier_ratio"] = float(ir)
if H is not None and ir >= 0.35:
img2_warped = cv2.warpPerspective(img2, H, (w, h), borderMode=cv2.BORDER_REFLECT)
img2_refined = _refine_ecc(img1, img2_warped)
ncc = _alignment_ncc(img1, img2_refined)
reg_meta.update({
"method": match_method,
"ncc": ncc,
"homography_used": True,
})
if ncc >= 0.55:
return img1, img2_refined, True, reg_meta
reg_meta["method"] = f"{match_method}_rejected"
return img1, img2, False, reg_meta
img1_ecc, img2_ecc, ok, ecc_meta = _register_images_ecc_multiscale(img1, img2)
reg_meta.update(ecc_meta)
return img1_ecc, img2_ecc, ok, reg_meta
def _refine_ecc(img1, img2_initial):
"""Refine an already-coarse-aligned image with ECC translation/affine."""
try:
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32) / 255.0
gray2 = cv2.cvtColor(img2_initial, cv2.COLOR_RGB2GRAY).astype(np.float32) / 255.0
h, w = img1.shape[:2]
warp = np.eye(2, 3, dtype=np.float32)
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 100, 1e-5)
# Try affine first, fall back to translation
for motion in [cv2.MOTION_AFFINE, cv2.MOTION_TRANSLATION]:
try:
warp_m = np.eye(2, 3, dtype=np.float32)
cc, warp_m = cv2.findTransformECC(
gray1, gray2, warp_m, motion, criteria)
if cc >= 0.6:
aligned = cv2.warpAffine(
img2_initial, warp_m, (w, h),
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP,
borderMode=cv2.BORDER_REFLECT)
return aligned
except Exception:
continue
except Exception:
pass
return img2_initial
def _register_images_ecc_multiscale(img1, img2):
"""
Multi-scale ECC fallback: start from a downscaled version (faster, wider
convergence basin), then refine at full resolution.
"""
try:
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
h, w = img1.shape[:2]
# Build 2-level pyramid
scales = [4, 2, 1]
warp = np.eye(2, 3, dtype=np.float32)
for scale in scales:
sh, sw = h // scale, w // scale
if sh < 64 or sw < 64:
continue
g1 = cv2.resize(gray1, (sw, sh)).astype(np.float32) / 255.0
g2 = cv2.resize(gray2, (sw, sh)).astype(np.float32) / 255.0
scaled_warp = warp.copy()
scaled_warp[0, 2] /= (scales[0] / scale) if scale != scales[0] else 1
scaled_warp[1, 2] /= (scales[0] / scale) if scale != scales[0] else 1
iters = 300 if scale == scales[0] else 150
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, iters, 1e-6)
try:
cc, scaled_warp = cv2.findTransformECC(
g1, g2, scaled_warp, cv2.MOTION_AFFINE, criteria)
except Exception:
continue
# Scale translation back for next level
if scale != 1:
warp = scaled_warp.copy()
next_idx = scales.index(scale) + 1
if next_idx < len(scales):
next_scale = scales[next_idx]
ratio = scale / next_scale
warp[0, 2] *= ratio
warp[1, 2] *= ratio
else:
warp = scaled_warp
aligned = cv2.warpAffine(
img2, warp, (w, h),
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP,
borderMode=cv2.BORDER_REFLECT)
# Check alignment quality via normalized cross-correlation
g_aligned = cv2.cvtColor(aligned, cv2.COLOR_RGB2GRAY).astype(np.float32)
g_ref = gray1.astype(np.float32)
ncc = float(np.corrcoef(g_ref.ravel(), g_aligned.ravel())[0, 1])
if not np.isfinite(ncc):
ncc = 0.0
meta = {"method": "ecc_multiscale", "inlier_ratio": 0.0, "ncc": ncc, "homography_used": False}
return img1, aligned, bool(ncc >= 0.50), meta
except Exception:
return img1, img2, False, {
"method": "ecc_failed", "inlier_ratio": 0.0, "ncc": 0.0, "homography_used": False,
}
# ---------------------------------------------------------------------------
# 3. Improved radiometric normalization
# ---------------------------------------------------------------------------
def normalize_radiometry(img1, img2):
"""Match after image radiometry to before; symmetric CLAHE on L channel."""
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
result = lab2.copy()
for ch in range(3):
mean1, std1 = np.mean(lab1[:, :, ch]), np.std(lab1[:, :, ch])
mean2, std2 = np.mean(lab2[:, :, ch]), np.std(lab2[:, :, ch])
if std2 > 1e-6:
result[:, :, ch] = (lab2[:, :, ch] - mean2) * (std1 / std2) + mean1
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
lab1_u = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB)
lab2_u = np.clip(result, 0, 255).astype(np.uint8)
lab1_u[:, :, 0] = clahe.apply(lab1_u[:, :, 0])
lab2_u[:, :, 0] = clahe.apply(lab2_u[:, :, 0])
return cv2.cvtColor(lab1_u, cv2.COLOR_LAB2RGB), cv2.cvtColor(lab2_u, cv2.COLOR_LAB2RGB)
# ---------------------------------------------------------------------------
# 4. Vegetation suppression
# ---------------------------------------------------------------------------
def compute_excess_green(img):
"""
Excess Green Index: ExG = 2G - R - B (normalized to [0,1]).
Excellent for separating vegetation from soil/buildings in satellite imagery.
"""
r = img[:, :, 0].astype(np.float32)
g = img[:, :, 1].astype(np.float32)
b = img[:, :, 2].astype(np.float32)
total = r + g + b + 1e-6
rn, gn, bn = r / total, g / total, b / total
exg = 2.0 * gn - rn - bn
return np.clip(exg, 0, 1).astype(np.float32)
def compute_vegetation_mask(img):
"""
Identify vegetation pixels using three complementary indices:
1. Pseudo-NDVI (G-R)/(G+R)
2. Excess Green Index: ExG = 2G - R - B
3. HSV hue/saturation ranges
Returns a float map in [0, 1] where 1.0 = vegetation, 0.0 = non-vegetation.
"""
r = img[:, :, 0].astype(np.float32)
g = img[:, :, 1].astype(np.float32)
ndvi = (g - r) / (g + r + 1e-6)
exg = compute_excess_green(img)
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
hue = hsv[:, :, 0].astype(np.float32)
sat = hsv[:, :, 1].astype(np.float32)
ndvi_veg = (ndvi > 0.08).astype(np.float32)
exg_veg = (exg > 0.05).astype(np.float32)
hsv_veg = ((hue >= 35) & (hue <= 85) & (sat > 30)).astype(np.float32)
veg = np.clip(ndvi_veg * 0.4 + exg_veg * 0.3 + hsv_veg * 0.3, 0, 1)
veg = cv2.GaussianBlur(veg, (11, 11), 0)
return veg
def compute_combined_vegetation_suppression(img1, img2):
"""
Asymmetric vegetation handling:
- Where BOTH images are vegetated: suppress (likely seasonal noise)
- Where only ONE image is vegetated: boost (real vegetation change)
Returns a float map where 1.0 = neutral, <1 = suppress, >1 = boost.
"""
veg1 = compute_vegetation_mask(img1)
veg2 = compute_vegetation_mask(img2)
both_veg = np.minimum(veg1, veg2)
one_only = np.abs(veg1 - veg2)
seasonal_suppression = 1.0 - both_veg * 0.7
vegetation_boost = 1.0 + one_only * 0.3
return (seasonal_suppression * vegetation_boost).astype(np.float32)
# ---------------------------------------------------------------------------
# 5. Shadow / illumination-only change suppression
# ---------------------------------------------------------------------------
def compute_shadow_suppression(img1, img2):
"""
Detect pixels where only brightness (L) changed but chrominance (A, B)
stayed similar. These are shadow/illumination shifts, not real changes.
Returns a float map in [0, 1]: 1.0 = real change, ~0.2 = illumination-only.
"""
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
delta_l = np.abs(lab1[:, :, 0] - lab2[:, :, 0])
delta_a = np.abs(lab1[:, :, 1] - lab2[:, :, 1])
delta_b = np.abs(lab1[:, :, 2] - lab2[:, :, 2])
chroma_change = delta_a + delta_b
brightness_only = (delta_l > 18) & (chroma_change < 12)
shadow_map = brightness_only.astype(np.float32)
shadow_map = cv2.GaussianBlur(shadow_map, (9, 9), 0)
suppression = 1.0 - shadow_map * 0.8
return suppression.astype(np.float32)
# ---------------------------------------------------------------------------
# 6. Change Vector Analysis (CVA)
# ---------------------------------------------------------------------------
def compute_cva(img1, img2):
"""
Change Vector Analysis in LAB space.
Returns a normalized change magnitude map with illumination-only
changes suppressed via direction filtering.
"""
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
dl = (lab2[:, :, 0] - lab1[:, :, 0]) / 100.0
da = (lab2[:, :, 1] - lab1[:, :, 1]) / 128.0
db = (lab2[:, :, 2] - lab1[:, :, 2]) / 128.0
magnitude = np.sqrt(dl ** 2 + da ** 2 + db ** 2)
chroma_mag = np.sqrt(da ** 2 + db ** 2)
total_mag = magnitude + 1e-8
chroma_ratio = chroma_mag / total_mag
# Suppress illumination-only changes (low chroma ratio)
suppression = np.clip(chroma_ratio * 2.5, 0.15, 1.0)
magnitude = magnitude * suppression
p995 = float(np.quantile(magnitude, 0.995))
if p995 > 1e-8:
magnitude = np.clip(magnitude / p995, 0, 1)
return magnitude.astype(np.float32)
# ---------------------------------------------------------------------------
# 7. SSIM-based structural change map
# ---------------------------------------------------------------------------
def _ssim_at_scale(gray1, gray2, win_size=11):
"""Compute SSIM dissimilarity at a single scale."""
sigma = win_size / 6.0
C1 = (0.01 * 255) ** 2
C2 = (0.03 * 255) ** 2
mu1 = cv2.GaussianBlur(gray1, (win_size, win_size), sigma)
mu2 = cv2.GaussianBlur(gray2, (win_size, win_size), sigma)
mu1_sq = mu1 * mu1
mu2_sq = mu2 * mu2
mu1_mu2 = mu1 * mu2
sigma1_sq = np.maximum(cv2.GaussianBlur(gray1 * gray1, (win_size, win_size), sigma) - mu1_sq, 0)
sigma2_sq = np.maximum(cv2.GaussianBlur(gray2 * gray2, (win_size, win_size), sigma) - mu2_sq, 0)
sigma12 = cv2.GaussianBlur(gray1 * gray2, (win_size, win_size), sigma) - mu1_mu2
denom = (mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2)
ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / (denom + 1e-12)
dssim = np.clip((1.0 - ssim_map) / 2.0, 0, 1)
return dssim
def compute_ssim_change_map(img1, img2, win_size=11):
"""
Multi-scale SSIM dissimilarity: averages full-res and half-res scales
to capture both fine and coarse structural changes.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float64)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float64)
dssim_full = _ssim_at_scale(gray1, gray2, win_size)
h, w = gray1.shape
g1_half = cv2.resize(gray1, (max(1, w // 2), max(1, h // 2)))
g2_half = cv2.resize(gray2, (max(1, w // 2), max(1, h // 2)))
half_win = max(3, win_size // 2) | 1
dssim_half = _ssim_at_scale(g1_half, g2_half, half_win)
dssim_half_up = cv2.resize(dssim_half, (w, h))
dssim = 0.6 * dssim_full + 0.4 * dssim_half_up
return dssim
# ---------------------------------------------------------------------------
# 8. Texture feature extraction (LBP)
# ---------------------------------------------------------------------------
def compute_lbp(gray, radius=1, n_points=8):
"""Compute simplified Local Binary Pattern texture descriptor."""
h, w = gray.shape
lbp = np.zeros_like(gray, dtype=np.float32)
for i in range(n_points):
angle = 2 * np.pi * i / n_points
dx = int(round(radius * np.cos(angle)))
dy = int(round(-radius * np.sin(angle)))
shifted = np.roll(np.roll(gray, dy, axis=0), dx, axis=1)
lbp += (shifted >= gray).astype(np.float32)
return lbp / n_points
def compute_texture_change(img1, img2):
"""Compute texture difference using LBP."""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY).astype(np.float32)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY).astype(np.float32)
lbp1 = compute_lbp(gray1)
lbp2 = compute_lbp(gray2)
texture_diff = np.abs(lbp1 - lbp2)
return texture_diff
# ---------------------------------------------------------------------------
# 9. Edge-aware change detection
# ---------------------------------------------------------------------------
def compute_edge_change(img1, img2):
"""Compute edge-based change map using Canny edges."""
gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
# Adaptive Canny thresholds based on median intensity
med1 = np.median(gray1)
edges1 = cv2.Canny(gray1, int(max(0, 0.67 * med1)), int(min(255, 1.33 * med1)))
med2 = np.median(gray2)
edges2 = cv2.Canny(gray2, int(max(0, 0.67 * med2)), int(min(255, 1.33 * med2)))
# Dilate edges slightly so nearby edges match
kernel = np.ones((3, 3), np.uint8)
edges1_d = cv2.dilate(edges1, kernel, iterations=1)
edges2_d = cv2.dilate(edges2, kernel, iterations=1)
# New edges = present in one image but not the other
edge_change = cv2.absdiff(edges1_d, edges2_d).astype(np.float32) / 255.0
return edge_change
# ---------------------------------------------------------------------------
# 10. Improved detection methods
# ---------------------------------------------------------------------------
def _adaptive_binary_threshold(score_uint8, min_floor=25, sensitivity=0.5):
"""
Robust thresholding for noisy scenes.
Uses max(Otsu, noise-floor, fixed floor) where noise-floor is median + 3*MAD.
"""
otsu_val, _ = cv2.threshold(
score_uint8, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
)
median = float(np.median(score_uint8))
mad = float(np.median(np.abs(score_uint8.astype(np.float32) - median)))
noise_floor = median + 3.0 * mad
# Higher sensitivity => lower threshold (detect more), lower sensitivity => stricter
sens = float(np.clip(sensitivity, 0.0, 1.0))
sens_shift = int((0.5 - sens) * 24) # approx -12..+12 around baseline
thr = int(max(min_floor, otsu_val, noise_floor) + sens_shift)
thr = max(0, min(255, thr))
_, mask = cv2.threshold(score_uint8, thr, 255, cv2.THRESH_BINARY)
return mask, thr, float(otsu_val), float(noise_floor)
def image_difference_method(img1, img2, threshold=0.25, blur_size=5, sensitivity=0.5):
"""Improved image difference with multi-channel analysis and adaptive threshold."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
# Multi-channel difference in LAB (perceptually uniform)
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
lab1_blur = cv2.GaussianBlur(lab1, (blur_size, blur_size), 0)
lab2_blur = cv2.GaussianBlur(lab2, (blur_size, blur_size), 0)
# Weighted Delta-E inspired difference
diff = lab1_blur - lab2_blur
delta_e = np.sqrt(
(diff[:, :, 0] / 100.0) ** 2 +
(diff[:, :, 1] / 128.0) ** 2 +
(diff[:, :, 2] / 128.0) ** 2
)
delta_e = delta_e / delta_e.max() if delta_e.max() > 0 else delta_e
delta_uint8 = (delta_e * 255).astype(np.uint8)
change_mask, used_thr, otsu_val, noise_floor = _adaptive_binary_threshold(
delta_uint8, min_floor=30, sensitivity=sensitivity
)
change_mask = _clean_mask(change_mask)
debug = {
"method": "Image Difference",
"threshold_used": int(used_thr),
"otsu": float(otsu_val),
"noise_floor": float(noise_floor),
"sensitivity": float(sensitivity),
}
return change_mask, debug
def feature_based_method(img1, img2, num_clusters=4, sensitivity=0.5):
"""Feature-based change detection using multi-space clustering."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
hsv1 = cv2.cvtColor(img1, cv2.COLOR_RGB2HSV).astype(np.float32)
hsv2 = cv2.cvtColor(img2, cv2.COLOR_RGB2HSV).astype(np.float32)
diff_lab = np.abs(lab1 - lab2)
diff_hsv = np.abs(hsv1 - hsv2)
h, w, _ = diff_lab.shape
features = np.concatenate([diff_lab, diff_hsv[:, :, 1:]], axis=2)
# Downsample for KMeans (full-res is too slow for >1M pixels)
MAX_PIXELS = 250_000
total = h * w
if total > MAX_PIXELS:
scale = np.sqrt(MAX_PIXELS / total)
sh, sw = max(1, int(h * scale)), max(1, int(w * scale))
features_small = cv2.resize(features, (sw, sh))
else:
features_small = features
sh, sw = h, w
features_flat = features_small.reshape(-1, features_small.shape[2])
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_flat)
kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10)
labels_small = kmeans.fit_predict(features_scaled)
cluster_means = [
np.mean(np.linalg.norm(features_flat[labels_small == i], axis=1))
if np.any(labels_small == i) else 0.0
for i in range(num_clusters)
]
change_cluster_idx = np.argmax(cluster_means)
# Map labels back to full resolution by predicting on all pixels
if total > MAX_PIXELS:
full_flat = features.reshape(-1, features.shape[2])
full_scaled = scaler.transform(full_flat)
labels = kmeans.predict(full_scaled)
else:
labels = labels_small
change_mask = (labels == change_cluster_idx).astype(np.uint8) * 255
change_mask = change_mask.reshape(h, w)
change_mask = _clean_mask(change_mask, sensitivity)
return change_mask
def _snr_weight(channel):
"""
Signal-to-noise ratio weight: signal = mean of top 5% values,
noise = std of bottom 50%. Channels with concentrated high responses
score higher than uniformly noisy ones.
"""
flat = channel.ravel()
p95 = float(np.quantile(flat, 0.95))
signal = float(np.mean(flat[flat >= p95])) if p95 > 1e-8 else 0.0
p50 = float(np.quantile(flat, 0.50))
noise = float(np.std(flat[flat <= p50])) + 1e-8
return signal / noise
def _compute_classical_score_map(img1, img2, registration_ok=True):
"""SNR-weighted classical change score in [0,1] before binary threshold."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
lab1 = cv2.cvtColor(img1, cv2.COLOR_RGB2LAB).astype(np.float32)
lab2 = cv2.cvtColor(img2, cv2.COLOR_RGB2LAB).astype(np.float32)
color_maps = []
for scale in (1, 2, 4):
if scale > 1:
s1 = cv2.resize(lab1, (lab1.shape[1] // scale, lab1.shape[0] // scale))
s2 = cv2.resize(lab2, (lab2.shape[1] // scale, lab2.shape[0] // scale))
else:
s1, s2 = lab1, lab2
diff = s1 - s2
delta_e = np.sqrt((diff[:, :, 0] / 100.0) ** 2 +
(diff[:, :, 1] / 128.0) ** 2 +
(diff[:, :, 2] / 128.0) ** 2)
if scale > 1:
delta_e = cv2.resize(delta_e, (lab1.shape[1], lab1.shape[0]))
color_maps.append(delta_e)
color_change = np.mean(color_maps, axis=0)
color_change = color_change / (color_change.max() + 1e-8)
ssim_change = compute_ssim_change_map(img1, img2)
ssim_change = ssim_change / (ssim_change.max() + 1e-8)
texture_change = compute_texture_change(img1, img2)
texture_change = texture_change / (texture_change.max() + 1e-8)
edge_change = compute_edge_change(img1, img2)
cva_change = compute_cva(img1, img2)
if not registration_ok:
ssim_change = ssim_change * 0.45
edge_change = edge_change * 0.45
channels = [color_change, ssim_change, texture_change, edge_change, cva_change]
weights = [_snr_weight(ch) for ch in channels]
total_w = sum(weights) + 1e-8
weights = [w / total_w for w in weights]
fused = np.zeros_like(color_change, dtype=np.float64)
for ch, w in zip(channels, weights):
fused += w * ch.astype(np.float64)
veg_suppression = compute_combined_vegetation_suppression(img1, img2)
shadow_suppression = compute_shadow_suppression(img1, img2)
fused = fused * veg_suppression.astype(np.float64) * shadow_suppression.astype(np.float64)
p995 = float(np.quantile(fused, 0.995))
if p995 <= 1e-8:
p995 = float(fused.max() + 1e-8)
fused_norm = np.clip(fused / (p995 + 1e-8), 0.0, 1.0)
fused_norm = np.power(fused_norm, 0.85)
return cv2.GaussianBlur(fused_norm.astype(np.float32), (5, 5), 0), weights
def fuse_dl_and_classical(dl_score, classical_score, img1, img2, sensitivity=0.5):
"""
Confidence-gated fusion (not union): DL drives structure; classical + ExG for vegetation.
Returns (mask, final_score_map, debug).
"""
sens = float(np.clip(sensitivity, 0.0, 1.0))
h, w = classical_score.shape
if dl_score is None or dl_score.shape != classical_score.shape:
dl_score = np.zeros((h, w), dtype=np.float32)
q = float(np.clip(0.96 - (sens - 0.5) * 0.04, 0.92, 0.98))
T_cl = float(np.quantile(classical_score, q))
final_score = 0.65 * dl_score.astype(np.float32) + 0.35 * classical_score
med_dl, med_cl = 0.35, T_cl * 0.7
both_agree = (dl_score >= med_dl) & (classical_score >= med_cl)
final_score = np.where(both_agree, np.maximum(dl_score, classical_score), final_score)
exg1 = compute_excess_green(img1)
exg2 = compute_excess_green(img2)
delta_exg = np.abs(exg2 - exg1)
veg_boost = (delta_exg > 0.04) & (classical_score >= T_cl * 0.8)
final_score = np.where(veg_boost, np.maximum(final_score, classical_score), final_score)
fused_thr = 0.45 + (1.0 - sens) * 0.15
change_mask = (final_score >= fused_thr).astype(np.uint8) * 255
change_mask = _clean_mask(change_mask, sensitivity=sens)
debug = {
"fusion": "confidence_gated",
"T_dl": 0.40 + (1.0 - sens) * 0.25,
"T_cl_percentile_q": q,
"T_cl_score": T_cl,
"fused_threshold": fused_thr,
"dl_changed_px": int(np.sum(dl_score >= med_dl)),
"classical_changed_px": int(np.sum(classical_score >= T_cl)),
"fused_changed_px": int(np.sum(change_mask > 127)),
}
return change_mask, final_score, debug
def _ai_fusion_core(img1, img2, sensitivity=0.5, registration_ok=True):
"""Classical-only path: score map + threshold. Returns (mask, score_map, debug)."""
classical_score, weights = _compute_classical_score_map(
img1, img2, registration_ok=registration_ok)
sens = float(np.clip(sensitivity, 0.0, 1.0))
q = float(np.clip(0.96 - (sens - 0.5) * 0.04, 0.92, 0.98))
thr_score = float(np.quantile(classical_score, q))
change_mask = (classical_score >= thr_score).astype(np.uint8) * 255
change_mask = _clean_mask(change_mask, sensitivity=sens)
debug = {
"method": "AI-Core",
"threshold_used": int(thr_score * 255),
"threshold_percentile_q": q,
"threshold_score": thr_score,
"sensitivity": float(sensitivity),
"channel_weights": {
"color": round(weights[0], 4),
"ssim": round(weights[1], 4),
"texture": round(weights[2], 4),
"edge": round(weights[3], 4),
"cva": round(weights[4], 4),
},
}
return change_mask, classical_score, debug
def ai_deep_learning_method(img1, img2, sensitivity=0.5, registration_ok=True):
"""AdaptFormer + confidence-gated classical fusion (no blind union)."""
from .model_inference import is_model_available, predict_change_mask
dl_score = None
model_ok = False
T_dl = 0.40 + (1.0 - float(np.clip(sensitivity, 0, 1))) * 0.25
if is_model_available():
try:
_, dl_score = predict_change_mask(img1, img2, threshold=2.0)
model_ok = dl_score is not None
except Exception as e:
_log.warning("AdaptFormer inference failed: %s", e)
classical_score, _ = _compute_classical_score_map(
img1, img2, registration_ok=registration_ok)
if model_ok and dl_score is not None:
combined, _, fuse_debug = fuse_dl_and_classical(
dl_score, classical_score, img1, img2, sensitivity=sensitivity)
debug = {
"method": "AI-Based Deep Learning (AdaptFormer + gated fusion)",
"model": "adaptformer-levir-cd",
"threshold_used": int(T_dl * 255),
"sensitivity": float(sensitivity),
**fuse_debug,
}
return combined, debug
rule_mask, _, core_debug = _ai_fusion_core(
img1, img2, sensitivity=sensitivity, registration_ok=registration_ok)
debug = {
"method": "AI-Based Deep Learning (classical fallback)",
"sensitivity": float(sensitivity),
"core": core_debug,
}
return rule_mask, debug
def hybrid_method(img1, img2, sensitivity=0.5, registration_ok=True):
"""Hybrid: weighted fusion of all methods with confidence-based merging."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
diff_mask, diff_debug = image_difference_method(img1, img2, sensitivity=sensitivity)
feature_mask = feature_based_method(img1, img2, sensitivity=sensitivity)
ai_mask, ai_debug = ai_deep_learning_method(
img1, img2, sensitivity=sensitivity, registration_ok=registration_ok)
combined = (
0.2 * diff_mask.astype(np.float32) +
0.3 * feature_mask.astype(np.float32) +
0.5 * ai_mask.astype(np.float32)
)
base_thr = 110
sens = float(np.clip(sensitivity, 0.0, 1.0))
hybrid_thr = int(np.clip(base_thr + int((0.5 - sens) * 36), 70, 160))
_, final_mask = cv2.threshold(combined.astype(np.uint8), hybrid_thr, 255, cv2.THRESH_BINARY)
final_mask = _clean_mask(final_mask, sensitivity=sensitivity)
debug = {
"method": "Hybrid Approach",
"threshold_used": int(hybrid_thr),
"sensitivity": float(sensitivity),
"sub_methods": {
"image_difference": diff_debug,
"ai_deep_learning": ai_debug,
},
}
return final_mask, debug
# ---------------------------------------------------------------------------
# 10b. Hybrid AI method (deep learning + classical with confidence map)
# ---------------------------------------------------------------------------
def _build_confidence_map_from_channels(img1, img2, dl_score=None):
"""
Build a per-pixel confidence map from multiple signal channels.
Includes color, SSIM, texture, edge, CVA, and optionally a DL score map.
Returns float32 map in [0,1].
"""
from .cd_models.model_utils import build_confidence_map
color = compute_cva(img1, img2)
ssim = compute_ssim_change_map(img1, img2)
ssim_norm = ssim / (ssim.max() + 1e-8)
texture = compute_texture_change(img1, img2)
texture_norm = texture / (texture.max() + 1e-8)
edge = compute_edge_change(img1, img2)
channels = [color, ssim_norm.astype(np.float32), texture_norm.astype(np.float32), edge]
weights = [0.30, 0.25, 0.15, 0.10]
if dl_score is not None:
channels.append(dl_score)
weights.append(0.40)
# Re-normalize so weights sum to 1
total = sum(weights)
weights = [w / total for w in weights]
return build_confidence_map(channels, weights)
def hybrid_ai_method(img1, img2, sensitivity=0.5, registration_ok=True):
"""Hybrid AI: same confidence-gated fusion as default AI path."""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
from .model_inference import is_model_available, predict_change_mask
dl_score = None
dl_method = "none"
if is_model_available():
try:
_, dl_score = predict_change_mask(img1, img2, threshold=2.0)
dl_method = "adaptformer"
except Exception:
pass
if dl_method == "none":
try:
from .cd_models.change_model import has_siamese_weights, predict_siamese
if has_siamese_weights():
_, dl_score = predict_siamese(img1, img2, threshold=2.0)
dl_method = "siamese_unet"
except Exception:
pass
classical_score, _ = _compute_classical_score_map(
img1, img2, registration_ok=registration_ok)
if dl_method != "none" and dl_score is not None:
final_mask, _, fuse_debug = fuse_dl_and_classical(
dl_score, classical_score, img1, img2, sensitivity=sensitivity)
debug = {
"method": f"Hybrid AI ({dl_method} + gated fusion)",
"dl_method": dl_method,
"sensitivity": float(sensitivity),
**fuse_debug,
}
return final_mask, debug
mask, _, core_debug = _ai_fusion_core(
img1, img2, sensitivity=sensitivity, registration_ok=registration_ok)
return mask, {"method": "Hybrid AI (classical fallback)", "core": core_debug}
ALIGNMENT_WARNING_MSG = (
"Images may differ in zoom/crop; use the same map location, zoom level, and crop "
"for before and after screenshots."
)
# ---------------------------------------------------------------------------
# 11. Robust post-processing
# ---------------------------------------------------------------------------
def _clean_mask(mask, sensitivity=0.5, border_margin=12):
"""
Robust morphological cleaning:
1. Zero-out border pixels (registration artifacts)
2. Median filter to kill salt-and-pepper noise
3. Opening to remove small specks
4. Closing to bridge tiny gaps
5. Fill holes inside regions
6. Erode-then-dilate to break thin noise bridges
7. Connected-component area + circularity filtering
"""
mask = mask.copy()
h, w = mask.shape[:2]
if border_margin > 0:
mask[:border_margin, :] = 0
mask[-border_margin:, :] = 0
mask[:, :border_margin] = 0
mask[:, -border_margin:] = 0
mask = cv2.medianBlur(mask, 3)
open_size = max(3, int(4 * (1 - sensitivity * 0.5)))
if open_size % 2 == 0:
open_size += 1
k_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (open_size, open_size))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k_open)
close_size = max(3, int(5 * (1 - sensitivity)))
if close_size % 2 == 0:
close_size += 1
k_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (close_size, close_size))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_close)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
filled = np.zeros_like(mask)
cv2.drawContours(filled, contours, -1, 255, thickness=cv2.FILLED)
k_break = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
filled = cv2.erode(filled, k_break, iterations=1)
filled = cv2.dilate(filled, k_break, iterations=1)
# 7. Component-level filtering: remove tiny survivors and elongated noise
min_component_px = max(200, int(h * w * 0.00003))
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(filled, connectivity=8)
clean = np.zeros_like(filled)
for i in range(1, num_labels):
area = stats[i, cv2.CC_STAT_AREA]
if area < min_component_px:
continue
cw = stats[i, cv2.CC_STAT_WIDTH]
ch = stats[i, cv2.CC_STAT_HEIGHT]
bbox_area = max(cw * ch, 1)
aspect = max(cw, ch) / (min(cw, ch) + 1e-8)
perimeter_approx = 2 * (cw + ch)
circularity = (perimeter_approx ** 2) / (bbox_area + 1e-8)
if circularity > 80 and area < min_component_px * 3:
continue
if aspect > 12 and area < min_component_px * 2:
continue
clean[labels == i] = 255
return clean
# ---------------------------------------------------------------------------
# 12. Severity classification and improved visualization
# ---------------------------------------------------------------------------
def _severity_from_region(region, total_pixels):
"""
Type-aware severity classification.
Building/structural changes use area + confidence.
Vegetation changes weight confidence (NDVI delta) more heavily.
"""
area = region.get("area", 0)
confidence = region.get("confidence", 0.0)
obj_type = region.get("object_type", "")
if total_pixels <= 0:
return "minor"
area_ratio = area / total_pixels
if obj_type in _VEGETATION_TYPES or "Vegetation" in (obj_type or ""):
score = area_ratio * 600 + confidence * 0.6
if score < 0.8:
return "minor"
if score < 3.0:
return "moderate"
return "major"
if obj_type in _STRUCTURAL_TYPES or obj_type in _BUILDING_TYPES:
score = area_ratio * 1200 + confidence * 0.4
if score < 1.2:
return "minor"
if score < 4.5:
return "moderate"
return "major"
score = area_ratio * 1000 + confidence * 0.3
if score < 1.0:
return "minor"
if score < 4.0:
return "moderate"
return "major"
# RGB colors for severity — high-contrast, colorblind-friendly palette
_SEVERITY_COLORS = {
"minor": (50, 205, 50), # Lime green
"moderate": (255, 165, 0), # Orange
"major": (255, 50, 50), # Bright red
}
# Maximum bounding boxes drawn on the image to avoid visual clutter
_MAX_VISIBLE_BOXES = 30
def visualize_changes(img1, img2, change_mask, regions=None, total_pixels=None):
"""
Clean visualization: subtle tinted overlay for changed pixels,
color-coded contour outlines (not filled boxes) for the top regions,
and compact numbered labels.
"""
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
if change_mask.shape[:2] != img2.shape[:2]:
change_mask = cv2.resize(change_mask, (img2.shape[1], img2.shape[0]))
overlay = img2.copy().astype(np.float32)
mask_bool = change_mask > 127
mask_float = mask_bool.astype(np.float32)
# Subtle warm tint on changed pixels (18% alpha) — enough to see, not enough to hide
tint = np.zeros_like(img2, dtype=np.float32)
tint[:, :, 0] = 255
tint[:, :, 1] = 80
alpha = 0.18
for c in range(3):
overlay[:, :, c] = (overlay[:, :, c] * (1 - mask_float * alpha)
+ tint[:, :, c] * mask_float * alpha)
overlay_uint8 = np.clip(overlay, 0, 255).astype(np.uint8)
total_px = total_pixels if total_pixels is not None else (img2.shape[0] * img2.shape[1])
if regions:
diag = np.sqrt(img2.shape[0]**2 + img2.shape[1]**2)
line_thickness = max(1, int(diag / 1100))
visible = regions[:_MAX_VISIBLE_BOXES]
for r in visible:
x, y, w, h = r["bbox"]
severity = r.get("severity") or _severity_from_region(r, total_px)
color = _SEVERITY_COLORS.get(severity, (255, 255, 255))
# Draw only the outline — no fill, keeps the image readable
cv2.rectangle(overlay_uint8, (x, y), (x + w, y + h), color, line_thickness)
# Compact label: region number in a small pill
rid = r.get("id", 0)
label = str(rid)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = max(0.32, min(0.48, w / 200))
txt_thick = 1
(tw, th), _ = cv2.getTextSize(label, font, font_scale, txt_thick)
lx = x
ly = max(th + 3, y - 3)
# Dark background pill for contrast on any terrain
cv2.rectangle(overlay_uint8,
(lx, ly - th - 2), (lx + tw + 5, ly + 1),
(30, 30, 30), cv2.FILLED)
cv2.putText(overlay_uint8, label, (lx + 2, ly - 1),
font, font_scale, color, txt_thick, cv2.LINE_AA)
return overlay_uint8
# ---------------------------------------------------------------------------
# 13. Improved object classification
# ---------------------------------------------------------------------------
def extract_advanced_features(region):
"""Extract rich features for classification: color, texture, edge, shape."""
if region.size == 0 or region.shape[0] < 3 or region.shape[1] < 3:
return None
hsv = cv2.cvtColor(region, cv2.COLOR_RGB2HSV)
lab = cv2.cvtColor(region, cv2.COLOR_RGB2LAB)
gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY).astype(np.float32)
# Color stats
mean_rgb = np.mean(region, axis=(0, 1))
std_rgb = np.std(region, axis=(0, 1))
mean_hsv = np.mean(hsv, axis=(0, 1))
mean_lab = np.mean(lab, axis=(0, 1))
total_rgb = np.sum(mean_rgb) + 1e-6
green_ratio = mean_rgb[1] / total_rgb
blue_ratio = mean_rgb[2] / total_rgb
red_ratio = mean_rgb[0] / total_rgb
# Vegetation indices
ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6)
exg = float(np.mean(compute_excess_green(region)))
# Texture
texture_std = float(np.std(gray))
lbp = compute_lbp(gray.astype(np.float32))
lbp_variance = float(np.var(lbp))
# Edges
grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
edge_mag = np.sqrt(grad_x ** 2 + grad_y ** 2)
edge_density = float(np.mean(edge_mag))
# Edge orientation histogram (structural regularity)
angles = np.arctan2(grad_y, grad_x + 1e-8)
angle_hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi))
angle_hist = angle_hist / (angle_hist.sum() + 1e-8)
orientation_entropy = -np.sum(angle_hist[angle_hist > 0] * np.log2(angle_hist[angle_hist > 0] + 1e-10))
# GLCM-like contrast (simplified: variance of neighbors)
shifted_r = np.roll(gray, 1, axis=1)
shifted_d = np.roll(gray, 1, axis=0)
glcm_contrast = float(np.mean((gray - shifted_r) ** 2 + (gray - shifted_d) ** 2))
return {
"mean_rgb": mean_rgb, "std_rgb": std_rgb, "mean_hsv": mean_hsv, "mean_lab": mean_lab,
"ndvi": ndvi, "exg": exg,
"texture_std": texture_std, "lbp_variance": lbp_variance,
"edge_density": edge_density, "orientation_entropy": orientation_entropy,
"glcm_contrast": glcm_contrast,
"color_homogeneity": float(np.mean(std_rgb)),
"brightness": float(mean_lab[0]),
"green_ratio": green_ratio, "blue_ratio": blue_ratio, "red_ratio": red_ratio,
"saturation": float(mean_hsv[1]), "hue": float(mean_hsv[0]),
}
def _is_transient_object(area, w, h, features):
"""
Filter out transient objects (people, cars, animals, shadows, etc.)
that are NOT permanent ground/structural changes.
Returns True if the region is likely transient and should be excluded.
"""
aspect_ratio = max(w, h) / max(min(w, h), 1)
# Very small regions are likely noise, people, or small vehicles
if area < 300:
return True
# Tall narrow regions (aspect > 4) are likely people or poles
if aspect_ratio > 5.0 and area < 2000:
return True
# Very high edge density + small area = likely a person or vehicle
if features["edge_density"] > 80 and area < 1500:
return True
# Extremely high texture variance in small area = likely transient clutter
if features["texture_std"] > 60 and area < 1000:
return True
return False
def _count_line_segments(gray_crop):
"""Count straight line segments using LSD — buildings have many, vegetation has few."""
if gray_crop.size == 0 or gray_crop.shape[0] < 5 or gray_crop.shape[1] < 5:
return 0, 0.0
lsd = cv2.createLineSegmentDetector(0)
lines, _, _, _ = lsd.detect(gray_crop.astype(np.uint8))
if lines is None:
return 0, 0.0
n_lines = len(lines)
total_length = sum(
np.sqrt((l[0][2] - l[0][0])**2 + (l[0][3] - l[0][1])**2)
for l in lines
)
return n_lines, float(total_length)
def _count_corners(gray_crop):
"""Count strong corners — buildings have clustered grid-like corners."""
if gray_crop.size == 0 or gray_crop.shape[0] < 5 or gray_crop.shape[1] < 5:
return 0
corners = cv2.goodFeaturesToTrack(
gray_crop.astype(np.uint8), maxCorners=100,
qualityLevel=0.05, minDistance=5)
return 0 if corners is None else len(corners)
def _rectangular_hull_ratio(gray_crop, threshold=128):
"""Ratio of non-zero area to bounding rect — buildings fill their box."""
if gray_crop.size == 0:
return 0.0
_, binary = cv2.threshold(gray_crop.astype(np.uint8), threshold, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return 0.0
biggest = max(contours, key=cv2.contourArea)
contour_area = cv2.contourArea(biggest)
_, _, rw, rh = cv2.boundingRect(biggest)
rect_area = max(rw * rh, 1)
return contour_area / rect_area
def _extract_differential_features(before_crop, after_crop):
"""Extract features from BOTH before and after crops plus their deltas."""
feat_b = extract_advanced_features(before_crop)
feat_a = extract_advanced_features(after_crop)
if feat_b is None or feat_a is None:
return None
gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY)
gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY)
lines_b, linelen_b = _count_line_segments(gray_b)
lines_a, linelen_a = _count_line_segments(gray_a)
corners_b = _count_corners(gray_b)
corners_a = _count_corners(gray_a)
hull_b = _rectangular_hull_ratio(gray_b)
hull_a = _rectangular_hull_ratio(gray_a)
lab_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2LAB).astype(np.float32)
lab_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2LAB).astype(np.float32)
lab_dist = float(np.mean(np.sqrt(np.sum((lab_a - lab_b) ** 2, axis=2))))
# Fast SSIM approximation using cv2 (avoids scikit-image dependency)
ssim_val = 1.0
try:
if gray_b.shape == gray_a.shape and gray_b.shape[0] >= 7 and gray_b.shape[1] >= 7:
C1 = (0.01 * 255) ** 2
C2 = (0.03 * 255) ** 2
fb = gray_b.astype(np.float64)
fa = gray_a.astype(np.float64)
mu_b = cv2.GaussianBlur(fb, (11, 11), 1.5)
mu_a = cv2.GaussianBlur(fa, (11, 11), 1.5)
sig_b2 = cv2.GaussianBlur(fb * fb, (11, 11), 1.5) - mu_b * mu_b
sig_a2 = cv2.GaussianBlur(fa * fa, (11, 11), 1.5) - mu_a * mu_a
sig_ba = cv2.GaussianBlur(fb * fa, (11, 11), 1.5) - mu_b * mu_a
numer = (2 * mu_b * mu_a + C1) * (2 * sig_ba + C2)
denom = (mu_b ** 2 + mu_a ** 2 + C1) * (sig_b2 + sig_a2 + C2)
ssim_map = numer / (denom + 1e-12)
ssim_val = float(np.mean(ssim_map))
except Exception:
pass
return {
"before": feat_b, "after": feat_a,
"delta_ndvi": feat_a["ndvi"] - feat_b["ndvi"],
"delta_exg": feat_a["exg"] - feat_b["exg"],
"delta_green_ratio": feat_a["green_ratio"] - feat_b["green_ratio"],
"delta_edge_density": feat_a["edge_density"] - feat_b["edge_density"],
"delta_brightness": feat_a["brightness"] - feat_b["brightness"],
"delta_texture_std": feat_a["texture_std"] - feat_b["texture_std"],
"delta_saturation": feat_a["saturation"] - feat_b["saturation"],
"delta_orientation_entropy": feat_a["orientation_entropy"] - feat_b["orientation_entropy"],
"delta_lines": lines_a - lines_b,
"delta_line_length": linelen_a - linelen_b,
"delta_corners": corners_a - corners_b,
"lines_after": lines_a, "corners_after": corners_a,
"lines_before": lines_b, "corners_before": corners_b,
"hull_ratio_before": hull_b,
"hull_ratio_after": hull_a,
"lab_color_distance": lab_dist,
"ssim": ssim_val,
}
def classify_object_type(image_region, bbox, before_region=None):
"""
Classify the type of change in a region.
When before_region is provided, uses differential (before vs after) analysis
for dramatically better accuracy. Falls back to single-image analysis otherwise.
"""
x, y, w, h = bbox
pad = 5
y1 = max(0, y - pad)
y2 = min(image_region.shape[0], y + h + pad)
x1 = max(0, x - pad)
x2 = min(image_region.shape[1], x + w + pad)
after_crop = image_region[y1:y2, x1:x2]
if after_crop.size == 0 or after_crop.shape[0] < 3 or after_crop.shape[1] < 3:
return "Unclassified", 0.0
feat_a = extract_advanced_features(after_crop)
if feat_a is None:
return "Unclassified", 0.0
area = w * h
if _is_transient_object(area, w, h, feat_a):
return None, 0.0
aspect_ratio = max(w, h) / max(min(w, h), 1)
compactness = (4 * np.pi * area) / ((2 * (w + h)) ** 2 + 1e-6)
# --- Differential classification when before image is available ---
diff = None
if before_region is not None:
by1 = max(0, y - pad)
by2 = min(before_region.shape[0], y + h + pad)
bx1 = max(0, x - pad)
bx2 = min(before_region.shape[1], x + w + pad)
before_crop = before_region[by1:by2, bx1:bx2]
if before_crop.size > 0 and before_crop.shape[0] >= 3 and before_crop.shape[1] >= 3:
diff = _extract_differential_features(before_crop, after_crop)
scores = {}
# ---- Water Body Change ----
water = 0.0
if feat_a["blue_ratio"] > 0.36:
water += 0.22
if feat_a["texture_std"] < 28:
water += 0.18
if feat_a["edge_density"] < 35:
water += 0.14
if 90 <= feat_a["hue"] <= 135:
water += 0.18
if feat_a["lbp_variance"] < 0.05:
water += 0.14
if feat_a["glcm_contrast"] < 500:
water += 0.10
if area > 800:
water += 0.04
scores["Water Body Change"] = water
# ---- Vegetation Change ----
veg = 0.0
if diff:
# Differential: detect actual vegetation gain or loss
if abs(diff["delta_ndvi"]) > 0.08:
veg += 0.25
# Excess Green Index delta — best single indicator of vegetation change
if abs(diff.get("delta_exg", 0)) > 0.04:
veg += 0.20
elif abs(diff.get("delta_exg", 0)) > 0.02:
veg += 0.10
if abs(diff["delta_green_ratio"]) > 0.04:
veg += 0.15
if diff["lab_color_distance"] > 15 and (
diff["before"]["ndvi"] > 0.05 or diff["after"]["ndvi"] > 0.05):
veg += 0.12
if abs(diff["delta_saturation"]) > 15 and (
diff["before"]["green_ratio"] > 0.34 or diff["after"]["green_ratio"] > 0.34):
veg += 0.12
if diff["delta_lines"] < 3 and diff["delta_corners"] < 5:
veg += 0.06
if area > 500:
veg += 0.04
else:
if feat_a["ndvi"] > 0.05:
veg += 0.22
if feat_a["ndvi"] > 0.15:
veg += 0.10
if feat_a["green_ratio"] > 0.36:
veg += 0.18
if 35 <= feat_a["hue"] <= 85:
veg += 0.15
if feat_a["saturation"] > 40:
veg += 0.10
if feat_a["orientation_entropy"] > 2.5:
veg += 0.05
if area > 500:
veg += 0.04
scores["Vegetation Change"] = veg
# ---- New Construction/Building ----
bld = 0.0
if diff:
# Edge density increase: strong for buildings, lower threshold to catch smaller ones
ded = diff["delta_edge_density"]
if ded > 20:
bld += 0.22
elif ded > 10:
bld += 0.16
elif ded > 5:
bld += 0.08
# More ordered structure (lower entropy = more regular geometry)
doe = diff["delta_orientation_entropy"]
if doe < -0.6:
bld += 0.15
elif doe < -0.2:
bld += 0.10
# New straight lines appearing
dl = diff["delta_lines"]
if dl > 8:
bld += 0.16
elif dl > 3:
bld += 0.10
elif dl > 1:
bld += 0.05
# New corners appearing
dc = diff["delta_corners"]
if dc > 10:
bld += 0.14
elif dc > 4:
bld += 0.10
elif dc > 1:
bld += 0.05
# Vegetation replaced by non-vegetation
if diff["after"]["ndvi"] < 0.08 and diff["before"]["ndvi"] > 0.02:
bld += 0.10
# Brightness increase (concrete/roofing vs bare ground)
if diff["delta_brightness"] > 8:
bld += 0.06
# Rectangular shape in after image
if diff["hull_ratio_after"] > 0.50:
bld += 0.10
elif diff["hull_ratio_after"] > 0.35:
bld += 0.05
# After image has structural features even if delta is modest
if diff["lines_after"] > 4 and diff["corners_after"] > 6:
bld += 0.08
# LAB color distance (significant visual change)
if diff["lab_color_distance"] > 25:
bld += 0.08
elif diff["lab_color_distance"] > 15:
bld += 0.04
# SSIM: low = big structural change; very important for building detection
ssim = diff.get("ssim", 1.0)
if ssim < 0.4:
bld += 0.14
elif ssim < 0.6:
bld += 0.10
elif ssim < 0.75:
bld += 0.05
# Low NDVI + high edge density in after = likely built structure
if diff["after"]["ndvi"] < 0.05 and diff["after"]["edge_density"] > 25:
bld += 0.08
# New rectangular shape appearing (hull increased)
hull_delta = diff["hull_ratio_after"] - diff.get("hull_ratio_before", 0)
if hull_delta > 0.2:
bld += 0.06
if 1.0 <= aspect_ratio <= 5.0:
bld += 0.06
if area > 600:
bld += 0.04
else:
if feat_a["orientation_entropy"] < 2.5:
bld += 0.18
if feat_a["color_homogeneity"] < 28:
bld += 0.15
if 1.0 <= aspect_ratio <= 5.0:
bld += 0.10
if 0.2 <= compactness <= 0.95:
bld += 0.10
if feat_a["edge_density"] > 25:
bld += 0.14
if feat_a["glcm_contrast"] > 300:
bld += 0.10
if feat_a["saturation"] < 100:
bld += 0.08
if 30 <= feat_a["brightness"] <= 95:
bld += 0.08
if area > 600:
bld += 0.05
scores["New Construction/Building"] = bld
# ---- Demolition/Clearing ----
demo = 0.0
if diff:
ded_neg = diff["delta_edge_density"]
if ded_neg < -20:
demo += 0.22
elif ded_neg < -10:
demo += 0.16
elif ded_neg < -5:
demo += 0.08
dl_neg = diff["delta_lines"]
if dl_neg < -8:
demo += 0.18
elif dl_neg < -3:
demo += 0.12
elif dl_neg < -1:
demo += 0.05
dc_neg = diff["delta_corners"]
if dc_neg < -10:
demo += 0.15
elif dc_neg < -4:
demo += 0.10
elif dc_neg < -1:
demo += 0.05
if diff["delta_texture_std"] > 8:
demo += 0.10
if diff["delta_brightness"] > 10:
demo += 0.10
# Structural features disappeared
if diff["lines_before"] > 4 and diff["lines_after"] <= 1:
demo += 0.10
# Hull ratio dropped (rectangular structure removed)
hull_drop = diff.get("hull_ratio_before", 0) - diff["hull_ratio_after"]
if hull_drop > 0.2:
demo += 0.08
# SSIM confirms big structural change
ssim = diff.get("ssim", 1.0)
if ssim < 0.5:
demo += 0.08
if diff["after"]["ndvi"] > 0.03 and diff["before"]["ndvi"] < 0.02:
demo += 0.06
if area > 500:
demo += 0.04
else:
if feat_a["texture_std"] > 30:
demo += 0.18
if feat_a["orientation_entropy"] > 2.8:
demo += 0.15
if feat_a["color_homogeneity"] > 25:
demo += 0.15
if feat_a["brightness"] > 60:
demo += 0.10
if feat_a["ndvi"] < 0.05:
demo += 0.12
if feat_a["saturation"] < 70:
demo += 0.10
if area > 800:
demo += 0.05
scores["Demolition/Clearing"] = demo
# ---- Road/Pavement Change ----
road = 0.0
if aspect_ratio > 2.5:
road += 0.22
if feat_a["color_homogeneity"] < 22:
road += 0.18
if feat_a["texture_std"] < 32:
road += 0.15
if feat_a["saturation"] < 65:
road += 0.12
if feat_a["orientation_entropy"] < 2.0:
road += 0.15
if 35 <= feat_a["brightness"] <= 75:
road += 0.10
if compactness < 0.3:
road += 0.05
if area > 600:
road += 0.03
scores["Road/Pavement Change"] = road
# ---- Temporary Structure (sheds, tents, makeshift) ----
tmp = 0.0
if diff:
ded_t = diff["delta_edge_density"]
if 3 < ded_t < 20:
tmp += 0.16
if diff["delta_lines"] > 0 and diff["delta_lines"] <= 5:
tmp += 0.12
if diff["delta_corners"] > 0 and diff["delta_corners"] <= 6:
tmp += 0.10
if diff["hull_ratio_after"] < 0.50:
tmp += 0.10
if diff["after"]["ndvi"] < 0.08:
tmp += 0.08
ssim_t = diff.get("ssim", 1.0)
if 0.3 < ssim_t < 0.7:
tmp += 0.10
if diff["lab_color_distance"] > 10:
tmp += 0.08
if 200 <= area <= 5000:
tmp += 0.08
if 1.0 <= aspect_ratio <= 3.5:
tmp += 0.06
else:
if feat_a["edge_density"] > 15 and feat_a["edge_density"] < 50:
tmp += 0.18
if feat_a["orientation_entropy"] > 2.0:
tmp += 0.12
if feat_a["color_homogeneity"] > 20:
tmp += 0.10
if feat_a["ndvi"] < 0.08:
tmp += 0.12
if 200 <= area <= 5000:
tmp += 0.10
if 1.0 <= aspect_ratio <= 3.5:
tmp += 0.08
if feat_a["saturation"] < 100:
tmp += 0.06
scores["Temporary Structure"] = tmp
# ---- Bare Land/Soil Change ----
soil = 0.0
if feat_a["red_ratio"] > 0.34 and feat_a["green_ratio"] < 0.36:
soil += 0.20
if 8 <= feat_a["hue"] <= 38:
soil += 0.18
if feat_a["ndvi"] < 0.05:
soil += 0.18
if feat_a["texture_std"] < 35:
soil += 0.12
if feat_a["lbp_variance"] < 0.04:
soil += 0.12
if 40 <= feat_a["saturation"] <= 130:
soil += 0.10
if 45 <= feat_a["brightness"] <= 82:
soil += 0.10
scores["Bare Land/Soil Change"] = soil
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.22:
return "Unclassified", conf
return best, min(conf, 1.0)
def classify_with_ensemble(image_region, bbox, before_region=None):
"""Ensemble: classify full region + sub-regions, vote with confidence weighting."""
x, y, w, h = bbox
sub_boxes = [(x, y, w, h)]
if w > 20 and h > 20:
hw, hh = w // 2, h // 2
sub_boxes += [
(x, y, hw, hh),
(x + hw, y, hw, hh),
(x, y + hh, hw, hh),
(x + hw, y + hh, hw, hh),
(x + w // 4, y + h // 4, hw, hh),
]
classifications = []
confidences = []
transient_count = 0
for sb in sub_boxes:
obj_type, conf = classify_object_type(image_region, sb,
before_region=before_region)
if obj_type is None:
transient_count += 1
continue
if obj_type != "Unclassified":
classifications.append(obj_type)
confidences.append(conf)
if transient_count > len(sub_boxes) // 2:
return None, 0.0
if not classifications:
return classify_object_type(image_region, (x, y, w, h),
before_region=before_region)
weighted = {}
counts = Counter(classifications)
for ot, c in zip(classifications, confidences):
weighted[ot] = weighted.get(ot, 0) + c
best_type = max(weighted, key=weighted.get)
avg_conf = weighted[best_type] / counts[best_type]
if counts[best_type] / len(classifications) >= 0.6:
avg_conf = min(1.0, avg_conf * 1.15)
return best_type, avg_conf
# ---------------------------------------------------------------------------
# 14. Vegetation sub-classification
# ---------------------------------------------------------------------------
_VEGETATION_TYPES = {"Vegetation Change"}
def _compute_region_greenness(crop):
"""Return (ndvi, green_ratio, mean_saturation) for an RGB crop."""
if crop.size == 0 or crop.shape[0] < 2 or crop.shape[1] < 2:
return 0.0, 0.0, 0.0
mean_rgb = np.mean(crop, axis=(0, 1)).astype(np.float64)
total = np.sum(mean_rgb) + 1e-6
green_ratio = mean_rgb[1] / total
ndvi = (mean_rgb[1] - mean_rgb[0]) / (mean_rgb[1] + mean_rgb[0] + 1e-6)
hsv = cv2.cvtColor(crop, cv2.COLOR_RGB2HSV)
mean_sat = float(np.mean(hsv[:, :, 1]))
return float(ndvi), float(green_ratio), mean_sat
def _compute_texture_regularity(gray_crop):
"""Measure how regular/grid-like the texture is (low entropy = regular crops)."""
if gray_crop.size == 0 or gray_crop.shape[0] < 3 or gray_crop.shape[1] < 3:
return 3.0
gx = cv2.Sobel(gray_crop.astype(np.float32), cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(gray_crop.astype(np.float32), cv2.CV_64F, 0, 1, ksize=3)
angles = np.arctan2(gy, gx + 1e-8)
hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi))
hist = hist / (hist.sum() + 1e-8)
entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10))
return float(entropy)
def classify_vegetation_subtype(before_img, after_img, bbox):
"""
Compare before/after crops to determine vegetation change sub-type.
Returns (subtype_name, confidence).
"""
x, y, w, h = bbox
pad = 5
y1, y2 = max(0, y - pad), min(before_img.shape[0], y + h + pad)
x1, x2 = max(0, x - pad), min(before_img.shape[1], x + w + pad)
before_crop = before_img[y1:y2, x1:x2]
after_crop = after_img[y1:y2, x1:x2]
if before_crop.size == 0 or after_crop.size == 0:
return "Vegetation Change", 0.3
ndvi_b, green_b, sat_b = _compute_region_greenness(before_crop)
ndvi_a, green_a, sat_a = _compute_region_greenness(after_crop)
gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY)
gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY)
tex_entropy_b = _compute_texture_regularity(gray_b)
tex_entropy_a = _compute_texture_regularity(gray_a)
brightness_b = float(np.mean(gray_b))
brightness_a = float(np.mean(gray_a))
ndvi_delta = ndvi_a - ndvi_b
green_delta = green_a - green_b
sat_delta = sat_a - sat_b
scores = {
"Deforestation/Tree Removal": 0.0,
"New Vegetation/Growth": 0.0,
"Crop/Agricultural Change": 0.0,
"Vegetation Health Decline": 0.0,
"Seasonal Variation": 0.0,
}
# --- Deforestation: was green, now not green ---
if ndvi_b > 0.08 and ndvi_delta < -0.06:
scores["Deforestation/Tree Removal"] += 0.30
if green_b > 0.36 and green_delta < -0.03:
scores["Deforestation/Tree Removal"] += 0.20
if brightness_a > brightness_b + 10:
scores["Deforestation/Tree Removal"] += 0.15
if sat_delta < -15:
scores["Deforestation/Tree Removal"] += 0.15
if tex_entropy_a < tex_entropy_b - 0.3:
scores["Deforestation/Tree Removal"] += 0.10
# --- New Vegetation/Growth: was bare, now green ---
if ndvi_a > 0.08 and ndvi_delta > 0.06:
scores["New Vegetation/Growth"] += 0.30
if green_a > 0.36 and green_delta > 0.03:
scores["New Vegetation/Growth"] += 0.20
if sat_delta > 15:
scores["New Vegetation/Growth"] += 0.15
if brightness_a < brightness_b - 5:
scores["New Vegetation/Growth"] += 0.10
if tex_entropy_a > tex_entropy_b + 0.2:
scores["New Vegetation/Growth"] += 0.10
# --- Crop/Agricultural Change: regular texture patterns, moderate color shift ---
is_regular = tex_entropy_b < 2.5 or tex_entropy_a < 2.5
if is_regular:
scores["Crop/Agricultural Change"] += 0.25
if 0.03 < abs(ndvi_delta) < 0.12:
scores["Crop/Agricultural Change"] += 0.20
if sat_b > 35 and sat_a > 35:
scores["Crop/Agricultural Change"] += 0.15
if abs(green_delta) < 0.04 and abs(ndvi_delta) > 0.02:
scores["Crop/Agricultural Change"] += 0.15
area = w * h
if area > 3000:
scores["Crop/Agricultural Change"] += 0.10
# --- Vegetation Health Decline: still green but browning ---
if ndvi_b > 0.05 and ndvi_a > 0.02 and ndvi_delta < -0.03:
scores["Vegetation Health Decline"] += 0.25
if green_b > 0.34 and green_a > 0.30 and green_delta < -0.02:
scores["Vegetation Health Decline"] += 0.20
if -20 < sat_delta < -3:
scores["Vegetation Health Decline"] += 0.20
if abs(brightness_a - brightness_b) < 15:
scores["Vegetation Health Decline"] += 0.10
# --- Seasonal Variation: mild shift in color/texture, both sides green ---
if ndvi_b > 0.04 and ndvi_a > 0.04 and abs(ndvi_delta) < 0.05:
scores["Seasonal Variation"] += 0.25
if abs(green_delta) < 0.03:
scores["Seasonal Variation"] += 0.20
if abs(sat_delta) < 12:
scores["Seasonal Variation"] += 0.15
if abs(brightness_a - brightness_b) < 12:
scores["Seasonal Variation"] += 0.15
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return "Vegetation Change", 0.3
return best, min(conf, 1.0)
# ---------------------------------------------------------------------------
# 15. Structural change sub-classification
# ---------------------------------------------------------------------------
_STRUCTURAL_TYPES = {"New Construction/Building", "Demolition/Clearing",
"Road/Pavement Change", "Temporary Structure"}
def _region_has_structure(crop):
"""Heuristic: does this crop contain building-like structure (edges + regularity)?"""
if crop.size == 0 or crop.shape[0] < 3 or crop.shape[1] < 3:
return False, 0.0, 0.0
gray = cv2.cvtColor(crop, cv2.COLOR_RGB2GRAY).astype(np.float32)
gx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
edge_density = float(np.mean(np.sqrt(gx**2 + gy**2)))
angles = np.arctan2(gy, gx + 1e-8)
hist, _ = np.histogram(angles, bins=8, range=(-np.pi, np.pi))
hist = hist / (hist.sum() + 1e-8)
entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0] + 1e-10))
has_structure = edge_density > 25 and entropy < 2.8
return has_structure, edge_density, entropy
def classify_structural_subtype(before_img, after_img, bbox, main_type):
"""
Compare before/after crops to determine structural change sub-type.
Returns (subtype_name, confidence).
"""
x, y, w, h = bbox
pad = 5
y1, y2 = max(0, y - pad), min(before_img.shape[0], y + h + pad)
x1, x2 = max(0, x - pad), min(before_img.shape[1], x + w + pad)
before_crop = before_img[y1:y2, x1:x2]
after_crop = after_img[y1:y2, x1:x2]
if before_crop.size == 0 or after_crop.size == 0:
return main_type, 0.3
struct_b, edge_b, ent_b = _region_has_structure(before_crop)
struct_a, edge_a, ent_a = _region_has_structure(after_crop)
gray_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2GRAY)
gray_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2GRAY)
brightness_b = float(np.mean(gray_b))
brightness_a = float(np.mean(gray_a))
texture_b = float(np.std(gray_b))
texture_a = float(np.std(gray_a))
hsv_b = cv2.cvtColor(before_crop, cv2.COLOR_RGB2HSV)
hsv_a = cv2.cvtColor(after_crop, cv2.COLOR_RGB2HSV)
sat_b = float(np.mean(hsv_b[:, :, 1]))
sat_a = float(np.mean(hsv_a[:, :, 1]))
# Check greenness to detect cleared-to-green or green-to-built transitions
mean_rgb_b = np.mean(before_crop, axis=(0, 1))
mean_rgb_a = np.mean(after_crop, axis=(0, 1))
ndvi_b = (mean_rgb_b[1] - mean_rgb_b[0]) / (mean_rgb_b[1] + mean_rgb_b[0] + 1e-6)
ndvi_a = (mean_rgb_a[1] - mean_rgb_a[0]) / (mean_rgb_a[1] + mean_rgb_a[0] + 1e-6)
area = w * h
if main_type == "Road/Pavement Change":
return _classify_road_subtype(
struct_b, struct_a, edge_b, edge_a, brightness_b, brightness_a,
texture_b, texture_a, area, w, h
)
scores = {
"New Building": 0.0,
"Building Expansion": 0.0,
"Renovation/Modification": 0.0,
"Partial Demolition": 0.0,
"Full Demolition": 0.0,
"Infrastructure Change": 0.0,
}
# --- New Building: before had no structure, after does ---
if not struct_b and struct_a:
scores["New Building"] += 0.35
if edge_a > edge_b + 15:
scores["New Building"] += 0.15
if ent_a < ent_b - 0.3:
scores["New Building"] += 0.10
if ndvi_b > 0.05 and ndvi_a < 0.03:
scores["New Building"] += 0.10
if sat_a < sat_b - 10:
scores["New Building"] += 0.10
# --- Building Expansion: both have structure but after has more ---
if struct_b and struct_a:
scores["Building Expansion"] += 0.15
if struct_b and edge_a > edge_b * 1.2:
scores["Building Expansion"] += 0.20
if struct_b and texture_a > texture_b + 5:
scores["Building Expansion"] += 0.15
if abs(ent_a - ent_b) < 0.5 and edge_a > edge_b:
scores["Building Expansion"] += 0.15
# --- Renovation/Modification: both have structure, similar density but different appearance ---
if struct_b and struct_a:
scores["Renovation/Modification"] += 0.15
if abs(edge_a - edge_b) < 12:
scores["Renovation/Modification"] += 0.15
if abs(brightness_a - brightness_b) > 8:
scores["Renovation/Modification"] += 0.20
if abs(sat_a - sat_b) > 10:
scores["Renovation/Modification"] += 0.15
if abs(texture_a - texture_b) < 10:
scores["Renovation/Modification"] += 0.10
# --- Partial Demolition: before had structure, after has less ---
if struct_b and edge_a < edge_b * 0.7:
scores["Partial Demolition"] += 0.25
if struct_b and ent_a > ent_b + 0.3:
scores["Partial Demolition"] += 0.15
if texture_a > texture_b + 8:
scores["Partial Demolition"] += 0.15
if brightness_a > brightness_b + 10:
scores["Partial Demolition"] += 0.10
# --- Full Demolition: before had structure, after is bare/empty ---
if struct_b and not struct_a:
scores["Full Demolition"] += 0.35
if edge_b > 30 and edge_a < 20:
scores["Full Demolition"] += 0.15
if texture_b > 25 and texture_a < 20:
scores["Full Demolition"] += 0.15
if brightness_a > brightness_b + 15:
scores["Full Demolition"] += 0.10
# --- Infrastructure Change: elongated shape, high edge regularity ---
aspect = max(w, h) / max(min(w, h), 1)
if aspect > 3.0:
scores["Infrastructure Change"] += 0.25
if ent_a < 2.0 or ent_b < 2.0:
scores["Infrastructure Change"] += 0.15
if area > 2000 and aspect > 2.5:
scores["Infrastructure Change"] += 0.15
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return main_type, 0.3
return best, min(conf, 1.0)
def _classify_road_subtype(struct_b, struct_a, edge_b, edge_a,
brightness_b, brightness_a, texture_b, texture_a,
area, w, h):
"""Sub-classify road/pavement changes."""
scores = {
"New Road/Pavement": 0.0,
"Road Widening": 0.0,
"Road Resurfacing": 0.0,
"Road Deterioration": 0.0,
}
if not struct_b and struct_a:
scores["New Road/Pavement"] += 0.30
if edge_a > edge_b + 10:
scores["New Road/Pavement"] += 0.20
if brightness_a < brightness_b:
scores["New Road/Pavement"] += 0.15
if struct_b and struct_a and edge_a > edge_b * 1.15:
scores["Road Widening"] += 0.30
if area > 2000:
scores["Road Widening"] += 0.15
if struct_b and struct_a and abs(edge_a - edge_b) < 10:
scores["Road Resurfacing"] += 0.20
if abs(brightness_a - brightness_b) > 12:
scores["Road Resurfacing"] += 0.25
if abs(texture_a - texture_b) < 8:
scores["Road Resurfacing"] += 0.15
if texture_a > texture_b + 10:
scores["Road Deterioration"] += 0.25
if edge_a < edge_b - 5:
scores["Road Deterioration"] += 0.20
if brightness_a > brightness_b + 8:
scores["Road Deterioration"] += 0.15
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return "Road/Pavement Change", 0.3
return best, min(conf, 1.0)
# ---------------------------------------------------------------------------
# 16. 3D Building Analysis — height estimation + construction stage
# ---------------------------------------------------------------------------
_BUILDING_TYPES = {"New Construction/Building", "Demolition/Clearing"}
_STORY_HEIGHT_M = 3.0 # assumed metres per story
def _detect_shadow_region(before_gray, after_gray, bbox, expand=0.6):
"""
Find new shadow pixels adjacent to a building bbox.
Returns a binary mask of likely shadow pixels in the expanded bbox area.
"""
x, y, w, h = bbox
img_h, img_w = after_gray.shape[:2]
# Expand bbox to capture shadows cast beside the building
ex = int(w * expand)
ey = int(h * expand)
x1 = max(0, x - ex)
y1 = max(0, y - ey)
x2 = min(img_w, x + w + ex)
y2 = min(img_h, y + h + ey)
before_crop = before_gray[y1:y2, x1:x2].astype(np.float32)
after_crop = after_gray[y1:y2, x1:x2].astype(np.float32)
if before_crop.size == 0 or after_crop.size == 0:
return None, 0
# New shadow = pixels that got significantly darker in the after image
darkening = before_crop - after_crop
dark_thresh = max(25, np.std(darkening) * 1.5)
shadow_mask = (darkening > dark_thresh).astype(np.uint8) * 255
# Remove shadow pixels inside the building footprint itself
bx1, by1 = x - x1, y - y1
bx2, by2 = bx1 + w, by1 + h
bx1, by1 = max(0, bx1), max(0, by1)
bx2 = min(shadow_mask.shape[1], bx2)
by2 = min(shadow_mask.shape[0], by2)
shadow_mask[by1:by2, bx1:bx2] = 0
# Clean noise
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
shadow_mask = cv2.morphologyEx(shadow_mask, cv2.MORPH_OPEN, kernel)
shadow_pixels = np.sum(shadow_mask > 0)
return shadow_mask, shadow_pixels
def estimate_building_height(before_img, after_img, bbox, features):
"""
Estimate building stories and height from shadow length and footprint geometry.
Returns (estimated_stories, estimated_height_m).
"""
before_gray = cv2.cvtColor(before_img, cv2.COLOR_RGB2GRAY)
after_gray = cv2.cvtColor(after_img, cv2.COLOR_RGB2GRAY)
x, y, w, h = bbox
shadow_mask, shadow_px = _detect_shadow_region(before_gray, after_gray, bbox)
short_side = max(min(w, h), 1)
footprint_area = w * h
# --- Shadow-based estimate ---
shadow_ratio = 0.0
if shadow_mask is not None and shadow_px > 20:
# Measure max extent of shadow perpendicular to building edge
coords = np.column_stack(np.where(shadow_mask > 0))
if len(coords) > 5:
# Shadow length = extent along the longer axis of shadow cluster
spread_y = coords[:, 0].max() - coords[:, 0].min()
spread_x = coords[:, 1].max() - coords[:, 1].min()
shadow_length = max(spread_y, spread_x)
shadow_ratio = shadow_length / short_side
# --- Footprint-based estimate ---
aspect = max(w, h) / max(short_side, 1)
# Compact footprints (aspect < 2.5) tend to be multi-story; elongated are single-story
footprint_factor = 1.0
if aspect > 3.0:
footprint_factor = 0.5 # likely single-story warehouse/industrial
elif aspect < 1.5 and footprint_area > 2000:
footprint_factor = 1.3 # compact large footprint = likely taller
# --- Texture regularity bonus ---
# Buildings with low orientation entropy (regular structure) tend to be taller
regularity_bonus = 0.0
if features and features.get("orientation_entropy", 3.0) < 2.2:
regularity_bonus = 0.5
# --- Combine signals ---
# Base: shadow ratio maps ~0.3-0.5 per story in typical nadir imagery
if shadow_ratio > 0.1:
raw_stories = shadow_ratio / 0.35
else:
# No clear shadow: use footprint area as rough proxy
if footprint_area > 5000:
raw_stories = 3.0
elif footprint_area > 2000:
raw_stories = 2.0
else:
raw_stories = 1.0
raw_stories = raw_stories * footprint_factor + regularity_bonus
stories = max(1, min(50, int(round(raw_stories))))
height_m = round(stories * _STORY_HEIGHT_M, 1)
return stories, height_m
def classify_construction_stage(features, bbox):
"""
Classify construction stage from visual features.
Returns (stage_name, confidence).
"""
if features is None:
return "Unknown", 0.0
w, h = bbox[2], bbox[3]
area = w * h
scores = {
"Foundation": 0.0,
"Structural": 0.0,
"Under Construction": 0.0,
"Complete": 0.0,
}
tex = features.get("texture_std", 30)
edge = features.get("edge_density", 40)
orient = features.get("orientation_entropy", 2.5)
homog = features.get("color_homogeneity", 25)
bright = features.get("brightness", 60)
sat = features.get("saturation", 50)
glcm = features.get("glcm_contrast", 500)
lbp_var = features.get("lbp_variance", 0.04)
# --- Foundation ---
# Flat, low-texture, soil/concrete colored, homogeneous
if tex < 22:
scores["Foundation"] += 0.25
if edge < 30:
scores["Foundation"] += 0.20
if homog < 20:
scores["Foundation"] += 0.20
if 40 <= bright <= 75:
scores["Foundation"] += 0.15
if sat < 60:
scores["Foundation"] += 0.10
if lbp_var < 0.03:
scores["Foundation"] += 0.10
# --- Structural/Framing ---
# High edges, geometric regularity, high contrast grid patterns
if edge > 50:
scores["Structural"] += 0.25
if orient < 2.2:
scores["Structural"] += 0.20
if glcm > 800:
scores["Structural"] += 0.20
if tex > 30:
scores["Structural"] += 0.15
if homog > 30:
scores["Structural"] += 0.10
if area > 1000:
scores["Structural"] += 0.10
# --- Under Construction ---
# Mixed materials, irregular texture, medium-high edge density
if 25 < tex < 50:
scores["Under Construction"] += 0.20
if 35 < edge < 65:
scores["Under Construction"] += 0.20
if orient > 2.6:
scores["Under Construction"] += 0.20
if homog > 25:
scores["Under Construction"] += 0.15
if 0.03 < lbp_var < 0.07:
scores["Under Construction"] += 0.15
if sat < 80:
scores["Under Construction"] += 0.10
# --- Complete ---
# Uniform roof, clean edges, low entropy, consistent color
if tex < 28:
scores["Complete"] += 0.20
if orient < 2.3:
scores["Complete"] += 0.25
if homog < 22:
scores["Complete"] += 0.20
if edge > 25:
scores["Complete"] += 0.10
if lbp_var < 0.04:
scores["Complete"] += 0.15
if bright > 50:
scores["Complete"] += 0.10
best = max(scores, key=scores.get)
conf = scores[best]
if conf < 0.25:
return "Unknown", conf
return best, min(conf, 1.0)
def analyze_building_3d(before_img, after_img, region, features):
"""
Run 3D analysis on a single building/construction region.
Enriches the region dict with stories, height, and construction stage.
"""
bbox = region["bbox"]
stories, height_m = estimate_building_height(before_img, after_img, bbox, features)
stage, stage_conf = classify_construction_stage(features, bbox)
region["estimated_stories"] = stories
region["estimated_height_m"] = height_m
region["construction_stage"] = stage
region["construction_stage_confidence"] = stage_conf
return region
# ---------------------------------------------------------------------------
# 17. Region analysis
# ---------------------------------------------------------------------------
def _tight_bbox(labels, label_id, stats_row):
"""
Compute a tighter bounding box using actual changed pixels.
Falls back to the connected-component bbox if the mask is dense enough.
"""
x = stats_row[cv2.CC_STAT_LEFT]
y = stats_row[cv2.CC_STAT_TOP]
w = stats_row[cv2.CC_STAT_WIDTH]
h = stats_row[cv2.CC_STAT_HEIGHT]
area = stats_row[cv2.CC_STAT_AREA]
fill_ratio = area / max(w * h, 1)
# If the component fills less than 20% of its bbox, compute a tighter fit
if fill_ratio < 0.20 and area > 100:
ys, xs = np.where(labels == label_id)
if len(xs) > 0:
x = int(np.min(xs))
y = int(np.min(ys))
w = int(np.max(xs) - x + 1)
h = int(np.max(ys) - y + 1)
fill_ratio = area / max(w * h, 1)
return x, y, w, h, fill_ratio
def _iou(boxA, boxB):
"""Intersection-over-union for two (x,y,w,h) boxes."""
ax1, ay1, aw, ah = boxA
bx1, by1, bw, bh = boxB
ax2, ay2 = ax1 + aw, ay1 + ah
bx2, by2 = bx1 + bw, by1 + bh
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
union = aw * ah + bw * bh - inter
return inter / max(union, 1)
def _nms_regions(regions, iou_thresh=0.45):
"""Non-maximum suppression: keep the highest-area box when two overlap."""
if len(regions) < 2:
return regions
keep = []
used = set()
for i, r in enumerate(regions):
if i in used:
continue
keep.append(r)
for j in range(i + 1, len(regions)):
if j in used:
continue
if _iou(r["bbox"], regions[j]["bbox"]) > iou_thresh:
used.add(j)
return keep
def analyze_change_regions(change_mask, image, min_area=400, use_ensemble=True,
before_img=None, registration_ok=True):
"""
Find connected change regions with strict quality filters:
- Adaptive min_area scaled to image size
- Fill-ratio filter (>= 0.12) rejects sparse noise boxes
- Tighter bounding boxes computed from actual pixel coordinates
- NMS to remove overlapping/duplicate boxes
- Max 60 regions cap to avoid flooding the UI
"""
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(
change_mask, connectivity=8)
change_regions = []
region_id = 0
img_h, img_w = change_mask.shape[:2]
img_area = img_h * img_w
# Adaptive minimum region size:
# - keeps sensitivity on smaller images
# - suppresses speckle noise on larger images
if min_area is None:
min_area = int(max(200, min(1000, img_area * 0.00008)))
for i in range(1, num_labels):
raw_area = stats[i, cv2.CC_STAT_AREA]
if raw_area < min_area:
continue
x, y, w, h, fill_ratio = _tight_bbox(labels, i, stats[i])
# Reject very sparse regions (bbox is mostly empty)
if fill_ratio < 0.12:
continue
# Keep large real changes; only suppress near-full-frame artifacts.
# When registration failed, allow larger regions to avoid missing true changes.
max_region_cover = 0.92 if not registration_ok else 0.75
if (w * h) > img_area * max_region_cover and fill_ratio < 0.35:
continue
cx, cy = centroids[i]
if use_ensemble and raw_area > 500:
object_type, confidence = classify_with_ensemble(
image, (x, y, w, h), before_region=before_img)
else:
object_type, confidence = classify_object_type(
image, (x, y, w, h), before_region=before_img)
if object_type is None:
# Do not silently drop large coherent regions; keep them as generic
# ground-change candidates so key changes are still surfaced.
if raw_area >= max(min_area * 2, 800) and fill_ratio >= 0.18:
object_type = "Unclassified Ground Change"
confidence = max(0.2, min(0.5, fill_ratio))
else:
continue
region_id += 1
region = {
"id": region_id,
"area": int(raw_area),
"bbox": (x, y, w, h),
"center": (int(cx), int(cy)),
"object_type": object_type,
"confidence": confidence,
"fill_ratio": round(fill_ratio, 3),
"sub_type": None,
"sub_type_confidence": None,
"estimated_stories": None,
"estimated_height_m": None,
"construction_stage": None,
}
if before_img is not None:
if object_type in _VEGETATION_TYPES:
sub, sub_conf = classify_vegetation_subtype(
before_img, image, (x, y, w, h))
region["sub_type"] = sub
region["sub_type_confidence"] = sub_conf
elif object_type in _STRUCTURAL_TYPES:
sub, sub_conf = classify_structural_subtype(
before_img, image, (x, y, w, h), object_type)
region["sub_type"] = sub
region["sub_type_confidence"] = sub_conf
if object_type in _BUILDING_TYPES:
pad = 5
ry1 = max(0, y - pad)
ry2 = min(image.shape[0], y + h + pad)
rx1 = max(0, x - pad)
rx2 = min(image.shape[1], x + w + pad)
crop = image[ry1:ry2, rx1:rx2]
feats = extract_advanced_features(crop) if crop.size > 0 else None
analyze_building_3d(before_img, image, region, feats)
change_regions.append(region)
# Sort by area descending, apply NMS, cap at 60
change_regions.sort(key=lambda r: r["area"], reverse=True)
change_regions = _nms_regions(change_regions, iou_thresh=0.45)
change_regions = change_regions[:60]
# Re-number after filtering
for idx, r in enumerate(change_regions, start=1):
r["id"] = idx
total_px = img_area
for r in change_regions:
r["severity"] = _severity_from_region(r, total_px)
return change_regions
# ---------------------------------------------------------------------------
# 18. Main pipeline
# ---------------------------------------------------------------------------
def run_detection(before_pil, after_pil, method="AI-Based Deep Learning",
enable_registration=True, enable_normalization=True,
detection_sensitivity=0.5, min_region_area=None):
"""Run full detection pipeline; returns change_mask, result_image, stats, regions."""
before_array = preprocess_image(before_pil)
after_array = preprocess_image(after_pil)
registration_ok = False
reg_meta = {}
if enable_registration:
before_array, after_array, registration_ok, reg_meta = register_images(
before_array, after_array)
if enable_normalization:
before_array, after_array = normalize_radiometry(before_array, after_array)
alignment_warning = None
if enable_registration and not registration_ok:
alignment_warning = ALIGNMENT_WARNING_MSG
if method == "AI-Based Deep Learning":
change_mask, threshold_debug = ai_deep_learning_method(
before_array, after_array,
sensitivity=detection_sensitivity,
registration_ok=registration_ok,
)
elif method == "Image Difference":
change_mask, threshold_debug = image_difference_method(
before_array, after_array, sensitivity=detection_sensitivity)
elif method == "Feature-Based":
change_mask = feature_based_method(
before_array, after_array, sensitivity=detection_sensitivity)
threshold_debug = {
"method": "Feature-Based",
"threshold_used": None,
"note": "KMeans clustering path does not use binary threshold.",
"sensitivity": float(detection_sensitivity),
}
elif method == "Hybrid AI":
change_mask, threshold_debug = hybrid_ai_method(
before_array, after_array,
sensitivity=detection_sensitivity,
registration_ok=registration_ok,
)
else:
change_mask, threshold_debug = hybrid_method(
before_array, after_array,
sensitivity=detection_sensitivity,
registration_ok=registration_ok,
)
total_pixels = int(change_mask.shape[0] * change_mask.shape[1])
changed_pixels_ratio = (
float(np.sum(change_mask > 127)) / float(total_pixels) if total_pixels else 0.0
)
change_regions = analyze_change_regions(
change_mask,
after_array,
min_area=min_region_area,
before_img=before_array,
registration_ok=registration_ok,
)
if (
method in ("AI-Based Deep Learning", "Hybrid Approach", "Hybrid AI")
and len(change_regions) == 0
and registration_ok
and changed_pixels_ratio == 0.0
):
diff_mask, diff_debug = image_difference_method(
before_array, after_array, sensitivity=detection_sensitivity)
diff_regions = analyze_change_regions(
diff_mask, after_array, min_area=min_region_area,
before_img=before_array, registration_ok=registration_ok,
)
if len(diff_regions) > 0:
change_mask = diff_mask
change_regions = diff_regions
threshold_debug = {
"method": f"{method} (fallback->Image Difference)",
"fallback_used": True,
"diff_debug": diff_debug,
"sensitivity": float(detection_sensitivity),
}
total_pixels = int(change_mask.shape[0] * change_mask.shape[1])
result_image = visualize_changes(
before_array, after_array, change_mask,
regions=change_regions, total_pixels=total_pixels,
)
changed_pixels = int(np.sum(change_mask > 127))
change_pct = (changed_pixels / total_pixels * 100.0) if total_pixels else 0.0
stats = {
"total_pixels": total_pixels,
"changed_pixels": changed_pixels,
"unchanged_pixels": total_pixels - changed_pixels,
"change_percentage": change_pct,
"image_width": change_mask.shape[1],
"image_height": change_mask.shape[0],
"threshold_debug": threshold_debug,
"alignment_warning": alignment_warning,
"params": {
"detection_sensitivity": float(detection_sensitivity),
"min_region_area": min_region_area,
"enable_registration": bool(enable_registration),
"enable_normalization": bool(enable_normalization),
"registration_ok": bool(registration_ok),
"registration": reg_meta,
},
}
return change_mask, result_image, stats, change_regions