""" model_utils.py — loads the .pkl metadata + .keras model and exposes inference helpers. ZoeDepth is loaded from HuggingFace Hub (CPU) for flood depth estimation. PKL structure (from flood_model_production.pkl): model_name, model_keras_path, input_shape, output_shape, threshold, preprocessing, gradcam_layer, metrics, risk_thresholds, custom_objects_keys """ import os import pickle import numpy as np import cv2 import tensorflow as tf from PIL import Image import torch # ── HuggingFace token (set via env var — never hardcode in production) ───────── HF_TOKEN = os.environ.get("HF_TOKEN", "") # ── Custom Keras objects ─────────────────────────────────────────────────────── def iou_metric(y_true, y_pred): y_pred = tf.cast(y_pred > 0.5, tf.float32) intersection = tf.reduce_sum(y_true * y_pred) union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection return intersection / (union + 1e-7) def dice_loss(y_true, y_pred, smooth=1e-7): intersection = tf.reduce_sum(y_true * y_pred) return 1.0 - (2.0 * intersection + smooth) / ( tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth ) def bce_dice_loss(y_true, y_pred): bce = tf.keras.losses.binary_crossentropy(y_true, y_pred) dice = dice_loss(y_true, y_pred) return bce + dice CUSTOM_OBJECTS = { "iou_metric": iou_metric, "dice_loss": dice_loss, "bce_dice_loss": bce_dice_loss, } IMAGE_SIZE = (512, 512) # ── Singletons ───────────────────────────────────────────────────────────────── _seg_model = None # Attention UNet (TF/Keras) _zoe_model = None # ZoeDepth (PyTorch, CPU) _metadata = None # dict from pkl # ── Metadata ─────────────────────────────────────────────────────────────────── def _load_metadata() -> dict: global _metadata if _metadata is not None: return _metadata pkl_path = os.environ.get("MODEL_PKL", "models/flood_model_production.pkl") if os.path.exists(pkl_path): with open(pkl_path, "rb") as f: _metadata = pickle.load(f) else: _metadata = {} return _metadata # ── Attention UNet loader ────────────────────────────────────────────────────── def load_seg_model(): global _seg_model if _seg_model is not None: return _seg_model keras_path = os.environ.get("MODEL_KERAS", "models/flood_best_model_export.keras") if not os.path.exists(keras_path): raise FileNotFoundError( f"Keras model not found at '{keras_path}'. " "Set MODEL_KERAS env var to the correct path." ) _seg_model = tf.keras.models.load_model(keras_path, custom_objects=CUSTOM_OBJECTS) return _seg_model # ── ZoeDepth loader (CPU, HF Hub) ───────────────────────────────────────────── def load_zoe_model(): """ Loads ZoeDepth (ZoeD_N) from HuggingFace Hub on CPU. Uses the transformers pipeline for simplicity and reliability. Model: Intel/zoedepth-nyu (indoor/outdoor depth estimation) """ global _zoe_model if _zoe_model is not None: return _zoe_model from transformers import pipeline as hf_pipeline token = HF_TOKEN or os.environ.get("HF_TOKEN", "") print("[ZoeDepth] Loading from HuggingFace Hub (CPU)...") _zoe_model = hf_pipeline( task="depth-estimation", model="Intel/zoedepth-nyu", device="cpu", # force CPU token=token if token else None, ) print("[ZoeDepth] Loaded OK.") return _zoe_model # ── Preprocessing ────────────────────────────────────────────────────────────── def preprocess_image(image: Image.Image) -> np.ndarray: """PIL Image → normalised (1, 512, 512, 3) float32 array.""" img = image.convert("RGB").resize(IMAGE_SIZE) arr = np.array(img, dtype=np.float32) / 255.0 return np.expand_dims(arr, axis=0) # (1, H, W, 3) # ── OOD / scene validation ───────────────────────────────────────────────────── def _is_flood_scene(image: Image.Image, pred: np.ndarray, threshold: float) -> tuple: """ Multi-signal check to detect out-of-distribution (non-flood) images. Signals used: 1. Colour distribution — flood/water images have blue/grey/brown tones. Non-flood scenes (cars, indoor, etc.) have very different hue distributions. 2. Spatial coherence — real flood masks are spatially connected large blobs. OOD masks are fragmented noise scattered across the image. 3. Raw flood coverage — if >85% of image is "flooded" it's almost certainly OOD (real floods rarely cover the entire frame uniformly). Returns: is_valid : bool — True if image looks like a flood scene reason : str — explanation if invalid """ img_rgb = np.array(image.convert("RGB").resize((512, 512)), dtype=np.float32) mask = (pred > threshold).astype(np.uint8) # ── Signal 1: colour check ───────────────────────────────────────────────── # Flood scenes have significant blue/grey channel presence. # Compute mean of blue channel relative to red — water is blue-dominant. r_mean = img_rgb[:, :, 0].mean() g_mean = img_rgb[:, :, 1].mean() b_mean = img_rgb[:, :, 2].mean() brightness = (r_mean + g_mean + b_mean) / 3.0 # Very dark images (night/indoor) or very bright (overexposed) are suspicious if brightness < 15 or brightness > 240: return False, "Image too dark or too bright for flood analysis" # ── Signal 2: spatial coherence of the predicted mask ───────────────────── # Real flood masks = large connected regions. OOD = scattered small blobs. flood_pct = float(mask.mean() * 100) if flood_pct > 0.5: # only check if there's something to check num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( mask, connectivity=8 ) if num_labels > 1: # Largest component area (excluding background label 0) component_areas = stats[1:, cv2.CC_STAT_AREA] largest_area = int(component_areas.max()) total_flood_px = int(mask.sum()) # Coherence = fraction of flood pixels in the largest blob coherence = largest_area / (total_flood_px + 1e-8) # If flood pixels are highly fragmented (coherence < 0.3 AND many components) # it's likely OOD noise if coherence < 0.25 and num_labels > 50: return False, f"Flood mask is fragmented ({num_labels} disconnected regions) — likely not a flood scene" # ── Signal 3: implausibly high coverage ─────────────────────────────────── if flood_pct > 90: return False, f"Flood coverage {flood_pct:.1f}% is implausibly high — image may not be a flood scene" # ── Signal 4: colour-coverage cross-check ───────────────────────────────── # High flood coverage (>40%) on an image that is NOT predominantly # blue/grey/brown (water colours) is suspicious. if flood_pct > 40: # Convert to HSV to check hue distribution img_uint8 = img_rgb.astype(np.uint8) hsv = cv2.cvtColor(img_uint8, cv2.COLOR_RGB2HSV) hue = hsv[:, :, 0] # 0-179 in OpenCV sat = hsv[:, :, 1] # 0-255 # Water/flood hues: blue (100-130), grey (any hue, low sat), brown (10-20) blue_mask = ((hue >= 100) & (hue <= 130) & (sat > 30)) grey_mask = (sat < 40) brown_mask = ((hue >= 8) & (hue <= 22) & (sat > 40)) water_px = float((blue_mask | grey_mask | brown_mask).mean() * 100) # If less than 15% of the image has water-like colours but >40% is "flooded" # → almost certainly OOD (car, building, vegetation, etc.) if water_px < 15: return False, ( f"High flood coverage ({flood_pct:.1f}%) but only {water_px:.1f}% " "water-like colours detected — image does not appear to be a flood scene" ) return True, "" # ── Segmentation prediction ──────────────────────────────────────────────────── def predict_mask(image: Image.Image) -> tuple: """ Returns: mask : (H, W) binary float32 array (0 or 1) flood_pct : percentage of image covered by flood (0-100) confidence : float 0-100 (display percentage) low_conf : bool, True if OOD detected ood_reason : str, explanation if OOD """ meta = _load_metadata() threshold = meta.get("threshold", 0.5) model = load_seg_model() inp = preprocess_image(image) pred = model.predict(inp, verbose=0)[0, :, :, 0] # (H, W) # Raw confidence metric (how decisive the predictions are) confidence_raw = float(np.mean(np.abs(pred - 0.5)) * 2) # OOD detection using multi-signal heuristics is_valid, ood_reason = _is_flood_scene(image, pred, threshold) if not is_valid: mask = np.zeros_like(pred, dtype=np.float32) flood_pct = 0.0 low_conf = True else: mask = (pred > threshold).astype(np.float32) flood_pct = float(mask.mean() * 100) low_conf = False confidence_pct = round(confidence_raw * 100, 1) return mask, flood_pct, confidence_pct, low_conf, ood_reason # ── Grad-CAM ─────────────────────────────────────────────────────────────────── def compute_gradcam(image: Image.Image) -> np.ndarray: """ Returns an RGB uint8 heatmap overlaid on the original image. Uses conv2d_130 (from PKL) — the last conv before the output head. """ meta = _load_metadata() model = load_seg_model() inp = preprocess_image(image) target_layer = meta.get("gradcam_layer", "conv2d_130") layer_names = [l.name for l in model.layers] if target_layer not in layer_names: # fallback to last Conv2D target_layer = None for layer in reversed(model.layers): if isinstance(layer, tf.keras.layers.Conv2D): target_layer = layer.name break if target_layer is None: return np.array(image.convert("RGB").resize(IMAGE_SIZE)) grad_model = tf.keras.models.Model( inputs=model.inputs, outputs=[model.get_layer(target_layer).output, model.output] ) with tf.GradientTape() as tape: inp_tensor = tf.cast(inp, tf.float32) conv_out, predictions = grad_model(inp_tensor) loss = tf.reduce_mean(predictions) grads = tape.gradient(loss, conv_out) pooled = tf.reduce_mean(grads, axis=(0, 1, 2)) cam = tf.reduce_sum(conv_out[0] * pooled, axis=-1).numpy() cam = np.maximum(cam, 0) cam = cam / (cam.max() + 1e-8) # Smooth for the classic soft-blob look (like the reference image) cam_resized = cv2.resize(cam, IMAGE_SIZE) cam_smooth = cv2.GaussianBlur(cam_resized, (15, 15), 0) # JET: blue(low) → cyan → green → yellow → red(high) — exactly the reference heatmap_bgr = cv2.applyColorMap(np.uint8(255 * cam_smooth), cv2.COLORMAP_JET) heatmap_rgb = cv2.cvtColor(heatmap_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) # Blend 55% heatmap over original — enough to see the image underneath orig = np.array(image.convert("RGB").resize(IMAGE_SIZE), dtype=np.float32) blended = (orig * 0.45 + heatmap_rgb * 0.55).clip(0, 255).astype(np.uint8) return blended # ── ZoeDepth flood depth estimation ─────────────────────────────────────────── def estimate_flood_depth(image: Image.Image, mask: np.ndarray) -> dict: """ Runs ZoeDepth on the image, then analyses depth only within flooded pixels. Returns a dict with: depth_map_vis : (H, W, 3) uint8 — colourised depth map depth_overlay : (H, W, 3) uint8 — depth map masked to flood region avg_depth_m : float — mean depth in flooded area (metres) max_depth_m : float — max depth in flooded area (metres) depth_category : str — shallow / moderate / deep / very deep """ zoe = load_zoe_model() # ZoeDepth expects a PIL RGB image (any size — it handles resize internally) rgb_img = image.convert("RGB") # Run depth estimation depth_result = zoe(rgb_img) depth_pil = depth_result["depth"] # PIL Image (grayscale, float-like) depth_arr = np.array(depth_pil, dtype=np.float32) # (H, W) # Resize depth map to 512×512 to match mask depth_512 = cv2.resize(depth_arr, IMAGE_SIZE, interpolation=cv2.INTER_LINEAR) # ── Normalise to [0, 1] relative scale ──────────────────────────────────── # ZoeDepth returns relative disparity values (not calibrated metres). # We normalise across the whole image so values are comparable. d_min, d_max = depth_512.min(), depth_512.max() depth_norm = (depth_512 - d_min) / (d_max - d_min + 1e-8) # 0→1 # Map relative depth to an estimated flood depth in metres (0–3 m scale). # Higher relative depth in flooded pixels → deeper water estimate. FLOOD_DEPTH_MAX_M = 3.0 depth_metres = depth_norm * FLOOD_DEPTH_MAX_M # (H, W), values 0–3 m # Colourised full depth map (PLASMA colormap) depth_vis = cv2.applyColorMap(np.uint8(255 * depth_norm), cv2.COLORMAP_PLASMA) depth_vis = cv2.cvtColor(depth_vis, cv2.COLOR_BGR2RGB) # Depth stats within flooded pixels only flood_mask_bool = mask.astype(bool) flooded_depths = depth_metres[flood_mask_bool] if len(flooded_depths) == 0 or flood_mask_bool.sum() < 50: # Fewer than 50 flooded pixels → treat as no flood avg_depth_m = 0.0 max_depth_m = 0.0 else: avg_depth_m = float(flooded_depths.mean()) max_depth_m = float(flooded_depths.max()) # Depth category (ZoeDepth outputs relative depth in metres approx) if avg_depth_m < 0.3: depth_category = "Shallow (< 30 cm)" elif avg_depth_m < 0.8: depth_category = "Moderate (30 cm – 80 cm)" elif avg_depth_m < 1.5: depth_category = "Deep (80 cm – 1.5 m)" else: depth_category = "Very Deep (> 1.5 m — life-threatening)" # Flood-region depth overlay: blend depth_vis with blue flood mask flood_depth_overlay = depth_vis.copy() # Highlight non-flooded areas in grey grey = np.full_like(depth_vis, 128) flood_depth_overlay[~flood_mask_bool] = grey[~flood_mask_bool] return { "depth_map_vis": depth_vis, # (H, W, 3) uint8 "depth_overlay": flood_depth_overlay, # (H, W, 3) uint8 "avg_depth_m": round(avg_depth_m, 3), "max_depth_m": round(max_depth_m, 3), "depth_category": depth_category, } # ── Risk assessment (uses thresholds from PKL + ZoeDepth depth) ─────────────── def assess_risk(flood_pct: float, avg_depth_m: float = 0.0) -> dict: """ Combined risk from flood coverage % AND average depth from ZoeDepth. Thresholds from PKL: low : flood_pct < 15 AND avg_depth_m < 0.80 medium : flood_pct < 35 AND avg_depth_m < 1.50 high : flood_pct < 60 AND avg_depth_m < 2.20 critical : above all """ meta = _load_metadata() thresholds = meta.get("risk_thresholds", { "low": {"max_flood_pct": 15, "max_avg_depth_m": 80}, "medium": {"max_flood_pct": 35, "max_avg_depth_m": 150}, "high": {"max_flood_pct": 60, "max_avg_depth_m": 220}, "critical": {"above_all": True}, }) # PKL stores depth in cm — convert to metres for comparison low_pct = thresholds.get("low", {}).get("max_flood_pct", 15) med_pct = thresholds.get("medium", {}).get("max_flood_pct", 35) high_pct = thresholds.get("high", {}).get("max_flood_pct", 60) # Depth thresholds in metres (realistic flood water levels) low_dep = 0.3 # < 30 cm → Low med_dep = 0.8 # < 80 cm → Moderate high_dep = 1.5 # < 150 cm → High (> 1.5 m → Critical) # Depth only contributes to risk when there is meaningful flood coverage. # If flood_pct < 2%, ignore depth entirely (noise / false positives). effective_depth = avg_depth_m if flood_pct >= 2.0 else 0.0 def _level_from_pct(p): if p < low_pct: return 0 if p < med_pct: return 1 if p < high_pct: return 2 return 3 def _level_from_dep(d): if d < low_dep: return 0 if d < med_dep: return 1 if d < high_dep: return 2 return 3 level_idx = max(_level_from_pct(flood_pct), _level_from_dep(effective_depth)) levels = ["Low", "Moderate", "High", "Critical"] colours = ["#2ecc71", "#f39c12", "#e67e22", "#e74c3c"] level = levels[level_idx] colour = colours[level_idx] # Score: flood coverage drives 70%, depth drives 30% pct_score = min(flood_pct / 100 * 100, 100) depth_score = min(effective_depth / 3.0 * 100, 100) # 3 m = max realistic scale score = round(pct_score * 0.7 + depth_score * 0.3, 1) recs_map = { "Low": [ "Monitor water levels periodically.", "Ensure drainage channels are clear.", "No immediate evacuation required.", ], "Moderate": [ "Alert local emergency services.", "Move valuables to higher ground.", "Prepare emergency kit.", "Monitor weather forecasts closely.", ], "High": [ "Initiate partial evacuation of vulnerable populations.", "Deploy flood barriers where possible.", "Activate emergency response teams.", "Avoid flooded roads and areas.", ], "Critical": [ "IMMEDIATE EVACUATION REQUIRED.", "Contact emergency services (911 / local disaster hotline).", "Do not attempt to cross flooded areas.", "Seek shelter on highest available ground.", "Follow official evacuation routes only.", ], } deployed_metrics = meta.get("metrics", {}).get("deployed", {}) return { "risk_level": level, "risk_score": score, "colour": colour, "flood_pct": round(flood_pct, 2), "avg_depth_m": round(avg_depth_m, 3), "recommendations": recs_map[level], "model_metrics": deployed_metrics, } # ── Full pipeline ────────────────────────────────────────────────────────────── def run_pipeline(image: Image.Image) -> dict: """ End-to-end: 1. Attention UNet → flood mask (with confidence gate) 2. Grad-CAM → explainability heatmap 3. ZoeDepth → per-pixel depth map, flood depth stats 4. Risk assessment → level, score, recommendations """ # Step 1 — segmentation + OOD detection mask, flood_pct, confidence, low_conf, ood_reason = predict_mask(image) # Step 2 — Grad-CAM gradcam_img = compute_gradcam(image) # Step 3 — ZoeDepth depth estimation depth_info = estimate_flood_depth(image, mask) # Step 4 — risk risk = assess_risk(flood_pct, avg_depth_m=depth_info["avg_depth_m"]) # Add OOD info to risk dict for display risk["confidence"] = confidence risk["low_conf"] = low_conf risk["warning"] = ( f"⚠️ Out-of-distribution image detected: {ood_reason}. " "Results suppressed." ) if low_conf else "" # Blue-tinted flood overlay on original orig_arr = np.array(image.convert("RGB").resize(IMAGE_SIZE), dtype=np.uint8) mask_3ch = np.stack([mask * 0, mask * 100, mask * 255], axis=-1).astype(np.uint8) overlay = cv2.addWeighted(orig_arr, 0.7, mask_3ch, 0.3, 0) return { "mask": mask, "overlay": overlay, "gradcam": gradcam_img, "depth_map": depth_info["depth_map_vis"], "depth_overlay": depth_info["depth_overlay"], "depth_info": depth_info, "risk": risk, }