import collections import numpy as np import datasets import json import os from typing import Optional, Tuple from tqdm.auto import tqdm # the train data file is expected to have the format of dataset SQUAD v2.0 def load_dataset(dataset_path, split = 0.1, shuffle = True): with open(dataset_path, 'r') as f: data = json.load(f)["data"] dataset = {'id': [], 'title': [], 'context': [], 'question': [], 'answers': []} for topic in data: title = topic["title"] for p in topic["paragraphs"]: for qas in p['qas']: dataset['id'].append(qas['id']) dataset['title'].append(title) dataset['context'].append(p["context"]) dataset['question'].append(qas["question"]) dataset['answers'].append(qas["answers"]) # Since there is no train data and validation data before hand, we have to manually split it N_SAMPLE = len(dataset['id']) # If you want to shuffle the dataset, the shuffle parameter should be kept True if (shuffle): perms = np.random.permutation(N_SAMPLE) else: perms = list(range(N_SAMPLE)) train_ds = dict() valid_ds = dict() for name, assets in dataset.items(): mock = N_SAMPLE - int(split * N_SAMPLE) train_ds[name] = [assets[i] for i in perms[:mock]] valid_ds[name] = [assets[i] for i in perms[mock:]] raw_dataset = datasets.DatasetDict() raw_dataset['train'] = datasets.Dataset.from_dict(train_ds) raw_dataset['valid'] = datasets.Dataset.from_dict(valid_ds) return raw_dataset def postprocess_qa_predictions( features, tokenizer, predictions: Tuple[np.ndarray, np.ndarray], n_best_size: int = 20, max_answer_length: int = 30 ): ''' Post-processes the predictions of a question-answering model to convert them to answers that are substrings of the original contexts. This is the base postprocessing functions for models that only return start and end logits. Args: features: The processed dataset (see the main script for more information). tokenizer: The tokenizer to decode ids of the answer back to text predictions (:obj:`Tuple[np.ndarray, np.ndarray]`): The predictions of the model: two arrays containing the start logits and the end logits respectively. Its first dimension must match the number of elements of :obj:`features`. n_best_size (:obj:`int`, `optional`, defaults to 20): The total number of n-best predictions to generate when looking for an answer. max_answer_length (:obj:`int`, `optional`, defaults to 30): The maximum length of an answer that can be generated. This is needed because the start and end predictions are not conditioned on one another. """ ''' if len(predictions) != 2: raise ValueError("`predictions` should be a tuple with two elements (start_logits, end_logits).") if len(predictions[0]) != len(features): raise ValueError(f"Got {len(predictions[0])} predictions and {len(features)} features.") all_start_logits, all_end_logits = predictions # The dictionaries we have to fill. all_predictions = collections.OrderedDict() # Let's loop over all the examples! for index, feature in enumerate(tqdm(features)): min_null_prediction = None prelim_predictions = [] # We grab the predictions of the model for this feature. start_logits = all_start_logits[index] end_logits = all_end_logits[index] # Update minimum null prediction. feature_null_score = start_logits[1] + end_logits[0] if (min_null_prediction is None or min_null_prediction["score"] > feature_null_score): min_null_prediction = { "ids": (1, 0), "score": feature_null_score } # Go through all possibilities for the `n_best_size` greater start and end logits. start_indexes = np.argsort(start_logits)[-1 : -n_best_size - 1 : -1].tolist() end_indexes = np.argsort(end_logits)[-1 : -n_best_size - 1 : -1].tolist() for start_index in start_indexes: for end_index in end_indexes: # Don't consider out-of-scope answers, either because the indices are out of bounds or correspond # to part of the input_ids that are not in the context. if (start_index >= len(feature['input_ids']) or end_index >= len(feature['input_ids']) ): continue # Don't consider answers with a length that is either < 0 or > max_answer_length. if end_index < start_index or end_index - start_index + 1 > max_answer_length: continue prelim_predictions.append( { "ids": (start_index, end_index), "score": start_logits[start_index] + end_logits[end_index] } ) if min_null_prediction is not None: # Add the minimum null prediction prelim_predictions.append(min_null_prediction) null_score = min_null_prediction["score"] # Only keep the best `n_best_size` predictions. predictions = sorted(prelim_predictions, key = lambda x: x["score"], reverse = True)[:n_best_size] # Add back the minimum null prediction if it was removed because of its low score. if (min_null_prediction is not None and not any(p["ids"] == (1, 0) for p in predictions)): predictions.append(min_null_prediction) best_non_null_pred = None for pred in predictions: l, r = pred.pop("ids") if (l <= r): pred_input_ids = feature['input_ids'][l: r + 1] pred_tokens = tokenizer.convert_ids_to_tokens(pred_input_ids) pred_text = tokenizer.convert_tokens_to_string(pred_tokens) pred["text"] = pred_text best_non_null_pred = pred break if (best_non_null_pred is None or best_non_null_pred["score"] < null_score): all_predictions[feature["id"]] = "" else: all_predictions[feature["id"]] = best_non_null_pred["text"] return all_predictions