# Create the BertClassfier class import numpy as np import torch import torch.nn as nn from transformers import AdamW, get_linear_schedule_with_warmup device='cuda' import random import time import torch.nn as nn # Specify loss function loss_fn = nn.CrossEntropyLoss() class PretrainedBert(nn.Module): """Bert Model for Classification Tasks. """ def __init__(self, freeze_bert=False): """ @param bert: a BertModel object @param classifier: a torch.nn.Module classifier @param freeze_bert (bool): Set `False` to fine-tune the BERT model """ super(PretrainedBert, self).__init__() # Specify hidden size of BERT, hidden size of our classifier, and number of labels D_in, H, D_out = 768, 50, 14 # Instantiate BERT model from transformers import BertConfig config = BertConfig( # we align this to the tokenizer vocab_size max_position_embeddings=5000, hidden_size=768, num_attention_heads=2, num_hidden_layers=2, type_vocab_size=1 ) from transformers import BertForMaskedLM self.bert =BertModel(config) # Instantiate an one-layer feed-forward classifier self.classifier = nn.Sequential( nn.Linear(D_in, H), nn.ReLU(), #nn.Dropout(0.5), nn.Linear(H, D_out) ) # Freeze the BERT model if freeze_bert: for param in self.bert.parameters(): param.requires_grad = False def forward(self, input_ids, attention_mask): """ Feed input to BERT and the classifier to compute logits. @param input_ids (torch.Tensor): an input tensor with shape (batch_size, max_length) @param attention_mask (torch.Tensor): a tensor that hold attention mask information with shape (batch_size, max_length) @return logits (torch.Tensor): an output tensor with shape (batch_size, num_labels) """ # Feed input to BERT outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) # Extract the last hidden state of the token `[CLS]` for classification task last_hidden_state_cls = outputs[0][:, 0, :] # Feed input to classifier to compute logits logits = self.classifier(last_hidden_state_cls) return logits from transformers import AdamW, get_linear_schedule_with_warmup device='cuda' def valid_evaluate(model, val_dataloader): """After the completion of each training epoch, measure the model's performance on our validation set. """ # Put the model into the evaluation mode. The dropout layers are disabled during # the test time. model.eval() # Tracking variables val_accuracy = [] val_loss = [] # For each batch in our validation set... for batch in val_dataloader: # Load batch to GPU b_input_ids, b_attn_mask, b_labels = tuple(t.to(device) for t in batch) # Compute logits with torch.no_grad(): logits = model(b_input_ids, b_attn_mask) # Compute loss loss = loss_fn(logits, b_labels) val_loss.append(loss.item()) # Get the predictions preds = torch.argmax(logits, dim=1).flatten() # Calculate the accuracy rate accuracy = (preds == b_labels).cpu().numpy().mean() * 100 val_accuracy.append(accuracy) # Compute the average accuracy and loss over the validation set. val_loss = np.mean(val_loss) val_accuracy = np.mean(val_accuracy) return val_loss, val_accuracy import torch import torch.nn as nn from transformers import BertModel # Create the BertClassfier class class FinetunningBert(nn.Module): """Bert Model for Classification Tasks. """ def __init__(self, freeze_bert=False): """ @param bert: a BertModel object @param classifier: a torch.nn.Module classifier @param freeze_bert (bool): Set `False` to fine-tune the BERT model """ super(FinetunningBert, self).__init__() # Specify hidden size of BERT, hidden size of our classifier, and number of labels D_in, H, D_out = 768, 50, 7 # Instantiate BERT model from transformers import BertConfig from transformers import BertForMaskedLM bert_classifier = PretrainedBert(freeze_bert=False) bert_classifier.load_state_dict(torch.load('/home/user/app/virBERT.pt')) self.bert =bert_classifier.bert.to(device) # Instantiate an one-layer feed-forward classifier self.classifier = nn.Sequential( nn.Linear(D_in, H), nn.ReLU(), #nn.Dropout(0.5), nn.Linear(H, D_out) ) # Freeze the BERT model if freeze_bert: for param in self.bert.parameters(): param.requires_grad = False def forward(self, input_ids, attention_mask): """ Feed input to BERT and the classifier to compute logits. @param input_ids (torch.Tensor): an input tensor with shape (batch_size, max_length) @param attention_mask (torch.Tensor): a tensor that hold attention mask information with shape (batch_size, max_length) @return logits (torch.Tensor): an output tensor with shape (batch_size, num_labels) """ # Feed input to BERT outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) # Extract the last hidden state of the token `[CLS]` for classification task last_hidden_state_cls = outputs[0][:, 0, :] # Feed input to classifier to compute logits logits = self.classifier(last_hidden_state_cls) return logits from transformers import AdamW, get_linear_schedule_with_warmup device='cuda' def initialize_finetunningBert(train_dataloader,epochs=4): """Initialize the Bert Classifier, the optimizer and the learning rate scheduler. """ # Instantiate Bert Classifier bert_classifier = FinetunningBert(freeze_bert=False) # Tell PyTorch to run the model on GPU bert_classifier.to(device) # Create the optimizer optimizer = AdamW(bert_classifier.parameters(), lr=5e-5, # Default learning rate eps=1e-8 # Default epsilon value ) # Total number of training steps total_steps = len(train_dataloader) * epochs # Set up the learning rate scheduler scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, # Default value num_training_steps=total_steps) return bert_classifier, optimizer, scheduler import random import time import torch.nn as nn # Specify loss function loss_fn = nn.CrossEntropyLoss() def finetunningBert_training(model, optimizer, scheduler, train_dataloader, val_dataloader=None, epochs=4, evaluation=False): """Train the BertClassifier model. """ # Start training loop print("Start training...\n") for epoch_i in range(epochs): # ======================================= # Training # ======================================= # Print the header of the result table print(f"{'Epoch':^7} | {'Batch':^7} | {'Train Loss':^12} | {'Val Loss':^10} | {'Val Acc':^9} | {'Elapsed':^9}") print("-"*70) # Measure the elapsed time of each epoch t0_epoch, t0_batch = time.time(), time.time() # Reset tracking variables at the beginning of each epoch total_loss, batch_loss, batch_counts = 0, 0, 0 # Put the model into the training mode model.train() # For each batch of training data... for step, batch in enumerate(train_dataloader): batch_counts +=1 # Load batch to GPU b_input_ids, b_attn_mask, b_labels = tuple(t.to(device) for t in batch) # Zero out any previously calculated gradients model.zero_grad() # Perform a forward pass. This will return logits. logits = model(b_input_ids, b_attn_mask) # Compute loss and accumulate the loss values loss = loss_fn(logits, b_labels) batch_loss += loss.item() total_loss += loss.item() # Perform a backward pass to calculate gradients loss.backward() # Clip the norm of the gradients to 1.0 to prevent "exploding gradients" torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Update parameters and the learning rate optimizer.step() scheduler.step() # Print the loss values and time elapsed for every 20 batches if (step % 20 == 0 and step != 0) or (step == len(train_dataloader) - 1): # Calculate time elapsed for 20 batches time_elapsed = time.time() - t0_batch # Print training results print(f"{epoch_i + 1:^7} | {step:^7} | {batch_loss / batch_counts:^12.6f} | {'-':^10} | {'-':^9} | {time_elapsed:^9.2f}") # Reset batch tracking variables batch_loss, batch_counts = 0, 0 t0_batch = time.time() # Calculate the average loss over the entire training data avg_train_loss = total_loss / len(train_dataloader) torch.save(model.state_dict(), '{}model.pt'.format("VirDNA4Baltimore")) print("-"*70) # ======================================= # Evaluation # ======================================= if evaluation == True: # After the completion of each training epoch, measure the model's performance # on our validation set. val_loss, val_accuracy = valid_evaluate(model, val_dataloader) # Print performance over the entire training data time_elapsed = time.time() - t0_epoch print(f"{epoch_i + 1:^7} | {'-':^7} | {avg_train_loss:^12.6f} | {val_loss:^10.6f} | {val_accuracy:^9.2f} | {time_elapsed:^9.2f}") print("-"*70) print("\n") print("Training complete!") def bertPredictions(torch,model, val_dataloader): """After the completion of each training epoch, measure the model's performance on our validation set. """ # Put the model into the evaluation mode. The dropout layers are disabled during # the test time. model.eval() device = 0 print("working3") # Tracking variables val_accuracy = [] val_loss = [] pred=[] actual=[] # For each batch in our validation set... for batch in val_dataloader: device = 0 # Load batch to GPU b_input_ids, b_attn_mask, b_labels = tuple(t for t in batch) # Compute logits with torch.no_grad(): logits = model(b_input_ids, b_attn_mask) # Compute loss #loss = loss_fn(logits, b_labels) #val_loss.append(loss.item()) # Get the predictions preds = torch.argmax(logits, dim=1).flatten() # Calculate the accuracy rate #accuracy = (preds == b_labels).cpu().numpy().mean() * 100 #val_accuracy.append(accuracy) pred.append(preds.cpu()) #actual.append(b_labels.cpu()) # Compute the average accuracy and loss over the validation set. #val_loss = np.mean(val_loss) #val_accuracy = np.mean(val_accuracy) return pred import torch import torch.nn as nn from transformers import BertModel # Create the BertClassfier class class ScratchBert(nn.Module): """Bert Model for Classification Tasks. """ def __init__(self, freeze_bert=False): """ @param bert: a BertModel object @param classifier: a torch.nn.Module classifier @param freeze_bert (bool): Set `False` to fine-tune the BERT model """ super(ScratchBert, self).__init__() # Specify hidden size of BERT, hidden size of our classifier, and number of labels D_in, H, D_out = 768, 50, 2 # Instantiate BERT model from transformers import BertConfig config = BertConfig( # we align this to the tokenizer vocab_size max_position_embeddings=5000, hidden_size=768, num_attention_heads=2, num_hidden_layers=2, type_vocab_size=1 ) from transformers import BertForMaskedLM self.bert =BertModel(config) # Instantiate an one-layer feed-forward classifier self.classifier = nn.Sequential( nn.Linear(D_in, H), nn.ReLU(), #nn.Dropout(0.5), nn.Linear(H, D_out) ) # Freeze the BERT model if freeze_bert: for param in self.bert.parameters(): param.requires_grad = False def forward(self, input_ids, attention_mask): """ Feed input to BERT and the classifier to compute logits. @param input_ids (torch.Tensor): an input tensor with shape (batch_size, max_length) @param attention_mask (torch.Tensor): a tensor that hold attention mask information with shape (batch_size, max_length) @return logits (torch.Tensor): an output tensor with shape (batch_size, num_labels) """ # Feed input to BERT outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) # Extract the last hidden state of the token `[CLS]` for classification task last_hidden_state_cls = outputs[0][:, 0, :] # Feed input to classifier to compute logits logits = self.classifier(last_hidden_state_cls) return logits from transformers import AdamW, get_linear_schedule_with_warmup device='cuda' def initialize_model(train_dataloader,epochs=4): """Initialize the Bert Classifier, the optimizer and the learning rate scheduler. """ # Instantiate Bert Classifier bert_classifier = ScratchBert(freeze_bert=False) # Tell PyTorch to run the model on GPU bert_classifier.to(device) # Create the optimizer optimizer = AdamW(bert_classifier.parameters(), lr=5e-5, # Default learning rate eps=1e-8 # Default epsilon value ) # Total number of training steps total_steps = len(train_dataloader) * epochs # Set up the learning rate scheduler scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, # Default value num_training_steps=total_steps) return bert_classifier, optimizer, scheduler import random import time import torch.nn as nn # Specify loss function loss_fn = nn.CrossEntropyLoss() def train(model,optimizer, scheduler, train_dataloader, val_dataloader=None, epochs=4, evaluation=False): """Train the BertClassifier model. """ # Start training loop print("Start training...\n") for epoch_i in range(epochs): # ======================================= # Training # ======================================= # Print the header of the result table print(f"{'Epoch':^7} | {'Batch':^7} | {'Train Loss':^12} | {'Val Loss':^10} | {'Val Acc':^9} | {'Elapsed':^9}") print("-"*70) # Measure the elapsed time of each epoch t0_epoch, t0_batch = time.time(), time.time() # Reset tracking variables at the beginning of each epoch total_loss, batch_loss, batch_counts = 0, 0, 0 # Put the model into the training mode model.train() # For each batch of training data... for step, batch in enumerate(train_dataloader): batch_counts +=1 # Load batch to GPU b_input_ids, b_attn_mask, b_labels = tuple(t.to(device) for t in batch) # Zero out any previously calculated gradients model.zero_grad() # Perform a forward pass. This will return logits. logits = model(b_input_ids, b_attn_mask) # Compute loss and accumulate the loss values loss = loss_fn(logits, b_labels) batch_loss += loss.item() total_loss += loss.item() # Perform a backward pass to calculate gradients loss.backward() # Clip the norm of the gradients to 1.0 to prevent "exploding gradients" torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Update parameters and the learning rate optimizer.step() scheduler.step() # Print the loss values and time elapsed for every 20 batches if (step % 20 == 0 and step != 0) or (step == len(train_dataloader) - 1): # Calculate time elapsed for 20 batches time_elapsed = time.time() - t0_batch # Print training results print(f"{epoch_i + 1:^7} | {step:^7} | {batch_loss / batch_counts:^12.6f} | {'-':^10} | {'-':^9} | {time_elapsed:^9.2f}") # Reset batch tracking variables batch_loss, batch_counts = 0, 0 t0_batch = time.time() # Calculate the average loss over the entire training data avg_train_loss = total_loss / len(train_dataloader) torch.save(model.state_dict(), '{}model.pt'.format("VirDNA")) print("-"*70) # ======================================= # Evaluation # ======================================= if evaluation == True: # After the completion of each training epoch, measure the model's performance # on our validation set. val_loss, val_accuracy = valid_evaluate(model, val_dataloader) # Print performance over the entire training data time_elapsed = time.time() - t0_epoch print(f"{epoch_i + 1:^7} | {'-':^7} | {avg_train_loss:^12.6f} | {val_loss:^10.6f} | {val_accuracy:^9.2f} | {time_elapsed:^9.2f}") print("-"*70) print("\n") print("Training complete!")