Suvh
Update to v1.1-chatty-luna (2025-12-07)
070061f
# NLU module for sentence-transformers-based semantic similarity and intent extraction
import pandas as pd
import os
import numpy as np
from constraints import L_SUPPORT_QUESTIONS_IDS, INTENT_TO_XAI_METHOD
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SentenceTransformer = None
SENTENCE_TRANSFORMERS_AVAILABLE = False
try:
from simcse import SimCSE
SIMCSE_AVAILABLE = True
except ImportError:
SimCSE = None
SIMCSE_AVAILABLE = False
class NLU:
def __init__(self, model_type="sentence_transformers", model_path=None):
self.model_type = model_type
self.df = pd.read_csv(os.path.join(os.path.dirname(__file__), '..', 'data_questions', 'Median_4.csv'), index_col=0).drop_duplicates()
self.questions = list(self.df['Question'])
# Prefer sentence-transformers; use GPU if available, otherwise CPU (Streamlit Cloud has no GPU)
if model_type == "sentence_transformers":
if not SENTENCE_TRANSFORMERS_AVAILABLE:
print("⚠️ sentence-transformers not available, trying SimCSE...")
self.model_type = "simcse"
else:
try:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
except Exception:
device = "cpu"
# Lightweight, fast model for semantic similarity
self.model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
print(f"✅ Loaded sentence-transformers model on {device}")
# Pre-compute embeddings for all questions
self.question_embeddings = self.model.encode(self.questions, convert_to_numpy=True, show_progress_bar=False)
print(f"✅ Pre-computed embeddings for {len(self.questions)} questions")
# Optional SimCSE fallback for legacy envs
if self.model_type == "simcse" or (model_type == "sentence_transformers" and not SENTENCE_TRANSFORMERS_AVAILABLE):
if not SIMCSE_AVAILABLE:
print("⚠️ SimCSE not available, falling back to simple keyword matching")
self.model_type = "fallback"
self.model = None
else:
self.model = SimCSE("princeton-nlp/sup-simcse-roberta-large")
self.model.build_index(self.questions)
self.model_type = "simcse"
elif model_type == "fallback":
self.model = None
elif self.model_type not in {"sentence_transformers", "simcse", "fallback"}:
raise ValueError(f"Unsupported NLU model type: {model_type}. Supported: 'sentence_transformers', 'simcse', 'fallback'")
def classify_intent(self, user_input, top_k=5):
# Dynamic, model-driven intent extraction
# Fast keyword heuristics ensure clear phrases immediately map to an XAI method
try:
text = (user_input or "").lower()
# Heuristics for common phrasing
rule_keywords = ["rule-based", "rule based", "rules", "conditions", "if then", "anchor"]
shap_keywords = ["feature", "importance", "impact", "influence", "contribute", "shap", "why", "explain", "decision", "factors", "affected"]
dice_keywords = ["what if", "counterfactual", "change", "modify", "different", "should i", "how to get"]
if any(k in text for k in rule_keywords):
return {
'intent': 'anchor',
'label': None,
'confidence': 0.95,
'matched_question': "Provide a simple rule-based explanation for this decision."
}, 0.95, []
# Explicitly detect single-word 'why' queries too
if text.strip() == "why" or any(k in text for k in shap_keywords):
return {
'intent': 'shap',
'label': None,
'confidence': 0.9,
'matched_question': "Which features were most important for this prediction?"
}, 0.9, []
if any(k in text for k in dice_keywords):
return {
'intent': 'dice',
'label': None,
'confidence': 0.9,
'matched_question': "How should the instance be changed to get a different prediction?"
}, 0.9, []
except Exception:
pass
# sentence-transformers path
if self.model_type == "sentence_transformers" and hasattr(self, 'question_embeddings'):
try:
query_emb = self.model.encode([user_input], convert_to_numpy=True, show_progress_bar=False)[0]
# Cosine similarity
q_norm = np.linalg.norm(self.question_embeddings, axis=1) + 1e-12
u_norm = np.linalg.norm(query_emb) + 1e-12
sims = (self.question_embeddings @ query_emb) / (q_norm * u_norm)
# Top-k indices
top_idx = np.argsort(-sims)[:top_k]
match_question = self.questions[top_idx[0]]
score = float(sims[top_idx[0]])
label = self.df.iloc[top_idx[0]]['Label']
xai_method = self.map_label_to_xai_method(label)
suggestions = [self.questions[i] for i in top_idx]
return {
'intent': xai_method,
'label': label,
'confidence': score,
'matched_question': match_question
}, score, suggestions
except Exception as e:
print(f"sentence-transformers classify failed: {e}")
# Legacy SimCSE path
if self.model_type == "simcse" and self.model is not None:
# Always get top matches without initial threshold filtering
match_results = self.model.search(user_input, threshold=0, top_k=top_k)
if len(match_results) > 0:
match_question, score = match_results[0]
# Get the label for the matched question
label = self.df.query('Question == @match_question')['Label'].iloc[0]
# Map label to XAI method if supported
xai_method = self.map_label_to_xai_method(label)
# Normalize confidence score to 0-1 range for consistency
# SimCSE scores can be very high, so we'll use relative confidence
normalized_confidence = min(1.0, score / 1e20) if score > 1 else score
# Always return the best match but indicate confidence level
return {
'intent': xai_method,
'label': label,
'confidence': normalized_confidence,
'matched_question': match_question
}, normalized_confidence, []
# Fallback to simple keyword matching when SimCSE is not available
elif self.model_type == "fallback" or self.model is None:
return self._fallback_classify_intent(user_input, top_k)
# No matches found at all
return 'unknown', 0.0, []
else:
return 'unknown', 0.0, []
def match(self, user_input, features=None, prediction=None, current_instance=None, labels=None):
"""Hybrid approach: Fuzzy first (primary), Intent classifier fallback"""
# PRIMARY: Try fuzzy matching first (fast and reliable)
fuzzy_result = self._fuzzy_match_fallback(user_input)
if fuzzy_result != "unknown":
print(f"🔤 Fuzzy match (primary): {fuzzy_result}")
return fuzzy_result
# FALLBACK 1: Try intent classifier (65% accuracy)
intent_result = self._classify_with_intent_classifier(user_input)
if intent_result != "unknown":
print(f"🧠 Intent classifier (fallback): {intent_result}")
return intent_result
# FALLBACK 2: Try embedding search if available (ST first, then SimCSE)
if self.model_type == "sentence_transformers" and hasattr(self, 'question_embeddings'):
try:
query_emb = self.model.encode([user_input], convert_to_numpy=True, show_progress_bar=False)[0]
q_norm = np.linalg.norm(self.question_embeddings, axis=1) + 1e-12
u_norm = np.linalg.norm(query_emb) + 1e-12
sims = (self.question_embeddings @ query_emb) / (q_norm * u_norm)
best_idx = int(np.argmax(sims))
match_question = self.questions[best_idx]
print(f"🔍 ST match (last resort): {match_question}")
return match_question
except Exception as e:
print(f"ST search failed: {e}")
if hasattr(self, 'model') and self.model_type == "simcse" and self.model is not None:
try:
threshold = 0.6
match_results = self.model.search(user_input, threshold=threshold)
if len(match_results) > 0:
match_question, score = match_results[0]
print(f"🔍 SimCSE match (last resort): {match_question}")
return match_question
else:
# Try with no threshold
match_results = self.model.search(user_input, threshold=0, top_k=5)
if len(match_results) > 0:
match_question, score = match_results[0]
print(f"🔍 SimCSE fallback: {match_question}")
return match_question
except Exception as e:
print(f"SimCSE search failed: {e}")
print(f"❓ No match found for: '{user_input}'")
return "unknown"
def _fuzzy_match_fallback(self, user_input):
"""Fallback fuzzy matching using simple string similarity"""
try:
from difflib import SequenceMatcher
user_lower = user_input.lower()
best_match = None
best_score = 0
# Define key patterns for different XAI methods
shap_patterns = [
"feature", "important", "impact", "contribute", "influence", "matter", "weigh", "explain", "why"
]
dice_patterns = [
"change", "different", "modify", "counterfact", "should", "what if", "approved", "denied"
]
anchor_patterns = [
"rule", "condition", "guarantee", "necessary", "sufficient", "always", "simple"
]
# Check for pattern matches
if any(pattern in user_lower for pattern in shap_patterns):
# Return a representative SHAP question
return "What features of this instance lead to the system's prediction?"
elif any(pattern in user_lower for pattern in dice_patterns):
# Return a representative DiCE question
return "How should the instance be changed to get a different (better or worse) prediction?"
elif any(pattern in user_lower for pattern in anchor_patterns):
# Return a representative Anchor question
return "What is the minimum requirement for the prediction to stay the same?"
# If no patterns match, try fuzzy string matching with dataset questions
for _, row in self.df.iterrows():
question = row['Question']
similarity = SequenceMatcher(None, user_lower, question.lower()).ratio()
if similarity > best_score:
best_score = similarity
best_match = question
# Return best match if similarity is reasonable
if best_score > 0.4: # 40% similarity threshold
return best_match
except Exception as e:
print(f"Fuzzy matching failed: {e}")
return "unknown"
def get_question_suggestions(self, match_results):
"""Extract question suggestions from match results"""
suggestions = []
for question, _ in match_results:
if len(suggestions) < 5: # Limit to 5 suggestions
suggestions.append(question)
return suggestions
def map_label_to_xai_method(self, label):
"""Map question label to appropriate XAI method (adopted from XAgent logic)"""
from constraints import L_SHAP_QUESTION_IDS, L_DICE_QUESTION_IDS, L_ANCHOR_QUESTION_IDS
if label in L_SHAP_QUESTION_IDS:
return "shap"
elif label in L_DICE_QUESTION_IDS:
return "dice"
elif label in L_ANCHOR_QUESTION_IDS:
return "anchor"
else:
return "general"
def replace_information(self, question, features=None, prediction=None, current_instance=None, labels=None):
"""Replace template variables in questions (adopted from XAgent)"""
if features and "{X}" in question:
feature_str = f"{{{features[0]},{features[1]}, ...}}" if len(features) > 1 else f"{features[0]}"
question = question.replace("{X}", feature_str)
if prediction and "{P}" in question:
question = question.replace("{P}", str(prediction))
if labels and prediction and "{Q}" in question:
other_labels = [label for label in labels if str(label) != str(prediction)]
question = question.replace("{Q}", str(other_labels))
return question
def _classify_with_intent_classifier(self, user_input):
"""Use the trained intent classifier (65% accuracy) as fallback"""
try:
# Try to load intent classifier if not already loaded
if not hasattr(self, 'intent_classifier') or self.intent_classifier is None:
self._load_intent_classifier()
if self.intent_classifier is None:
return "unknown"
# Generate embedding for user input
embedding = self.intent_simcse.encode([user_input])
# Convert to tensor
import torch
import numpy as np
embedding_tensor = torch.FloatTensor(embedding)
# Get classifier prediction
with torch.no_grad():
outputs = self.intent_classifier(embedding_tensor)
probabilities = outputs[0].numpy()
# Get the class with highest probability
predicted_class_idx = np.argmax(probabilities)
confidence = probabilities[predicted_class_idx]
# Use lower threshold since this is fallback
if confidence >= 0.3: # Lower threshold for fallback
# Convert back to intent
predicted_intent = self.intent_label_encoder.inverse_transform([predicted_class_idx])[0]
# Map intent to representative question
if predicted_intent == 'shap':
return "What features of this instance lead to the system's prediction?"
elif predicted_intent == 'dice':
return "How should the instance be changed to get a different (better or worse) prediction?"
elif predicted_intent == 'anchor':
return "What is the minimum requirement for the prediction to stay the same?"
# Don't return anything for 'other' - let it fall through
except Exception as e:
print(f"Intent classifier failed: {e}")
return "unknown"
def _load_intent_classifier(self):
"""Load the trained intent classifier (65% accuracy model)"""
try:
import torch
import torch.nn as nn
import pickle
import numpy as np
from simcse import SimCSE
# Define the classifier architecture (matching the training script)
class IntentClassifier(nn.Module):
def __init__(self, input_dim, hidden_dim, num_classes=4):
super(IntentClassifier, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim // 2, num_classes),
nn.Softmax(dim=1)
)
def forward(self, x):
return self.network(x)
# Load metadata
with open('models/intent_classifier_metadata.pkl', 'rb') as f:
metadata = pickle.load(f)
# Load label encoder
with open('models/intent_label_encoder.pkl', 'rb') as f:
self.intent_label_encoder = pickle.load(f)
# Initialize and load classifier
self.intent_classifier = IntentClassifier(
metadata['input_dim'],
metadata['hidden_dim'],
metadata['num_classes']
)
self.intent_classifier.load_state_dict(torch.load('models/intent_classifier_best.pth', map_location='cpu'))
self.intent_classifier.eval()
# Initialize SimCSE for embedding generation
self.intent_simcse = SimCSE("princeton-nlp/sup-simcse-roberta-large")
print(f"✅ Loaded intent classifier (accuracy: {metadata.get('best_accuracy', 'unknown'):.4f})")
except Exception as e:
print(f"⚠️ Could not load intent classifier: {e}")
self.intent_classifier = None
self.intent_label_encoder = None
self.intent_simcse = None