| |
| import os |
| |
| import torch |
| import torch.nn as nn |
| from transformers import BertForSequenceClassification, BertTokenizerFast |
| import torch.optim as optim |
| from torch.nn import CrossEntropyLoss |
| from torch.utils.data import DataLoader, TensorDataset |
| |
| from FallingPlanet.orbit.utils.Metrics import AdvancedMetrics |
| from FallingPlanet.orbit.utils.Metrics import TinyEmoBoard |
| import torchmetrics |
| from tqdm import tqdm |
| from FallingPlanet.orbit.utils.callbacks import EarlyStopping |
| from FallingPlanet.orbit.models import BertFineTuneTiny |
| from itertools import islice |
|
|
| class Classifier: |
| def __init__(self,model, device, num_labels, log_dir): |
| self.model = model.to(device) |
| self.device = device |
| self.loss_criterion = CrossEntropyLoss() |
| self.writer = TinyEmoBoard(log_dir=log_dir) |
| |
| |
| self.accuracy = torchmetrics.Accuracy(num_classes=num_labels, task='multiclass').to(device) |
| self.precision = torchmetrics.Precision(num_classes=num_labels, task='multiclass').to(device) |
| self.recall = torchmetrics.Recall(num_classes=num_labels, task='multiclass').to(device) |
| self.f1= torchmetrics.F1Score(num_classes=num_labels, task = 'multiclass').to(device) |
| self.mcc = torchmetrics.MatthewsCorrCoef(num_classes=num_labels,task = 'multiclass').to(device) |
| self.top2_acc = torchmetrics.Accuracy(top_k=2, num_classes=num_labels,task='multiclass').to(device) |
| |
| def compute_loss(self,logits, labels): |
| loss = self.loss_criterion(logits,labels) |
| return loss |
| |
| def train_step(self, dataloader, optimizer, epoch): |
| self.model.train() |
| total_loss = 0.0 |
| |
| total_accuracy = 0.0 |
| total_precision = 0.0 |
| total_recall = 0.0 |
| total_f1 = 0.0 |
| total_mcc = 0.0 |
|
|
| pbar = tqdm(dataloader, desc=f"Training Epoch {epoch}") |
|
|
| for batch in pbar: |
| input_ids, attention_masks, labels = [x.to(self.device) for x in batch] |
|
|
| optimizer.zero_grad() |
| outputs = self.model(input_ids, attention_masks) |
| loss = self.compute_loss(outputs, labels) |
| loss.backward() |
| optimizer.step() |
|
|
|
|
| total_loss += loss.item() |
|
|
| |
| total_accuracy += self.accuracy(outputs.argmax(dim=1), labels).item() |
| total_precision += self.precision(outputs.argmax(dim=1), labels).item() |
| total_recall += self.recall(outputs.argmax(dim=1), labels).item() |
| total_f1 += self.f1(outputs, labels).item() |
| total_mcc += self.mcc(outputs.argmax(dim=1), labels).item() |
|
|
| |
| pbar.set_postfix(loss=total_loss / (pbar.n + 1)) |
|
|
| |
| num_batches = len(dataloader) |
| avg_accuracy = total_accuracy / num_batches |
| avg_precision = total_precision / num_batches |
| avg_recall = total_recall / num_batches |
| avg_f1 = total_f1 / num_batches |
| avg_mcc = total_mcc / num_batches |
| avg_train_loss = total_loss / num_batches |
|
|
| |
| self.writer.log_scalar('Training/Average Loss', avg_train_loss, epoch) |
| self.writer.log_scalar('Training/Average Accuracy', avg_accuracy, epoch) |
| self.writer.log_scalar('Training/Average Precision', avg_precision, epoch) |
| self.writer.log_scalar('Training/Average Recall', avg_recall, epoch) |
| self.writer.log_scalar('Training/Average F1', avg_f1, epoch) |
| self.writer.log_scalar('Training/Average MCC', avg_mcc, epoch) |
|
|
| pbar.close() |
|
|
|
|
| def val_step(self, dataloader, epoch): |
| self.model.eval() |
| total_loss = 0.0 |
| |
| total_accuracy = 0.0 |
| total_precision = 0.0 |
| total_recall = 0.0 |
| total_f1 = 0.0 |
| total_mcc = 0.0 |
|
|
| with torch.no_grad(): |
| pbar = tqdm(dataloader, desc=f"Validation Epoch {epoch}") |
| for batch in pbar: |
| input_ids, attention_masks, labels = [x.to(self.device) for x in batch] |
| |
| outputs = self.model(input_ids, attention_masks) |
| loss = self.compute_loss(outputs, labels) |
|
|
| total_loss += loss.item() |
|
|
| |
| total_accuracy += self.accuracy(outputs.argmax(dim=1), labels).item() |
| total_precision += self.precision(outputs.argmax(dim=1), labels).item() |
| total_recall += self.recall(outputs.argmax(dim=1), labels).item() |
| total_f1 += self.f1(outputs, labels).item() |
| total_mcc += self.mcc(outputs.argmax(dim=1), labels).item() |
|
|
| |
| pbar.set_postfix(loss=total_loss / (pbar.n + 1)) |
|
|
| |
| num_batches = len(dataloader) |
| avg_val_loss = total_loss / num_batches |
| avg_accuracy = total_accuracy / num_batches |
| avg_precision = total_precision / num_batches |
| avg_recall = total_recall / num_batches |
| avg_f1 = total_f1 / num_batches |
| avg_mcc = total_mcc / num_batches |
|
|
| |
| self.writer.log_scalar('Validation/Average Loss', avg_val_loss, epoch) |
| self.writer.log_scalar('Validation/Average Accuracy', avg_accuracy, epoch) |
| self.writer.log_scalar('Validation/Average Precision', avg_precision, epoch) |
| self.writer.log_scalar('Validation/Average Recall', avg_recall, epoch) |
| self.writer.log_scalar('Validation/Average F1', avg_f1, epoch) |
| self.writer.log_scalar('Validation/Average MCC', avg_mcc, epoch) |
|
|
| pbar.close() |
| return avg_val_loss |
| |
| |
| def test_step(self, dataloader): |
| self.model.eval() |
| |
| aggregated_metrics = { |
| 'total_accuracy': 0.0, |
| 'total_precision': 0.0, |
| 'total_recall': 0.0, |
| 'total_f1': 0.0, |
| 'total_mcc': 0.0, |
| 'total_top_2_acc': 0.0 |
| } |
|
|
| with torch.no_grad(): |
| pbar = tqdm(dataloader, desc="Testing") |
| for batch in pbar: |
| input_ids, attention_masks, labels = [x.to(self.device) for x in batch] |
| outputs = self.model(input_ids, attention_masks) |
|
|
| |
| aggregated_metrics['total_accuracy'] += self.accuracy(outputs.argmax(dim=1), labels).item() |
| aggregated_metrics['total_precision'] += self.precision(outputs.argmax(dim=1), labels).item() |
| aggregated_metrics['total_recall'] += self.recall(outputs.argmax(dim=1), labels).item() |
| aggregated_metrics['total_f1'] += self.f1(outputs, labels).item() |
| aggregated_metrics['total_mcc'] += self.mcc(outputs.argmax(dim=1), labels).item() |
| aggregated_metrics['total_top_2_acc'] += self.top2_acc(outputs, labels).item() |
|
|
| |
| pbar.set_postfix({ |
| 'Accuracy': aggregated_metrics['total_accuracy'] / (pbar.n + 1), |
| 'MCC': aggregated_metrics['total_mcc'] / (pbar.n + 1) |
| }) |
|
|
| |
| num_batches = len(dataloader) |
| for key in aggregated_metrics: |
| aggregated_metrics[key] /= num_batches |
|
|
| return aggregated_metrics |
|
|
|
|
| |
| def main(mode = "full"): |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
| |
| emotion_data_train = torch.load(r"E:\text_datasets\saved\train_emotion_no_batch_no_batch.pt") |
| emotion_data_val = torch.load(r"E:\text_datasets\saved\val_emotion_no_batch_no_batch.pt") |
| emotion_data_test = torch.load(r"E:\text_datasets\saved\test_emotion_no_batch_no_batch.pt") |
| |
| |
|
|
| |
| |
| |
| dataloader_train = DataLoader(emotion_data_train, batch_size=512, shuffle=True) |
| dataloader_val = DataLoader(emotion_data_val, batch_size=512) |
| dataloader_test = DataLoader(emotion_data_test, batch_size=512) |
| |
| NUM_EMOTION_LABELS = 9 |
| LOG_DIR = r"EmoBERTv2-tiny\logging" |
| |
|
|
| model = BertFineTuneTiny(num_tasks=1, num_labels=[9]) |
| optimizer = torch.optim.AdamW(model.parameters(),lr =1e-5, weight_decay=1e-6) |
| classifier = Classifier(model, device, NUM_EMOTION_LABELS, LOG_DIR) |
|
|
| if mode in ["train", "full"]: |
| |
| early_stopping = EarlyStopping(patience=50, min_delta=1e-8) |
| num_epochs = 75 |
| for epoch in range(num_epochs): |
| classifier.train_step(dataloader_train, optimizer, epoch) |
| val_loss = classifier.val_step(dataloader_val, epoch) |
|
|
| if early_stopping.step(val_loss, classifier.model): |
| print("Early stopping triggered. Restoring best model weights.") |
| classifier.model.load_state_dict(early_stopping.best_state) |
| break |
|
|
| if early_stopping.best_state is not None: |
| torch.save(early_stopping.best_state, 'EmoBERTv2-tiny.pth') |
|
|
| if mode in ["test", "full"]: |
| if os.path.exists('EmoBERTv2-tiny.pth'): |
| classifier.model.load_state_dict(torch.load('EmoBERTv2-tiny.pth')) |
| |
| test_results = classifier.test_step(dataloader_test) |
| print("Test Results:", test_results) |
|
|
|
|
| if __name__ == "__main__": |
| main(mode="full") |