File size: 6,578 Bytes
7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf 7955824 06e9ccf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
# Generate synthetic terrain and rainfall data
def generate_data(num_samples=100, img_size=128):
X = [] # Input data: terrain maps
Y = [] # Output data: flood risk maps
for _ in range(num_samples):
# Generate synthetic terrain (random elevation patterns)
terrain = np.random.rand(img_size, img_size)
# Generate rainfall patterns
rainfall = np.random.rand(img_size, img_size) * 0.5
# Combine terrain and rainfall to simulate flood risk
flood_risk = np.clip(terrain + rainfall, 0, 1)
X.append(np.dstack([terrain, rainfall])) # Stack terrain + rainfall as input channels
Y.append(flood_risk) # Flood risk map as output
X = np.array(X)
Y = np.array(Y)
return X, Y
# Generate data
X, Y = generate_data(200)
# Split data into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
import tensorflow as tf
from tensorflow.keras import layers, Model
# Define the UNet model
def unet_model(input_shape=(128, 128, 2)):
inputs = layers.Input(shape=input_shape)
# Encoder
c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c1)
p1 = layers.MaxPooling2D((2, 2))(c1)
c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(p1)
c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c2)
p2 = layers.MaxPooling2D((2, 2))(c2)
# Bottleneck
b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(p2)
b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(b)
# Decoder
u2 = layers.UpSampling2D((2, 2))(b)
c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(u2)
c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c3)
u1 = layers.UpSampling2D((2, 2))(c3)
c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(u1)
c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c4)
outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c4)
return Model(inputs, outputs)
# Compile the model
model = unet_model()
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
model.summary()
# Train the model
history = model.fit(X_train, Y_train, epochs=10, batch_size=16, validation_data=(X_test, Y_test))
import numpy as np
import gradio as gr
import tensorflow as tf
from tensorflow.keras import layers, Model
# Define the UNet model
def unet_model(input_shape=(128, 128, 2)):
inputs = layers.Input(shape=input_shape)
# Encoder
c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c1)
p1 = layers.MaxPooling2D((2, 2))(c1)
c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(p1)
c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c2)
p2 = layers.MaxPooling2D((2, 2))(c2)
# Bottleneck
b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(p2)
b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(b)
# Decoder
u2 = layers.UpSampling2D((2, 2))(b)
c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(u2)
c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c3)
u1 = layers.UpSampling2D((2, 2))(c3)
c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(u1)
c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c4)
outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c4)
return Model(inputs, outputs)
# Create and compile the model
model = unet_model()
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
# Gradio function for prediction with proper error handling
def predict_flood(terrain, rainfall):
try:
# Ensure the inputs are numpy arrays
if not isinstance(terrain, np.ndarray) or not isinstance(rainfall, np.ndarray):
raise ValueError("Both terrain and rainfall must be NumPy arrays.")
# Check if the input images are of correct shape
if terrain.shape != (128, 128) or rainfall.shape != (128, 128):
raise ValueError("Both terrain and rainfall images must be of shape (128, 128).")
# Normalize the images to [0, 1]
terrain = terrain.astype(np.float32) / 255.0
rainfall = rainfall.astype(np.float32) / 255.0
# Stack terrain and rainfall into a 2-channel input (128x128x2)
input_data = np.dstack([terrain, rainfall]) # Shape: (128, 128, 2)
input_data = input_data.reshape(1, 128, 128, 2) # Shape: (1, 128, 128, 2)
# Debug: Print input data shape and min/max values
print(f"Input data shape: {input_data.shape}")
print(f"Terrain min/max: {terrain.min()}/{terrain.max()}")
print(f"Rainfall min/max: {rainfall.min()}/{rainfall.max()}")
# Make prediction using the model
prediction = model.predict(input_data)[0].squeeze() # Get the first prediction
# Debug: Print prediction shape and min/max values
print(f"Prediction shape: {prediction.shape}")
print(f"Prediction min/max: {prediction.min()}/{prediction.max()}")
# Check if prediction is in expected range
if prediction.shape != (128, 128):
raise ValueError("Model output is not of expected shape (128, 128).")
# Rescale prediction to range [0, 255] for image output
prediction = (prediction * 255).astype(np.uint8) # Convert prediction to uint8 for display
return prediction
except Exception as e:
# Handle any exceptions and print the error
print(f"Error during prediction: {e}")
# Return a black image in case of error (debugging step)
return np.zeros((128, 128), dtype=np.uint8)
# Launch Gradio app with error handling in place
iface = gr.Interface(
fn=predict_flood,
inputs=[
gr.Image(type="numpy", label="Terrain Map", image_mode='L'), # Input: Terrain Map (grayscale)
gr.Image(type="numpy", label="Rainfall Map", image_mode='L') # Input: Rainfall Map (grayscale)
],
outputs=gr.Image(label="Predicted Flood Risk Map", type="numpy"), # Output: Flood Risk Map
title="Flood Risk Prediction",
description="Upload terrain and rainfall maps to predict flood risk."
)
iface.launch()
|