Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import cv2 | |
| from pathlib import Path | |
| from typing import Tuple, Optional | |
| import os | |
| from models.cloud_detector import CloudDetector | |
| from models.change_detector import ChangeDetector | |
| from utils.preprocessing import preprocess_image, mask_clouds | |
| from utils.visualization import create_overlay, visualize_predictions | |
| from utils.evaluation import calculate_metrics | |
| from utils.metrics import compare_with_without_masking, calculate_change_statistics | |
| # Initialize models | |
| device = "cuda" if os.environ.get("CUDA_VISIBLE_DEVICES") else "cpu" | |
| cloud_detector = CloudDetector(device=device) | |
| change_detector = ChangeDetector(device=device) | |
| def load_example_images(): | |
| """Load example images from examples directory.""" | |
| examples_dir = Path("examples") | |
| examples = [] | |
| before_files = sorted( | |
| list((examples_dir / "before").glob("*.png")) + | |
| list((examples_dir / "before").glob("*.jpg")) | |
| ) | |
| after_files = sorted( | |
| list((examples_dir / "after").glob("*.png")) + | |
| list((examples_dir / "after").glob("*.jpg")) | |
| ) | |
| for before_file, after_file in zip(before_files, after_files): | |
| before = cv2.imread(str(before_file)) | |
| after = cv2.imread(str(after_file)) | |
| if before is not None and after is not None: | |
| before = cv2.cvtColor(before, cv2.COLOR_BGR2RGB) | |
| after = cv2.cvtColor(after, cv2.COLOR_BGR2RGB) | |
| examples.append([before, after]) | |
| return examples | |
| def detect_clouds_in_image( | |
| image: np.ndarray, | |
| cloud_threshold: float = 0.5 | |
| ) -> Tuple[np.ndarray, str]: | |
| """ | |
| Detect clouds in a single image. | |
| Args: | |
| image: Input image (H, W, 3) | |
| cloud_threshold: Confidence threshold | |
| Returns: | |
| Tuple of (overlay_image, stats_text) | |
| """ | |
| if image is None: | |
| return None, "Please upload an image." | |
| # Preprocess (normalise to float [0,1]) | |
| preprocessed = preprocess_image(image, normalize=True) | |
| # Detect clouds β returns 2D mask and 2D confidence map | |
| cloud_mask, cloud_confidence = cloud_detector.detect_clouds( | |
| preprocessed, | |
| threshold=cloud_threshold | |
| ) | |
| # Create visualization overlay on original image | |
| overlay = create_overlay(image, cloud_mask, alpha=0.5, color=(0, 0, 255)) | |
| # Statistics β all values are now properly 2D arrays | |
| total_pixels = int(cloud_mask.size) | |
| cloud_pixels = int(np.sum(cloud_mask)) | |
| cloud_pct = 100.0 * cloud_pixels / total_pixels if total_pixels > 0 else 0.0 | |
| mean_conf = float(cloud_confidence.mean()) | |
| max_conf = float(cloud_confidence.max()) | |
| min_conf = float(cloud_confidence.min()) | |
| stats_text = ( | |
| f"Cloud Detection Results:\n" | |
| f"βββββββββββββββββββββ\n" | |
| f"Cloud Pixels: {cloud_pixels}\n" | |
| f"Total Pixels: {total_pixels}\n" | |
| f"Cloud Percentage: {cloud_pct:.2f}%\n" | |
| f"Mean Confidence: {mean_conf:.4f}\n" | |
| f"Max Confidence: {max_conf:.4f}\n" | |
| f"Min Confidence: {min_conf:.4f}" | |
| ) | |
| return overlay, stats_text | |
| def detect_changes( | |
| before_image: np.ndarray, | |
| after_image: np.ndarray, | |
| apply_cloud_masking: bool = True, | |
| cloud_threshold: float = 0.5, | |
| change_threshold: float = 0.5 | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, str, str]: | |
| """ | |
| Detect changes between two temporal images. | |
| Returns: | |
| Tuple of (before_overlay, after_overlay, change_mask_vis, | |
| metrics_text, stats_text) | |
| """ | |
| if before_image is None or after_image is None: | |
| empty = np.zeros((224, 224, 3), dtype=np.uint8) | |
| return empty, empty, empty, "Please upload both images.", "" | |
| # Resize both to the same size before processing | |
| TARGET = (512, 512) | |
| before_image = cv2.resize(before_image, TARGET, interpolation=cv2.INTER_LINEAR) | |
| after_image = cv2.resize(after_image, TARGET, interpolation=cv2.INTER_LINEAR) | |
| # Preprocess to float [0,1] | |
| before_preprocessed = preprocess_image(before_image, normalize=True) | |
| after_preprocessed = preprocess_image(after_image, normalize=True) | |
| cloud_mask = None | |
| if apply_cloud_masking: | |
| cloud_mask_before, _ = cloud_detector.detect_clouds( | |
| before_preprocessed, threshold=cloud_threshold | |
| ) | |
| cloud_mask_after, _ = cloud_detector.detect_clouds( | |
| after_preprocessed, threshold=cloud_threshold | |
| ) | |
| # Combined cloud mask (union of both) | |
| cloud_mask = np.logical_or(cloud_mask_before, cloud_mask_after).astype(np.uint8) | |
| before_masked = mask_clouds(before_preprocessed, cloud_mask, fill_value=0.0) | |
| after_masked = mask_clouds(after_preprocessed, cloud_mask, fill_value=0.0) | |
| else: | |
| before_masked = before_preprocessed | |
| after_masked = after_preprocessed | |
| # Detect changes β now returns proper 2D arrays | |
| change_mask, change_confidence = change_detector.detect_changes( | |
| before_masked, | |
| after_masked, | |
| threshold=change_threshold | |
| ) | |
| # Overlays on original images | |
| before_overlay = create_overlay(before_image, change_mask, alpha=0.5, color=(255, 0, 0)) | |
| after_overlay = create_overlay(after_image, change_mask, alpha=0.5, color=(255, 0, 0)) | |
| if cloud_mask is not None: | |
| cloud_overlay_before = create_overlay(before_image, cloud_mask, alpha=0.4, color=(0, 0, 255)) | |
| cloud_overlay_after = create_overlay(after_image, cloud_mask, alpha=0.4, color=(0, 0, 255)) | |
| before_overlay = (before_overlay * 0.5 + cloud_overlay_before * 0.5).astype(np.uint8) | |
| after_overlay = (after_overlay * 0.5 + cloud_overlay_after * 0.5).astype(np.uint8) | |
| # Change mask visualisation (white = changed) | |
| change_mask_vis = (change_mask * 255).astype(np.uint8) | |
| change_mask_vis = cv2.cvtColor(change_mask_vis, cv2.COLOR_GRAY2RGB) | |
| # Statistics from 2D arrays β all values are valid now | |
| stats = calculate_change_statistics(change_mask, change_confidence) | |
| metrics_text = ( | |
| f"Change Detection Metrics:\n" | |
| f"βββββββββββββββββββββββββ\n" | |
| f"Mean Confidence: {float(change_confidence.mean()):.4f}\n" | |
| f"Max Confidence: {float(change_confidence.max()):.4f}\n" | |
| f"Min Confidence: {float(change_confidence.min()):.4f}\n" | |
| f"Algorithm: Siamese ViT\n" | |
| f"Cloud Masking: {'Yes' if apply_cloud_masking else 'No'}" | |
| ) | |
| # Safe access to change_confidence_mean | |
| if stats["changed_pixels"] > 0: | |
| change_conf_line = ( | |
| f"Change Region Confidence: {stats['change_confidence_mean']:.4f}" | |
| ) | |
| else: | |
| change_conf_line = "No changes detected above threshold" | |
| stats_text = ( | |
| f"Change Statistics:\n" | |
| f"ββββββββββββββββββ\n" | |
| f"Total Pixels: {stats['total_pixels']}\n" | |
| f"Changed Pixels: {stats['changed_pixels']}\n" | |
| f"Unchanged Pixels: {stats['unchanged_pixels']}\n" | |
| f"Change Percentage: {stats['change_percentage']:.2f}%\n" | |
| f"Mean Confidence: {stats['mean_confidence']:.4f}\n" | |
| f"Min Confidence: {stats['min_confidence']:.4f}\n" | |
| f"Max Confidence: {stats['max_confidence']:.4f}\n" | |
| f"{change_conf_line}" | |
| ) | |
| return before_overlay, after_overlay, change_mask_vis, metrics_text, stats_text | |
| def create_comparison_interface(): | |
| """Create Gradio interface for change detection comparison.""" | |
| with gr.Blocks(title="Satellite Change Detector") as demo: | |
| gr.Markdown( | |
| """ | |
| # Satellite Change Detection System | |
| Detect changes in Sentinel-2 satellite imagery using Vision Transformer models. | |
| Compare results with and without cloud masking. | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| # ββ Cloud Detection Tab ββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Cloud Detection"): | |
| gr.Markdown("### Detect and visualize clouds in satellite imagery") | |
| with gr.Row(): | |
| with gr.Column(): | |
| cloud_input = gr.Image(label="Input Image", type="numpy") | |
| cloud_threshold = gr.Slider( | |
| 0, 1, value=0.5, step=0.01, | |
| label="Cloud Detection Threshold" | |
| ) | |
| cloud_detect_btn = gr.Button("Detect Clouds") | |
| with gr.Column(): | |
| cloud_overlay_output = gr.Image(label="Cloud Detection Result") | |
| cloud_stats_output = gr.Textbox(label="Statistics", lines=8) | |
| cloud_detect_btn.click( | |
| detect_clouds_in_image, | |
| inputs=[cloud_input, cloud_threshold], | |
| outputs=[cloud_overlay_output, cloud_stats_output] | |
| ) | |
| # ββ Change Detection Tab βββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Change Detection"): | |
| gr.Markdown("### Detect changes between two temporal satellite images") | |
| with gr.Row(): | |
| with gr.Column(): | |
| before_img = gr.Image(label="Before Image", type="numpy") | |
| after_img = gr.Image(label="After Image", type="numpy") | |
| with gr.Column(): | |
| gr.Markdown("### Settings") | |
| apply_masking = gr.Checkbox( | |
| value=True, | |
| label="Apply Cloud Masking" | |
| ) | |
| cloud_thresh = gr.Slider( | |
| 0, 1, value=0.5, step=0.01, | |
| label="Cloud Threshold" | |
| ) | |
| change_thresh = gr.Slider( | |
| 0, 1, value=0.5, step=0.01, | |
| label="Change Threshold" | |
| ) | |
| detect_btn = gr.Button("Detect Changes", size="lg") | |
| with gr.Row(): | |
| before_overlay_output = gr.Image(label="Before with Changes") | |
| after_overlay_output = gr.Image(label="After with Changes") | |
| with gr.Row(): | |
| change_mask_output = gr.Image(label="Change Mask") | |
| metrics_output = gr.Textbox(label="Metrics", lines=8) | |
| stats_output = gr.Textbox(label="Change Statistics", lines=10) | |
| detect_btn.click( | |
| detect_changes, | |
| inputs=[before_img, after_img, apply_masking, cloud_thresh, change_thresh], | |
| outputs=[ | |
| before_overlay_output, | |
| after_overlay_output, | |
| change_mask_output, | |
| metrics_output, | |
| stats_output | |
| ] | |
| ) | |
| # ββ Examples Tab βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Examples"): | |
| gr.Markdown("### Pre-loaded example images") | |
| examples = load_example_images() | |
| if examples: | |
| for idx, (before, after) in enumerate(examples[:3]): | |
| with gr.Row(): | |
| gr.Image(value=before, label=f"Example {idx+1}: Before") | |
| gr.Image(value=after, label=f"Example {idx+1}: After") | |
| else: | |
| gr.Markdown( | |
| "No example images found in `examples/` directory.\n" | |
| "Run `python setup_oscd.py` to download OSCD samples." | |
| ) | |
| gr.Markdown( | |
| """ | |
| ## About | |
| This application uses Vision Transformer (ViT) models for: | |
| - **Cloud Detection**: Identifies and masks cloud cover in satellite imagery | |
| - **Change Detection**: Detects land cover changes between multi-temporal observations | |
| Models are fine-tuned on Sentinel-2 satellite data. | |
| """ | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_comparison_interface() | |
| demo.launch(share=True) | |