# NLU module for sentence-transformers-based semantic similarity and intent extraction import pandas as pd import os import numpy as np from constraints import L_SUPPORT_QUESTIONS_IDS, INTENT_TO_XAI_METHOD try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SentenceTransformer = None SENTENCE_TRANSFORMERS_AVAILABLE = False try: from simcse import SimCSE SIMCSE_AVAILABLE = True except ImportError: SimCSE = None SIMCSE_AVAILABLE = False class NLU: def __init__(self, model_type="sentence_transformers", model_path=None): self.model_type = model_type self.df = pd.read_csv(os.path.join(os.path.dirname(__file__), '..', 'data_questions', 'Median_4.csv'), index_col=0).drop_duplicates() self.questions = list(self.df['Question']) # Prefer sentence-transformers; use GPU if available, otherwise CPU (Streamlit Cloud has no GPU) if model_type == "sentence_transformers": if not SENTENCE_TRANSFORMERS_AVAILABLE: print("⚠️ sentence-transformers not available, trying SimCSE...") self.model_type = "simcse" else: try: import torch device = "cuda" if torch.cuda.is_available() else "cpu" except Exception: device = "cpu" # Lightweight, fast model for semantic similarity self.model = SentenceTransformer('all-MiniLM-L6-v2', device=device) print(f"✅ Loaded sentence-transformers model on {device}") # Pre-compute embeddings for all questions self.question_embeddings = self.model.encode(self.questions, convert_to_numpy=True, show_progress_bar=False) print(f"✅ Pre-computed embeddings for {len(self.questions)} questions") # Optional SimCSE fallback for legacy envs if self.model_type == "simcse" or (model_type == "sentence_transformers" and not SENTENCE_TRANSFORMERS_AVAILABLE): if not SIMCSE_AVAILABLE: print("⚠️ SimCSE not available, falling back to simple keyword matching") self.model_type = "fallback" self.model = None else: self.model = SimCSE("princeton-nlp/sup-simcse-roberta-large") self.model.build_index(self.questions) self.model_type = "simcse" elif model_type == "fallback": self.model = None elif self.model_type not in {"sentence_transformers", "simcse", "fallback"}: raise ValueError(f"Unsupported NLU model type: {model_type}. Supported: 'sentence_transformers', 'simcse', 'fallback'") def classify_intent(self, user_input, top_k=5): # Dynamic, model-driven intent extraction # Fast keyword heuristics ensure clear phrases immediately map to an XAI method try: text = (user_input or "").lower() # Heuristics for common phrasing rule_keywords = ["rule-based", "rule based", "rules", "conditions", "if then", "anchor"] shap_keywords = ["feature", "importance", "impact", "influence", "contribute", "shap", "why", "explain", "decision", "factors", "affected"] dice_keywords = ["what if", "counterfactual", "change", "modify", "different", "should i", "how to get"] if any(k in text for k in rule_keywords): return { 'intent': 'anchor', 'label': None, 'confidence': 0.95, 'matched_question': "Provide a simple rule-based explanation for this decision." }, 0.95, [] # Explicitly detect single-word 'why' queries too if text.strip() == "why" or any(k in text for k in shap_keywords): return { 'intent': 'shap', 'label': None, 'confidence': 0.9, 'matched_question': "Which features were most important for this prediction?" }, 0.9, [] if any(k in text for k in dice_keywords): return { 'intent': 'dice', 'label': None, 'confidence': 0.9, 'matched_question': "How should the instance be changed to get a different prediction?" }, 0.9, [] except Exception: pass # sentence-transformers path if self.model_type == "sentence_transformers" and hasattr(self, 'question_embeddings'): try: query_emb = self.model.encode([user_input], convert_to_numpy=True, show_progress_bar=False)[0] # Cosine similarity q_norm = np.linalg.norm(self.question_embeddings, axis=1) + 1e-12 u_norm = np.linalg.norm(query_emb) + 1e-12 sims = (self.question_embeddings @ query_emb) / (q_norm * u_norm) # Top-k indices top_idx = np.argsort(-sims)[:top_k] match_question = self.questions[top_idx[0]] score = float(sims[top_idx[0]]) label = self.df.iloc[top_idx[0]]['Label'] xai_method = self.map_label_to_xai_method(label) suggestions = [self.questions[i] for i in top_idx] return { 'intent': xai_method, 'label': label, 'confidence': score, 'matched_question': match_question }, score, suggestions except Exception as e: print(f"sentence-transformers classify failed: {e}") # Legacy SimCSE path if self.model_type == "simcse" and self.model is not None: # Always get top matches without initial threshold filtering match_results = self.model.search(user_input, threshold=0, top_k=top_k) if len(match_results) > 0: match_question, score = match_results[0] # Get the label for the matched question label = self.df.query('Question == @match_question')['Label'].iloc[0] # Map label to XAI method if supported xai_method = self.map_label_to_xai_method(label) # Normalize confidence score to 0-1 range for consistency # SimCSE scores can be very high, so we'll use relative confidence normalized_confidence = min(1.0, score / 1e20) if score > 1 else score # Always return the best match but indicate confidence level return { 'intent': xai_method, 'label': label, 'confidence': normalized_confidence, 'matched_question': match_question }, normalized_confidence, [] # Fallback to simple keyword matching when SimCSE is not available elif self.model_type == "fallback" or self.model is None: return self._fallback_classify_intent(user_input, top_k) # No matches found at all return 'unknown', 0.0, [] else: return 'unknown', 0.0, [] def match(self, user_input, features=None, prediction=None, current_instance=None, labels=None): """Hybrid approach: Fuzzy first (primary), Intent classifier fallback""" # PRIMARY: Try fuzzy matching first (fast and reliable) fuzzy_result = self._fuzzy_match_fallback(user_input) if fuzzy_result != "unknown": print(f"🔤 Fuzzy match (primary): {fuzzy_result}") return fuzzy_result # FALLBACK 1: Try intent classifier (65% accuracy) intent_result = self._classify_with_intent_classifier(user_input) if intent_result != "unknown": print(f"🧠 Intent classifier (fallback): {intent_result}") return intent_result # FALLBACK 2: Try embedding search if available (ST first, then SimCSE) if self.model_type == "sentence_transformers" and hasattr(self, 'question_embeddings'): try: query_emb = self.model.encode([user_input], convert_to_numpy=True, show_progress_bar=False)[0] q_norm = np.linalg.norm(self.question_embeddings, axis=1) + 1e-12 u_norm = np.linalg.norm(query_emb) + 1e-12 sims = (self.question_embeddings @ query_emb) / (q_norm * u_norm) best_idx = int(np.argmax(sims)) match_question = self.questions[best_idx] print(f"🔍 ST match (last resort): {match_question}") return match_question except Exception as e: print(f"ST search failed: {e}") if hasattr(self, 'model') and self.model_type == "simcse" and self.model is not None: try: threshold = 0.6 match_results = self.model.search(user_input, threshold=threshold) if len(match_results) > 0: match_question, score = match_results[0] print(f"🔍 SimCSE match (last resort): {match_question}") return match_question else: # Try with no threshold match_results = self.model.search(user_input, threshold=0, top_k=5) if len(match_results) > 0: match_question, score = match_results[0] print(f"🔍 SimCSE fallback: {match_question}") return match_question except Exception as e: print(f"SimCSE search failed: {e}") print(f"❓ No match found for: '{user_input}'") return "unknown" def _fuzzy_match_fallback(self, user_input): """Fallback fuzzy matching using simple string similarity""" try: from difflib import SequenceMatcher user_lower = user_input.lower() best_match = None best_score = 0 # Define key patterns for different XAI methods shap_patterns = [ "feature", "important", "impact", "contribute", "influence", "matter", "weigh", "explain", "why" ] dice_patterns = [ "change", "different", "modify", "counterfact", "should", "what if", "approved", "denied" ] anchor_patterns = [ "rule", "condition", "guarantee", "necessary", "sufficient", "always", "simple" ] # Check for pattern matches if any(pattern in user_lower for pattern in shap_patterns): # Return a representative SHAP question return "What features of this instance lead to the system's prediction?" elif any(pattern in user_lower for pattern in dice_patterns): # Return a representative DiCE question return "How should the instance be changed to get a different (better or worse) prediction?" elif any(pattern in user_lower for pattern in anchor_patterns): # Return a representative Anchor question return "What is the minimum requirement for the prediction to stay the same?" # If no patterns match, try fuzzy string matching with dataset questions for _, row in self.df.iterrows(): question = row['Question'] similarity = SequenceMatcher(None, user_lower, question.lower()).ratio() if similarity > best_score: best_score = similarity best_match = question # Return best match if similarity is reasonable if best_score > 0.4: # 40% similarity threshold return best_match except Exception as e: print(f"Fuzzy matching failed: {e}") return "unknown" def get_question_suggestions(self, match_results): """Extract question suggestions from match results""" suggestions = [] for question, _ in match_results: if len(suggestions) < 5: # Limit to 5 suggestions suggestions.append(question) return suggestions def map_label_to_xai_method(self, label): """Map question label to appropriate XAI method (adopted from XAgent logic)""" from constraints import L_SHAP_QUESTION_IDS, L_DICE_QUESTION_IDS, L_ANCHOR_QUESTION_IDS if label in L_SHAP_QUESTION_IDS: return "shap" elif label in L_DICE_QUESTION_IDS: return "dice" elif label in L_ANCHOR_QUESTION_IDS: return "anchor" else: return "general" def replace_information(self, question, features=None, prediction=None, current_instance=None, labels=None): """Replace template variables in questions (adopted from XAgent)""" if features and "{X}" in question: feature_str = f"{{{features[0]},{features[1]}, ...}}" if len(features) > 1 else f"{features[0]}" question = question.replace("{X}", feature_str) if prediction and "{P}" in question: question = question.replace("{P}", str(prediction)) if labels and prediction and "{Q}" in question: other_labels = [label for label in labels if str(label) != str(prediction)] question = question.replace("{Q}", str(other_labels)) return question def _classify_with_intent_classifier(self, user_input): """Use the trained intent classifier (65% accuracy) as fallback""" try: # Try to load intent classifier if not already loaded if not hasattr(self, 'intent_classifier') or self.intent_classifier is None: self._load_intent_classifier() if self.intent_classifier is None: return "unknown" # Generate embedding for user input embedding = self.intent_simcse.encode([user_input]) # Convert to tensor import torch import numpy as np embedding_tensor = torch.FloatTensor(embedding) # Get classifier prediction with torch.no_grad(): outputs = self.intent_classifier(embedding_tensor) probabilities = outputs[0].numpy() # Get the class with highest probability predicted_class_idx = np.argmax(probabilities) confidence = probabilities[predicted_class_idx] # Use lower threshold since this is fallback if confidence >= 0.3: # Lower threshold for fallback # Convert back to intent predicted_intent = self.intent_label_encoder.inverse_transform([predicted_class_idx])[0] # Map intent to representative question if predicted_intent == 'shap': return "What features of this instance lead to the system's prediction?" elif predicted_intent == 'dice': return "How should the instance be changed to get a different (better or worse) prediction?" elif predicted_intent == 'anchor': return "What is the minimum requirement for the prediction to stay the same?" # Don't return anything for 'other' - let it fall through except Exception as e: print(f"Intent classifier failed: {e}") return "unknown" def _load_intent_classifier(self): """Load the trained intent classifier (65% accuracy model)""" try: import torch import torch.nn as nn import pickle import numpy as np from simcse import SimCSE # Define the classifier architecture (matching the training script) class IntentClassifier(nn.Module): def __init__(self, input_dim, hidden_dim, num_classes=4): super(IntentClassifier, self).__init__() self.network = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.2), nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Dropout(0.2), nn.Linear(hidden_dim // 2, num_classes), nn.Softmax(dim=1) ) def forward(self, x): return self.network(x) # Load metadata with open('models/intent_classifier_metadata.pkl', 'rb') as f: metadata = pickle.load(f) # Load label encoder with open('models/intent_label_encoder.pkl', 'rb') as f: self.intent_label_encoder = pickle.load(f) # Initialize and load classifier self.intent_classifier = IntentClassifier( metadata['input_dim'], metadata['hidden_dim'], metadata['num_classes'] ) self.intent_classifier.load_state_dict(torch.load('models/intent_classifier_best.pth', map_location='cpu')) self.intent_classifier.eval() # Initialize SimCSE for embedding generation self.intent_simcse = SimCSE("princeton-nlp/sup-simcse-roberta-large") print(f"✅ Loaded intent classifier (accuracy: {metadata.get('best_accuracy', 'unknown'):.4f})") except Exception as e: print(f"⚠️ Could not load intent classifier: {e}") self.intent_classifier = None self.intent_label_encoder = None self.intent_simcse = None