import os # Disable OpenMP os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' os.environ['OMP_NUM_THREADS'] = '1' os.environ['OPENBLAS_NUM_THREADS'] = '1' os.environ['MKL_NUM_THREADS'] = '1' os.environ['VECLIB_MAXIMUM_THREADS'] = '1' os.environ['NUMEXPR_NUM_THREADS'] = '1' import streamlit as st import torch import numpy as np import pandas as pd import matplotlib.pyplot as plt import shap from sklearn.preprocessing import MinMaxScaler import plotly.graph_objects as go import io from matplotlib.figure import Figure import math import torch.nn.functional as F # Set page config st.set_page_config( page_title="Waste Properties Predictor", page_icon="🔄", layout="wide" ) # Custom CSS to improve the app's appearance st.markdown(""" """, unsafe_allow_html=True) # Load the trained model and recreate the architecture for both friction and cohesion class DualStreamNet(torch.nn.Module): def __init__(self, input_size): super(DualStreamNet, self).__init__() # Stream 1: Original MLP self.mlp_fc1 = torch.nn.Linear(input_size, 64) self.mlp_fc2 = torch.nn.Linear(64, 1000) self.mlp_fc3 = torch.nn.Linear(1000, 200) self.mlp_fc4 = torch.nn.Linear(200, 8) # Stream 2: Feature Attention Mechanism self.feature_attention_dim = 16 self.feature_projection = torch.nn.Linear(input_size, self.feature_attention_dim) self.feature_query = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) self.feature_key = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) self.feature_value = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) self.feature_norm = torch.nn.LayerNorm(self.feature_attention_dim) # Stream 3: Batch Attention Mechanism self.batch_attention_dim = 16 self.batch_projection = torch.nn.Linear(input_size, self.batch_attention_dim) self.batch_query = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) self.batch_key = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) self.batch_value = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) self.batch_norm = torch.nn.LayerNorm(self.batch_attention_dim) # Feature Attention stream MLP self.feature_att_fc1 = torch.nn.Linear(self.feature_attention_dim, 32) self.feature_att_fc2 = torch.nn.Linear(32, 8) # Batch Attention stream MLP self.batch_att_fc1 = torch.nn.Linear(self.batch_attention_dim, 32) self.batch_att_fc2 = torch.nn.Linear(32, 8) # Concatenated output self.final_fc = torch.nn.Linear(24, 1) # 8 from MLP + 8 from feature attention + 8 from batch attention self.dropout = torch.nn.Dropout(0.2) # Initialize weights self.apply(self._init_weights) def _init_weights(self, module): if isinstance(module, torch.nn.Linear): torch.nn.init.xavier_uniform_(module.weight) if module.bias is not None: module.bias.data.zero_() def feature_attention(self, x): # Project input to attention dimension projected = self.feature_projection(x) # Self-attention mechanism across features query = self.feature_query(projected) key = self.feature_key(projected) value = self.feature_value(projected) # Calculate attention scores scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.feature_attention_dim) attention_weights = F.softmax(scores, dim=-1) # Apply attention weights context = torch.matmul(attention_weights, value) # Add residual connection and normalize context = context + projected context = self.feature_norm(context) return context def batch_attention(self, x): batch_size = x.size(0) # If batch size is 1, we can't do batch attention if batch_size <= 1: return self.feature_projection(x) # Project input to attention dimension projected = self.batch_projection(x) # Self-attention mechanism across batch dimension query = self.batch_query(projected) key = self.batch_key(projected) value = self.batch_value(projected) # Calculate attention scores across batch dimension # Reshape tensors for batch-wise attention query_reshaped = query.view(batch_size, -1) # (batch_size, feature_dim) key_reshaped = key.view(batch_size, -1) # (batch_size, feature_dim) # Compute similarity between samples in the batch scores = torch.mm(query_reshaped, key_reshaped.t()) / math.sqrt(key_reshaped.size(1)) attention_weights = F.softmax(scores, dim=1) # (batch_size, batch_size) # Weighted sum of values across batch dimension batch_context = torch.mm(attention_weights, value.view(batch_size, -1)) batch_context = batch_context.view(batch_size, -1) # Reshape back # Add residual connection and normalize context = batch_context.view_as(projected) + projected context = self.batch_norm(context) return context def forward(self, x): # Stream 1: Original MLP mlp_x = F.relu(self.mlp_fc1(x)) mlp_x = self.dropout(mlp_x) mlp_x = F.relu(self.mlp_fc2(mlp_x)) mlp_x = self.dropout(mlp_x) mlp_x = F.relu(self.mlp_fc3(mlp_x)) mlp_x = self.dropout(mlp_x) mlp_x = F.relu(self.mlp_fc4(mlp_x)) mlp_x = self.dropout(mlp_x) # Stream 2: Feature Attention mechanism feature_att_x = self.feature_attention(x) feature_att_x = F.relu(self.feature_att_fc1(feature_att_x)) feature_att_x = self.dropout(feature_att_x) feature_att_x = F.relu(self.feature_att_fc2(feature_att_x)) feature_att_x = self.dropout(feature_att_x) # Stream 3: Batch Attention mechanism batch_att_x = self.batch_attention(x) batch_att_x = F.relu(self.batch_att_fc1(batch_att_x)) batch_att_x = self.dropout(batch_att_x) batch_att_x = F.relu(self.batch_att_fc2(batch_att_x)) batch_att_x = self.dropout(batch_att_x) # Concatenate outputs from all three streams combined = torch.cat([mlp_x, feature_att_x, batch_att_x], dim=1) # Final prediction output = self.final_fc(combined) return output @st.cache_resource def load_model_and_data(): # Set device and random seeds np.random.seed(32) torch.manual_seed(42) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Load data data = pd.read_excel("Data_syw_r.xlsx") # Updated to use Data_syw_r.xlsx X = data.iloc[:, list(range(1, 17)) + list(range(21, 23))] # Friction data y_friction = data.iloc[:, 28].values correlation_with_friction = abs(X.corrwith(pd.Series(y_friction))) selected_features_friction = correlation_with_friction[correlation_with_friction > 0.1].index X_friction = X[selected_features_friction] # Cohesion data y_cohesion = data.iloc[:, 25].values correlation_with_cohesion = abs(X.corrwith(pd.Series(y_cohesion))) selected_features_cohesion = correlation_with_cohesion[correlation_with_cohesion > 0.1].index X_cohesion = X[selected_features_cohesion] # Initialize and fit scalers for friction scaler_X_friction = MinMaxScaler() scaler_y_friction = MinMaxScaler() scaler_X_friction.fit(X_friction) scaler_y_friction.fit(y_friction.reshape(-1, 1)) # Initialize and fit scalers for cohesion scaler_X_cohesion = MinMaxScaler() scaler_y_cohesion = MinMaxScaler() scaler_X_cohesion.fit(X_cohesion) scaler_y_cohesion.fit(y_cohesion.reshape(-1, 1)) # Load models friction_model = DualStreamNet(input_size=len(selected_features_friction)).to(device) friction_model.load_state_dict(torch.load('best_friction_model.pt')) friction_model.eval() cohesion_model = DualStreamNet(input_size=len(selected_features_cohesion)).to(device) cohesion_model.load_state_dict(torch.load('cohebest.pt')) cohesion_model.eval() return (friction_model, X_friction.columns, scaler_X_friction, scaler_y_friction, cohesion_model, X_cohesion.columns, scaler_X_cohesion, scaler_y_cohesion, device, X_friction, X_cohesion) def predict_friction(input_values, model, scaler_X, scaler_y, device): # Scale input values input_scaled = scaler_X.transform(input_values) input_tensor = torch.FloatTensor(input_scaled).to(device) # Make prediction with torch.no_grad(): prediction_scaled = model(input_tensor) prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) return prediction[0][0] def predict_cohesion(input_values, model, scaler_X, scaler_y, device): # Scale input values input_scaled = scaler_X.transform(input_values) input_tensor = torch.FloatTensor(input_scaled).to(device) # Make prediction with torch.no_grad(): prediction_scaled = model(input_tensor) prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) return prediction[0][0] def calculate_shap_values(input_values, model, X, scaler_X, scaler_y, device): def model_predict(X): X_scaled = scaler_X.transform(X) X_tensor = torch.FloatTensor(X_scaled).to(device) model.eval() with torch.no_grad(): scaled_predictions = model(X_tensor).cpu().numpy().flatten() # Unscale the predictions return scaler_y.inverse_transform(scaled_predictions.reshape(-1, 1)).flatten() try: # Set random seed for reproducibility np.random.seed(42) # Use k-means for background data background = shap.kmeans(X.values, 10) explainer = shap.KernelExplainer(model_predict, background) # Calculate SHAP values with more samples for stability shap_values = explainer.shap_values(input_values.values, nsamples=200) if isinstance(shap_values, list): shap_values = np.array(shap_values[0]) # Unscale the expected value expected_value = explainer.expected_value if isinstance(expected_value, np.ndarray): expected_value = expected_value[0] return shap_values[0], expected_value except Exception as e: st.error(f"Error calculating SHAP values: {str(e)}") return np.zeros(len(input_values.columns)), 0.0 @st.cache_resource def create_background_data(X, n_samples=50): """Create and cache background data for SHAP calculations""" np.random.seed(42) # Ensure n_samples is not larger than dataset n_samples = min(n_samples, len(X)) background_indices = np.random.choice(len(X), size=n_samples, replace=False) return X.iloc[background_indices].values def create_waterfall_plot(shap_values, feature_names, base_value, input_data, title): # Create SHAP explanation object explanation = shap.Explanation( values=shap_values, base_values=base_value, data=input_data, feature_names=list(feature_names) ) # Create figure fig = plt.figure(figsize=(12, 8)) shap.plots.waterfall(explanation, show=False) plt.title(f'{title} - Local SHAP Value Contributions') plt.tight_layout() # Save plot to a buffer buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight', dpi=300) plt.close(fig) buf.seek(0) return buf def main(): st.title("🔄 Waste Properties Predictor") st.write("This app predicts both friction angle and cohesion based on waste composition and characteristics.") try: # Load models and data (friction_model, friction_features, scaler_X_friction, scaler_y_friction, cohesion_model, cohesion_features, scaler_X_cohesion, scaler_y_cohesion, device, X_friction, X_cohesion) = load_model_and_data() # Create and cache background data for SHAP calculations friction_background = create_background_data(X_friction) cohesion_background = create_background_data(X_cohesion) # Combine all unique features all_features = sorted(list(set(friction_features) | set(cohesion_features))) st.header("Input Parameters") # Add file upload option uploaded_file = st.file_uploader("Upload Excel file with input values", type=['xlsx', 'xls']) # Initialize input values from the data file input_values = {} # Load default values from Data_syw_r.xlsx default_data = pd.read_excel("Data_syw_r.xlsx") if len(default_data) > 0: for feature in all_features: if feature in default_data.columns: input_values[feature] = float(default_data[feature].iloc[1]) # Override with uploaded file if provided if uploaded_file is not None: try: # Read the uploaded file df = pd.read_excel(uploaded_file) if len(df) > 0: # Use the first row of the uploaded file for feature in all_features: if feature in df.columns: input_values[feature] = float(df[feature].iloc[1]) except Exception as e: st.error(f"Error reading file: {str(e)}") st.write("Enter the waste composition and characteristics below to predict both friction angle and cohesion.") # Create two columns for input col1, col2 = st.columns(2) # Create input fields for each feature for i, feature in enumerate(all_features): with col1 if i < len(all_features)//2 else col2: # Get min and max values considering both friction and cohesion datasets if feature in X_friction.columns and feature in X_cohesion.columns: min_val = min(float(X_friction[feature].min()), float(X_cohesion[feature].min())) max_val = max(float(X_friction[feature].max()), float(X_cohesion[feature].max())) elif feature in X_friction.columns: min_val = float(X_friction[feature].min()) max_val = float(X_friction[feature].max()) else: min_val = float(X_cohesion[feature].min()) max_val = float(X_cohesion[feature].max()) # Use the value from input_values if available, otherwise use 0 default_value = input_values.get(feature, 0.0) input_values[feature] = st.number_input( f"{feature}", min_value=min_val, max_value=max_val, value=default_value, format="%.5f", help=f"Range: {min_val:.5f} to {max_val:.5f}" ) # Create DataFrames for both predictions friction_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in friction_features]], columns=friction_features) cohesion_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in cohesion_features]], columns=cohesion_features) if st.button("Predict Properties"): with st.spinner("Calculating predictions and SHAP values..."): # Make predictions friction_prediction = predict_friction(friction_input_df, friction_model, scaler_X_friction, scaler_y_friction, device) cohesion_prediction = predict_cohesion(cohesion_input_df, cohesion_model, scaler_X_cohesion, scaler_y_cohesion, device) # Set random seed before SHAP calculations np.random.seed(42) torch.manual_seed(42) if torch.cuda.is_available(): torch.cuda.manual_seed(42) # Calculate SHAP values using cached background data friction_shap_values, friction_base_value = calculate_shap_values(friction_input_df, friction_model, X_friction, scaler_X_friction, scaler_y_friction, device) cohesion_shap_values, cohesion_base_value = calculate_shap_values(cohesion_input_df, cohesion_model, X_cohesion, scaler_X_cohesion, scaler_y_cohesion, device) # Display results st.header("Prediction Results") col1, col2 = st.columns(2) with col1: st.metric("Friction Angle", f"{friction_prediction:.5f}°") with col2: st.metric("Cohesion", f"{cohesion_prediction:.5f} kPa") # Create and display waterfall plots col1, col2 = st.columns(2) with col1: st.subheader("Friction Angle SHAP Analysis") friction_waterfall_plot = create_waterfall_plot( shap_values=friction_shap_values, feature_names=friction_features, base_value=friction_base_value, input_data=friction_input_df.values[0], title="Friction Angle" ) st.image(friction_waterfall_plot) with col2: st.subheader("Cohesion SHAP Analysis") cohesion_waterfall_plot = create_waterfall_plot( shap_values=cohesion_shap_values, feature_names=cohesion_features, base_value=cohesion_base_value, input_data=cohesion_input_df.values[0], title="Cohesion" ) st.image(cohesion_waterfall_plot) except Exception as e: st.error(f"An error occurred: {str(e)}") st.info("Please try refreshing the page. If the error persists, contact support.") if __name__ == "__main__": main()