|
|
import json
|
|
|
import torch
|
|
|
from transformers import BertConfig, BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup
|
|
|
from torch.utils.data import Dataset, DataLoader
|
|
|
from tqdm import tqdm
|
|
|
from sklearn.metrics import f1_score
|
|
|
import torch.amp as amp
|
|
|
import re
|
|
|
import warnings
|
|
|
from transformers import logging
|
|
|
import os
|
|
|
|
|
|
warnings.filterwarnings("ignore")
|
|
|
logging.set_verbosity_error()
|
|
|
|
|
|
|
|
|
class IntentDataset(Dataset):
|
|
|
def __init__(self, questions, intents, tokenizer, max_len, intent_to_label, is_inference=False):
|
|
|
self.questions = questions
|
|
|
self.intents = intents
|
|
|
self.tokenizer = tokenizer
|
|
|
self.max_len = max_len
|
|
|
self.intent_to_label = intent_to_label
|
|
|
self.is_inference = is_inference
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.questions)
|
|
|
|
|
|
def __getitem__(self, item):
|
|
|
question = str(self.questions[item])
|
|
|
intent = self.intents[item]
|
|
|
|
|
|
encoding = self.tokenizer.encode_plus(
|
|
|
question,
|
|
|
add_special_tokens=True,
|
|
|
max_length=self.max_len,
|
|
|
return_token_type_ids=False,
|
|
|
padding='max_length',
|
|
|
return_attention_mask=True,
|
|
|
return_tensors='pt',
|
|
|
truncation=True
|
|
|
)
|
|
|
|
|
|
if self.is_inference:
|
|
|
return {
|
|
|
'input_ids': encoding['input_ids'].flatten(),
|
|
|
'attention_mask': encoding['attention_mask'].flatten(),
|
|
|
}
|
|
|
else:
|
|
|
return {
|
|
|
'input_ids': encoding['input_ids'].flatten(),
|
|
|
'attention_mask': encoding['attention_mask'].flatten(),
|
|
|
'labels': torch.tensor(self.intent_to_label[intent], dtype=torch.long)
|
|
|
}
|
|
|
|
|
|
|
|
|
def load_data(test_file, val_file):
|
|
|
import random
|
|
|
|
|
|
print(f"Reading test data from {test_file}")
|
|
|
with open(test_file, 'r') as f:
|
|
|
test_data = json.load(f)
|
|
|
|
|
|
print(f"Reading validation data from {val_file}")
|
|
|
with open(val_file, 'r') as f:
|
|
|
val_data = json.load(f)
|
|
|
|
|
|
all_data = []
|
|
|
for item in test_data:
|
|
|
if isinstance(item, dict):
|
|
|
utterance = item.get('utterance') or item.get('text') or item.get('question')
|
|
|
intent = item.get('intent') or item.get('label') or item.get('class')
|
|
|
if utterance and intent:
|
|
|
all_data.append((utterance, intent))
|
|
|
elif isinstance(item, list) and len(item) == 2:
|
|
|
all_data.append(tuple(item))
|
|
|
|
|
|
random.shuffle(all_data)
|
|
|
split_point = int(len(all_data) * 0.7)
|
|
|
train_processed = all_data[:split_point]
|
|
|
test_processed = all_data[split_point:]
|
|
|
|
|
|
val_processed = []
|
|
|
for item in val_data:
|
|
|
if isinstance(item, dict):
|
|
|
utterance = item.get('utterance') or item.get('text') or item.get('question')
|
|
|
intent = item.get('intent') or item.get('label') or item.get('class')
|
|
|
if utterance and intent:
|
|
|
val_processed.append((utterance, intent))
|
|
|
elif isinstance(item, list) and len(item) == 2:
|
|
|
val_processed.append(tuple(item))
|
|
|
|
|
|
intent_labels = list(set([intent for _, intent in train_processed + test_processed + val_processed]))
|
|
|
print(
|
|
|
f"Loaded {len(train_processed)} training examples, {len(test_processed)} test examples, and {len(val_processed)} validation examples")
|
|
|
return train_processed, test_processed, val_processed, intent_labels
|
|
|
|
|
|
|
|
|
class IntentClassifier:
|
|
|
def __init__(self, model_path, device):
|
|
|
self.device = device
|
|
|
self.model = BertForSequenceClassification.from_pretrained(model_path).to(device)
|
|
|
self.tokenizer = BertTokenizer.from_pretrained(model_path)
|
|
|
self.model.eval()
|
|
|
|
|
|
|
|
|
self.intent_priorities = {
|
|
|
|
|
|
'RecommendationRequest': 1.0,
|
|
|
'Request': 1.0,
|
|
|
'ComparisonRequest': 0.95,
|
|
|
'ClarificationRequest': 0.95,
|
|
|
|
|
|
|
|
|
'Fact': 0.9,
|
|
|
'ActionReport': 0.85,
|
|
|
'Preference': 0.85,
|
|
|
|
|
|
|
|
|
'Opinion': 0.7,
|
|
|
'SystemRecommendation': 0.7,
|
|
|
'Answer': 0.7,
|
|
|
|
|
|
|
|
|
'Sentiment': 0.5,
|
|
|
'Feedback': 0.5,
|
|
|
'ReferenceToPriorConversation': 0.5,
|
|
|
|
|
|
|
|
|
'Greetings': 0.3,
|
|
|
'Farewell': 0.3,
|
|
|
'AgreementWithSystem': 0.3,
|
|
|
'DisagreementWithSystem': 0.3,
|
|
|
|
|
|
|
|
|
'IrrelevantUtterance': 0.1,
|
|
|
'Unclear': 0.1
|
|
|
}
|
|
|
|
|
|
|
|
|
self.intent_relationships = {
|
|
|
'RecommendationRequest': ['Fact', 'Preference', 'ActionReport'],
|
|
|
'Request': ['Fact', 'Preference', 'ActionReport'],
|
|
|
'ComparisonRequest': ['Fact', 'Preference'],
|
|
|
'ClarificationRequest': ['Fact', 'ReferenceToPriorConversation']
|
|
|
}
|
|
|
|
|
|
def segment_text(self, text):
|
|
|
"""Enhanced text segmentation"""
|
|
|
|
|
|
text = re.sub(r' and ', '. ', text)
|
|
|
text = re.sub(r', (?=[A-Z])', '. ', text)
|
|
|
|
|
|
|
|
|
segments = re.split('[.!?]', text)
|
|
|
|
|
|
|
|
|
segments = [s.strip() for s in segments if s.strip()]
|
|
|
|
|
|
|
|
|
refined_segments = []
|
|
|
for segment in segments:
|
|
|
if any(marker in segment.lower() for marker in ['because', 'since', 'as', 'would like', 'want to']):
|
|
|
parts = re.split(r'\b(because|since|as|would like|want to)\b', segment, flags=re.IGNORECASE)
|
|
|
refined_segments.extend([p.strip() for p in parts if p.strip()])
|
|
|
else:
|
|
|
refined_segments.append(segment)
|
|
|
|
|
|
if not refined_segments:
|
|
|
refined_segments = [text.strip()]
|
|
|
|
|
|
return refined_segments
|
|
|
|
|
|
def classify_segment(self, text):
|
|
|
encoding = self.tokenizer.encode_plus(
|
|
|
text,
|
|
|
add_special_tokens=True,
|
|
|
max_length=128,
|
|
|
return_token_type_ids=False,
|
|
|
padding='max_length',
|
|
|
return_attention_mask=True,
|
|
|
return_tensors='pt',
|
|
|
truncation=True
|
|
|
)
|
|
|
|
|
|
input_ids = encoding['input_ids'].to(self.device)
|
|
|
attention_mask = encoding['attention_mask'].to(self.device)
|
|
|
|
|
|
with torch.no_grad():
|
|
|
outputs = self.model(input_ids, attention_mask=attention_mask)
|
|
|
probabilities = torch.softmax(outputs.logits, dim=1)[0]
|
|
|
confidence_values, pred_indices = torch.topk(probabilities, k=3)
|
|
|
|
|
|
|
|
|
return (pred_indices.cpu().tolist(),
|
|
|
confidence_values.cpu().tolist())
|
|
|
|
|
|
def get_intent_label(self, index):
|
|
|
try:
|
|
|
with open('intent_mapping.json', 'r') as f:
|
|
|
intent_mapping = json.load(f)
|
|
|
return intent_mapping.get(f"LABEL_{index}", "Unknown")
|
|
|
except Exception as e:
|
|
|
print(f"Error loading intent mapping: {e}")
|
|
|
return "Unknown"
|
|
|
|
|
|
def classify_text(self, text):
|
|
|
"""Enhanced text classification with context awareness"""
|
|
|
segments = self.segment_text(text)
|
|
|
all_results = []
|
|
|
|
|
|
|
|
|
for segment in segments:
|
|
|
try:
|
|
|
pred_indices, confidence_values = self.classify_segment(segment)
|
|
|
|
|
|
segment_results = []
|
|
|
for pred_idx, conf in zip(pred_indices, confidence_values):
|
|
|
intent = self.get_intent_label(pred_idx)
|
|
|
base_priority = self.intent_priorities.get(intent, 0.5)
|
|
|
|
|
|
|
|
|
position_boost = 1.0 if segment == segments[0] else 0.9
|
|
|
|
|
|
|
|
|
content_boost = 1.2 if any(phrase in segment.lower() for phrase in
|
|
|
['need', 'want', 'help', 'recommend', 'advice', 'suggest']) else 1.0
|
|
|
|
|
|
weighted_confidence = float(conf) * base_priority * position_boost * content_boost
|
|
|
|
|
|
segment_results.append({
|
|
|
'segment': segment,
|
|
|
'intent': intent,
|
|
|
'confidence': float(conf),
|
|
|
'weighted_confidence': weighted_confidence,
|
|
|
'base_priority': base_priority
|
|
|
})
|
|
|
|
|
|
all_results.extend(segment_results)
|
|
|
except Exception as e:
|
|
|
print(f"Error processing segment '{segment}': {e}")
|
|
|
continue
|
|
|
|
|
|
if not all_results:
|
|
|
return None
|
|
|
|
|
|
|
|
|
all_results.sort(key=lambda x: x['weighted_confidence'], reverse=True)
|
|
|
|
|
|
|
|
|
primary_intent = all_results[0]
|
|
|
secondary_intents = []
|
|
|
|
|
|
|
|
|
if primary_intent['intent'] in self.intent_relationships:
|
|
|
related_intents = self.intent_relationships[primary_intent['intent']]
|
|
|
for result in all_results[1:]:
|
|
|
if (result['intent'] in related_intents and
|
|
|
result['confidence'] > 0.4 and
|
|
|
len(secondary_intents) < 2):
|
|
|
secondary_intents.append(result)
|
|
|
|
|
|
|
|
|
if len(secondary_intents) < 2:
|
|
|
for result in all_results[1:]:
|
|
|
if (result['weighted_confidence'] > 0.4 and
|
|
|
result not in secondary_intents and
|
|
|
len(secondary_intents) < 2):
|
|
|
secondary_intents.append(result)
|
|
|
|
|
|
return [primary_intent] + secondary_intents
|
|
|
|
|
|
|
|
|
def train_model(train_data, val_data, intent_labels, device):
|
|
|
train_questions, train_intents = zip(*train_data)
|
|
|
val_questions, val_intents = zip(*val_data)
|
|
|
intent_to_label = {intent: i for i, intent in enumerate(intent_labels)}
|
|
|
|
|
|
config = BertConfig.from_pretrained('bert-base-uncased',
|
|
|
num_labels=len(intent_labels))
|
|
|
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', config=config)
|
|
|
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
|
|
|
|
|
|
model = model.to(device)
|
|
|
|
|
|
train_dataset = IntentDataset(train_questions, train_intents, tokenizer, max_len=128,
|
|
|
intent_to_label=intent_to_label)
|
|
|
val_dataset = IntentDataset(val_questions, val_intents, tokenizer, max_len=128,
|
|
|
intent_to_label=intent_to_label)
|
|
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
|
|
|
val_loader = DataLoader(val_dataset, batch_size=16)
|
|
|
|
|
|
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
|
|
|
num_epochs = 25
|
|
|
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0,
|
|
|
num_training_steps=len(train_loader) * num_epochs)
|
|
|
scaler = amp.GradScaler()
|
|
|
best_val_f1 = 0
|
|
|
|
|
|
for epoch in range(num_epochs):
|
|
|
model.train()
|
|
|
total_loss = 0
|
|
|
for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
|
|
|
optimizer.zero_grad()
|
|
|
input_ids = batch['input_ids'].to(device)
|
|
|
attention_mask = batch['attention_mask'].to(device)
|
|
|
labels = batch['labels'].to(device)
|
|
|
|
|
|
with amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'):
|
|
|
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
|
|
|
loss = outputs.loss
|
|
|
|
|
|
scaler.scale(loss).backward()
|
|
|
scaler.step(optimizer)
|
|
|
scaler.update()
|
|
|
scheduler.step()
|
|
|
|
|
|
total_loss += loss.item()
|
|
|
|
|
|
|
|
|
model.eval()
|
|
|
val_preds = []
|
|
|
val_true = []
|
|
|
|
|
|
with torch.no_grad():
|
|
|
for batch in val_loader:
|
|
|
input_ids = batch['input_ids'].to(device)
|
|
|
attention_mask = batch['attention_mask'].to(device)
|
|
|
labels = batch['labels'].to(device)
|
|
|
|
|
|
outputs = model(input_ids, attention_mask=attention_mask)
|
|
|
_, preds = torch.max(outputs.logits, dim=1)
|
|
|
|
|
|
val_preds.extend(preds.cpu().tolist())
|
|
|
val_true.extend(labels.cpu().tolist())
|
|
|
|
|
|
val_f1 = f1_score(val_true, val_preds, average='weighted')
|
|
|
print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}, Val F1: {val_f1:.4f}")
|
|
|
|
|
|
if val_f1 > best_val_f1:
|
|
|
best_val_f1 = val_f1
|
|
|
model.save_pretrained('./fine_tuned_bert')
|
|
|
tokenizer.save_pretrained('./fine_tuned_bert')
|
|
|
print(f"New best model saved with validation F1: {best_val_f1:.4f}")
|
|
|
|
|
|
intent_mapping = {f"LABEL_{i}": intent for i, intent in enumerate(intent_labels)}
|
|
|
with open('./fine_tuned_bert/intent_mapping.json', 'w') as f:
|
|
|
json.dump(intent_mapping, f, indent=2)
|
|
|
|
|
|
return model, tokenizer
|
|
|
|
|
|
|
|
|
def interactive_classification():
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
print(f"Using device: {device}")
|
|
|
|
|
|
try:
|
|
|
classifier = IntentClassifier('./fine_tuned_bert', device)
|
|
|
print("\nModel loaded successfully!")
|
|
|
print("\nEnter your questions (type 'quit' to exit):")
|
|
|
|
|
|
while True:
|
|
|
question = input("\nEnter your question: ").strip()
|
|
|
|
|
|
if question.lower() in ['quit', 'exit', 'q']:
|
|
|
print("Exiting...")
|
|
|
break
|
|
|
|
|
|
if not question:
|
|
|
print("Please enter a valid question.")
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
results = classifier.classify_text(question)
|
|
|
|
|
|
if results:
|
|
|
print("\nResults:")
|
|
|
for i, result in enumerate(results, 1):
|
|
|
print(f"\nIntent {i}:")
|
|
|
print(f"Detected Intent: {result['intent']}")
|
|
|
print(f"Confidence: {result['confidence']:.2%}")
|
|
|
print(f"Segment: {result['segment']}")
|
|
|
|
|
|
conf = result['confidence']
|
|
|
if conf >= 0.9:
|
|
|
print("Confidence Level: Very High")
|
|
|
elif conf >= 0.7:
|
|
|
print("Confidence Level: High")
|
|
|
elif conf >= 0.5:
|
|
|
print("Confidence Level: Moderate")
|
|
|
else:
|
|
|
print("Confidence Level: Low")
|
|
|
else:
|
|
|
print("Could not determine intent with sufficient confidence.")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing question: {str(e)}")
|
|
|
print("Please try another question.")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error loading model: {str(e)}")
|
|
|
print("Please ensure the model has been trained and saved correctly.")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
print(f"Using device: {device}")
|
|
|
|
|
|
model_dir = '/app/code/fine_tuned_bert'
|
|
|
required_files = {
|
|
|
'config.json',
|
|
|
'intent_mapping.json',
|
|
|
'tokenizer_config.json',
|
|
|
'vocab.txt'
|
|
|
}
|
|
|
model_files = {'pytorch_model.bin', 'model.safetensors'}
|
|
|
|
|
|
|
|
|
if os.path.exists(model_dir):
|
|
|
existing_files = set(os.listdir(model_dir))
|
|
|
|
|
|
|
|
|
has_model_file = any(f in existing_files for f in model_files)
|
|
|
|
|
|
|
|
|
has_required = required_files.issubset(existing_files)
|
|
|
|
|
|
if has_model_file and has_required:
|
|
|
print("Found valid existing model. Loading...")
|
|
|
classifier = IntentClassifier(model_dir, device)
|
|
|
print("\nStarting interactive classification...")
|
|
|
interactive_classification()
|
|
|
return
|
|
|
|
|
|
|
|
|
print("Model not found or incomplete. Starting training...")
|
|
|
os.makedirs(model_dir, exist_ok=True)
|
|
|
|
|
|
|
|
|
train_data, test_data, val_data, intent_labels = load_data(
|
|
|
'training-22-intent.json',
|
|
|
'validation-22-intent.json'
|
|
|
)
|
|
|
intent_mapping = {f"LABEL_{i}": intent for i, intent in enumerate(intent_labels)}
|
|
|
with open(os.path.join(model_dir, 'intent_mapping.json'), 'w') as f:
|
|
|
json.dump(intent_mapping, f, indent=2)
|
|
|
|
|
|
|
|
|
model, tokenizer = train_model(train_data, val_data, intent_labels, device)
|
|
|
model.save_pretrained(model_dir, safe_serialization=False)
|
|
|
tokenizer.save_pretrained(model_dir)
|
|
|
|
|
|
print("Training complete. Starting classification...")
|
|
|
interactive_classification()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |