| | import copy |
| | import torch |
| | import torch.nn as nn |
| | import pandas as pd |
| | import numpy as np |
| | import json |
| | |
| | from .models.bert_labeler import bert_labeler |
| | from .bert_tokenizer import tokenize |
| | from sklearn.metrics import f1_score, confusion_matrix |
| | from statsmodels.stats.inter_rater import cohens_kappa |
| | from transformers import BertTokenizer |
| | from .constants import * |
| |
|
| | def get_weighted_f1_weights(train_path_or_csv): |
| | """Compute weights used to obtain the weighted average of |
| | mention, negation and uncertain f1 scores. |
| | @param train_path_or_csv: A path to the csv file or a dataframe |
| | |
| | @return weight_dict (dictionary): maps conditions to a list of weights, the order |
| | in the lists is negation, uncertain, positive |
| | """ |
| | if isinstance(train_path_or_csv, str): |
| | df = pd.read_csv(train_path_or_csv) |
| | else: |
| | df = train_path_or_csv |
| | df.replace(0, 2, inplace=True) |
| | df.replace(-1, 3, inplace=True) |
| | df.fillna(0, inplace=True) |
| | |
| | weight_dict = {} |
| | for cond in CONDITIONS: |
| | weights = [] |
| | col = df[cond] |
| |
|
| | mask = col == 2 |
| | weights.append(mask.sum()) |
| |
|
| | mask = col == 3 |
| | weights.append(mask.sum()) |
| |
|
| | mask = col == 1 |
| | weights.append(mask.sum()) |
| |
|
| | if np.sum(weights) > 0: |
| | weights = np.array(weights)/np.sum(weights) |
| | weight_dict[cond] = weights |
| | return weight_dict |
| |
|
| | def weighted_avg(scores, weights): |
| | """Compute weighted average of scores |
| | @param scores(List): the task scores |
| | @param weights (List): corresponding normalized weights |
| | |
| | @return (float): the weighted average of task scores |
| | """ |
| | return np.sum(np.array(scores) * np.array(weights)) |
| |
|
| | def compute_train_weights(train_path): |
| | """Compute class weights for rebalancing rare classes |
| | @param train_path (str): A path to the training csv file |
| | |
| | @returns weight_arr (torch.Tensor): Tensor of shape (train_set_size), containing |
| | the weight assigned to each training example |
| | """ |
| | df = pd.read_csv(train_path) |
| | cond_weights = {} |
| | for cond in CONDITIONS: |
| | col = df[cond] |
| | val_counts = col.value_counts() |
| | if cond != 'No Finding': |
| | weights = {} |
| | weights['0.0'] = len(df) / val_counts[0] |
| | weights['-1.0'] = len(df) / val_counts[-1] |
| | weights['1.0'] = len(df) / val_counts[1] |
| | weights['nan'] = len(df) / (len(df) - val_counts.sum()) |
| | else: |
| | weights = {} |
| | weights['1.0'] = len(df) / val_counts[1] |
| | weights['nan'] = len(df) / (len(df) - val_counts.sum()) |
| | |
| | cond_weights[cond] = weights |
| | |
| | weight_arr = torch.zeros(len(df)) |
| | for i in range(len(df)): |
| | for cond in CONDITIONS: |
| | label = str(df[cond].iloc[i]) |
| | weight_arr[i] += cond_weights[cond][label] |
| | |
| | return weight_arr |
| |
|
| | def generate_attention_masks(batch, source_lengths, device): |
| | """Generate masks for padded batches to avoid self-attention over pad tokens |
| | @param batch (Tensor): tensor of token indices of shape (batch_size, max_len) |
| | where max_len is length of longest sequence in the batch |
| | @param source_lengths (List[Int]): List of actual lengths for each of the |
| | sequences in the batch |
| | @param device (torch.device): device on which data should be |
| | |
| | @returns masks (Tensor): Tensor of masks of shape (batch_size, max_len) |
| | """ |
| | masks = torch.ones(batch.size(0), batch.size(1), dtype=torch.float) |
| | for idx, src_len in enumerate(source_lengths): |
| | masks[idx, src_len:] = 0 |
| | return masks.to(device) |
| |
|
| | def compute_mention_f1(y_true, y_pred): |
| | """Compute the mention F1 score as in CheXpert paper |
| | @param y_true (list): List of 14 tensors each of shape (dev_set_size) |
| | @param y_pred (list): Same as y_true but for model predictions |
| | |
| | @returns res (list): List of 14 scalars |
| | """ |
| | for j in range(len(y_true)): |
| | y_true[j][y_true[j] == 2] = 1 |
| | y_true[j][y_true[j] == 3] = 1 |
| | y_pred[j][y_pred[j] == 2] = 1 |
| | y_pred[j][y_pred[j] == 3] = 1 |
| |
|
| | res = [] |
| | for j in range(len(y_true)): |
| | res.append(f1_score(y_true[j], y_pred[j], pos_label=1)) |
| | |
| | return res |
| |
|
| | def compute_blank_f1(y_true, y_pred): |
| | """Compute the blank F1 score |
| | @param y_true (list): List of 14 tensors each of shape (dev_set_size) |
| | @param y_pred (list): Same as y_true but for model predictions |
| | |
| | @returns res (list): List of 14 scalars |
| | """ |
| | for j in range(len(y_true)): |
| | y_true[j][y_true[j] == 2] = 1 |
| | y_true[j][y_true[j] == 3] = 1 |
| | y_pred[j][y_pred[j] == 2] = 1 |
| | y_pred[j][y_pred[j] == 3] = 1 |
| |
|
| | res = [] |
| | for j in range(len(y_true)): |
| | res.append(f1_score(y_true[j], y_pred[j], pos_label=0)) |
| |
|
| | return res |
| | |
| | def compute_negation_f1(y_true, y_pred): |
| | """Compute the negation F1 score as in CheXpert paper |
| | @param y_true (list): List of 14 tensors each of shape (dev_set_size) |
| | @param y_pred (list): Same as y_true but for model predictions |
| | |
| | @returns res (list): List of 14 scalars |
| | """ |
| | for j in range(len(y_true)): |
| | y_true[j][y_true[j] == 3] = 0 |
| | y_true[j][y_true[j] == 1] = 0 |
| | y_pred[j][y_pred[j] == 3] = 0 |
| | y_pred[j][y_pred[j] == 1] = 0 |
| |
|
| | res = [] |
| | for j in range(len(y_true)-1): |
| | res.append(f1_score(y_true[j], y_pred[j], pos_label=2)) |
| |
|
| | res.append(0) |
| | return res |
| |
|
| | def compute_positive_f1(y_true, y_pred): |
| | """Compute the positive F1 score |
| | @param y_true (list): List of 14 tensors each of shape (dev_set_size) |
| | @param y_pred (list): Same as y_true but for model predictions |
| | |
| | @returns res (list): List of 14 scalars |
| | """ |
| | for j in range(len(y_true)): |
| | y_true[j][y_true[j] == 3] = 0 |
| | y_true[j][y_true[j] == 2] = 0 |
| | y_pred[j][y_pred[j] == 3] = 0 |
| | y_pred[j][y_pred[j] == 2] = 0 |
| |
|
| | res = [] |
| | for j in range(len(y_true)): |
| | res.append(f1_score(y_true[j], y_pred[j], pos_label=1)) |
| |
|
| | return res |
| | |
| | def compute_uncertain_f1(y_true, y_pred): |
| | """Compute the negation F1 score as in CheXpert paper |
| | @param y_true (list): List of 14 tensors each of shape (dev_set_size) |
| | @param y_pred (list): Same as y_true but for model predictions |
| | |
| | @returns res (list): List of 14 scalars |
| | """ |
| | for j in range(len(y_true)): |
| | y_true[j][y_true[j] == 2] = 0 |
| | y_true[j][y_true[j] == 1] = 0 |
| | y_pred[j][y_pred[j] == 2] = 0 |
| | y_pred[j][y_pred[j] == 1] = 0 |
| |
|
| | res = [] |
| | for j in range(len(y_true)-1): |
| | res.append(f1_score(y_true[j], y_pred[j], pos_label=3)) |
| |
|
| | res.append(0) |
| | return res |
| |
|
| | def evaluate(model, dev_loader, device, f1_weights, return_pred=False): |
| | """ Function to evaluate the current model weights |
| | @param model (nn.Module): the labeler module |
| | @param dev_loader (torch.utils.data.DataLoader): dataloader for dev set |
| | @param device (torch.device): device on which data should be |
| | @param f1_weights (dictionary): dictionary mapping conditions to f1 |
| | task weights |
| | @param return_pred (bool): whether to return predictions or not |
| | |
| | @returns res_dict (dictionary): dictionary with keys 'blank', 'mention', 'negation', |
| | 'uncertain', 'positive' and 'weighted', with values |
| | being lists of length 14 with each element in the |
| | lists as a scalar. If return_pred is true then a |
| | tuple is returned with the aforementioned dictionary |
| | as the first item, a list of predictions as the |
| | second item, and a list of ground truth as the |
| | third item |
| | """ |
| | |
| | was_training = model.training |
| | model.eval() |
| | y_pred = [[] for _ in range(len(CONDITIONS))] |
| | y_true = [[] for _ in range(len(CONDITIONS))] |
| | |
| | with torch.no_grad(): |
| | for i, data in enumerate(dev_loader, 0): |
| | batch = data['imp'] |
| | batch = batch.to(device) |
| | label = data['label'] |
| | label = label.permute(1, 0).to(device) |
| | src_len = data['len'] |
| | batch_size = batch.shape[0] |
| | attn_mask = generate_attention_masks(batch, src_len, device) |
| |
|
| | out = model(batch, attn_mask) |
| | |
| | for j in range(len(out)): |
| | out[j] = out[j].to('cpu') |
| | curr_y_pred = out[j].argmax(dim=1) |
| | y_pred[j].append(curr_y_pred) |
| | y_true[j].append(label[j].to('cpu')) |
| |
|
| | if (i+1) % 200 == 0: |
| | print('Evaluation batch no: ', i+1) |
| | |
| | for j in range(len(y_true)): |
| | y_true[j] = torch.cat(y_true[j], dim=0) |
| | y_pred[j] = torch.cat(y_pred[j], dim=0) |
| |
|
| | if was_training: |
| | model.train() |
| |
|
| | mention_f1 = compute_mention_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) |
| | negation_f1 = compute_negation_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) |
| | uncertain_f1 = compute_uncertain_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) |
| | positive_f1 = compute_positive_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) |
| | blank_f1 = compute_blank_f1(copy.deepcopy(y_true), copy.deepcopy(y_pred)) |
| | |
| | weighted = [] |
| | kappas = [] |
| | for j in range(len(y_pred)): |
| | cond = CONDITIONS[j] |
| | avg = weighted_avg([negation_f1[j], uncertain_f1[j], positive_f1[j]], f1_weights[cond]) |
| | weighted.append(avg) |
| |
|
| | mat = confusion_matrix(y_true[j], y_pred[j]) |
| | kappas.append(cohens_kappa(mat, return_results=False)) |
| |
|
| | res_dict = {'mention': mention_f1, |
| | 'blank': blank_f1, |
| | 'negation': negation_f1, |
| | 'uncertain': uncertain_f1, |
| | 'positive': positive_f1, |
| | 'weighted': weighted, |
| | 'kappa': kappas} |
| | |
| | if return_pred: |
| | return res_dict, y_pred, y_true |
| | else: |
| | return res_dict |
| |
|
| | def test(model, checkpoint_path, test_ld, f1_weights): |
| | """Evaluate model on test set. |
| | @param model (nn.Module): labeler module |
| | @param checkpoint_path (string): location of saved model checkpoint |
| | @param test_ld (dataloader): dataloader for test set |
| | @param f1_weights (dictionary): maps conditions to f1 task weights |
| | """ |
| | device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
| | if torch.cuda.device_count() > 1: |
| | print("Using", torch.cuda.device_count(), "GPUs!") |
| | model = nn.DataParallel(model) |
| | model = model.to(device) |
| |
|
| | checkpoint = torch.load(checkpoint_path) |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| |
|
| | print("Doing evaluation on test set\n") |
| | metrics = evaluate(model, test_ld, device, f1_weights) |
| | weighted = metrics['weighted'] |
| | kappas = metrics['kappa'] |
| |
|
| | for j in range(len(CONDITIONS)): |
| | print('%s kappa: %.3f' % (CONDITIONS[j], kappas[j])) |
| | print('average: %.3f' % np.mean(kappas)) |
| |
|
| | print() |
| | for j in range(len(CONDITIONS)): |
| | print('%s weighted_f1: %.3f' % (CONDITIONS[j], weighted[j])) |
| | print('average of weighted_f1: %.3f' % (np.mean(weighted))) |
| | |
| | print() |
| | for j in range(len(CONDITIONS)): |
| | print('%s blank_f1: %.3f, negation_f1: %.3f, uncertain_f1: %.3f, positive_f1: %.3f' % (CONDITIONS[j], |
| | metrics['blank'][j], |
| | metrics['negation'][j], |
| | metrics['uncertain'][j], |
| | metrics['positive'][j])) |
| |
|
| | men_macro_avg = np.mean(metrics['mention']) |
| | neg_macro_avg = np.mean(metrics['negation'][:-1]) |
| | unc_macro_avg = np.mean(metrics['uncertain'][:-2]) |
| | pos_macro_avg = np.mean(metrics['positive']) |
| | blank_macro_avg = np.mean(metrics['blank']) |
| | |
| | print("blank macro avg: %.3f, negation macro avg: %.3f, uncertain macro avg: %.3f, positive macro avg: %.3f" % (blank_macro_avg, |
| | neg_macro_avg, |
| | unc_macro_avg, |
| | pos_macro_avg)) |
| | print() |
| | for j in range(len(CONDITIONS)): |
| | print('%s mention_f1: %.3f' % (CONDITIONS[j], metrics['mention'][j])) |
| | print('mention macro avg: %.3f' % men_macro_avg) |
| | |
| |
|
| | def label_report_list(checkpoint_path, report_list): |
| | """ Evaluate model on list of reports. |
| | @param checkpoint_path (string): location of saved model checkpoint |
| | @param report_list (list): list of report impressions (string) |
| | """ |
| | imp = pd.Series(report_list) |
| | imp = imp.str.strip() |
| | imp = imp.replace('\n',' ', regex=True) |
| | imp = imp.replace('[0-9]\.', '', regex=True) |
| | imp = imp.replace('\s+', ' ', regex=True) |
| | imp = imp.str.strip() |
| | |
| | model = bert_labeler() |
| | device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
| | if torch.cuda.device_count() > 1: |
| | print("Using", torch.cuda.device_count(), "GPUs!") |
| | model = nn.DataParallel(model) |
| | model = model.to(device) |
| | checkpoint = torch.load(checkpoint_path) |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| | model.eval() |
| |
|
| | y_pred = [] |
| | tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') |
| | new_imps = tokenize(imp, tokenizer) |
| | with torch.no_grad(): |
| | for imp in new_imps: |
| | |
| | imp = torch.LongTensor(imp) |
| | source = imp.view(1, len(imp)) |
| | |
| | attention = torch.ones(len(imp)) |
| | attention = attention.view(1, len(imp)) |
| | out = model(source.to(device), attention.to(device)) |
| |
|
| | |
| | result = {} |
| | for j in range(len(out)): |
| | curr_y_pred = out[j].argmax(dim=1) |
| | result[CONDITIONS[j]] = CLASS_MAPPING[curr_y_pred.item()] |
| | y_pred.append(result) |
| | return y_pred |
| |
|
| |
|