import streamlit as st import pandas as pd import numpy as np import torch import torch.nn as nn from sklearn.preprocessing import PowerTransformer import matplotlib.pyplot as plt import shap import os import json import pickle import sys import warnings # Suppress OpenMP warnings warnings.filterwarnings("ignore", message=".*OpenMP.*") # Suppress PowerTransformer feature names warning warnings.filterwarnings("ignore", message=".*has feature names.*") # Get the absolute path of the current file current_dir = os.path.dirname(os.path.abspath(__file__)) # Create temp directory for plots if it doesn't exist os.makedirs(os.path.join(current_dir, 'temp'), exist_ok=True) # Define the model classes from 2wayembed.py class FeatureEmbedding(nn.Module): def __init__(self, input_dim=1, embedding_dim=32): super().__init__() self.embedding = nn.Sequential( nn.Linear(input_dim, embedding_dim), nn.ReLU(), nn.Linear(embedding_dim, embedding_dim) ) def forward(self, x): return self.embedding(x) class TabularTransformerWithEmbedding(nn.Module): def __init__(self, num_features=6, embedding_dim=32, output_dim=1, num_attention_heads=4): super().__init__() self.num_features = num_features self.embedding_dim = embedding_dim # Create separate embedding for each feature self.feature_embeddings = nn.ModuleList([ FeatureEmbedding(input_dim=1, embedding_dim=embedding_dim) for _ in range(num_features) ]) # 1D Feature Attention (attention across features) self.feature_attention = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_attention_heads) self.feature_norm = nn.LayerNorm(embedding_dim) # 1D Sample Attention (attention across samples/rows in batch) self.sample_attention = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_attention_heads) self.sample_norm = nn.LayerNorm(embedding_dim) # Combine layer self.combine_layer = nn.Linear(embedding_dim*2, embedding_dim) self.combine_activation = nn.ReLU() # Output layers self.output_layers = nn.Sequential( nn.Linear(embedding_dim, embedding_dim), nn.ReLU(), nn.Linear(embedding_dim, output_dim) ) def forward(self, x): # x shape: (batch_size, num_features) batch_size = x.shape[0] # Project each feature to embedding space embedded_features = [] for i in range(self.num_features): # Extract single feature and project to embedding dimension feature = x[:, i:i+1] # (batch_size, 1) projected = self.feature_embeddings[i](feature) # (batch_size, embedding_dim) embedded_features.append(projected) # Stack features for attention # Shape: (num_features, batch_size, embedding_dim) embeddings = torch.stack(embedded_features) # 1. Feature Attention (attending to features) # Each feature attends to all other features # Apply feature attention in multiple layers feature_attended = embeddings for _ in range(4): # Apply attention attended_layer, _ = self.feature_attention(feature_attended, feature_attended, feature_attended) # Add residual connection feature_attended = attended_layer + feature_attended # Apply layer normalization feature_attended = self.feature_norm(feature_attended) # 2. Sample Attention (attending to samples) # Permute to make batch dimension first for sample attention # Shape: (batch_size, num_features, embedding_dim) sample_input = embeddings.permute(1, 0, 2) # Permute back for attention: (num_features, batch_size, embedding_dim) sample_input = sample_input.permute(1, 0, 2) # Apply sample attention in multiple layers sample_attended = sample_input for _ in range(4): # Apply attention attended_layer, _ = self.sample_attention(sample_attended, sample_attended, sample_attended) # Add residual connection sample_attended = attended_layer + sample_attended # Apply layer normalization sample_attended = self.sample_norm(sample_attended) # Combine both attention mechanisms # First, make batch dimension first for both # Shape: (batch_size, num_features, embedding_dim) feature_attended = feature_attended.permute(1, 0, 2) sample_attended = sample_attended.permute(1, 0, 2) # Mean across features to get a single vector per sample # Shape: (batch_size, embedding_dim) feature_pooled = feature_attended.mean(dim=1) sample_pooled = sample_attended.mean(dim=1) # Concatenate the two attention results # Shape: (batch_size, embedding_dim*2) combined = torch.cat([feature_pooled, sample_pooled], dim=1) # Project back to embedding_dim combined = self.combine_layer(combined) combined = self.combine_activation(combined) # Final output layers output = self.output_layers(combined) # (batch_size, output_dim) return output class ShapModel: def __init__(self, model): self.model = model def __call__(self, X): with torch.no_grad(): X_tensor = torch.FloatTensor(X.values if isinstance(X, pd.DataFrame) else X) output = self.model(X_tensor) return output.numpy() @st.cache_resource def load_model_and_scalers(): """Load the model, scalers, and data""" # Set paths relative to the current file model_path = os.path.join(current_dir, "best_val_r2_model.pth") data_path = os.path.join(current_dir, "data.xlsx") scaler_x_path = os.path.join(current_dir, "scaler_X.pkl") scaler_y_path = os.path.join(current_dir, "scaler_y.pkl") # Load data df = pd.read_excel(data_path) X = df.iloc[:, 0:6] # First 6 columns for features y = df.iloc[:, 6] # 7th column for target (Y) feature_names = X.columns.tolist() # Initialize model model = TabularTransformerWithEmbedding(num_features=6, embedding_dim=32, output_dim=1, num_attention_heads=4) # Load model state dict state_dict = torch.load(model_path) # Remove feature_weights if present in the state dict but not in the model if 'feature_weights' in state_dict and not hasattr(model, 'feature_weights'): del state_dict['feature_weights'] # Load the state dict with strict=False to allow missing keys model.load_state_dict(state_dict, strict=False) model.eval() # Load saved scalers with error handling try: with open(scaler_x_path, 'rb') as f: scaler_X = pickle.load(f) with open(scaler_y_path, 'rb') as f: scaler_y = pickle.load(f) except (FileNotFoundError, pickle.UnpicklingError) as e: # If saved scalers not found or unpickling error, create new ones st.warning(f"Issue with saved scalers: {str(e)}. Creating new scalers.") scaler_X = PowerTransformer(method='yeo-johnson') scaler_y = PowerTransformer(method='yeo-johnson') # Fit scalers scaler_X.fit(X) scaler_y.fit(y.values.reshape(-1, 1)) # Save the new scalers with open(scaler_x_path, 'wb') as f: pickle.dump(scaler_X, f) with open(scaler_y_path, 'wb') as f: pickle.dump(scaler_y, f) # Save feature names for later use with open(os.path.join(current_dir, 'feature_names.json'), 'w') as f: json.dump(feature_names, f) return model, scaler_X, scaler_y, feature_names, X def explain_prediction(model, input_df, X_background, scaler_X, scaler_y, feature_names): """Generate SHAP explanation for a prediction""" try: # Create a prediction function for SHAP def predict_fn(X): try: # Convert to numpy array if it's a DataFrame to avoid feature names warning X_array = X.values if isinstance(X, pd.DataFrame) else X X_tensor = torch.FloatTensor(scaler_X.transform(X_array)) with torch.no_grad(): scaled_pred = model(X_tensor).numpy() return scaler_y.inverse_transform(scaled_pred) except Exception as e: st.error(f"Error in prediction function: {str(e)}") # Return zeros as fallback return np.zeros((X_array.shape[0], 1)) # Create a ShapModel instance shap_model = ShapModel(model) # Calculate SHAP values background = shap.kmeans(X_background.values, 10) explainer = shap.KernelExplainer(predict_fn, background) # Get SHAP values for the input # Convert to numpy array to avoid feature names warning input_array = input_df.values shap_values = explainer.shap_values(input_array) # Handle different SHAP value formats if isinstance(shap_values, list): shap_values = np.array(shap_values[0]) # Ensure correct shape for waterfall plot if len(shap_values.shape) > 1: if shap_values.shape[0] == len(feature_names): shap_values = shap_values.T shap_values = shap_values.flatten() # Create waterfall plot plt.figure(figsize=(10, 6)) shap.plots.waterfall( shap.Explanation( values=shap_values, base_values=explainer.expected_value if np.isscalar(explainer.expected_value) else explainer.expected_value[0], data=input_df.iloc[0].values, feature_names=feature_names ), show=False ) plt.title('Feature Contributions to Prediction') plt.tight_layout() # Save the plot to a temporary file temp_dir = os.path.join(current_dir, 'temp') os.makedirs(temp_dir, exist_ok=True) temp_file = os.path.join(temp_dir, 'shap_explanation.png') plt.savefig(temp_file, dpi=300, bbox_inches='tight') plt.close() return explainer.expected_value, shap_values, temp_file except Exception as e: st.error(f"Error generating explanation: {str(e)}") return 0, np.zeros(len(feature_names)), None def model_predict(model, input_df, scaler_X, scaler_y): """Make a prediction using the model""" try: # Scale input data # Convert DataFrame to numpy array before transformation to avoid feature names warning X_scaled = scaler_X.transform(input_df.values) X_tensor = torch.FloatTensor(X_scaled) # Make prediction with torch.no_grad(): scaled_pred = model(X_tensor).numpy() # Inverse transform to get original scale prediction prediction = scaler_y.inverse_transform(scaled_pred) return prediction.flatten() except Exception as e: st.error(f"Error making prediction: {str(e)}") # Return a default value in case of error return np.array([0.0]) # Set page title and description st.set_page_config( page_title="Soil Resistivity Predictor", page_icon="🧪", layout="wide" ) st.title("Soil Resistivity Prediction Tool") st.markdown(""" This application predicts soil resistivity based on various soil properties using a deep learning model. Enter the soil properties below and click the 'Predict Resistivity' button to get a prediction. """) # Ensure temp directory exists temp_dir = os.path.join(current_dir, 'temp') os.makedirs(temp_dir, exist_ok=True) # Add a session state to track if this is the first run if 'first_run' not in st.session_state: st.session_state.first_run = True # Clear any existing temp files on first run for file in os.listdir(temp_dir): if file.endswith('.png'): try: os.remove(os.path.join(temp_dir, file)) except: pass # Load model and scalers try: model, scaler_X, scaler_y, feature_names, X = load_model_and_scalers() # Create input fields for features st.subheader("Input Features") # Create two columns for input fields col1, col2 = st.columns(2) # Dictionary to store input values input_values = {} # Create input fields split between two columns for i, feature in enumerate(feature_names): # Get min and max values for each feature min_val = float(X[feature].min()) max_val = float(X[feature].max()) # Add input field to alternating columns with col1 if i < len(feature_names)//2 else col2: # Use session state to maintain values between reruns if f'input_{feature}' not in st.session_state: st.session_state[f'input_{feature}'] = float(X[feature].mean()) input_values[feature] = st.number_input( f"{feature}", min_value=float(min_val * 0.9), # Allow slightly below min max_value=float(max_val * 1.1), # Allow slightly above max value=st.session_state[f'input_{feature}'], key=f'input_widget_{feature}', help=f"Range: {min_val:.2f} to {max_val:.2f}" ) # Update session state with current value st.session_state[f'input_{feature}'] = input_values[feature] # Add predict button if st.button("Predict Resistivity", type="primary"): try: # Create input DataFrame input_df = pd.DataFrame([input_values]) # Make prediction with st.spinner("Calculating prediction..."): prediction = model_predict(model, input_df, scaler_X, scaler_y) # Display prediction st.subheader("Prediction Result") st.markdown(f"### Predicted Resistivity: {prediction[0]:.2f} Ω·m") # Calculate and display SHAP values with st.spinner("Generating explanation..."): st.subheader("Feature Importance Explanation") # Get SHAP values using the training data as background expected_value, shap_values, temp_file = explain_prediction( model, input_df, X, scaler_X, scaler_y, feature_names ) # Display the waterfall plot if temp_file and os.path.exists(temp_file): try: st.image(temp_file) except Exception as img_error: st.error(f"Error displaying SHAP explanation image: {str(img_error)}") else: st.warning("Could not generate SHAP explanation plot.") except Exception as pred_error: st.error(f"Error during prediction process: {str(pred_error)}") st.exception(pred_error) except Exception as e: st.error(f""" Error loading the model and data. Please make sure: 1. The model file 'best_val_r2_model.pth' exists in the application directory 2. The data file 'data.xlsx' exists in the application directory 3. The scaler files 'scaler_X.pkl' and 'scaler_y.pkl' exist in the application directory 4. All required packages are installed Error details: {str(e)} """) # Show detailed error information st.exception(e)