import os import numpy as np import tensorflow as tf from PIL import Image from pathlib import Path from tensorflow.keras.applications import ConvNeXtLarge import streamlit as st import io from huggingface_hub import hf_hub_download, try_to_load_from_cache # Removed: setup_logging function and its call # We will now rely on hf_hub_download's default cache behavior def download_model_from_hub(): """ Downloads the model file from the Hugging Face Hub using the default cache. """ try: st.info("Downloading model from Hugging Face Hub (if not already cached)...") # Configuration for Hugging Face Hub repo_id = "Darshan03/convnext_volcano_detector" filename_in_repo = "model.h5" # Use hf_hub_download without specifying cache_dir or local_dir # This uses the default Hugging Face cache location, which is writable in Spaces. local_model_path = hf_hub_download( repo_id=repo_id, filename=filename_in_repo, # Do NOT specify cache_dir or local_dir to avoid permission issues ) st.success(f"Model file available locally at: {local_model_path}") return local_model_path except Exception as e: st.error(f"Error downloading model from Hugging Face Hub: {str(e)}") st.info("Please check the repo ID and filename, and ensure the repository is public or Space has access.") # Re-raise the exception so the Streamlit app knows loading failed raise def create_convnext_model(input_shape=(512, 512, 3)): """ Creates the ConvNeXt model architecture. """ # The base model weights ('imagenet') will be downloaded by TensorFlow/Keras # to its own cache directory if not present. This is usually not a permission # issue in Spaces as TensorFlow uses standard cache locations. base_model = ConvNeXtLarge( include_top=False, weights='imagenet', input_shape=input_shape ) base_model.trainable = False model = tf.keras.Sequential([ base_model, tf.keras.layers.GlobalAveragePooling2D(), tf.keras.layers.Dropout(0.3), tf.keras.layers.Dense(512, activation='relu'), tf.keras.layers.Dropout(0.3), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.3), tf.keras.layers.Dense(1, activation='sigmoid') ]) return model # Use st.cache_resource to cache the loaded model @st.cache_resource def load_model(model_path): """ Loads the Keras model weights from a specified path. """ try: # Removed: logging.info(f"Attempting to load model from {model_path}") # First create the model architecture model = create_convnext_model() # Then load the weights from the downloaded .h5 file model.load_weights(model_path) # Removed: logging.info("Model weights loaded successfully.") return model except Exception as e: # Removed: logging.error(f"Error loading model weights: {str(e)}") st.error(f"Error loading model weights: {str(e)}") st.info("Ensure the downloaded file is a valid Keras .h5 weights file compatible with the ConvNeXtLarge architecture.") # Re-raise the exception so Streamlit knows to stop if model loading fails raise def preprocess_image(image, target_size=(512, 512)): """ Preprocesses the input image for model inference. """ try: # Ensure image is in RGB format if image.mode != 'RGB': image = image.convert('RGB') # Resize image using BICUBIC for potentially better quality than BILINEAR for downsampling img = image.resize(target_size, Image.Resampling.BICUBIC) # Convert to numpy array and normalize img_array = np.array(img, dtype=np.float32) # Use float32 for normalization # Normalize to [0, 1] img_array /= 255. # Apply the same normalization as in training (ImageNet mean/std for ConvNeXt) # These values are typical for ImageNet and often used with pre-trained models. mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) # Standard ImageNet mean std = np.array([0.229, 0.224, 0.225], dtype=np.float32) # Standard ImageNet std # Expand mean and std to match image dimensions for broadcasting mean = mean.reshape(1, 1, 3) std = std.reshape(1, 1, 3) img_array = (img_array - mean) / std # Add batch dimension img_array = np.expand_dims(img_array, axis=0) # Removed: logging.info("Image preprocessed successfully.") return img_array except Exception as e: # Removed: logging.error(f"Error preprocessing image: {str(e)}") st.error(f"Error preprocessing image: {str(e)}") raise def predict_volcano(model, image): """ Makes a prediction using the loaded model. """ if model is None: st.error("Model is not loaded. Cannot make prediction.") return None try: # Preprocess the image processed_image = preprocess_image(image) # Make prediction # Use predict() for inference prediction = model.predict(processed_image) probability = prediction[0][0] # Assuming binary classification with sigmoid output # Determine result based on 0.5 threshold result = "Volcanic Eruption" if probability > 0.5 else "No Volcanic Eruption" # Confidence is the probability for the predicted class confidence = probability if probability > 0.5 else 1 - probability # Removed: logging.info(f"Prediction made: Result={result}, Probability={probability:.4f}") return { "result": result, "confidence": float(confidence), # Convert to standard Python float "probability": float(probability) # Convert to standard Python float } except Exception as e: # Removed: logging.error(f"Error making prediction: {str(e)}") st.error(f"Error making prediction: {str(e)}") raise def get_sample_images(): """ Defines paths for sample images within the Space's repository. Assumes sample_images directory is at the same level as the app script. """ # Get the directory where the current script is located base_dir = Path(__file__).parent.absolute() sample_dir = base_dir / 'sample_images' # Note: In a Space, these files should exist in the repository. # No need to create the directory or handle download here. # Return a dictionary of sample image paths # Check if files exist before including them? Or assume they are in the repo? # Assuming they are in the repo for simplicity. sample_images = { "Select an image": None, "Upload new image": "upload", "Sample Volcano 1": str(sample_dir / "volcano1.jpg"), "Sample Volcano 2": str(sample_dir / "volcano2.jpg"), "Sample No Volcano 1": str(sample_dir / "no_volcano1.jpg"), "Sample No Volcano 2": str(sample_dir / "no_volcano2.jpg") } # Optional: Filter out non-existent sample image paths if you want to be robust # existing_sample_images = {"Select an image": None, "Upload new image": "upload"} # for name, path in sample_images.items(): # if name not in ["Select an image", "Upload new image"] and Path(path).exists(): # existing_sample_images[name] = path # return existing_sample_images return sample_images def main(): st.set_page_config( page_title="Volcano Detection", page_icon="🌋", layout="centered" ) st.title("🌋 Volcano Detection") st.write("Select or upload an image to detect if it contains a volcanic eruption.") # --- Model Loading --- # Always attempt to download from Hub (uses cache). This is the robust way # to handle model availability in a Space. model_path = None try: model_path = download_model_from_hub() except Exception as e: # Error message already shown in download_model_from_hub pass # Allow the rest of the app to load, but model will be None # Load the model if path was successfully obtained model = None if model_path: try: model = load_model(model_path) except Exception as e: # Error message already shown in load_model pass # Model remains None # --- Image Selection and Prediction --- # Get sample images sample_images = get_sample_images() # Create the image selection interface selected_option = st.selectbox("Choose an image", list(sample_images.keys())) # Handle image selection image = None if selected_option == "Upload new image": uploaded_file = st.file_uploader("Upload your image", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: try: image = Image.open(uploaded_file) # Removed: logging.info("Image uploaded by user.") except Exception as e: st.error(f"Error opening uploaded image: {str(e)}") # Removed: logging.error(f"Error opening uploaded image: {str(e)}") elif selected_option != "Select an image" and sample_images[selected_option] is not None: try: sample_image_path = sample_images[selected_option] if Path(sample_image_path).exists(): image = Image.open(sample_image_path) # Removed: logging.info(f"Sample image '{selected_option}' loaded.") else: st.warning(f"Sample image file not found: {sample_image_path}") # Removed: logging.warning(f"Sample image file not found: {sample_image_path}") except Exception as e: st.error(f"Error loading sample image '{selected_option}': {str(e)}") # Removed: logging.error(f"Error loading sample image '{selected_option}': {str(e)}") if image is not None: # Display the image st.image(image, caption="Selected Image", use_column_width=True) # Add a predict button # Only show predict button if the model is loaded if st.button("Detect Volcano") and model is not None: with st.spinner("Analyzing image..."): try: result = predict_volcano(model, image) if result: # Display results in a nice format st.markdown("### Results") # Create columns for the results col1, col2 = st.columns(2) with col1: st.metric("Prediction", result["result"]) with col2: st.metric("Confidence", f"{result['confidence']:.2%}") # Add a progress bar for the probability # Ensure probability is within [0, 1] for the progress bar st.progress(max(0.0, min(1.0, result["probability"]))) st.write(f"Raw Probability: {result['probability']:.4f}") except Exception as e: # Error message already shown in predict_volcano or preprocess_image pass # Prediction failed, error already displayed elif model is None: st.warning("Model failed to load. Cannot make prediction.") if __name__ == "__main__": main()