Spaces:
Build error
Build error
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from sklearn.preprocessing import PowerTransformer | |
| import matplotlib.pyplot as plt | |
| import shap | |
| import os | |
| import json | |
| import pickle | |
| import sys | |
| import warnings | |
| # Suppress OpenMP warnings | |
| warnings.filterwarnings("ignore", message=".*OpenMP.*") | |
| # Suppress PowerTransformer feature names warning | |
| warnings.filterwarnings("ignore", message=".*has feature names.*") | |
| # Get the absolute path of the current file | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Create temp directory for plots if it doesn't exist | |
| os.makedirs(os.path.join(current_dir, 'temp'), exist_ok=True) | |
| # Define the model classes from 2wayembed.py | |
| class FeatureEmbedding(nn.Module): | |
| def __init__(self, input_dim=1, embedding_dim=32): | |
| super().__init__() | |
| self.embedding = nn.Sequential( | |
| nn.Linear(input_dim, embedding_dim), | |
| nn.ReLU(), | |
| nn.Linear(embedding_dim, embedding_dim) | |
| ) | |
| def forward(self, x): | |
| return self.embedding(x) | |
| class TabularTransformerWithEmbedding(nn.Module): | |
| def __init__(self, num_features=6, embedding_dim=32, output_dim=1, num_attention_heads=4): | |
| super().__init__() | |
| self.num_features = num_features | |
| self.embedding_dim = embedding_dim | |
| # Create separate embedding for each feature | |
| self.feature_embeddings = nn.ModuleList([ | |
| FeatureEmbedding(input_dim=1, embedding_dim=embedding_dim) | |
| for _ in range(num_features) | |
| ]) | |
| # 1D Feature Attention (attention across features) | |
| self.feature_attention = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_attention_heads) | |
| self.feature_norm = nn.LayerNorm(embedding_dim) | |
| # 1D Sample Attention (attention across samples/rows in batch) | |
| self.sample_attention = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_attention_heads) | |
| self.sample_norm = nn.LayerNorm(embedding_dim) | |
| # Combine layer | |
| self.combine_layer = nn.Linear(embedding_dim*2, embedding_dim) | |
| self.combine_activation = nn.ReLU() | |
| # Output layers | |
| self.output_layers = nn.Sequential( | |
| nn.Linear(embedding_dim, embedding_dim), | |
| nn.ReLU(), | |
| nn.Linear(embedding_dim, output_dim) | |
| ) | |
| def forward(self, x): | |
| # x shape: (batch_size, num_features) | |
| batch_size = x.shape[0] | |
| # Project each feature to embedding space | |
| embedded_features = [] | |
| for i in range(self.num_features): | |
| # Extract single feature and project to embedding dimension | |
| feature = x[:, i:i+1] # (batch_size, 1) | |
| projected = self.feature_embeddings[i](feature) # (batch_size, embedding_dim) | |
| embedded_features.append(projected) | |
| # Stack features for attention | |
| # Shape: (num_features, batch_size, embedding_dim) | |
| embeddings = torch.stack(embedded_features) | |
| # 1. Feature Attention (attending to features) | |
| # Each feature attends to all other features | |
| # Apply feature attention in multiple layers | |
| feature_attended = embeddings | |
| for _ in range(4): | |
| # Apply attention | |
| attended_layer, _ = self.feature_attention(feature_attended, feature_attended, feature_attended) | |
| # Add residual connection | |
| feature_attended = attended_layer + feature_attended | |
| # Apply layer normalization | |
| feature_attended = self.feature_norm(feature_attended) | |
| # 2. Sample Attention (attending to samples) | |
| # Permute to make batch dimension first for sample attention | |
| # Shape: (batch_size, num_features, embedding_dim) | |
| sample_input = embeddings.permute(1, 0, 2) | |
| # Permute back for attention: (num_features, batch_size, embedding_dim) | |
| sample_input = sample_input.permute(1, 0, 2) | |
| # Apply sample attention in multiple layers | |
| sample_attended = sample_input | |
| for _ in range(4): | |
| # Apply attention | |
| attended_layer, _ = self.sample_attention(sample_attended, sample_attended, sample_attended) | |
| # Add residual connection | |
| sample_attended = attended_layer + sample_attended | |
| # Apply layer normalization | |
| sample_attended = self.sample_norm(sample_attended) | |
| # Combine both attention mechanisms | |
| # First, make batch dimension first for both | |
| # Shape: (batch_size, num_features, embedding_dim) | |
| feature_attended = feature_attended.permute(1, 0, 2) | |
| sample_attended = sample_attended.permute(1, 0, 2) | |
| # Mean across features to get a single vector per sample | |
| # Shape: (batch_size, embedding_dim) | |
| feature_pooled = feature_attended.mean(dim=1) | |
| sample_pooled = sample_attended.mean(dim=1) | |
| # Concatenate the two attention results | |
| # Shape: (batch_size, embedding_dim*2) | |
| combined = torch.cat([feature_pooled, sample_pooled], dim=1) | |
| # Project back to embedding_dim | |
| combined = self.combine_layer(combined) | |
| combined = self.combine_activation(combined) | |
| # Final output layers | |
| output = self.output_layers(combined) # (batch_size, output_dim) | |
| return output | |
| class ShapModel: | |
| def __init__(self, model): | |
| self.model = model | |
| def __call__(self, X): | |
| with torch.no_grad(): | |
| X_tensor = torch.FloatTensor(X.values if isinstance(X, pd.DataFrame) else X) | |
| output = self.model(X_tensor) | |
| return output.numpy() | |
| def load_model_and_scalers(): | |
| """Load the model, scalers, and data""" | |
| # Set paths relative to the current file | |
| model_path = os.path.join(current_dir, "best_val_r2_model.pth") | |
| data_path = os.path.join(current_dir, "data.xlsx") | |
| scaler_x_path = os.path.join(current_dir, "scaler_X.pkl") | |
| scaler_y_path = os.path.join(current_dir, "scaler_y.pkl") | |
| # Load data | |
| df = pd.read_excel(data_path) | |
| X = df.iloc[:, 0:6] # First 6 columns for features | |
| y = df.iloc[:, 6] # 7th column for target (Y) | |
| feature_names = X.columns.tolist() | |
| # Initialize model | |
| model = TabularTransformerWithEmbedding(num_features=6, embedding_dim=32, output_dim=1, num_attention_heads=4) | |
| # Load model state dict | |
| state_dict = torch.load(model_path) | |
| # Remove feature_weights if present in the state dict but not in the model | |
| if 'feature_weights' in state_dict and not hasattr(model, 'feature_weights'): | |
| del state_dict['feature_weights'] | |
| # Load the state dict with strict=False to allow missing keys | |
| model.load_state_dict(state_dict, strict=False) | |
| model.eval() | |
| # Load saved scalers with error handling | |
| try: | |
| with open(scaler_x_path, 'rb') as f: | |
| scaler_X = pickle.load(f) | |
| with open(scaler_y_path, 'rb') as f: | |
| scaler_y = pickle.load(f) | |
| except (FileNotFoundError, pickle.UnpicklingError) as e: | |
| # If saved scalers not found or unpickling error, create new ones | |
| st.warning(f"Issue with saved scalers: {str(e)}. Creating new scalers.") | |
| scaler_X = PowerTransformer(method='yeo-johnson') | |
| scaler_y = PowerTransformer(method='yeo-johnson') | |
| # Fit scalers | |
| scaler_X.fit(X) | |
| scaler_y.fit(y.values.reshape(-1, 1)) | |
| # Save the new scalers | |
| with open(scaler_x_path, 'wb') as f: | |
| pickle.dump(scaler_X, f) | |
| with open(scaler_y_path, 'wb') as f: | |
| pickle.dump(scaler_y, f) | |
| # Save feature names for later use | |
| with open(os.path.join(current_dir, 'feature_names.json'), 'w') as f: | |
| json.dump(feature_names, f) | |
| return model, scaler_X, scaler_y, feature_names, X | |
| def explain_prediction(model, input_df, X_background, scaler_X, scaler_y, feature_names): | |
| """Generate SHAP explanation for a prediction""" | |
| try: | |
| # Create a prediction function for SHAP | |
| def predict_fn(X): | |
| try: | |
| # Convert to numpy array if it's a DataFrame to avoid feature names warning | |
| X_array = X.values if isinstance(X, pd.DataFrame) else X | |
| X_tensor = torch.FloatTensor(scaler_X.transform(X_array)) | |
| with torch.no_grad(): | |
| scaled_pred = model(X_tensor).numpy() | |
| return scaler_y.inverse_transform(scaled_pred) | |
| except Exception as e: | |
| st.error(f"Error in prediction function: {str(e)}") | |
| # Return zeros as fallback | |
| return np.zeros((X_array.shape[0], 1)) | |
| # Create a ShapModel instance | |
| shap_model = ShapModel(model) | |
| # Calculate SHAP values | |
| background = shap.kmeans(X_background.values, 10) | |
| explainer = shap.KernelExplainer(predict_fn, background) | |
| # Get SHAP values for the input | |
| # Convert to numpy array to avoid feature names warning | |
| input_array = input_df.values | |
| shap_values = explainer.shap_values(input_array) | |
| # Handle different SHAP value formats | |
| if isinstance(shap_values, list): | |
| shap_values = np.array(shap_values[0]) | |
| # Ensure correct shape for waterfall plot | |
| if len(shap_values.shape) > 1: | |
| if shap_values.shape[0] == len(feature_names): | |
| shap_values = shap_values.T | |
| shap_values = shap_values.flatten() | |
| # Create waterfall plot | |
| plt.figure(figsize=(10, 6)) | |
| shap.plots.waterfall( | |
| shap.Explanation( | |
| values=shap_values, | |
| base_values=explainer.expected_value if np.isscalar(explainer.expected_value) | |
| else explainer.expected_value[0], | |
| data=input_df.iloc[0].values, | |
| feature_names=feature_names | |
| ), | |
| show=False | |
| ) | |
| plt.title('Feature Contributions to Prediction') | |
| plt.tight_layout() | |
| # Save the plot to a temporary file | |
| temp_dir = os.path.join(current_dir, 'temp') | |
| os.makedirs(temp_dir, exist_ok=True) | |
| temp_file = os.path.join(temp_dir, 'shap_explanation.png') | |
| plt.savefig(temp_file, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return explainer.expected_value, shap_values, temp_file | |
| except Exception as e: | |
| st.error(f"Error generating explanation: {str(e)}") | |
| return 0, np.zeros(len(feature_names)), None | |
| def model_predict(model, input_df, scaler_X, scaler_y): | |
| """Make a prediction using the model""" | |
| try: | |
| # Scale input data | |
| # Convert DataFrame to numpy array before transformation to avoid feature names warning | |
| X_scaled = scaler_X.transform(input_df.values) | |
| X_tensor = torch.FloatTensor(X_scaled) | |
| # Make prediction | |
| with torch.no_grad(): | |
| scaled_pred = model(X_tensor).numpy() | |
| # Inverse transform to get original scale prediction | |
| prediction = scaler_y.inverse_transform(scaled_pred) | |
| return prediction.flatten() | |
| except Exception as e: | |
| st.error(f"Error making prediction: {str(e)}") | |
| # Return a default value in case of error | |
| return np.array([0.0]) | |
| # Set page title and description | |
| st.set_page_config( | |
| page_title="Soil Resistivity Predictor", | |
| page_icon="🧪", | |
| layout="wide" | |
| ) | |
| st.title("Soil Resistivity Prediction Tool") | |
| st.markdown(""" | |
| This application predicts soil resistivity based on various soil properties using a deep learning model. | |
| Enter the soil properties below and click the 'Predict Resistivity' button to get a prediction. | |
| """) | |
| # Ensure temp directory exists | |
| temp_dir = os.path.join(current_dir, 'temp') | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Add a session state to track if this is the first run | |
| if 'first_run' not in st.session_state: | |
| st.session_state.first_run = True | |
| # Clear any existing temp files on first run | |
| for file in os.listdir(temp_dir): | |
| if file.endswith('.png'): | |
| try: | |
| os.remove(os.path.join(temp_dir, file)) | |
| except: | |
| pass | |
| # Load model and scalers | |
| try: | |
| model, scaler_X, scaler_y, feature_names, X = load_model_and_scalers() | |
| # Create input fields for features | |
| st.subheader("Input Features") | |
| # Create two columns for input fields | |
| col1, col2 = st.columns(2) | |
| # Dictionary to store input values | |
| input_values = {} | |
| # Create input fields split between two columns | |
| for i, feature in enumerate(feature_names): | |
| # Get min and max values for each feature | |
| min_val = float(X[feature].min()) | |
| max_val = float(X[feature].max()) | |
| # Add input field to alternating columns | |
| with col1 if i < len(feature_names)//2 else col2: | |
| # Use session state to maintain values between reruns | |
| if f'input_{feature}' not in st.session_state: | |
| st.session_state[f'input_{feature}'] = float(X[feature].mean()) | |
| input_values[feature] = st.number_input( | |
| f"{feature}", | |
| min_value=float(min_val * 0.9), # Allow slightly below min | |
| max_value=float(max_val * 1.1), # Allow slightly above max | |
| value=st.session_state[f'input_{feature}'], | |
| key=f'input_widget_{feature}', | |
| help=f"Range: {min_val:.2f} to {max_val:.2f}" | |
| ) | |
| # Update session state with current value | |
| st.session_state[f'input_{feature}'] = input_values[feature] | |
| # Add predict button | |
| if st.button("Predict Resistivity", type="primary"): | |
| try: | |
| # Create input DataFrame | |
| input_df = pd.DataFrame([input_values]) | |
| # Make prediction | |
| with st.spinner("Calculating prediction..."): | |
| prediction = model_predict(model, input_df, scaler_X, scaler_y) | |
| # Display prediction | |
| st.subheader("Prediction Result") | |
| st.markdown(f"### Predicted Resistivity: {prediction[0]:.2f} Ω·m") | |
| # Calculate and display SHAP values | |
| with st.spinner("Generating explanation..."): | |
| st.subheader("Feature Importance Explanation") | |
| # Get SHAP values using the training data as background | |
| expected_value, shap_values, temp_file = explain_prediction( | |
| model, input_df, X, scaler_X, scaler_y, feature_names | |
| ) | |
| # Display the waterfall plot | |
| if temp_file and os.path.exists(temp_file): | |
| try: | |
| st.image(temp_file) | |
| except Exception as img_error: | |
| st.error(f"Error displaying SHAP explanation image: {str(img_error)}") | |
| else: | |
| st.warning("Could not generate SHAP explanation plot.") | |
| except Exception as pred_error: | |
| st.error(f"Error during prediction process: {str(pred_error)}") | |
| st.exception(pred_error) | |
| except Exception as e: | |
| st.error(f""" | |
| Error loading the model and data. Please make sure: | |
| 1. The model file 'best_val_r2_model.pth' exists in the application directory | |
| 2. The data file 'data.xlsx' exists in the application directory | |
| 3. The scaler files 'scaler_X.pkl' and 'scaler_y.pkl' exist in the application directory | |
| 4. All required packages are installed | |
| Error details: {str(e)} | |
| """) | |
| # Show detailed error information | |
| st.exception(e) |