|
|
import os |
|
|
import numpy as np |
|
|
import tensorflow as tf |
|
|
from PIL import Image |
|
|
from pathlib import Path |
|
|
from tensorflow.keras.applications import ConvNeXtLarge |
|
|
import streamlit as st |
|
|
import io |
|
|
from huggingface_hub import hf_hub_download, try_to_load_from_cache |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_model_from_hub(): |
|
|
""" |
|
|
Downloads the model file from the Hugging Face Hub using the default cache. |
|
|
""" |
|
|
try: |
|
|
st.info("Downloading model from Hugging Face Hub (if not already cached)...") |
|
|
|
|
|
|
|
|
repo_id = "Darshan03/convnext_volcano_detector" |
|
|
filename_in_repo = "model.h5" |
|
|
|
|
|
|
|
|
|
|
|
local_model_path = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=filename_in_repo, |
|
|
|
|
|
) |
|
|
|
|
|
st.success(f"Model file available locally at: {local_model_path}") |
|
|
return local_model_path |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error downloading model from Hugging Face Hub: {str(e)}") |
|
|
st.info("Please check the repo ID and filename, and ensure the repository is public or Space has access.") |
|
|
|
|
|
raise |
|
|
|
|
|
def create_convnext_model(input_shape=(512, 512, 3)): |
|
|
""" |
|
|
Creates the ConvNeXt model architecture. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
base_model = ConvNeXtLarge( |
|
|
include_top=False, |
|
|
weights='imagenet', |
|
|
input_shape=input_shape |
|
|
) |
|
|
base_model.trainable = False |
|
|
model = tf.keras.Sequential([ |
|
|
base_model, |
|
|
tf.keras.layers.GlobalAveragePooling2D(), |
|
|
tf.keras.layers.Dropout(0.3), |
|
|
tf.keras.layers.Dense(512, activation='relu'), |
|
|
tf.keras.layers.Dropout(0.3), |
|
|
tf.keras.layers.Dense(256, activation='relu'), |
|
|
tf.keras.layers.Dropout(0.3), |
|
|
tf.keras.layers.Dense(1, activation='sigmoid') |
|
|
]) |
|
|
return model |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def load_model(model_path): |
|
|
""" |
|
|
Loads the Keras model weights from a specified path. |
|
|
""" |
|
|
try: |
|
|
|
|
|
|
|
|
model = create_convnext_model() |
|
|
|
|
|
|
|
|
model.load_weights(model_path) |
|
|
|
|
|
return model |
|
|
except Exception as e: |
|
|
|
|
|
st.error(f"Error loading model weights: {str(e)}") |
|
|
st.info("Ensure the downloaded file is a valid Keras .h5 weights file compatible with the ConvNeXtLarge architecture.") |
|
|
|
|
|
raise |
|
|
|
|
|
def preprocess_image(image, target_size=(512, 512)): |
|
|
""" |
|
|
Preprocesses the input image for model inference. |
|
|
""" |
|
|
try: |
|
|
|
|
|
if image.mode != 'RGB': |
|
|
image = image.convert('RGB') |
|
|
|
|
|
|
|
|
img = image.resize(target_size, Image.Resampling.BICUBIC) |
|
|
|
|
|
|
|
|
img_array = np.array(img, dtype=np.float32) |
|
|
|
|
|
|
|
|
img_array /= 255. |
|
|
|
|
|
|
|
|
|
|
|
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) |
|
|
std = np.array([0.229, 0.224, 0.225], dtype=np.float32) |
|
|
|
|
|
|
|
|
mean = mean.reshape(1, 1, 3) |
|
|
std = std.reshape(1, 1, 3) |
|
|
|
|
|
img_array = (img_array - mean) / std |
|
|
|
|
|
|
|
|
img_array = np.expand_dims(img_array, axis=0) |
|
|
|
|
|
|
|
|
return img_array |
|
|
except Exception as e: |
|
|
|
|
|
st.error(f"Error preprocessing image: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
def predict_volcano(model, image): |
|
|
""" |
|
|
Makes a prediction using the loaded model. |
|
|
""" |
|
|
if model is None: |
|
|
st.error("Model is not loaded. Cannot make prediction.") |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
processed_image = preprocess_image(image) |
|
|
|
|
|
|
|
|
|
|
|
prediction = model.predict(processed_image) |
|
|
probability = prediction[0][0] |
|
|
|
|
|
|
|
|
result = "Volcanic Eruption" if probability > 0.5 else "No Volcanic Eruption" |
|
|
|
|
|
|
|
|
confidence = probability if probability > 0.5 else 1 - probability |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"result": result, |
|
|
"confidence": float(confidence), |
|
|
"probability": float(probability) |
|
|
} |
|
|
except Exception as e: |
|
|
|
|
|
st.error(f"Error making prediction: {str(e)}") |
|
|
raise |
|
|
|
|
|
def get_sample_images(): |
|
|
""" |
|
|
Defines paths for sample images within the Space's repository. |
|
|
Assumes sample_images directory is at the same level as the app script. |
|
|
""" |
|
|
|
|
|
base_dir = Path(__file__).parent.absolute() |
|
|
sample_dir = base_dir / 'sample_images' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sample_images = { |
|
|
"Select an image": None, |
|
|
"Upload new image": "upload", |
|
|
"Sample Volcano 1": str(sample_dir / "volcano1.jpg"), |
|
|
"Sample Volcano 2": str(sample_dir / "volcano2.jpg"), |
|
|
"Sample No Volcano 1": str(sample_dir / "no_volcano1.jpg"), |
|
|
"Sample No Volcano 2": str(sample_dir / "no_volcano2.jpg") |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return sample_images |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Volcano Detection", |
|
|
page_icon="๐", |
|
|
layout="centered" |
|
|
) |
|
|
|
|
|
st.title("๐ Volcano Detection") |
|
|
st.write("Select or upload an image to detect if it contains a volcanic eruption.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_path = None |
|
|
try: |
|
|
model_path = download_model_from_hub() |
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
model = None |
|
|
if model_path: |
|
|
try: |
|
|
model = load_model(model_path) |
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
sample_images = get_sample_images() |
|
|
|
|
|
|
|
|
selected_option = st.selectbox("Choose an image", list(sample_images.keys())) |
|
|
|
|
|
|
|
|
image = None |
|
|
if selected_option == "Upload new image": |
|
|
uploaded_file = st.file_uploader("Upload your image", type=["jpg", "jpeg", "png"]) |
|
|
if uploaded_file is not None: |
|
|
try: |
|
|
image = Image.open(uploaded_file) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error opening uploaded image: {str(e)}") |
|
|
|
|
|
elif selected_option != "Select an image" and sample_images[selected_option] is not None: |
|
|
try: |
|
|
sample_image_path = sample_images[selected_option] |
|
|
if Path(sample_image_path).exists(): |
|
|
image = Image.open(sample_image_path) |
|
|
|
|
|
else: |
|
|
st.warning(f"Sample image file not found: {sample_image_path}") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error loading sample image '{selected_option}': {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
if image is not None: |
|
|
|
|
|
st.image(image, caption="Selected Image", use_column_width=True) |
|
|
|
|
|
|
|
|
|
|
|
if st.button("Detect Volcano") and model is not None: |
|
|
with st.spinner("Analyzing image..."): |
|
|
try: |
|
|
result = predict_volcano(model, image) |
|
|
|
|
|
if result: |
|
|
|
|
|
st.markdown("### Results") |
|
|
|
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
with col1: |
|
|
st.metric("Prediction", result["result"]) |
|
|
|
|
|
with col2: |
|
|
st.metric("Confidence", f"{result['confidence']:.2%}") |
|
|
|
|
|
|
|
|
|
|
|
st.progress(max(0.0, min(1.0, result["probability"]))) |
|
|
st.write(f"Raw Probability: {result['probability']:.4f}") |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
|
|
|
elif model is None: |
|
|
st.warning("Model failed to load. Cannot make prediction.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |