|
|
import collections |
|
|
import numpy as np |
|
|
import datasets |
|
|
import json |
|
|
|
|
|
import os |
|
|
from typing import Optional, Tuple |
|
|
from tqdm.auto import tqdm |
|
|
|
|
|
|
|
|
|
|
|
def load_dataset(dataset_path, split = 0.1, shuffle = True): |
|
|
with open(dataset_path, 'r') as f: |
|
|
data = json.load(f)["data"] |
|
|
|
|
|
dataset = {'id': [], |
|
|
'title': [], |
|
|
'context': [], |
|
|
'question': [], |
|
|
'answers': []} |
|
|
|
|
|
for topic in data: |
|
|
title = topic["title"] |
|
|
for p in topic["paragraphs"]: |
|
|
for qas in p['qas']: |
|
|
dataset['id'].append(qas['id']) |
|
|
dataset['title'].append(title) |
|
|
dataset['context'].append(p["context"]) |
|
|
dataset['question'].append(qas["question"]) |
|
|
dataset['answers'].append(qas["answers"]) |
|
|
|
|
|
|
|
|
N_SAMPLE = len(dataset['id']) |
|
|
|
|
|
|
|
|
if (shuffle): perms = np.random.permutation(N_SAMPLE) |
|
|
else: perms = list(range(N_SAMPLE)) |
|
|
|
|
|
train_ds = dict() |
|
|
valid_ds = dict() |
|
|
|
|
|
for name, assets in dataset.items(): |
|
|
mock = N_SAMPLE - int(split * N_SAMPLE) |
|
|
|
|
|
train_ds[name] = [assets[i] for i in perms[:mock]] |
|
|
valid_ds[name] = [assets[i] for i in perms[mock:]] |
|
|
|
|
|
raw_dataset = datasets.DatasetDict() |
|
|
raw_dataset['train'] = datasets.Dataset.from_dict(train_ds) |
|
|
raw_dataset['valid'] = datasets.Dataset.from_dict(valid_ds) |
|
|
|
|
|
return raw_dataset |
|
|
|
|
|
def postprocess_qa_predictions( |
|
|
features, |
|
|
tokenizer, |
|
|
predictions: Tuple[np.ndarray, np.ndarray], |
|
|
n_best_size: int = 20, |
|
|
max_answer_length: int = 30 |
|
|
): |
|
|
''' |
|
|
Post-processes the predictions of a question-answering model to convert them to answers that are substrings of the |
|
|
original contexts. This is the base postprocessing functions for models that only return start and end logits. |
|
|
Args: |
|
|
features: The processed dataset (see the main script for more information). |
|
|
tokenizer: The tokenizer to decode ids of the answer back to text |
|
|
predictions (:obj:`Tuple[np.ndarray, np.ndarray]`): |
|
|
The predictions of the model: two arrays containing the start logits and the end logits respectively. Its |
|
|
first dimension must match the number of elements of :obj:`features`. |
|
|
n_best_size (:obj:`int`, `optional`, defaults to 20): |
|
|
The total number of n-best predictions to generate when looking for an answer. |
|
|
max_answer_length (:obj:`int`, `optional`, defaults to 30): |
|
|
The maximum length of an answer that can be generated. This is needed because the start and end predictions |
|
|
are not conditioned on one another. |
|
|
""" |
|
|
''' |
|
|
if len(predictions) != 2: raise ValueError("`predictions` should be a tuple with two elements (start_logits, end_logits).") |
|
|
if len(predictions[0]) != len(features): raise ValueError(f"Got {len(predictions[0])} predictions and {len(features)} features.") |
|
|
|
|
|
all_start_logits, all_end_logits = predictions |
|
|
|
|
|
all_predictions = collections.OrderedDict() |
|
|
|
|
|
|
|
|
for index, feature in enumerate(tqdm(features)): |
|
|
min_null_prediction = None |
|
|
prelim_predictions = [] |
|
|
|
|
|
|
|
|
start_logits = all_start_logits[index] |
|
|
end_logits = all_end_logits[index] |
|
|
|
|
|
|
|
|
feature_null_score = start_logits[1] + end_logits[0] |
|
|
if (min_null_prediction is None or min_null_prediction["score"] > feature_null_score): |
|
|
min_null_prediction = { |
|
|
"ids": (1, 0), |
|
|
"score": feature_null_score |
|
|
} |
|
|
|
|
|
|
|
|
start_indexes = np.argsort(start_logits)[-1 : -n_best_size - 1 : -1].tolist() |
|
|
end_indexes = np.argsort(end_logits)[-1 : -n_best_size - 1 : -1].tolist() |
|
|
|
|
|
for start_index in start_indexes: |
|
|
for end_index in end_indexes: |
|
|
|
|
|
|
|
|
if (start_index >= len(feature['input_ids']) |
|
|
or end_index >= len(feature['input_ids']) |
|
|
): |
|
|
continue |
|
|
|
|
|
if end_index < start_index or end_index - start_index + 1 > max_answer_length: |
|
|
continue |
|
|
|
|
|
prelim_predictions.append( |
|
|
{ |
|
|
"ids": (start_index, end_index), |
|
|
"score": start_logits[start_index] + end_logits[end_index] |
|
|
} |
|
|
) |
|
|
if min_null_prediction is not None: |
|
|
|
|
|
prelim_predictions.append(min_null_prediction) |
|
|
null_score = min_null_prediction["score"] |
|
|
|
|
|
|
|
|
predictions = sorted(prelim_predictions, |
|
|
key = lambda x: x["score"], |
|
|
reverse = True)[:n_best_size] |
|
|
|
|
|
|
|
|
if (min_null_prediction is not None and not any(p["ids"] == (1, 0) for p in predictions)): |
|
|
predictions.append(min_null_prediction) |
|
|
|
|
|
best_non_null_pred = None |
|
|
|
|
|
for pred in predictions: |
|
|
l, r = pred.pop("ids") |
|
|
if (l <= r): |
|
|
pred_input_ids = feature['input_ids'][l: r + 1] |
|
|
pred_tokens = tokenizer.convert_ids_to_tokens(pred_input_ids) |
|
|
pred_text = tokenizer.convert_tokens_to_string(pred_tokens) |
|
|
|
|
|
pred["text"] = pred_text |
|
|
best_non_null_pred = pred |
|
|
|
|
|
break |
|
|
|
|
|
if (best_non_null_pred is None or best_non_null_pred["score"] < null_score): |
|
|
all_predictions[feature["id"]] = "" |
|
|
else: |
|
|
all_predictions[feature["id"]] = best_non_null_pred["text"] |
|
|
|
|
|
return all_predictions |
|
|
|
|
|
|