flood-risk-detection / app /model_utils.py
Kinzaaa
Initial deployment: Attention UNet + ZoeDepth + Grad-CAM + Risk Assessment
3fd6082
"""
model_utils.py β€” loads the .pkl metadata + .keras model and exposes inference helpers.
ZoeDepth is loaded from HuggingFace Hub (CPU) for flood depth estimation.
PKL structure (from flood_model_production.pkl):
model_name, model_keras_path, input_shape, output_shape, threshold,
preprocessing, gradcam_layer, metrics, risk_thresholds, custom_objects_keys
"""
import os
import pickle
import numpy as np
import cv2
import tensorflow as tf
from PIL import Image
import torch
# ── HuggingFace token (set via env var β€” never hardcode in production) ─────────
HF_TOKEN = os.environ.get("HF_TOKEN", "")
# ── Custom Keras objects ───────────────────────────────────────────────────────
def iou_metric(y_true, y_pred):
y_pred = tf.cast(y_pred > 0.5, tf.float32)
intersection = tf.reduce_sum(y_true * y_pred)
union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection
return intersection / (union + 1e-7)
def dice_loss(y_true, y_pred, smooth=1e-7):
intersection = tf.reduce_sum(y_true * y_pred)
return 1.0 - (2.0 * intersection + smooth) / (
tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth
)
def bce_dice_loss(y_true, y_pred):
bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
dice = dice_loss(y_true, y_pred)
return bce + dice
CUSTOM_OBJECTS = {
"iou_metric": iou_metric,
"dice_loss": dice_loss,
"bce_dice_loss": bce_dice_loss,
}
IMAGE_SIZE = (512, 512)
# ── Singletons ─────────────────────────────────────────────────────────────────
_seg_model = None # Attention UNet (TF/Keras)
_zoe_model = None # ZoeDepth (PyTorch, CPU)
_metadata = None # dict from pkl
# ── Metadata ───────────────────────────────────────────────────────────────────
def _load_metadata() -> dict:
global _metadata
if _metadata is not None:
return _metadata
pkl_path = os.environ.get("MODEL_PKL", "models/flood_model_production.pkl")
if os.path.exists(pkl_path):
with open(pkl_path, "rb") as f:
_metadata = pickle.load(f)
else:
_metadata = {}
return _metadata
# ── Attention UNet loader ──────────────────────────────────────────────────────
def load_seg_model():
global _seg_model
if _seg_model is not None:
return _seg_model
keras_path = os.environ.get("MODEL_KERAS", "models/flood_best_model_export.keras")
if not os.path.exists(keras_path):
raise FileNotFoundError(
f"Keras model not found at '{keras_path}'. "
"Set MODEL_KERAS env var to the correct path."
)
_seg_model = tf.keras.models.load_model(keras_path, custom_objects=CUSTOM_OBJECTS)
return _seg_model
# ── ZoeDepth loader (CPU, HF Hub) ─────────────────────────────────────────────
def load_zoe_model():
"""
Loads ZoeDepth (ZoeD_N) from HuggingFace Hub on CPU.
Uses the transformers pipeline for simplicity and reliability.
Model: Intel/zoedepth-nyu (indoor/outdoor depth estimation)
"""
global _zoe_model
if _zoe_model is not None:
return _zoe_model
from transformers import pipeline as hf_pipeline
token = HF_TOKEN or os.environ.get("HF_TOKEN", "")
print("[ZoeDepth] Loading from HuggingFace Hub (CPU)...")
_zoe_model = hf_pipeline(
task="depth-estimation",
model="Intel/zoedepth-nyu",
device="cpu", # force CPU
token=token if token else None,
)
print("[ZoeDepth] Loaded OK.")
return _zoe_model
# ── Preprocessing ──────────────────────────────────────────────────────────────
def preprocess_image(image: Image.Image) -> np.ndarray:
"""PIL Image β†’ normalised (1, 512, 512, 3) float32 array."""
img = image.convert("RGB").resize(IMAGE_SIZE)
arr = np.array(img, dtype=np.float32) / 255.0
return np.expand_dims(arr, axis=0) # (1, H, W, 3)
# ── OOD / scene validation ─────────────────────────────────────────────────────
def _is_flood_scene(image: Image.Image, pred: np.ndarray, threshold: float) -> tuple:
"""
Multi-signal check to detect out-of-distribution (non-flood) images.
Signals used:
1. Colour distribution β€” flood/water images have blue/grey/brown tones.
Non-flood scenes (cars, indoor, etc.) have very different hue distributions.
2. Spatial coherence β€” real flood masks are spatially connected large blobs.
OOD masks are fragmented noise scattered across the image.
3. Raw flood coverage β€” if >85% of image is "flooded" it's almost certainly OOD
(real floods rarely cover the entire frame uniformly).
Returns:
is_valid : bool β€” True if image looks like a flood scene
reason : str β€” explanation if invalid
"""
img_rgb = np.array(image.convert("RGB").resize((512, 512)), dtype=np.float32)
mask = (pred > threshold).astype(np.uint8)
# ── Signal 1: colour check ─────────────────────────────────────────────────
# Flood scenes have significant blue/grey channel presence.
# Compute mean of blue channel relative to red β€” water is blue-dominant.
r_mean = img_rgb[:, :, 0].mean()
g_mean = img_rgb[:, :, 1].mean()
b_mean = img_rgb[:, :, 2].mean()
brightness = (r_mean + g_mean + b_mean) / 3.0
# Very dark images (night/indoor) or very bright (overexposed) are suspicious
if brightness < 15 or brightness > 240:
return False, "Image too dark or too bright for flood analysis"
# ── Signal 2: spatial coherence of the predicted mask ─────────────────────
# Real flood masks = large connected regions. OOD = scattered small blobs.
flood_pct = float(mask.mean() * 100)
if flood_pct > 0.5: # only check if there's something to check
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
mask, connectivity=8
)
if num_labels > 1:
# Largest component area (excluding background label 0)
component_areas = stats[1:, cv2.CC_STAT_AREA]
largest_area = int(component_areas.max())
total_flood_px = int(mask.sum())
# Coherence = fraction of flood pixels in the largest blob
coherence = largest_area / (total_flood_px + 1e-8)
# If flood pixels are highly fragmented (coherence < 0.3 AND many components)
# it's likely OOD noise
if coherence < 0.25 and num_labels > 50:
return False, f"Flood mask is fragmented ({num_labels} disconnected regions) β€” likely not a flood scene"
# ── Signal 3: implausibly high coverage ───────────────────────────────────
if flood_pct > 90:
return False, f"Flood coverage {flood_pct:.1f}% is implausibly high β€” image may not be a flood scene"
# ── Signal 4: colour-coverage cross-check ─────────────────────────────────
# High flood coverage (>40%) on an image that is NOT predominantly
# blue/grey/brown (water colours) is suspicious.
if flood_pct > 40:
# Convert to HSV to check hue distribution
img_uint8 = img_rgb.astype(np.uint8)
hsv = cv2.cvtColor(img_uint8, cv2.COLOR_RGB2HSV)
hue = hsv[:, :, 0] # 0-179 in OpenCV
sat = hsv[:, :, 1] # 0-255
# Water/flood hues: blue (100-130), grey (any hue, low sat), brown (10-20)
blue_mask = ((hue >= 100) & (hue <= 130) & (sat > 30))
grey_mask = (sat < 40)
brown_mask = ((hue >= 8) & (hue <= 22) & (sat > 40))
water_px = float((blue_mask | grey_mask | brown_mask).mean() * 100)
# If less than 15% of the image has water-like colours but >40% is "flooded"
# β†’ almost certainly OOD (car, building, vegetation, etc.)
if water_px < 15:
return False, (
f"High flood coverage ({flood_pct:.1f}%) but only {water_px:.1f}% "
"water-like colours detected β€” image does not appear to be a flood scene"
)
return True, ""
# ── Segmentation prediction ────────────────────────────────────────────────────
def predict_mask(image: Image.Image) -> tuple:
"""
Returns:
mask : (H, W) binary float32 array (0 or 1)
flood_pct : percentage of image covered by flood (0-100)
confidence : float 0-100 (display percentage)
low_conf : bool, True if OOD detected
ood_reason : str, explanation if OOD
"""
meta = _load_metadata()
threshold = meta.get("threshold", 0.5)
model = load_seg_model()
inp = preprocess_image(image)
pred = model.predict(inp, verbose=0)[0, :, :, 0] # (H, W)
# Raw confidence metric (how decisive the predictions are)
confidence_raw = float(np.mean(np.abs(pred - 0.5)) * 2)
# OOD detection using multi-signal heuristics
is_valid, ood_reason = _is_flood_scene(image, pred, threshold)
if not is_valid:
mask = np.zeros_like(pred, dtype=np.float32)
flood_pct = 0.0
low_conf = True
else:
mask = (pred > threshold).astype(np.float32)
flood_pct = float(mask.mean() * 100)
low_conf = False
confidence_pct = round(confidence_raw * 100, 1)
return mask, flood_pct, confidence_pct, low_conf, ood_reason
# ── Grad-CAM ───────────────────────────────────────────────────────────────────
def compute_gradcam(image: Image.Image) -> np.ndarray:
"""
Returns an RGB uint8 heatmap overlaid on the original image.
Uses conv2d_130 (from PKL) β€” the last conv before the output head.
"""
meta = _load_metadata()
model = load_seg_model()
inp = preprocess_image(image)
target_layer = meta.get("gradcam_layer", "conv2d_130")
layer_names = [l.name for l in model.layers]
if target_layer not in layer_names:
# fallback to last Conv2D
target_layer = None
for layer in reversed(model.layers):
if isinstance(layer, tf.keras.layers.Conv2D):
target_layer = layer.name
break
if target_layer is None:
return np.array(image.convert("RGB").resize(IMAGE_SIZE))
grad_model = tf.keras.models.Model(
inputs=model.inputs,
outputs=[model.get_layer(target_layer).output, model.output]
)
with tf.GradientTape() as tape:
inp_tensor = tf.cast(inp, tf.float32)
conv_out, predictions = grad_model(inp_tensor)
loss = tf.reduce_mean(predictions)
grads = tape.gradient(loss, conv_out)
pooled = tf.reduce_mean(grads, axis=(0, 1, 2))
cam = tf.reduce_sum(conv_out[0] * pooled, axis=-1).numpy()
cam = np.maximum(cam, 0)
cam = cam / (cam.max() + 1e-8)
# Smooth for the classic soft-blob look (like the reference image)
cam_resized = cv2.resize(cam, IMAGE_SIZE)
cam_smooth = cv2.GaussianBlur(cam_resized, (15, 15), 0)
# JET: blue(low) β†’ cyan β†’ green β†’ yellow β†’ red(high) β€” exactly the reference
heatmap_bgr = cv2.applyColorMap(np.uint8(255 * cam_smooth), cv2.COLORMAP_JET)
heatmap_rgb = cv2.cvtColor(heatmap_bgr, cv2.COLOR_BGR2RGB).astype(np.float32)
# Blend 55% heatmap over original β€” enough to see the image underneath
orig = np.array(image.convert("RGB").resize(IMAGE_SIZE), dtype=np.float32)
blended = (orig * 0.45 + heatmap_rgb * 0.55).clip(0, 255).astype(np.uint8)
return blended
# ── ZoeDepth flood depth estimation ───────────────────────────────────────────
def estimate_flood_depth(image: Image.Image, mask: np.ndarray) -> dict:
"""
Runs ZoeDepth on the image, then analyses depth only within flooded pixels.
Returns a dict with:
depth_map_vis : (H, W, 3) uint8 β€” colourised depth map
depth_overlay : (H, W, 3) uint8 β€” depth map masked to flood region
avg_depth_m : float β€” mean depth in flooded area (metres)
max_depth_m : float β€” max depth in flooded area (metres)
depth_category : str β€” shallow / moderate / deep / very deep
"""
zoe = load_zoe_model()
# ZoeDepth expects a PIL RGB image (any size β€” it handles resize internally)
rgb_img = image.convert("RGB")
# Run depth estimation
depth_result = zoe(rgb_img)
depth_pil = depth_result["depth"] # PIL Image (grayscale, float-like)
depth_arr = np.array(depth_pil, dtype=np.float32) # (H, W)
# Resize depth map to 512Γ—512 to match mask
depth_512 = cv2.resize(depth_arr, IMAGE_SIZE, interpolation=cv2.INTER_LINEAR)
# ── Normalise to [0, 1] relative scale ────────────────────────────────────
# ZoeDepth returns relative disparity values (not calibrated metres).
# We normalise across the whole image so values are comparable.
d_min, d_max = depth_512.min(), depth_512.max()
depth_norm = (depth_512 - d_min) / (d_max - d_min + 1e-8) # 0β†’1
# Map relative depth to an estimated flood depth in metres (0–3 m scale).
# Higher relative depth in flooded pixels β†’ deeper water estimate.
FLOOD_DEPTH_MAX_M = 3.0
depth_metres = depth_norm * FLOOD_DEPTH_MAX_M # (H, W), values 0–3 m
# Colourised full depth map (PLASMA colormap)
depth_vis = cv2.applyColorMap(np.uint8(255 * depth_norm), cv2.COLORMAP_PLASMA)
depth_vis = cv2.cvtColor(depth_vis, cv2.COLOR_BGR2RGB)
# Depth stats within flooded pixels only
flood_mask_bool = mask.astype(bool)
flooded_depths = depth_metres[flood_mask_bool]
if len(flooded_depths) == 0 or flood_mask_bool.sum() < 50:
# Fewer than 50 flooded pixels β†’ treat as no flood
avg_depth_m = 0.0
max_depth_m = 0.0
else:
avg_depth_m = float(flooded_depths.mean())
max_depth_m = float(flooded_depths.max())
# Depth category (ZoeDepth outputs relative depth in metres approx)
if avg_depth_m < 0.3:
depth_category = "Shallow (< 30 cm)"
elif avg_depth_m < 0.8:
depth_category = "Moderate (30 cm – 80 cm)"
elif avg_depth_m < 1.5:
depth_category = "Deep (80 cm – 1.5 m)"
else:
depth_category = "Very Deep (> 1.5 m β€” life-threatening)"
# Flood-region depth overlay: blend depth_vis with blue flood mask
flood_depth_overlay = depth_vis.copy()
# Highlight non-flooded areas in grey
grey = np.full_like(depth_vis, 128)
flood_depth_overlay[~flood_mask_bool] = grey[~flood_mask_bool]
return {
"depth_map_vis": depth_vis, # (H, W, 3) uint8
"depth_overlay": flood_depth_overlay, # (H, W, 3) uint8
"avg_depth_m": round(avg_depth_m, 3),
"max_depth_m": round(max_depth_m, 3),
"depth_category": depth_category,
}
# ── Risk assessment (uses thresholds from PKL + ZoeDepth depth) ───────────────
def assess_risk(flood_pct: float, avg_depth_m: float = 0.0) -> dict:
"""
Combined risk from flood coverage % AND average depth from ZoeDepth.
Thresholds from PKL:
low : flood_pct < 15 AND avg_depth_m < 0.80
medium : flood_pct < 35 AND avg_depth_m < 1.50
high : flood_pct < 60 AND avg_depth_m < 2.20
critical : above all
"""
meta = _load_metadata()
thresholds = meta.get("risk_thresholds", {
"low": {"max_flood_pct": 15, "max_avg_depth_m": 80},
"medium": {"max_flood_pct": 35, "max_avg_depth_m": 150},
"high": {"max_flood_pct": 60, "max_avg_depth_m": 220},
"critical": {"above_all": True},
})
# PKL stores depth in cm β€” convert to metres for comparison
low_pct = thresholds.get("low", {}).get("max_flood_pct", 15)
med_pct = thresholds.get("medium", {}).get("max_flood_pct", 35)
high_pct = thresholds.get("high", {}).get("max_flood_pct", 60)
# Depth thresholds in metres (realistic flood water levels)
low_dep = 0.3 # < 30 cm β†’ Low
med_dep = 0.8 # < 80 cm β†’ Moderate
high_dep = 1.5 # < 150 cm β†’ High (> 1.5 m β†’ Critical)
# Depth only contributes to risk when there is meaningful flood coverage.
# If flood_pct < 2%, ignore depth entirely (noise / false positives).
effective_depth = avg_depth_m if flood_pct >= 2.0 else 0.0
def _level_from_pct(p):
if p < low_pct: return 0
if p < med_pct: return 1
if p < high_pct: return 2
return 3
def _level_from_dep(d):
if d < low_dep: return 0
if d < med_dep: return 1
if d < high_dep: return 2
return 3
level_idx = max(_level_from_pct(flood_pct), _level_from_dep(effective_depth))
levels = ["Low", "Moderate", "High", "Critical"]
colours = ["#2ecc71", "#f39c12", "#e67e22", "#e74c3c"]
level = levels[level_idx]
colour = colours[level_idx]
# Score: flood coverage drives 70%, depth drives 30%
pct_score = min(flood_pct / 100 * 100, 100)
depth_score = min(effective_depth / 3.0 * 100, 100) # 3 m = max realistic scale
score = round(pct_score * 0.7 + depth_score * 0.3, 1)
recs_map = {
"Low": [
"Monitor water levels periodically.",
"Ensure drainage channels are clear.",
"No immediate evacuation required.",
],
"Moderate": [
"Alert local emergency services.",
"Move valuables to higher ground.",
"Prepare emergency kit.",
"Monitor weather forecasts closely.",
],
"High": [
"Initiate partial evacuation of vulnerable populations.",
"Deploy flood barriers where possible.",
"Activate emergency response teams.",
"Avoid flooded roads and areas.",
],
"Critical": [
"IMMEDIATE EVACUATION REQUIRED.",
"Contact emergency services (911 / local disaster hotline).",
"Do not attempt to cross flooded areas.",
"Seek shelter on highest available ground.",
"Follow official evacuation routes only.",
],
}
deployed_metrics = meta.get("metrics", {}).get("deployed", {})
return {
"risk_level": level,
"risk_score": score,
"colour": colour,
"flood_pct": round(flood_pct, 2),
"avg_depth_m": round(avg_depth_m, 3),
"recommendations": recs_map[level],
"model_metrics": deployed_metrics,
}
# ── Full pipeline ──────────────────────────────────────────────────────────────
def run_pipeline(image: Image.Image) -> dict:
"""
End-to-end:
1. Attention UNet β†’ flood mask (with confidence gate)
2. Grad-CAM β†’ explainability heatmap
3. ZoeDepth β†’ per-pixel depth map, flood depth stats
4. Risk assessment β†’ level, score, recommendations
"""
# Step 1 β€” segmentation + OOD detection
mask, flood_pct, confidence, low_conf, ood_reason = predict_mask(image)
# Step 2 β€” Grad-CAM
gradcam_img = compute_gradcam(image)
# Step 3 β€” ZoeDepth depth estimation
depth_info = estimate_flood_depth(image, mask)
# Step 4 β€” risk
risk = assess_risk(flood_pct, avg_depth_m=depth_info["avg_depth_m"])
# Add OOD info to risk dict for display
risk["confidence"] = confidence
risk["low_conf"] = low_conf
risk["warning"] = (
f"⚠️ Out-of-distribution image detected: {ood_reason}. "
"Results suppressed."
) if low_conf else ""
# Blue-tinted flood overlay on original
orig_arr = np.array(image.convert("RGB").resize(IMAGE_SIZE), dtype=np.uint8)
mask_3ch = np.stack([mask * 0, mask * 100, mask * 255], axis=-1).astype(np.uint8)
overlay = cv2.addWeighted(orig_arr, 0.7, mask_3ch, 0.3, 0)
return {
"mask": mask,
"overlay": overlay,
"gradcam": gradcam_img,
"depth_map": depth_info["depth_map_vis"],
"depth_overlay": depth_info["depth_overlay"],
"depth_info": depth_info,
"risk": risk,
}