import json import torch from transformers import BertConfig, BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup from torch.utils.data import Dataset, DataLoader from tqdm import tqdm from sklearn.metrics import f1_score import torch.amp as amp import re import warnings from transformers import logging import os warnings.filterwarnings("ignore") logging.set_verbosity_error() class IntentDataset(Dataset): def __init__(self, questions, intents, tokenizer, max_len, intent_to_label, is_inference=False): self.questions = questions self.intents = intents self.tokenizer = tokenizer self.max_len = max_len self.intent_to_label = intent_to_label self.is_inference = is_inference def __len__(self): return len(self.questions) def __getitem__(self, item): question = str(self.questions[item]) intent = self.intents[item] encoding = self.tokenizer.encode_plus( question, add_special_tokens=True, max_length=self.max_len, return_token_type_ids=False, padding='max_length', return_attention_mask=True, return_tensors='pt', truncation=True ) if self.is_inference: return { 'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), } else: return { 'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), 'labels': torch.tensor(self.intent_to_label[intent], dtype=torch.long) } def load_data(test_file, val_file): import random print(f"Reading test data from {test_file}") with open(test_file, 'r') as f: test_data = json.load(f) print(f"Reading validation data from {val_file}") with open(val_file, 'r') as f: val_data = json.load(f) all_data = [] for item in test_data: if isinstance(item, dict): utterance = item.get('utterance') or item.get('text') or item.get('question') intent = item.get('intent') or item.get('label') or item.get('class') if utterance and intent: all_data.append((utterance, intent)) elif isinstance(item, list) and len(item) == 2: all_data.append(tuple(item)) random.shuffle(all_data) split_point = int(len(all_data) * 0.7) train_processed = all_data[:split_point] test_processed = all_data[split_point:] val_processed = [] for item in val_data: if isinstance(item, dict): utterance = item.get('utterance') or item.get('text') or item.get('question') intent = item.get('intent') or item.get('label') or item.get('class') if utterance and intent: val_processed.append((utterance, intent)) elif isinstance(item, list) and len(item) == 2: val_processed.append(tuple(item)) intent_labels = list(set([intent for _, intent in train_processed + test_processed + val_processed])) print( f"Loaded {len(train_processed)} training examples, {len(test_processed)} test examples, and {len(val_processed)} validation examples") return train_processed, test_processed, val_processed, intent_labels class IntentClassifier: def __init__(self, model_path, device): self.device = device self.model = BertForSequenceClassification.from_pretrained(model_path).to(device) self.tokenizer = BertTokenizer.from_pretrained(model_path) self.model.eval() # Updated priority weights with more nuanced categorization self.intent_priorities = { # High Priority (Information Seeking) 'RecommendationRequest': 1.0, 'Request': 1.0, 'ComparisonRequest': 0.95, 'ClarificationRequest': 0.95, # Important Context 'Fact': 0.9, 'ActionReport': 0.85, 'Preference': 0.85, # Supporting Information 'Opinion': 0.7, 'SystemRecommendation': 0.7, 'Answer': 0.7, # Secondary Information 'Sentiment': 0.5, 'Feedback': 0.5, 'ReferenceToPriorConversation': 0.5, # Low Priority 'Greetings': 0.3, 'Farewell': 0.3, 'AgreementWithSystem': 0.3, 'DisagreementWithSystem': 0.3, # Special Cases 'IrrelevantUtterance': 0.1, 'Unclear': 0.1 } # Intent relationships for context self.intent_relationships = { 'RecommendationRequest': ['Fact', 'Preference', 'ActionReport'], 'Request': ['Fact', 'Preference', 'ActionReport'], 'ComparisonRequest': ['Fact', 'Preference'], 'ClarificationRequest': ['Fact', 'ReferenceToPriorConversation'] } def segment_text(self, text): """Enhanced text segmentation""" # Split on stronger boundaries text = re.sub(r' and ', '. ', text) text = re.sub(r', (?=[A-Z])', '. ', text) # Split on sentence boundaries segments = re.split('[.!?]', text) # Clean segments segments = [s.strip() for s in segments if s.strip()] # Handle subordinate clauses refined_segments = [] for segment in segments: if any(marker in segment.lower() for marker in ['because', 'since', 'as', 'would like', 'want to']): parts = re.split(r'\b(because|since|as|would like|want to)\b', segment, flags=re.IGNORECASE) refined_segments.extend([p.strip() for p in parts if p.strip()]) else: refined_segments.append(segment) if not refined_segments: refined_segments = [text.strip()] return refined_segments def classify_segment(self, text): encoding = self.tokenizer.encode_plus( text, add_special_tokens=True, max_length=128, return_token_type_ids=False, padding='max_length', return_attention_mask=True, return_tensors='pt', truncation=True ) input_ids = encoding['input_ids'].to(self.device) attention_mask = encoding['attention_mask'].to(self.device) with torch.no_grad(): outputs = self.model(input_ids, attention_mask=attention_mask) probabilities = torch.softmax(outputs.logits, dim=1)[0] # Take first element confidence_values, pred_indices = torch.topk(probabilities, k=3) # Convert to Python lists/floats return (pred_indices.cpu().tolist(), confidence_values.cpu().tolist()) def get_intent_label(self, index): try: with open('intent_mapping.json', 'r') as f: intent_mapping = json.load(f) return intent_mapping.get(f"LABEL_{index}", "Unknown") except Exception as e: print(f"Error loading intent mapping: {e}") return "Unknown" def classify_text(self, text): """Enhanced text classification with context awareness""" segments = self.segment_text(text) all_results = [] # First pass: Classify all segments for segment in segments: try: pred_indices, confidence_values = self.classify_segment(segment) segment_results = [] for pred_idx, conf in zip(pred_indices, confidence_values): intent = self.get_intent_label(pred_idx) base_priority = self.intent_priorities.get(intent, 0.5) # Adjust priority based on segment position position_boost = 1.0 if segment == segments[0] else 0.9 # Boost priority if contains key phrases content_boost = 1.2 if any(phrase in segment.lower() for phrase in ['need', 'want', 'help', 'recommend', 'advice', 'suggest']) else 1.0 weighted_confidence = float(conf) * base_priority * position_boost * content_boost segment_results.append({ 'segment': segment, 'intent': intent, 'confidence': float(conf), 'weighted_confidence': weighted_confidence, 'base_priority': base_priority }) all_results.extend(segment_results) except Exception as e: print(f"Error processing segment '{segment}': {e}") continue if not all_results: return None # Sort by weighted confidence all_results.sort(key=lambda x: x['weighted_confidence'], reverse=True) # Filter and enhance results primary_intent = all_results[0] secondary_intents = [] # Look for supporting intents if primary_intent['intent'] in self.intent_relationships: related_intents = self.intent_relationships[primary_intent['intent']] for result in all_results[1:]: if (result['intent'] in related_intents and result['confidence'] > 0.4 and len(secondary_intents) < 2): secondary_intents.append(result) # Add high confidence intents if we still need more if len(secondary_intents) < 2: for result in all_results[1:]: if (result['weighted_confidence'] > 0.4 and result not in secondary_intents and len(secondary_intents) < 2): secondary_intents.append(result) return [primary_intent] + secondary_intents def train_model(train_data, val_data, intent_labels, device): train_questions, train_intents = zip(*train_data) val_questions, val_intents = zip(*val_data) intent_to_label = {intent: i for i, intent in enumerate(intent_labels)} config = BertConfig.from_pretrained('bert-base-uncased', num_labels=len(intent_labels)) model = BertForSequenceClassification.from_pretrained('bert-base-uncased', config=config) tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') model = model.to(device) train_dataset = IntentDataset(train_questions, train_intents, tokenizer, max_len=128, intent_to_label=intent_to_label) val_dataset = IntentDataset(val_questions, val_intents, tokenizer, max_len=128, intent_to_label=intent_to_label) train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=16) optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5) num_epochs = 25 scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_loader) * num_epochs) scaler = amp.GradScaler() best_val_f1 = 0 for epoch in range(num_epochs): model.train() total_loss = 0 for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"): optimizer.zero_grad() input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) with amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'): outputs = model(input_ids, attention_mask=attention_mask, labels=labels) loss = outputs.loss scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() scheduler.step() total_loss += loss.item() # Validation model.eval() val_preds = [] val_true = [] with torch.no_grad(): for batch in val_loader: input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) outputs = model(input_ids, attention_mask=attention_mask) _, preds = torch.max(outputs.logits, dim=1) val_preds.extend(preds.cpu().tolist()) val_true.extend(labels.cpu().tolist()) val_f1 = f1_score(val_true, val_preds, average='weighted') print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}, Val F1: {val_f1:.4f}") if val_f1 > best_val_f1: best_val_f1 = val_f1 model.save_pretrained('./fine_tuned_bert') tokenizer.save_pretrained('./fine_tuned_bert') print(f"New best model saved with validation F1: {best_val_f1:.4f}") # Save intent mapping to model directory intent_mapping = {f"LABEL_{i}": intent for i, intent in enumerate(intent_labels)} with open('./fine_tuned_bert/intent_mapping.json', 'w') as f: json.dump(intent_mapping, f, indent=2) return model, tokenizer def interactive_classification(): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") try: classifier = IntentClassifier('./fine_tuned_bert', device) print("\nModel loaded successfully!") print("\nEnter your questions (type 'quit' to exit):") while True: question = input("\nEnter your question: ").strip() if question.lower() in ['quit', 'exit', 'q']: print("Exiting...") break if not question: print("Please enter a valid question.") continue try: results = classifier.classify_text(question) if results: print("\nResults:") for i, result in enumerate(results, 1): print(f"\nIntent {i}:") print(f"Detected Intent: {result['intent']}") print(f"Confidence: {result['confidence']:.2%}") print(f"Segment: {result['segment']}") conf = result['confidence'] if conf >= 0.9: print("Confidence Level: Very High") elif conf >= 0.7: print("Confidence Level: High") elif conf >= 0.5: print("Confidence Level: Moderate") else: print("Confidence Level: Low") else: print("Could not determine intent with sufficient confidence.") except Exception as e: print(f"Error processing question: {str(e)}") print("Please try another question.") except Exception as e: print(f"Error loading model: {str(e)}") print("Please ensure the model has been trained and saved correctly.") def main(): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") model_dir = '/app/code/fine_tuned_bert' required_files = { 'config.json', 'intent_mapping.json', 'tokenizer_config.json', 'vocab.txt' } model_files = {'pytorch_model.bin', 'model.safetensors'} # Common model formats # Check for existing valid model if os.path.exists(model_dir): existing_files = set(os.listdir(model_dir)) # Check for at least one model file format has_model_file = any(f in existing_files for f in model_files) # Check all other required files has_required = required_files.issubset(existing_files) if has_model_file and has_required: print("Found valid existing model. Loading...") classifier = IntentClassifier(model_dir, device) print("\nStarting interactive classification...") interactive_classification() return # If we get here, train new model print("Model not found or incomplete. Starting training...") os.makedirs(model_dir, exist_ok=True) # Load data and create mapping train_data, test_data, val_data, intent_labels = load_data( 'training-22-intent.json', 'validation-22-intent.json' ) intent_mapping = {f"LABEL_{i}": intent for i, intent in enumerate(intent_labels)} with open(os.path.join(model_dir, 'intent_mapping.json'), 'w') as f: json.dump(intent_mapping, f, indent=2) # Train and save model, tokenizer = train_model(train_data, val_data, intent_labels, device) model.save_pretrained(model_dir, safe_serialization=False) # Force PyTorch format tokenizer.save_pretrained(model_dir) print("Training complete. Starting classification...") interactive_classification() if __name__ == "__main__": main()