File size: 18,790 Bytes
070061f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386

# NLU module for sentence-transformers-based semantic similarity and intent extraction
import pandas as pd
import os
import numpy as np
from constraints import L_SUPPORT_QUESTIONS_IDS, INTENT_TO_XAI_METHOD

try:
    from sentence_transformers import SentenceTransformer
    SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
    SentenceTransformer = None
    SENTENCE_TRANSFORMERS_AVAILABLE = False

try:
    from simcse import SimCSE
    SIMCSE_AVAILABLE = True
except ImportError:
    SimCSE = None
    SIMCSE_AVAILABLE = False

class NLU:
    def __init__(self, model_type="sentence_transformers", model_path=None):
        self.model_type = model_type
        self.df = pd.read_csv(os.path.join(os.path.dirname(__file__), '..', 'data_questions', 'Median_4.csv'), index_col=0).drop_duplicates()
        self.questions = list(self.df['Question'])

        # Prefer sentence-transformers; use GPU if available, otherwise CPU (Streamlit Cloud has no GPU)
        if model_type == "sentence_transformers":
            if not SENTENCE_TRANSFORMERS_AVAILABLE:
                print("⚠️ sentence-transformers not available, trying SimCSE...")
                self.model_type = "simcse"
            else:
                try:
                    import torch
                    device = "cuda" if torch.cuda.is_available() else "cpu"
                except Exception:
                    device = "cpu"
                # Lightweight, fast model for semantic similarity
                self.model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
                print(f"βœ… Loaded sentence-transformers model on {device}")
                # Pre-compute embeddings for all questions
                self.question_embeddings = self.model.encode(self.questions, convert_to_numpy=True, show_progress_bar=False)
                print(f"βœ… Pre-computed embeddings for {len(self.questions)} questions")

        # Optional SimCSE fallback for legacy envs
        if self.model_type == "simcse" or (model_type == "sentence_transformers" and not SENTENCE_TRANSFORMERS_AVAILABLE):
            if not SIMCSE_AVAILABLE:
                print("⚠️ SimCSE not available, falling back to simple keyword matching")
                self.model_type = "fallback"
                self.model = None
            else:
                self.model = SimCSE("princeton-nlp/sup-simcse-roberta-large")
                self.model.build_index(self.questions)
                self.model_type = "simcse"

        elif model_type == "fallback":
            self.model = None
        elif self.model_type not in {"sentence_transformers", "simcse", "fallback"}:
            raise ValueError(f"Unsupported NLU model type: {model_type}. Supported: 'sentence_transformers', 'simcse', 'fallback'")

    def classify_intent(self, user_input, top_k=5):
        # Dynamic, model-driven intent extraction
        # Fast keyword heuristics ensure clear phrases immediately map to an XAI method
        try:
            text = (user_input or "").lower()
            # Heuristics for common phrasing
            rule_keywords = ["rule-based", "rule based", "rules", "conditions", "if then", "anchor"]
            shap_keywords = ["feature", "importance", "impact", "influence", "contribute", "shap", "why", "explain", "decision", "factors", "affected"]
            dice_keywords = ["what if", "counterfactual", "change", "modify", "different", "should i", "how to get"]
            if any(k in text for k in rule_keywords):
                return {
                    'intent': 'anchor',
                    'label': None,
                    'confidence': 0.95,
                    'matched_question': "Provide a simple rule-based explanation for this decision."
                }, 0.95, []
            # Explicitly detect single-word 'why' queries too
            if text.strip() == "why" or any(k in text for k in shap_keywords):
                return {
                    'intent': 'shap',
                    'label': None,
                    'confidence': 0.9,
                    'matched_question': "Which features were most important for this prediction?"
                }, 0.9, []
            if any(k in text for k in dice_keywords):
                return {
                    'intent': 'dice',
                    'label': None,
                    'confidence': 0.9,
                    'matched_question': "How should the instance be changed to get a different prediction?"
                }, 0.9, []
        except Exception:
            pass

        # sentence-transformers path
        if self.model_type == "sentence_transformers" and hasattr(self, 'question_embeddings'):
            try:
                query_emb = self.model.encode([user_input], convert_to_numpy=True, show_progress_bar=False)[0]
                # Cosine similarity
                q_norm = np.linalg.norm(self.question_embeddings, axis=1) + 1e-12
                u_norm = np.linalg.norm(query_emb) + 1e-12
                sims = (self.question_embeddings @ query_emb) / (q_norm * u_norm)
                # Top-k indices
                top_idx = np.argsort(-sims)[:top_k]
                match_question = self.questions[top_idx[0]]
                score = float(sims[top_idx[0]])
                label = self.df.iloc[top_idx[0]]['Label']
                xai_method = self.map_label_to_xai_method(label)
                suggestions = [self.questions[i] for i in top_idx]
                return {
                    'intent': xai_method,
                    'label': label,
                    'confidence': score,
                    'matched_question': match_question
                }, score, suggestions
            except Exception as e:
                print(f"sentence-transformers classify failed: {e}")

        # Legacy SimCSE path
        if self.model_type == "simcse" and self.model is not None:
            # Always get top matches without initial threshold filtering
            match_results = self.model.search(user_input, threshold=0, top_k=top_k)
            
            if len(match_results) > 0:
                match_question, score = match_results[0]
                
                # Get the label for the matched question
                label = self.df.query('Question == @match_question')['Label'].iloc[0]
                # Map label to XAI method if supported
                xai_method = self.map_label_to_xai_method(label)
                
                # Normalize confidence score to 0-1 range for consistency
                # SimCSE scores can be very high, so we'll use relative confidence
                normalized_confidence = min(1.0, score / 1e20) if score > 1 else score
                
                # Always return the best match but indicate confidence level
                return {
                    'intent': xai_method,
                    'label': label,
                    'confidence': normalized_confidence,
                    'matched_question': match_question
                }, normalized_confidence, []
        
        # Fallback to simple keyword matching when SimCSE is not available
        elif self.model_type == "fallback" or self.model is None:
            return self._fallback_classify_intent(user_input, top_k)
            
        # No matches found at all
            return 'unknown', 0.0, []
        else:
            return 'unknown', 0.0, []

    def match(self, user_input, features=None, prediction=None, current_instance=None, labels=None):
        """Hybrid approach: Fuzzy first (primary), Intent classifier fallback"""
        
        # PRIMARY: Try fuzzy matching first (fast and reliable)
        fuzzy_result = self._fuzzy_match_fallback(user_input)
        if fuzzy_result != "unknown":
            print(f"πŸ”€ Fuzzy match (primary): {fuzzy_result}")
            return fuzzy_result
        
        # FALLBACK 1: Try intent classifier (65% accuracy)
        intent_result = self._classify_with_intent_classifier(user_input)
        if intent_result != "unknown":
            print(f"🧠 Intent classifier (fallback): {intent_result}")
            return intent_result
        
        # FALLBACK 2: Try embedding search if available (ST first, then SimCSE)
        if self.model_type == "sentence_transformers" and hasattr(self, 'question_embeddings'):
            try:
                query_emb = self.model.encode([user_input], convert_to_numpy=True, show_progress_bar=False)[0]
                q_norm = np.linalg.norm(self.question_embeddings, axis=1) + 1e-12
                u_norm = np.linalg.norm(query_emb) + 1e-12
                sims = (self.question_embeddings @ query_emb) / (q_norm * u_norm)
                best_idx = int(np.argmax(sims))
                match_question = self.questions[best_idx]
                print(f"πŸ” ST match (last resort): {match_question}")
                return match_question
            except Exception as e:
                print(f"ST search failed: {e}")

        if hasattr(self, 'model') and self.model_type == "simcse" and self.model is not None:
            try:
                threshold = 0.6
                match_results = self.model.search(user_input, threshold=threshold)
                
                if len(match_results) > 0:
                    match_question, score = match_results[0]
                    print(f"πŸ” SimCSE match (last resort): {match_question}")
                    return match_question
                else:
                    # Try with no threshold
                    match_results = self.model.search(user_input, threshold=0, top_k=5)
                    if len(match_results) > 0:
                        match_question, score = match_results[0]
                        print(f"πŸ” SimCSE fallback: {match_question}")
                        return match_question
            except Exception as e:
                print(f"SimCSE search failed: {e}")
        
        print(f"❓ No match found for: '{user_input}'")
        return "unknown"
    
    def _fuzzy_match_fallback(self, user_input):
        """Fallback fuzzy matching using simple string similarity"""
        try:
            from difflib import SequenceMatcher
            
            user_lower = user_input.lower()
            best_match = None
            best_score = 0
            
            # Define key patterns for different XAI methods
            shap_patterns = [
                "feature", "important", "impact", "contribute", "influence", "matter", "weigh", "explain", "why"
            ]
            dice_patterns = [
                "change", "different", "modify", "counterfact", "should", "what if", "approved", "denied"
            ]
            anchor_patterns = [
                "rule", "condition", "guarantee", "necessary", "sufficient", "always", "simple"
            ]
            
            # Check for pattern matches
            if any(pattern in user_lower for pattern in shap_patterns):
                # Return a representative SHAP question
                return "What features of this instance lead to the system's prediction?"
            elif any(pattern in user_lower for pattern in dice_patterns):
                # Return a representative DiCE question  
                return "How should the instance be changed to get a different (better or worse) prediction?"
            elif any(pattern in user_lower for pattern in anchor_patterns):
                # Return a representative Anchor question
                return "What is the minimum requirement for the prediction to stay the same?"
            
            # If no patterns match, try fuzzy string matching with dataset questions
            for _, row in self.df.iterrows():
                question = row['Question']
                similarity = SequenceMatcher(None, user_lower, question.lower()).ratio()
                if similarity > best_score:
                    best_score = similarity
                    best_match = question
            
            # Return best match if similarity is reasonable
            if best_score > 0.4:  # 40% similarity threshold
                return best_match
                
        except Exception as e:
            print(f"Fuzzy matching failed: {e}")
        
        return "unknown"
    
    def get_question_suggestions(self, match_results):
        """Extract question suggestions from match results"""
        suggestions = []
        for question, _ in match_results:
            if len(suggestions) < 5:  # Limit to 5 suggestions
                suggestions.append(question)
        return suggestions
    
    def map_label_to_xai_method(self, label):
        """Map question label to appropriate XAI method (adopted from XAgent logic)"""
        from constraints import L_SHAP_QUESTION_IDS, L_DICE_QUESTION_IDS, L_ANCHOR_QUESTION_IDS
        
        if label in L_SHAP_QUESTION_IDS:
            return "shap"
        elif label in L_DICE_QUESTION_IDS:
            return "dice"  
        elif label in L_ANCHOR_QUESTION_IDS:
            return "anchor"
        else:
            return "general"
    
    def replace_information(self, question, features=None, prediction=None, current_instance=None, labels=None):
        """Replace template variables in questions (adopted from XAgent)"""
        if features and "{X}" in question:
            feature_str = f"{{{features[0]},{features[1]}, ...}}" if len(features) > 1 else f"{features[0]}"
            question = question.replace("{X}", feature_str)
        if prediction and "{P}" in question:
            question = question.replace("{P}", str(prediction))
        if labels and prediction and "{Q}" in question:
            other_labels = [label for label in labels if str(label) != str(prediction)]
            question = question.replace("{Q}", str(other_labels))
        return question

    def _classify_with_intent_classifier(self, user_input):
        """Use the trained intent classifier (65% accuracy) as fallback"""
        try:
            # Try to load intent classifier if not already loaded
            if not hasattr(self, 'intent_classifier') or self.intent_classifier is None:
                self._load_intent_classifier()
            
            if self.intent_classifier is None:
                return "unknown"
            
            # Generate embedding for user input
            embedding = self.intent_simcse.encode([user_input])
            
            # Convert to tensor
            import torch
            import numpy as np
            embedding_tensor = torch.FloatTensor(embedding)
            
            # Get classifier prediction
            with torch.no_grad():
                outputs = self.intent_classifier(embedding_tensor)
                probabilities = outputs[0].numpy()
                
                # Get the class with highest probability
                predicted_class_idx = np.argmax(probabilities)
                confidence = probabilities[predicted_class_idx]
                
                # Use lower threshold since this is fallback
                if confidence >= 0.3:  # Lower threshold for fallback
                    # Convert back to intent
                    predicted_intent = self.intent_label_encoder.inverse_transform([predicted_class_idx])[0]
                    
                    # Map intent to representative question
                    if predicted_intent == 'shap':
                        return "What features of this instance lead to the system's prediction?"
                    elif predicted_intent == 'dice':
                        return "How should the instance be changed to get a different (better or worse) prediction?"
                    elif predicted_intent == 'anchor':
                        return "What is the minimum requirement for the prediction to stay the same?"
                    # Don't return anything for 'other' - let it fall through
                
        except Exception as e:
            print(f"Intent classifier failed: {e}")
        
        return "unknown"
    
    def _load_intent_classifier(self):
        """Load the trained intent classifier (65% accuracy model)"""
        try:
            import torch
            import torch.nn as nn
            import pickle
            import numpy as np
            from simcse import SimCSE
            
            # Define the classifier architecture (matching the training script)
            class IntentClassifier(nn.Module):
                def __init__(self, input_dim, hidden_dim, num_classes=4):
                    super(IntentClassifier, self).__init__()
                    self.network = nn.Sequential(
                        nn.Linear(input_dim, hidden_dim),
                        nn.ReLU(),
                        nn.Dropout(0.2),
                        nn.Linear(hidden_dim, hidden_dim // 2),
                        nn.ReLU(),
                        nn.Dropout(0.2),
                        nn.Linear(hidden_dim // 2, num_classes),
                        nn.Softmax(dim=1)
                    )
                
                def forward(self, x):
                    return self.network(x)
            
            # Load metadata
            with open('models/intent_classifier_metadata.pkl', 'rb') as f:
                metadata = pickle.load(f)
            
            # Load label encoder
            with open('models/intent_label_encoder.pkl', 'rb') as f:
                self.intent_label_encoder = pickle.load(f)
            
            # Initialize and load classifier
            self.intent_classifier = IntentClassifier(
                metadata['input_dim'],
                metadata['hidden_dim'], 
                metadata['num_classes']
            )
            self.intent_classifier.load_state_dict(torch.load('models/intent_classifier_best.pth', map_location='cpu'))
            self.intent_classifier.eval()
            
            # Initialize SimCSE for embedding generation
            self.intent_simcse = SimCSE("princeton-nlp/sup-simcse-roberta-large")
            
            print(f"βœ… Loaded intent classifier (accuracy: {metadata.get('best_accuracy', 'unknown'):.4f})")
            
        except Exception as e:
            print(f"⚠️ Could not load intent classifier: {e}")
            self.intent_classifier = None
            self.intent_label_encoder = None
            self.intent_simcse = None