import copy import torch import torch.nn as nn import pandas as pd import numpy as np import json # from models.bert_labeler import bert_labeler from .models.bert_labeler import bert_labeler from .bert_tokenizer import tokenize from sklearn.metrics import f1_score, confusion_matrix from statsmodels.stats.inter_rater import cohens_kappa from transformers import BertTokenizer from .constants import * def get_weighted_f1_weights(train_path_or_csv): """Compute weights used to obtain the weighted average of mention, negation and uncertain f1 scores. @param train_path_or_csv: A path to the csv file or a dataframe @return weight_dict (dictionary): maps conditions to a list of weights, the order in the lists is negation, uncertain, positive """ if isinstance(train_path_or_csv, str): df = pd.read_csv(train_path_or_csv) else: df = train_path_or_csv df.replace(0, 2, inplace=True) df.replace(-1, 3, inplace=True) df.fillna(0, inplace=True) weight_dict = {} for cond in CONDITIONS: weights = [] col = df[cond] mask = col == 2 weights.append(mask.sum()) mask = col == 3 weights.append(mask.sum()) mask = col == 1 weights.append(mask.sum()) if np.sum(weights) > 0: weights = np.array(weights)/np.sum(weights) weight_dict[cond] = weights return weight_dict def weighted_avg(scores, weights): """Compute weighted average of scores @param scores(List): the task scores @param weights (List): corresponding normalized weights @return (float): the weighted average of task scores """ return np.sum(np.array(scores) * np.array(weights)) def compute_train_weights(train_path): """Compute class weights for rebalancing rare classes @param train_path (str): A path to the training csv file @returns weight_arr (torch.Tensor): Tensor of shape (train_set_size), containing the weight assigned to each training example """ df = pd.read_csv(train_path) cond_weights = {} for cond in CONDITIONS: col = df[cond] val_counts = col.value_counts() if cond != 'No Finding': weights = {} weights['0.0'] = len(df) / val_counts[0] weights['-1.0'] = len(df) / val_counts[-1] weights['1.0'] = len(df) / val_counts[1] weights['nan'] = len(df) / (len(df) - val_counts.sum()) else: weights = {} weights['1.0'] = len(df) / val_counts[1] weights['nan'] = len(df) / (len(df) - val_counts.sum()) cond_weights[cond] = weights weight_arr = torch.zeros(len(df)) for i in range(len(df)): #loop over training set for cond in CONDITIONS: #loop over all conditions label = str(df[cond].iloc[i]) weight_arr[i] += cond_weights[cond][label] #add weight for given class' label return weight_arr def generate_attention_masks(batch, source_lengths, device): """Generate masks for padded batches to avoid self-attention over pad tokens @param batch (Tensor): tensor of token indices of shape (batch_size, max_len) where max_len is length of longest sequence in the batch @param source_lengths (List[Int]): List of actual lengths for each of the sequences in the batch @param device (torch.device): device on which data should be @returns masks (Tensor): Tensor of masks of shape (batch_size, max_len) """ masks = torch.ones(batch.size(0), batch.size(1), dtype=torch.float) for idx, src_len in enumerate(source_lengths): masks[idx, src_len:] = 0 return masks.to(device) def compute_mention_f1(y_true, y_pred): """Compute the mention F1 score as in CheXpert paper @param y_true (list): List of 14 tensors each of shape (dev_set_size) @param y_pred (list): Same as y_true but for model predictions @returns res (list): List of 14 scalars """ for j in range(len(y_true)): y_true[j][y_true[j] == 2] = 1 y_true[j][y_true[j] == 3] = 1 y_pred[j][y_pred[j] == 2] = 1 y_pred[j][y_pred[j] == 3] = 1 res = [] for j in range(len(y_true)): res.append(f1_score(y_true[j], y_pred[j], pos_label=1)) return res def compute_blank_f1(y_true, y_pred): """Compute the blank F1 score @param y_true (list): List of 14 tensors each of shape (dev_set_size) @param y_pred (list): Same as y_true but for model predictions @returns res (list): List of 14 scalars """ for j in range(len(y_true)): y_true[j][y_true[j] == 2] = 1 y_true[j][y_true[j] == 3] = 1 y_pred[j][y_pred[j] == 2] = 1 y_pred[j][y_pred[j] == 3] = 1 res = [] for j in range(len(y_true)): res.append(f1_score(y_true[j], y_pred[j], pos_label=0)) return res def compute_negation_f1(y_true, y_pred): """Compute the negation F1 score as in CheXpert paper @param y_true (list): List of 14 tensors each of shape (dev_set_size) @param y_pred (list): Same as y_true but for model predictions @returns res (list): List of 14 scalars """ for j in range(len(y_true)): y_true[j][y_true[j] == 3] = 0 y_true[j][y_true[j] == 1] = 0 y_pred[j][y_pred[j] == 3] = 0 y_pred[j][y_pred[j] == 1] = 0 res = [] for j in range(len(y_true)-1): res.append(f1_score(y_true[j], y_pred[j], pos_label=2)) res.append(0) #No Finding gets score of zero return res def compute_positive_f1(y_true, y_pred): """Compute the positive F1 score @param y_true (list): List of 14 tensors each of shape (dev_set_size) @param y_pred (list): Same as y_true but for model predictions @returns res (list): List of 14 scalars """ for j in range(len(y_true)): y_true[j][y_true[j] == 3] = 0 y_true[j][y_true[j] == 2] = 0 y_pred[j][y_pred[j] == 3] = 0 y_pred[j][y_pred[j] == 2] = 0 res = [] for j in range(len(y_true)): res.append(f1_score(y_true[j], y_pred[j], pos_label=1)) return res def compute_uncertain_f1(y_true, y_pred): """Compute the negation F1 score as in CheXpert paper @param y_true (list): List of 14 tensors each of shape (dev_set_size) @param y_pred (list): Same as y_true but for model predictions @returns res (list): List of 14 scalars """ for j in range(len(y_true)): y_true[j][y_true[j] == 2] = 0 y_true[j][y_true[j] == 1] = 0 y_pred[j][y_pred[j] == 2] = 0 y_pred[j][y_pred[j] == 1] = 0 res = [] for j in range(len(y_true)-1): res.append(f1_score(y_true[j], y_pred[j], pos_label=3)) res.append(0) #No Finding gets a score of zero return res def evaluate(model, dev_loader, device, f1_weights, return_pred=False): """ Function to evaluate the current model weights @param model (nn.Module): the labeler module @param dev_loader (torch.utils.data.DataLoader): dataloader for dev set @param device (torch.device): device on which data should be @param f1_weights (dictionary): dictionary mapping conditions to f1 task weights @param return_pred (bool): whether to return predictions or not @returns res_dict (dictionary): dictionary with keys 'blank', 'mention', 'negation', 'uncertain', 'positive' and 'weighted', with values being lists of length 14 with each element in the lists as a scalar. If return_pred is true then a tuple is returned with the aforementioned dictionary as the first item, a list of predictions as the second item, and a list of ground truth as the third item """ was_training = model.training model.eval() y_pred = [[] for _ in range(len(CONDITIONS))] y_true = [[] for _ in range(len(CONDITIONS))] with torch.no_grad(): for i, data in enumerate(dev_loader, 0): batch = data['imp'] #(batch_size, max_len) batch = batch.to(device) label = data['label'] #(batch_size, 14) label = label.permute(1, 0).to(device) src_len = data['len'] batch_size = batch.shape[0] attn_mask = generate_attention_masks(batch, src_len, device) out = model(batch, attn_mask) for j in range(len(out)): out[j] = out[j].to('cpu') #move to cpu for sklearn curr_y_pred = out[j].argmax(dim=1) #shape is (batch_size) y_pred[j].append(curr_y_pred) y_true[j].append(label[j].to('cpu')) if (i+1) % 200 == 0: print('Evaluation batch no: ', i+1) for j in range(len(y_true)): y_true[j] = torch.cat(y_true[j], dim=0) y_pred[j] = torch.cat(y_pred[j], dim=0) if was_training: model.train() mention_f1 = compute_mention_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) negation_f1 = compute_negation_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) uncertain_f1 = compute_uncertain_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) positive_f1 = compute_positive_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) blank_f1 = compute_blank_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) weighted = [] kappas = [] for j in range(len(y_pred)): cond = CONDITIONS[j] avg = weighted_avg([negation_f1[j], uncertain_f1[j], positive_f1[j]], f1_weights[cond]) weighted.append(avg) mat = confusion_matrix(y_true[j], y_pred[j]) kappas.append(cohens_kappa(mat, return_results=False)) res_dict = {'mention': mention_f1, 'blank': blank_f1, 'negation': negation_f1, 'uncertain': uncertain_f1, 'positive': positive_f1, 'weighted': weighted, 'kappa': kappas} if return_pred: return res_dict, y_pred, y_true else: return res_dict def test(model, checkpoint_path, test_ld, f1_weights): """Evaluate model on test set. @param model (nn.Module): labeler module @param checkpoint_path (string): location of saved model checkpoint @param test_ld (dataloader): dataloader for test set @param f1_weights (dictionary): maps conditions to f1 task weights """ device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") if torch.cuda.device_count() > 1: print("Using", torch.cuda.device_count(), "GPUs!") model = nn.DataParallel(model) #to utilize multiple GPU's model = model.to(device) checkpoint = torch.load(checkpoint_path) model.load_state_dict(checkpoint['model_state_dict']) print("Doing evaluation on test set\n") metrics = evaluate(model, test_ld, device, f1_weights) weighted = metrics['weighted'] kappas = metrics['kappa'] for j in range(len(CONDITIONS)): print('%s kappa: %.3f' % (CONDITIONS[j], kappas[j])) print('average: %.3f' % np.mean(kappas)) print() for j in range(len(CONDITIONS)): print('%s weighted_f1: %.3f' % (CONDITIONS[j], weighted[j])) print('average of weighted_f1: %.3f' % (np.mean(weighted))) print() for j in range(len(CONDITIONS)): print('%s blank_f1: %.3f, negation_f1: %.3f, uncertain_f1: %.3f, positive_f1: %.3f' % (CONDITIONS[j], metrics['blank'][j], metrics['negation'][j], metrics['uncertain'][j], metrics['positive'][j])) men_macro_avg = np.mean(metrics['mention']) neg_macro_avg = np.mean(metrics['negation'][:-1]) #No Finding has no negations unc_macro_avg = np.mean(metrics['uncertain'][:-2]) #No Finding, Support Devices have no uncertain labels in test set pos_macro_avg = np.mean(metrics['positive']) blank_macro_avg = np.mean(metrics['blank']) print("blank macro avg: %.3f, negation macro avg: %.3f, uncertain macro avg: %.3f, positive macro avg: %.3f" % (blank_macro_avg, neg_macro_avg, unc_macro_avg, pos_macro_avg)) print() for j in range(len(CONDITIONS)): print('%s mention_f1: %.3f' % (CONDITIONS[j], metrics['mention'][j])) print('mention macro avg: %.3f' % men_macro_avg) def label_report_list(checkpoint_path, report_list): """ Evaluate model on list of reports. @param checkpoint_path (string): location of saved model checkpoint @param report_list (list): list of report impressions (string) """ imp = pd.Series(report_list) imp = imp.str.strip() imp = imp.replace('\n',' ', regex=True) imp = imp.replace('[0-9]\.', '', regex=True) imp = imp.replace('\s+', ' ', regex=True) imp = imp.str.strip() model = bert_labeler() device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") if torch.cuda.device_count() > 1: print("Using", torch.cuda.device_count(), "GPUs!") model = nn.DataParallel(model) #to utilize multiple GPU's model = model.to(device) checkpoint = torch.load(checkpoint_path) model.load_state_dict(checkpoint['model_state_dict']) model.eval() y_pred = [] tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') new_imps = tokenize(imp, tokenizer) with torch.no_grad(): for imp in new_imps: # run forward prop imp = torch.LongTensor(imp) source = imp.view(1, len(imp)) attention = torch.ones(len(imp)) attention = attention.view(1, len(imp)) out = model(source.to(device), attention.to(device)) # get predictions result = {} for j in range(len(out)): curr_y_pred = out[j].argmax(dim=1) #shape is (1) result[CONDITIONS[j]] = CLASS_MAPPING[curr_y_pred.item()] y_pred.append(result) return y_pred