import gradio as gr
import matplotlib.pyplot as plt
import io
import base64
import shap
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import requests
import re
import json
import pickle # Added for loading .pkl and .json files
import os # Added for path checking
from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder # Explicitly import LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
# --- Device Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# --- Model Architecture (MultiTaskMLP) ---
class MultiTaskMLP(nn.Module):
def __init__(self, input_dim, num_classes, dropout_rate=0.3):
super(MultiTaskMLP, self).__init__()
self.shared_layers = nn.Sequential(
nn.Linear(input_dim, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(dropout_rate)
)
self.regression_head = nn.Sequential(
nn.Linear(128, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(64, 1)
)
self.classification_head = nn.Sequential(
nn.Linear(128, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(64, num_classes)
)
def forward(self, x):
shared_features = self.shared_layers(x)
ddg_output = self.regression_head(shared_features)
effect_output = self.classification_head(shared_features)
return ddg_output, effect_output
# --- SHAP Wrapper Classes ---
class RegressionHeadWrapper(nn.Module):
def __init__(self, original_model):
super().__init__()
self.original_model = original_model
def forward(self, x):
ddg_output, _ = self.original_model(x)
return ddg_output
class ClassificationHeadWrapper(nn.Module):
def __init__(self, original_model):
super().__init__()
self.original_model = original_model
def forward(self, x):
_, effect_output = self.original_model(x)
return effect_output
# --- Helper Function: plot_to_base64 ---
def plot_to_base64(fig):
buffer = io.BytesIO()
fig.savefig(buffer, format='png', bbox_inches='tight')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.read()).decode('utf-8')
plt.close(fig)
return f""
# --- Helper Function: AA_PROPERTIES and get_aa_property_comparison ---
# AA_PROPERTIES is now loaded from global_vars.json in the main script
def get_aa_property_comparison(original_aa, mutant_aa):
# global AA_PROPERTIES is assumed to be loaded after global_vars.json
if original_aa not in AA_PROPERTIES or mutant_aa not in AA_PROPERTIES:
return "Invalid amino acid code(s) for comparison."
results = {}
for prop, _ in AA_PROPERTIES['A'].items():
orig_val = AA_PROPERTIES[original_aa].get(prop, 0)
mut_val = AA_PROPERTIES[mutant_aa].get(prop, 0)
change = mut_val - orig_val
results[prop] = {
'original': orig_val,
'mutant': mut_val,
'change': f'{change:.2f}'
}
return results
# --- Helper Function: get_top_features_by_shap ---
def get_top_features_by_shap(explainer_obj, background_data_tensor, feature_names, top_n=10, task_type='regression'):
if task_type == 'regression':
shap_values_background = explainer_obj.shap_values(background_data_tensor)
if isinstance(shap_values_background, list):
shap_values_background = np.array(shap_values_background).squeeze()
elif shap_values_background.ndim == 3:
shap_values_background = shap_values_background.squeeze(axis=-1)
elif task_type == 'classification':
shap_values_background_raw = explainer_obj.shap_values(background_data_tensor)
if isinstance(shap_values_background_raw, list):
abs_shap_values = np.mean(np.abs(np.array(shap_values_background_raw)), axis=0)
elif shap_values_background_raw.ndim == 3:
abs_shap_values = np.mean(np.abs(shap_values_background_raw), axis=-1)
else:
abs_shap_values = np.abs(shap_values_background_raw)
shap_values_background = abs_shap_values
else:
raise ValueError("task_type must be 'regression' or 'classification'")
mean_abs_shap = np.mean(np.abs(shap_values_background), axis=0)
feature_importance = pd.DataFrame({
'Feature': feature_names,
'Mean_Abs_SHAP': mean_abs_shap
})
top_features = feature_importance.sort_values(by='Mean_Abs_SHAP', ascending=False).head(top_n)
return top_features.to_dict(orient='records')
# --- Helper Function: plot_local_feature_importance ---
def plot_local_feature_importance(shap_values, feature_names, title="Local Feature Importance", top_n=10):
if shap_values.ndim > 1:
shap_values = shap_values.flatten()
feature_importance = pd.DataFrame({
'Feature': feature_names,
'SHAP_Value': shap_values
})
feature_importance['Abs_SHAP'] = np.abs(feature_importance['SHAP_Value'])
feature_importance = feature_importance.sort_values(by='Abs_SHAP', ascending=False).head(top_n)
feature_importance = feature_importance.sort_values(by='SHAP_Value', ascending=True)
fig, ax = plt.subplots(figsize=(10, 6))
colors = ['red' if x < 0 else 'blue' for x in feature_importance['SHAP_Value']]
ax.barh(feature_importance['Feature'], feature_importance['SHAP_Value'], color=colors)
ax.set_xlabel('SHAP Value (Impact on Model Output)')
ax.set_ylabel('Feature')
ax.set_title(title)
fig.tight_layout()
return plot_to_base64(fig)
# --- Helper Function: get_mutation_context_summary ---
def get_mutation_context_summary(input_data_df):
summary_html = "
Secondary Structure (sst): {sst}
" if isinstance(rsa_val, (int, float)): summary_html += f"Relative Solvent Accessibility (rsa): {rsa_val:.2f} (0=Buried, 1=Exposed)
" else: summary_html += f"Relative Solvent Accessibility (rsa): {rsa_val}
" if isinstance(res_depth_val, (int, float)): summary_html += f"Residue Depth: {res_depth_val:.2f} (Higher value = More buried)
" else: summary_html += f"Residue Depth: {res_depth_val}
" if sst == 'Strand': summary_html += "Insight: Mutations in strand regions might significantly disrupt beta-sheet structures, affecting protein folding and stability.
" elif sst == 'AlphaHelix': summary_html += "Insight: Alpha-helical mutations can alter helix packing, flexibility, or interactions, potentially impacting stability or function.
" elif sst == 'None': summary_html += "Insight: Mutations in coil/loop regions might introduce flexibility or alter surface interactions.
" if isinstance(rsa_val, (int, float)): if rsa_val < 0.2: summary_html += "Insight: This residue is highly **buried**, suggesting it plays a critical role in the protein's core stability or internal packing. Changes here are often destabilizing.
" elif rsa_val > 0.8: summary_html += "Insight: This residue is highly **exposed**, indicating it might be involved in surface interactions, ligand binding, or protein-protein interfaces. Mutations here could affect function or solvent interactions.
" else: summary_html += "Insight: This residue has intermediate solvent accessibility, potentially involved in both structural integrity and surface interactions.
" if isinstance(res_depth_val, (int, float)): if res_depth_val > 6.0: summary_html += "Insight: The residue is very deep within the protein, reinforcing its role in core structural stability.
" elif res_depth_val < 3.0: summary_html += "Insight: The residue is near the surface, potentially affecting surface interactions or flexibility.
" return summary_html # --- Artifact Loading Logic --- def load_object_from_path(filepath, alternative_filepath=None): if os.path.exists(filepath): with open(filepath, 'rb') as f: return pickle.load(f) elif alternative_filepath and os.path.exists(alternative_filepath): with open(alternative_filepath, 'rb') as f: return pickle.load(f) else: raise FileNotFoundError(f"Object file not found at {filepath} or {alternative_filepath}") def load_json_from_path(filepath, alternative_filepath=None): if os.path.exists(filepath): with open(filepath, 'r') as f: return json.load(f) elif alternative_filepath and os.path.exists(alternative_filepath): with open(alternative_filepath, 'r') as f: return json.load(f) else: raise FileNotFoundError(f"JSON file not found at {filepath} or {alternative_filepath}") # Define file paths preprocessor_file = 'preprocessor.pkl' global_vars_file = 'global_vars.json' feature_names_file = 'feature_names.json' shap_background_data_file = 'shap_background_data.pt' model_save_path = 'final_multi_task_protein_stability_model_non_plm.pth' # Load preprocessor try: preprocessor = load_object_from_path(preprocessor_file, f'src/{preprocessor_file}') except FileNotFoundError as e: raise SystemExit(f"Error loading preprocessor: {e}. Please ensure 'preprocessor.pkl' is available.") # Load global variables try: global_vars = load_json_from_path(global_vars_file, f'src/{global_vars_file}') except FileNotFoundError as e: raise SystemExit(f"Error loading global variables: {e}. Please ensure 'global_vars.json' is available.") # Unpack global variables label_encoder = LabelEncoder() # Create an instance of LabelEncoder label_encoder.classes_ = np.array(global_vars['label_encoder_classes']) # Assign classes from loaded data num_classes = global_vars['num_classes'] best_hyperparams = global_vars['best_hyperparams'] expected_value_reg = global_vars['expected_value_reg'] expected_value_cls = np.array(global_vars['expected_value_cls']) # Already converted to list, convert back to np array AA_PROPERTIES = global_vars['AA_PROPERTIES'] # Access global AA_PROPERTIES # Load feature names try: feature_names = load_json_from_path(feature_names_file, f'src/{feature_names_file}') except FileNotFoundError as e: raise SystemExit(f"Error loading feature names: {e}. Please ensure 'feature_names.json' is available.") # Load SHAP background data try: background_data = torch.load(shap_background_data_file, map_location=device) except FileNotFoundError as e: try: background_data = torch.load(f'src/{shap_background_data_file}', map_location=device) except FileNotFoundError: raise SystemExit(f"Error loading SHAP background data: {e}. Please ensure 'shap_background_data.pt' is available.") # --- Load Trained Model --- # Determine input_dim from feature_names for model initialization input_dim = len(feature_names) loaded_model = MultiTaskMLP(input_dim, num_classes, dropout_rate=best_hyperparams['dropout']).to(device) # Check model path: root, then src/ alt_model_save_path = f'src/{model_save_path}' if os.path.exists(model_save_path): final_model_path_to_load = model_save_path elif os.path.exists(alt_model_save_path): final_model_path_to_load = alt_model_save_path else: raise SystemExit(f"Error: Model file not found at {model_save_path} or {alt_model_save_path}") loaded_model.load_state_dict(torch.load(final_model_path_to_load, map_location=device)) loaded_model.eval() # Ensure eval mode is set immediately after loading # --- SHAP Explainers Setup --- model_reg_wrapper = RegressionHeadWrapper(loaded_model).to(device) model_cls_wrapper = ClassificationHeadWrapper(loaded_model).to(device) explainer_reg = shap.GradientExplainer(model_reg_wrapper, background_data) explainer_cls = shap.GradientExplainer(model_cls_wrapper, background_data) # --- Main Prediction Function for Gradio --- def predict_stability_change( weight, blosum62, pos, year, aro, ca_depth, mut_count, neg, sul, relative_bfactor, ph, neu, phi, psi, rsa, res_depth, temperature, acc, don, pam250, length, dtm, sst, measure, protein_name, source, original_aa, mutant_aa, mutated_chain, mutation_type, method_val ): # Explicitly cast numerical inputs to float and handle None values # Gradio's gr.Number will return float or int, no need for manual casting if types are consistent. # However, if using gr.Slider with None as default, it might return None. Ensure defaults or cast. # For robustness, we still ensure float type here. input_data_df_raw = pd.DataFrame({ 'weight': [float(weight) if weight is not None else 0.0], 'blosum62': [float(blosum62) if blosum62 is not None else 0.0], 'pos': [float(pos) if pos is not None else 0.0], 'year': [float(year) if year is not None else 0.0], 'aro': [float(aro) if aro is not None else 0.0], 'ca_depth': [float(ca_depth) if ca_depth is not None else 0.0], 'mut_count': [float(mut_count) if mut_count is not None else 0.0], 'neg': [float(neg) if neg is not None else 0.0], 'sul': [float(sul) if sul is not None else 0.0], 'relative_bfactor': [float(relative_bfactor) if relative_bfactor is not None else 0.0], 'ph': [float(ph) if ph is not None else 0.0], 'neu': [float(neu) if neu is not None else 0.0], 'phi': [float(phi) if phi is not None else 0.0], 'psi': [float(psi) if psi is not None else 0.0], 'rsa': [float(rsa) if rsa is not None else 0.0], 'res_depth': [float(res_depth) if res_depth is not None else 0.0], 'temperature': [float(temperature) if temperature is not None else 0.0], 'acc': [float(acc) if acc is not None else 0.0], 'don': [float(don) if don is not None else 0.0], 'pam250': [float(pam250) if pam250 is not None else 0.0], 'length': [float(length) if length is not None else 0.0], 'dtm': [float(dtm) if dtm is not None else 0.0], 'sst': [sst], 'measure': [measure], 'protein': [protein_name], 'source': [source], 'original_aa': [original_aa], 'mutant_aa': [mutant_aa], 'mutated_chain': [mutated_chain], 'mutation_type': [mutation_type], 'method': [method_val] }) # Preprocess the input data processed_input = preprocessor.transform(input_data_df_raw) if hasattr(processed_input, 'toarray'): processed_input = processed_input.toarray() input_tensor = torch.tensor(processed_input, dtype=torch.float32).to(device) # Make predictions loaded_model.eval() with torch.no_grad(): ddg_pred, effect_logits = loaded_model(input_tensor) predicted_ddg = ddg_pred.item() predicted_effect_class_idx = torch.argmax(effect_logits, dim=1).item() predicted_effect_label = label_encoder.inverse_transform([predicted_effect_class_idx])[0] # 4. Generate AA Property Comparison aa_comparison_results = get_aa_property_comparison(original_aa, mutant_aa) aa_props_html = "{aa_comparison_results}
" else: aa_props_html += "| Original | Mutant | Change | Property |
|---|---|---|---|
| {vals['original']} | {vals['mutant']} | {vals['change']} | {prop} |
Regression (ΔΔG):
Classification (Effect):