File size: 6,578 Bytes
7955824
 
06e9ccf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7955824
06e9ccf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7955824
06e9ccf
 
7955824
06e9ccf
 
 
7955824
06e9ccf
 
 
 
7955824
06e9ccf
 
 
7955824
06e9ccf
 
 
7955824
06e9ccf
 
 
 
7955824
06e9ccf
 
 
7955824
06e9ccf
7955824
 
06e9ccf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7955824
06e9ccf
 
 
 
 
7955824
06e9ccf
7955824
 
06e9ccf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split

# Generate synthetic terrain and rainfall data
def generate_data(num_samples=100, img_size=128):
    X = []  # Input data: terrain maps
    Y = []  # Output data: flood risk maps
    for _ in range(num_samples):
        # Generate synthetic terrain (random elevation patterns)
        terrain = np.random.rand(img_size, img_size)

        # Generate rainfall patterns
        rainfall = np.random.rand(img_size, img_size) * 0.5

        # Combine terrain and rainfall to simulate flood risk
        flood_risk = np.clip(terrain + rainfall, 0, 1)

        X.append(np.dstack([terrain, rainfall]))  # Stack terrain + rainfall as input channels
        Y.append(flood_risk)  # Flood risk map as output

    X = np.array(X)
    Y = np.array(Y)
    return X, Y

# Generate data
X, Y = generate_data(200)

# Split data into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)


import tensorflow as tf
from tensorflow.keras import layers, Model

# Define the UNet model
def unet_model(input_shape=(128, 128, 2)):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D((2, 2))(c2)

    # Bottleneck
    b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(p2)
    b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(b)

    # Decoder
    u2 = layers.UpSampling2D((2, 2))(b)
    c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(u2)
    c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c3)

    u1 = layers.UpSampling2D((2, 2))(c3)
    c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(u1)
    c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c4)

    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c4)
    return Model(inputs, outputs)

# Compile the model
model = unet_model()
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
model.summary()


# Train the model
history = model.fit(X_train, Y_train, epochs=10, batch_size=16, validation_data=(X_test, Y_test))

import numpy as np
import gradio as gr
import tensorflow as tf
from tensorflow.keras import layers, Model

# Define the UNet model
def unet_model(input_shape=(128, 128, 2)):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D((2, 2))(c2)

    # Bottleneck
    b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(p2)
    b = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(b)

    # Decoder
    u2 = layers.UpSampling2D((2, 2))(b)
    c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(u2)
    c3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(c3)

    u1 = layers.UpSampling2D((2, 2))(c3)
    c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(u1)
    c4 = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(c4)

    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c4)
    return Model(inputs, outputs)

# Create and compile the model
model = unet_model()
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])

# Gradio function for prediction with proper error handling
def predict_flood(terrain, rainfall):
    try:
        # Ensure the inputs are numpy arrays
        if not isinstance(terrain, np.ndarray) or not isinstance(rainfall, np.ndarray):
            raise ValueError("Both terrain and rainfall must be NumPy arrays.")

        # Check if the input images are of correct shape
        if terrain.shape != (128, 128) or rainfall.shape != (128, 128):
            raise ValueError("Both terrain and rainfall images must be of shape (128, 128).")

        # Normalize the images to [0, 1]
        terrain = terrain.astype(np.float32) / 255.0
        rainfall = rainfall.astype(np.float32) / 255.0

        # Stack terrain and rainfall into a 2-channel input (128x128x2)
        input_data = np.dstack([terrain, rainfall])  # Shape: (128, 128, 2)
        input_data = input_data.reshape(1, 128, 128, 2)  # Shape: (1, 128, 128, 2)

        # Debug: Print input data shape and min/max values
        print(f"Input data shape: {input_data.shape}")
        print(f"Terrain min/max: {terrain.min()}/{terrain.max()}")
        print(f"Rainfall min/max: {rainfall.min()}/{rainfall.max()}")

        # Make prediction using the model
        prediction = model.predict(input_data)[0].squeeze()  # Get the first prediction

        # Debug: Print prediction shape and min/max values
        print(f"Prediction shape: {prediction.shape}")
        print(f"Prediction min/max: {prediction.min()}/{prediction.max()}")

        # Check if prediction is in expected range
        if prediction.shape != (128, 128):
            raise ValueError("Model output is not of expected shape (128, 128).")

        # Rescale prediction to range [0, 255] for image output
        prediction = (prediction * 255).astype(np.uint8)  # Convert prediction to uint8 for display

        return prediction

    except Exception as e:
        # Handle any exceptions and print the error
        print(f"Error during prediction: {e}")
        # Return a black image in case of error (debugging step)
        return np.zeros((128, 128), dtype=np.uint8)

# Launch Gradio app with error handling in place
iface = gr.Interface(
    fn=predict_flood,
    inputs=[
        gr.Image(type="numpy", label="Terrain Map", image_mode='L'),  # Input: Terrain Map (grayscale)
        gr.Image(type="numpy", label="Rainfall Map", image_mode='L')  # Input: Rainfall Map (grayscale)
    ],
    outputs=gr.Image(label="Predicted Flood Risk Map", type="numpy"),  # Output: Flood Risk Map
    title="Flood Risk Prediction",
    description="Upload terrain and rainfall maps to predict flood risk."
)

iface.launch()