import torch, transformers import numpy as np from torch import nn from torch.utils.data import DataLoader import pytorch_lightning as pl from transformers import AutoTokenizer, AutoModel from sklearn.metrics import f1_score from .util import mean_pooling, token_embeddings_filtering_padding, read_corpus, CEFRDataset, eval_multiclass class LevelEstimaterBase(pl.LightningModule): def __init__(self, corpus_path, test_corpus_path, pretrained_model, with_ib, attach_wlv, num_labels, word_num_labels, alpha, batch_size, learning_rate, warmup, lm_layer): super().__init__() self.save_hyperparameters() self.CEFR_lvs = 6 if attach_wlv and with_ib: raise Exception('Information bottleneck and word labels cannot be used together!') self.corpus_path = corpus_path self.test_corpus_path = test_corpus_path self.pretrained_model = pretrained_model self.with_ib = with_ib self.attach_wlv = attach_wlv self.num_labels = num_labels self.word_num_labels = word_num_labels self.alpha = alpha self.batch_size = batch_size self.learning_rate = learning_rate self.warmup = warmup self.lm_layer = lm_layer # Load pre-trained model self.load_pretrained_lm() def load_pretrained_lm(self): if 'roberta' in self.pretrained_model: self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_model, add_prefix_space=True) else: self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_model) self.lm = AutoModel.from_pretrained(self.pretrained_model) def precompute_loss_weights(self, epsilon=1e-5): train_levels_a, train_levels_b, _ = read_corpus(self.corpus_path + '_train.txt', self.num_labels) train_levels = np.concatenate((train_levels_a, train_levels_b[train_levels_b != train_levels_a])) train_sentlv_ratio = np.array([np.sum(train_levels == lv) for lv in range(self.CEFR_lvs)]) train_sentlv_ratio = train_sentlv_ratio / np.sum(train_sentlv_ratio) train_sentlv_weights = np.power(train_sentlv_ratio, self.alpha) / np.sum( np.power(train_sentlv_ratio, self.alpha)) / (train_sentlv_ratio + epsilon) return torch.Tensor(train_sentlv_weights) def encode(self, batch): outputs = self.lm(batch['input_ids'], attention_mask=batch['attention_mask'], output_hidden_states=True) return outputs.hidden_states[self.lm_layer], None def forward(self, inputs): pass def training_step(self, batch, batch_idx): pass def validation_step(self, batch, batch_idx): pass def test_step(self, batch, batch_idx): pass def get_gold_labels(self, predictions, lower_labels, higher_labels): if torch.sum(predictions == lower_labels) >= torch.sum(predictions == higher_labels): gold_labels = lower_labels gold_labels[predictions == higher_labels] = higher_labels[predictions == higher_labels] else: gold_labels = higher_labels gold_labels[predictions == lower_labels] = lower_labels[predictions == lower_labels] return gold_labels def evaluation(self, outputs, test=False): pred_labels, gold_labels_low, gold_labels_high = [], [], [] for output in outputs: gold_labels_low += output['gold_labels_low'].tolist() gold_labels_high += output['gold_labels_high'].tolist() pred_labels += output['pred_labels'].tolist() gold_labels_high = np.array(gold_labels_high) gold_labels_low = np.array(gold_labels_low) pred_labels = np.array(pred_labels) # pick higher or lower labels that the model performs better gold_labels = self.get_gold_labels(torch.from_numpy(pred_labels), torch.from_numpy(gold_labels_low), torch.from_numpy(gold_labels_high)) gold_labels = gold_labels.numpy() eval_score = f1_score(gold_labels, pred_labels, average='macro') logs = {"score": eval_score} if test: eval_multiclass(self.logger.log_dir + '/sentence', gold_labels, pred_labels) with open(self.logger.log_dir + '/test_predictions.txt', 'w') as fw: fw.write('Sentence_Lv\n') for sent_lv in pred_labels: fw.write('{0}\n'.format(sent_lv)) return logs def configure_optimizers(self): optimizer = transformers.AdamW(self.parameters(), lr=self.learning_rate) # Warm-up scheduler if self.warmup > 0: scheduler = transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=self.warmup) return {"optimizer": optimizer, "lr_scheduler": scheduler} else: return optimizer def prepare_data(self): self.train_levels_a, self.train_levels_b, self.train_sents = read_corpus( self.corpus_path + '_train.txt', self.num_labels) self.dev_levels_a, self.dev_levels_b, self.dev_sents = read_corpus( self.corpus_path + '_dev.txt', self.num_labels) self.test_levels_a, self.test_levels_b, self.test_sents = read_corpus( self.test_corpus_path + '_test.txt', self.num_labels) # return the dataloader for each split def train_dataloader(self): data_type = torch.float if self.num_labels == 1 else torch.long y_sent_a = torch.tensor(self.train_levels_a, dtype=data_type).unsqueeze(1) y_sent_b = torch.tensor(self.train_levels_b, dtype=data_type).unsqueeze(1) inputs = self.my_tokenize(self.train_sents) return DataLoader(CEFRDataset(inputs, y_sent_a, y_sent_b), batch_size=self.batch_size, shuffle=True) def val_dataloader(self): data_type = torch.float if self.num_labels == 1 else torch.long y_sent_a = torch.tensor(self.dev_levels_a, dtype=data_type).unsqueeze(1) y_sent_b = torch.tensor(self.dev_levels_b, dtype=data_type).unsqueeze(1) inputs = self.my_tokenize(self.dev_sents) return DataLoader(CEFRDataset(inputs, y_sent_a, y_sent_b), batch_size=self.batch_size, shuffle=False) def test_dataloader(self): data_type = torch.float if self.num_labels == 1 else torch.long y_sent_a = torch.tensor(self.test_levels_a, dtype=data_type).unsqueeze(1) y_sent_b = torch.tensor(self.test_levels_b, dtype=data_type).unsqueeze(1) inputs = self.my_tokenize(self.test_sents) return DataLoader(CEFRDataset(inputs, y_sent_a, y_sent_b), batch_size=self.batch_size, shuffle=False) def my_tokenize(self, sents): inputs = self.tokenizer(sents, return_tensors="pt", padding=True, is_split_into_words=True, return_offsets_mapping=True) return inputs def retokenize_with_wordlvs(self, sents, wlvs): wlv_sequences = [[self.word_lv_dic[lv] for lv in wlv_list if lv >= 0 and lv < self.word_num_labels] for wlv_list in wlvs.clone().detach().numpy()] inputs = self.tokenizer(sents, text_pair=wlv_sequences, return_tensors="pt", padding=True, is_split_into_words=True, return_offsets_mapping=True) return inputs def wordlabel_to_tokenlabel(self, all_token_ids, all_offsets_mapping, labels): token_labels = torch.zeros_like(all_token_ids) for sid in range(all_token_ids.shape[0]): wid = -1 for i, offset in enumerate(all_offsets_mapping[sid]): if offset[1] == 0: # Special tokens like CLS, PAD # Faster but cannot handle self-added [SEP] token # if all_token_ids[sid][i] in self.tokenizer.all_special_ids: # Special tokens like CLS, PAD: Much slower token_labels[sid, i] = -1 continue if offset[0] == 0: # New word starts wid += 1 token_labels[sid, i] = labels[sid][wid] return token_labels