Spaces:
Sleeping
Sleeping
| """ | |
| model_utils.py β loads the .pkl metadata + .keras model and exposes inference helpers. | |
| ZoeDepth is loaded from HuggingFace Hub (CPU) for flood depth estimation. | |
| PKL structure (from flood_model_production.pkl): | |
| model_name, model_keras_path, input_shape, output_shape, threshold, | |
| preprocessing, gradcam_layer, metrics, risk_thresholds, custom_objects_keys | |
| """ | |
| import os | |
| import pickle | |
| import numpy as np | |
| import cv2 | |
| import tensorflow as tf | |
| from PIL import Image | |
| import torch | |
| # ββ HuggingFace token (set via env var β never hardcode in production) βββββββββ | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| # ββ Custom Keras objects βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def iou_metric(y_true, y_pred): | |
| y_pred = tf.cast(y_pred > 0.5, tf.float32) | |
| intersection = tf.reduce_sum(y_true * y_pred) | |
| union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection | |
| return intersection / (union + 1e-7) | |
| def dice_loss(y_true, y_pred, smooth=1e-7): | |
| intersection = tf.reduce_sum(y_true * y_pred) | |
| return 1.0 - (2.0 * intersection + smooth) / ( | |
| tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth | |
| ) | |
| def bce_dice_loss(y_true, y_pred): | |
| bce = tf.keras.losses.binary_crossentropy(y_true, y_pred) | |
| dice = dice_loss(y_true, y_pred) | |
| return bce + dice | |
| CUSTOM_OBJECTS = { | |
| "iou_metric": iou_metric, | |
| "dice_loss": dice_loss, | |
| "bce_dice_loss": bce_dice_loss, | |
| } | |
| IMAGE_SIZE = (512, 512) | |
| # ββ Singletons βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _seg_model = None # Attention UNet (TF/Keras) | |
| _zoe_model = None # ZoeDepth (PyTorch, CPU) | |
| _metadata = None # dict from pkl | |
| # ββ Metadata βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_metadata() -> dict: | |
| global _metadata | |
| if _metadata is not None: | |
| return _metadata | |
| pkl_path = os.environ.get("MODEL_PKL", "models/flood_model_production.pkl") | |
| if os.path.exists(pkl_path): | |
| with open(pkl_path, "rb") as f: | |
| _metadata = pickle.load(f) | |
| else: | |
| _metadata = {} | |
| return _metadata | |
| # ββ Attention UNet loader ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_seg_model(): | |
| global _seg_model | |
| if _seg_model is not None: | |
| return _seg_model | |
| keras_path = os.environ.get("MODEL_KERAS", "models/flood_best_model_export.keras") | |
| if not os.path.exists(keras_path): | |
| raise FileNotFoundError( | |
| f"Keras model not found at '{keras_path}'. " | |
| "Set MODEL_KERAS env var to the correct path." | |
| ) | |
| _seg_model = tf.keras.models.load_model(keras_path, custom_objects=CUSTOM_OBJECTS) | |
| return _seg_model | |
| # ββ ZoeDepth loader (CPU, HF Hub) βββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_zoe_model(): | |
| """ | |
| Loads ZoeDepth (ZoeD_N) from HuggingFace Hub on CPU. | |
| Uses the transformers pipeline for simplicity and reliability. | |
| Model: Intel/zoedepth-nyu (indoor/outdoor depth estimation) | |
| """ | |
| global _zoe_model | |
| if _zoe_model is not None: | |
| return _zoe_model | |
| from transformers import pipeline as hf_pipeline | |
| token = HF_TOKEN or os.environ.get("HF_TOKEN", "") | |
| print("[ZoeDepth] Loading from HuggingFace Hub (CPU)...") | |
| _zoe_model = hf_pipeline( | |
| task="depth-estimation", | |
| model="Intel/zoedepth-nyu", | |
| device="cpu", # force CPU | |
| token=token if token else None, | |
| ) | |
| print("[ZoeDepth] Loaded OK.") | |
| return _zoe_model | |
| # ββ Preprocessing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def preprocess_image(image: Image.Image) -> np.ndarray: | |
| """PIL Image β normalised (1, 512, 512, 3) float32 array.""" | |
| img = image.convert("RGB").resize(IMAGE_SIZE) | |
| arr = np.array(img, dtype=np.float32) / 255.0 | |
| return np.expand_dims(arr, axis=0) # (1, H, W, 3) | |
| # ββ OOD / scene validation βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_flood_scene(image: Image.Image, pred: np.ndarray, threshold: float) -> tuple: | |
| """ | |
| Multi-signal check to detect out-of-distribution (non-flood) images. | |
| Signals used: | |
| 1. Colour distribution β flood/water images have blue/grey/brown tones. | |
| Non-flood scenes (cars, indoor, etc.) have very different hue distributions. | |
| 2. Spatial coherence β real flood masks are spatially connected large blobs. | |
| OOD masks are fragmented noise scattered across the image. | |
| 3. Raw flood coverage β if >85% of image is "flooded" it's almost certainly OOD | |
| (real floods rarely cover the entire frame uniformly). | |
| Returns: | |
| is_valid : bool β True if image looks like a flood scene | |
| reason : str β explanation if invalid | |
| """ | |
| img_rgb = np.array(image.convert("RGB").resize((512, 512)), dtype=np.float32) | |
| mask = (pred > threshold).astype(np.uint8) | |
| # ββ Signal 1: colour check βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Flood scenes have significant blue/grey channel presence. | |
| # Compute mean of blue channel relative to red β water is blue-dominant. | |
| r_mean = img_rgb[:, :, 0].mean() | |
| g_mean = img_rgb[:, :, 1].mean() | |
| b_mean = img_rgb[:, :, 2].mean() | |
| brightness = (r_mean + g_mean + b_mean) / 3.0 | |
| # Very dark images (night/indoor) or very bright (overexposed) are suspicious | |
| if brightness < 15 or brightness > 240: | |
| return False, "Image too dark or too bright for flood analysis" | |
| # ββ Signal 2: spatial coherence of the predicted mask βββββββββββββββββββββ | |
| # Real flood masks = large connected regions. OOD = scattered small blobs. | |
| flood_pct = float(mask.mean() * 100) | |
| if flood_pct > 0.5: # only check if there's something to check | |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( | |
| mask, connectivity=8 | |
| ) | |
| if num_labels > 1: | |
| # Largest component area (excluding background label 0) | |
| component_areas = stats[1:, cv2.CC_STAT_AREA] | |
| largest_area = int(component_areas.max()) | |
| total_flood_px = int(mask.sum()) | |
| # Coherence = fraction of flood pixels in the largest blob | |
| coherence = largest_area / (total_flood_px + 1e-8) | |
| # If flood pixels are highly fragmented (coherence < 0.3 AND many components) | |
| # it's likely OOD noise | |
| if coherence < 0.25 and num_labels > 50: | |
| return False, f"Flood mask is fragmented ({num_labels} disconnected regions) β likely not a flood scene" | |
| # ββ Signal 3: implausibly high coverage βββββββββββββββββββββββββββββββββββ | |
| if flood_pct > 90: | |
| return False, f"Flood coverage {flood_pct:.1f}% is implausibly high β image may not be a flood scene" | |
| # ββ Signal 4: colour-coverage cross-check βββββββββββββββββββββββββββββββββ | |
| # High flood coverage (>40%) on an image that is NOT predominantly | |
| # blue/grey/brown (water colours) is suspicious. | |
| if flood_pct > 40: | |
| # Convert to HSV to check hue distribution | |
| img_uint8 = img_rgb.astype(np.uint8) | |
| hsv = cv2.cvtColor(img_uint8, cv2.COLOR_RGB2HSV) | |
| hue = hsv[:, :, 0] # 0-179 in OpenCV | |
| sat = hsv[:, :, 1] # 0-255 | |
| # Water/flood hues: blue (100-130), grey (any hue, low sat), brown (10-20) | |
| blue_mask = ((hue >= 100) & (hue <= 130) & (sat > 30)) | |
| grey_mask = (sat < 40) | |
| brown_mask = ((hue >= 8) & (hue <= 22) & (sat > 40)) | |
| water_px = float((blue_mask | grey_mask | brown_mask).mean() * 100) | |
| # If less than 15% of the image has water-like colours but >40% is "flooded" | |
| # β almost certainly OOD (car, building, vegetation, etc.) | |
| if water_px < 15: | |
| return False, ( | |
| f"High flood coverage ({flood_pct:.1f}%) but only {water_px:.1f}% " | |
| "water-like colours detected β image does not appear to be a flood scene" | |
| ) | |
| return True, "" | |
| # ββ Segmentation prediction ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def predict_mask(image: Image.Image) -> tuple: | |
| """ | |
| Returns: | |
| mask : (H, W) binary float32 array (0 or 1) | |
| flood_pct : percentage of image covered by flood (0-100) | |
| confidence : float 0-100 (display percentage) | |
| low_conf : bool, True if OOD detected | |
| ood_reason : str, explanation if OOD | |
| """ | |
| meta = _load_metadata() | |
| threshold = meta.get("threshold", 0.5) | |
| model = load_seg_model() | |
| inp = preprocess_image(image) | |
| pred = model.predict(inp, verbose=0)[0, :, :, 0] # (H, W) | |
| # Raw confidence metric (how decisive the predictions are) | |
| confidence_raw = float(np.mean(np.abs(pred - 0.5)) * 2) | |
| # OOD detection using multi-signal heuristics | |
| is_valid, ood_reason = _is_flood_scene(image, pred, threshold) | |
| if not is_valid: | |
| mask = np.zeros_like(pred, dtype=np.float32) | |
| flood_pct = 0.0 | |
| low_conf = True | |
| else: | |
| mask = (pred > threshold).astype(np.float32) | |
| flood_pct = float(mask.mean() * 100) | |
| low_conf = False | |
| confidence_pct = round(confidence_raw * 100, 1) | |
| return mask, flood_pct, confidence_pct, low_conf, ood_reason | |
| # ββ Grad-CAM βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_gradcam(image: Image.Image) -> np.ndarray: | |
| """ | |
| Returns an RGB uint8 heatmap overlaid on the original image. | |
| Uses conv2d_130 (from PKL) β the last conv before the output head. | |
| """ | |
| meta = _load_metadata() | |
| model = load_seg_model() | |
| inp = preprocess_image(image) | |
| target_layer = meta.get("gradcam_layer", "conv2d_130") | |
| layer_names = [l.name for l in model.layers] | |
| if target_layer not in layer_names: | |
| # fallback to last Conv2D | |
| target_layer = None | |
| for layer in reversed(model.layers): | |
| if isinstance(layer, tf.keras.layers.Conv2D): | |
| target_layer = layer.name | |
| break | |
| if target_layer is None: | |
| return np.array(image.convert("RGB").resize(IMAGE_SIZE)) | |
| grad_model = tf.keras.models.Model( | |
| inputs=model.inputs, | |
| outputs=[model.get_layer(target_layer).output, model.output] | |
| ) | |
| with tf.GradientTape() as tape: | |
| inp_tensor = tf.cast(inp, tf.float32) | |
| conv_out, predictions = grad_model(inp_tensor) | |
| loss = tf.reduce_mean(predictions) | |
| grads = tape.gradient(loss, conv_out) | |
| pooled = tf.reduce_mean(grads, axis=(0, 1, 2)) | |
| cam = tf.reduce_sum(conv_out[0] * pooled, axis=-1).numpy() | |
| cam = np.maximum(cam, 0) | |
| cam = cam / (cam.max() + 1e-8) | |
| # Smooth for the classic soft-blob look (like the reference image) | |
| cam_resized = cv2.resize(cam, IMAGE_SIZE) | |
| cam_smooth = cv2.GaussianBlur(cam_resized, (15, 15), 0) | |
| # JET: blue(low) β cyan β green β yellow β red(high) β exactly the reference | |
| heatmap_bgr = cv2.applyColorMap(np.uint8(255 * cam_smooth), cv2.COLORMAP_JET) | |
| heatmap_rgb = cv2.cvtColor(heatmap_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) | |
| # Blend 55% heatmap over original β enough to see the image underneath | |
| orig = np.array(image.convert("RGB").resize(IMAGE_SIZE), dtype=np.float32) | |
| blended = (orig * 0.45 + heatmap_rgb * 0.55).clip(0, 255).astype(np.uint8) | |
| return blended | |
| # ββ ZoeDepth flood depth estimation βββββββββββββββββββββββββββββββββββββββββββ | |
| def estimate_flood_depth(image: Image.Image, mask: np.ndarray) -> dict: | |
| """ | |
| Runs ZoeDepth on the image, then analyses depth only within flooded pixels. | |
| Returns a dict with: | |
| depth_map_vis : (H, W, 3) uint8 β colourised depth map | |
| depth_overlay : (H, W, 3) uint8 β depth map masked to flood region | |
| avg_depth_m : float β mean depth in flooded area (metres) | |
| max_depth_m : float β max depth in flooded area (metres) | |
| depth_category : str β shallow / moderate / deep / very deep | |
| """ | |
| zoe = load_zoe_model() | |
| # ZoeDepth expects a PIL RGB image (any size β it handles resize internally) | |
| rgb_img = image.convert("RGB") | |
| # Run depth estimation | |
| depth_result = zoe(rgb_img) | |
| depth_pil = depth_result["depth"] # PIL Image (grayscale, float-like) | |
| depth_arr = np.array(depth_pil, dtype=np.float32) # (H, W) | |
| # Resize depth map to 512Γ512 to match mask | |
| depth_512 = cv2.resize(depth_arr, IMAGE_SIZE, interpolation=cv2.INTER_LINEAR) | |
| # ββ Normalise to [0, 1] relative scale ββββββββββββββββββββββββββββββββββββ | |
| # ZoeDepth returns relative disparity values (not calibrated metres). | |
| # We normalise across the whole image so values are comparable. | |
| d_min, d_max = depth_512.min(), depth_512.max() | |
| depth_norm = (depth_512 - d_min) / (d_max - d_min + 1e-8) # 0β1 | |
| # Map relative depth to an estimated flood depth in metres (0β3 m scale). | |
| # Higher relative depth in flooded pixels β deeper water estimate. | |
| FLOOD_DEPTH_MAX_M = 3.0 | |
| depth_metres = depth_norm * FLOOD_DEPTH_MAX_M # (H, W), values 0β3 m | |
| # Colourised full depth map (PLASMA colormap) | |
| depth_vis = cv2.applyColorMap(np.uint8(255 * depth_norm), cv2.COLORMAP_PLASMA) | |
| depth_vis = cv2.cvtColor(depth_vis, cv2.COLOR_BGR2RGB) | |
| # Depth stats within flooded pixels only | |
| flood_mask_bool = mask.astype(bool) | |
| flooded_depths = depth_metres[flood_mask_bool] | |
| if len(flooded_depths) == 0 or flood_mask_bool.sum() < 50: | |
| # Fewer than 50 flooded pixels β treat as no flood | |
| avg_depth_m = 0.0 | |
| max_depth_m = 0.0 | |
| else: | |
| avg_depth_m = float(flooded_depths.mean()) | |
| max_depth_m = float(flooded_depths.max()) | |
| # Depth category (ZoeDepth outputs relative depth in metres approx) | |
| if avg_depth_m < 0.3: | |
| depth_category = "Shallow (< 30 cm)" | |
| elif avg_depth_m < 0.8: | |
| depth_category = "Moderate (30 cm β 80 cm)" | |
| elif avg_depth_m < 1.5: | |
| depth_category = "Deep (80 cm β 1.5 m)" | |
| else: | |
| depth_category = "Very Deep (> 1.5 m β life-threatening)" | |
| # Flood-region depth overlay: blend depth_vis with blue flood mask | |
| flood_depth_overlay = depth_vis.copy() | |
| # Highlight non-flooded areas in grey | |
| grey = np.full_like(depth_vis, 128) | |
| flood_depth_overlay[~flood_mask_bool] = grey[~flood_mask_bool] | |
| return { | |
| "depth_map_vis": depth_vis, # (H, W, 3) uint8 | |
| "depth_overlay": flood_depth_overlay, # (H, W, 3) uint8 | |
| "avg_depth_m": round(avg_depth_m, 3), | |
| "max_depth_m": round(max_depth_m, 3), | |
| "depth_category": depth_category, | |
| } | |
| # ββ Risk assessment (uses thresholds from PKL + ZoeDepth depth) βββββββββββββββ | |
| def assess_risk(flood_pct: float, avg_depth_m: float = 0.0) -> dict: | |
| """ | |
| Combined risk from flood coverage % AND average depth from ZoeDepth. | |
| Thresholds from PKL: | |
| low : flood_pct < 15 AND avg_depth_m < 0.80 | |
| medium : flood_pct < 35 AND avg_depth_m < 1.50 | |
| high : flood_pct < 60 AND avg_depth_m < 2.20 | |
| critical : above all | |
| """ | |
| meta = _load_metadata() | |
| thresholds = meta.get("risk_thresholds", { | |
| "low": {"max_flood_pct": 15, "max_avg_depth_m": 80}, | |
| "medium": {"max_flood_pct": 35, "max_avg_depth_m": 150}, | |
| "high": {"max_flood_pct": 60, "max_avg_depth_m": 220}, | |
| "critical": {"above_all": True}, | |
| }) | |
| # PKL stores depth in cm β convert to metres for comparison | |
| low_pct = thresholds.get("low", {}).get("max_flood_pct", 15) | |
| med_pct = thresholds.get("medium", {}).get("max_flood_pct", 35) | |
| high_pct = thresholds.get("high", {}).get("max_flood_pct", 60) | |
| # Depth thresholds in metres (realistic flood water levels) | |
| low_dep = 0.3 # < 30 cm β Low | |
| med_dep = 0.8 # < 80 cm β Moderate | |
| high_dep = 1.5 # < 150 cm β High (> 1.5 m β Critical) | |
| # Depth only contributes to risk when there is meaningful flood coverage. | |
| # If flood_pct < 2%, ignore depth entirely (noise / false positives). | |
| effective_depth = avg_depth_m if flood_pct >= 2.0 else 0.0 | |
| def _level_from_pct(p): | |
| if p < low_pct: return 0 | |
| if p < med_pct: return 1 | |
| if p < high_pct: return 2 | |
| return 3 | |
| def _level_from_dep(d): | |
| if d < low_dep: return 0 | |
| if d < med_dep: return 1 | |
| if d < high_dep: return 2 | |
| return 3 | |
| level_idx = max(_level_from_pct(flood_pct), _level_from_dep(effective_depth)) | |
| levels = ["Low", "Moderate", "High", "Critical"] | |
| colours = ["#2ecc71", "#f39c12", "#e67e22", "#e74c3c"] | |
| level = levels[level_idx] | |
| colour = colours[level_idx] | |
| # Score: flood coverage drives 70%, depth drives 30% | |
| pct_score = min(flood_pct / 100 * 100, 100) | |
| depth_score = min(effective_depth / 3.0 * 100, 100) # 3 m = max realistic scale | |
| score = round(pct_score * 0.7 + depth_score * 0.3, 1) | |
| recs_map = { | |
| "Low": [ | |
| "Monitor water levels periodically.", | |
| "Ensure drainage channels are clear.", | |
| "No immediate evacuation required.", | |
| ], | |
| "Moderate": [ | |
| "Alert local emergency services.", | |
| "Move valuables to higher ground.", | |
| "Prepare emergency kit.", | |
| "Monitor weather forecasts closely.", | |
| ], | |
| "High": [ | |
| "Initiate partial evacuation of vulnerable populations.", | |
| "Deploy flood barriers where possible.", | |
| "Activate emergency response teams.", | |
| "Avoid flooded roads and areas.", | |
| ], | |
| "Critical": [ | |
| "IMMEDIATE EVACUATION REQUIRED.", | |
| "Contact emergency services (911 / local disaster hotline).", | |
| "Do not attempt to cross flooded areas.", | |
| "Seek shelter on highest available ground.", | |
| "Follow official evacuation routes only.", | |
| ], | |
| } | |
| deployed_metrics = meta.get("metrics", {}).get("deployed", {}) | |
| return { | |
| "risk_level": level, | |
| "risk_score": score, | |
| "colour": colour, | |
| "flood_pct": round(flood_pct, 2), | |
| "avg_depth_m": round(avg_depth_m, 3), | |
| "recommendations": recs_map[level], | |
| "model_metrics": deployed_metrics, | |
| } | |
| # ββ Full pipeline ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_pipeline(image: Image.Image) -> dict: | |
| """ | |
| End-to-end: | |
| 1. Attention UNet β flood mask (with confidence gate) | |
| 2. Grad-CAM β explainability heatmap | |
| 3. ZoeDepth β per-pixel depth map, flood depth stats | |
| 4. Risk assessment β level, score, recommendations | |
| """ | |
| # Step 1 β segmentation + OOD detection | |
| mask, flood_pct, confidence, low_conf, ood_reason = predict_mask(image) | |
| # Step 2 β Grad-CAM | |
| gradcam_img = compute_gradcam(image) | |
| # Step 3 β ZoeDepth depth estimation | |
| depth_info = estimate_flood_depth(image, mask) | |
| # Step 4 β risk | |
| risk = assess_risk(flood_pct, avg_depth_m=depth_info["avg_depth_m"]) | |
| # Add OOD info to risk dict for display | |
| risk["confidence"] = confidence | |
| risk["low_conf"] = low_conf | |
| risk["warning"] = ( | |
| f"β οΈ Out-of-distribution image detected: {ood_reason}. " | |
| "Results suppressed." | |
| ) if low_conf else "" | |
| # Blue-tinted flood overlay on original | |
| orig_arr = np.array(image.convert("RGB").resize(IMAGE_SIZE), dtype=np.uint8) | |
| mask_3ch = np.stack([mask * 0, mask * 100, mask * 255], axis=-1).astype(np.uint8) | |
| overlay = cv2.addWeighted(orig_arr, 0.7, mask_3ch, 0.3, 0) | |
| return { | |
| "mask": mask, | |
| "overlay": overlay, | |
| "gradcam": gradcam_img, | |
| "depth_map": depth_info["depth_map_vis"], | |
| "depth_overlay": depth_info["depth_overlay"], | |
| "depth_info": depth_info, | |
| "risk": risk, | |
| } | |