| import argparse |
| import json |
| import scipy |
| import numpy as np |
| import sklearn |
| import nltk |
| from nltk import word_tokenize |
|
|
|
|
| def pairwise_meteor(candidate, reference): |
| return nltk.translate.meteor_score.single_meteor_score( |
| word_tokenize(reference), word_tokenize(candidate) |
| ) |
|
|
|
|
| def compute_all_pairwise_scores(src_data, tgt_data, metric): |
| scores = np.empty((len(src_data), len(tgt_data))) |
|
|
| for i, src in enumerate(src_data): |
| for j, tgt in enumerate(tgt_data): |
| scores[i][j] = metric(src, tgt) |
|
|
| return scores |
|
|
|
|
| def print_with_space(left, right, left_space=40): |
| print_spaces = " " * (left_space - len(left)) |
| print(left + print_spaces + right) |
|
|
|
|
| class AVeriTeCEvaluator: |
|
|
| verdicts = [ |
| "Supported", |
| "Refuted", |
| "Not Enough Evidence", |
| "Conflicting Evidence/Cherrypicking", |
| ] |
| pairwise_metric = None |
| max_questions = 10 |
| metric = None |
| averitec_reporting_levels = [0.1, 0.2, 0.25, 0.3, 0.4, 0.5] |
|
|
| def __init__(self, metric="meteor"): |
| self.metric = metric |
| if metric == "meteor": |
| self.pairwise_metric = pairwise_meteor |
|
|
| def evaluate_averitec_veracity_by_type(self, srcs, tgts, threshold=0.25): |
| types = {} |
| for src, tgt in zip(srcs, tgts): |
| score = self.compute_pairwise_evidence_score(src, tgt) |
|
|
| if score <= threshold: |
| score = 0 |
|
|
| for t in tgt["claim_types"]: |
| if t not in types: |
| types[t] = [] |
|
|
| types[t].append(score) |
|
|
| return {t: np.mean(v) for t, v in types.items()} |
|
|
| def evaluate_averitec_score(self, srcs, tgts): |
| scores = [] |
| for src, tgt in zip(srcs, tgts): |
| score = self.compute_pairwise_evidence_score(src, tgt) |
|
|
| this_example_scores = [0.0 for _ in self.averitec_reporting_levels] |
| for i, level in enumerate(self.averitec_reporting_levels): |
| if score > level: |
| this_example_scores[i] = src["pred_label"] == tgt["label"] |
|
|
| scores.append(this_example_scores) |
|
|
| return np.mean(np.array(scores), axis=0) |
|
|
| def evaluate_veracity(self, src, tgt): |
| src_labels = [x["pred_label"] for x in src] |
| tgt_labels = [x["label"] for x in tgt] |
|
|
| acc = np.mean([s == t for s, t in zip(src_labels, tgt_labels)]) |
|
|
| f1 = { |
| self.verdicts[i]: x |
| for i, x in enumerate( |
| sklearn.metrics.f1_score( |
| tgt_labels, src_labels, labels=self.verdicts, average=None |
| ) |
| ) |
| } |
| f1["macro"] = sklearn.metrics.f1_score( |
| tgt_labels, src_labels, labels=self.verdicts, average="macro" |
| ) |
| f1["acc"] = acc |
| return f1 |
|
|
| def evaluate_questions_only(self, srcs, tgts): |
| all_utils = [] |
| for src, tgt in zip(srcs, tgts): |
| if "evidence" not in src: |
| |
| src_questions = self.extract_full_comparison_strings( |
| src, is_target=False |
| )[: self.max_questions] |
| else: |
| src_questions = [ |
| qa["question"] for qa in src["evidence"][: self.max_questions] |
| ] |
| tgt_questions = [qa["question"] for qa in tgt["questions"]] |
|
|
| pairwise_scores = compute_all_pairwise_scores( |
| src_questions, tgt_questions, self.pairwise_metric |
| ) |
|
|
| assignment = scipy.optimize.linear_sum_assignment( |
| pairwise_scores, maximize=True |
| ) |
|
|
| assignment_utility = pairwise_scores[assignment[0], assignment[1]].sum() |
|
|
| |
| reweight_term = 1 / float(len(tgt_questions)) |
| assignment_utility *= reweight_term |
|
|
| all_utils.append(assignment_utility) |
|
|
| return np.mean(all_utils) |
|
|
| def get_n_best_qau(self, srcs, tgts, n=3): |
| all_utils = [] |
| for src, tgt in zip(srcs, tgts): |
| assignment_utility = self.compute_pairwise_evidence_score(src, tgt) |
|
|
| all_utils.append(assignment_utility) |
|
|
| idxs = np.argsort(all_utils)[::-1][:n] |
|
|
| examples = [ |
| ( |
| ( |
| srcs[i]["questions"] |
| if "questions" in srcs[i] |
| else srcs[i]["string_evidence"] |
| ), |
| tgts[i]["questions"], |
| all_utils[i], |
| ) |
| for i in idxs |
| ] |
|
|
| return examples |
|
|
| def compute_pairwise_evidence_score(self, src, tgt): |
| """Different key is used for reference_data and prediction. |
| For the prediction, the format is |
| {"evidence": [ |
| { |
| "question": "What does the increased federal medical assistance percentage mean for you?", |
| "answer": "Appendix A: Applicability of the Increased Federal Medical Assistance Percentage ", |
| "url": "https://www.medicaid.gov/federal-policy-guidance/downloads/smd21003.pdf" |
| }], |
| "pred_label": "Supported"} |
| |
| And for the data with fold label: |
| {"questions": [ |
| { |
| "question": "Where was the claim first published", |
| "answers": [ |
| { |
| "answer": "It was first published on Sccopertino", |
| "answer_type": "Abstractive", |
| "source_url": "https://web.archive.org/web/20201129141238/https://scoopertino.com/exposed-the-imac-disaster-that-almost-was/", |
| "source_medium": "Web text", |
| "cached_source_url": "https://web.archive.org/web/20201129141238/https://scoopertino.com/exposed-the-imac-disaster-that-almost-was/" |
| } |
| ] |
| }] |
| "label": "Refuted"} |
| """ |
|
|
| src_strings = self.extract_full_comparison_strings(src, is_target=False)[ |
| : self.max_questions |
| ] |
| tgt_strings = self.extract_full_comparison_strings(tgt) |
| pairwise_scores = compute_all_pairwise_scores( |
| src_strings, tgt_strings, self.pairwise_metric |
| ) |
| assignment = scipy.optimize.linear_sum_assignment( |
| pairwise_scores, maximize=True |
| ) |
| assignment_utility = pairwise_scores[assignment[0], assignment[1]].sum() |
|
|
| |
| reweight_term = 1 / float(len(tgt_strings)) |
| assignment_utility *= reweight_term |
| return assignment_utility |
|
|
| def evaluate_questions_and_answers(self, srcs, tgts): |
| all_utils = [] |
| for src, tgt in zip(srcs, tgts): |
| src_strings = self.extract_full_comparison_strings(src, is_target=False)[ |
| : self.max_questions |
| ] |
| tgt_strings = self.extract_full_comparison_strings(tgt) |
|
|
| pairwise_scores = compute_all_pairwise_scores( |
| src_strings, tgt_strings, self.pairwise_metric |
| ) |
|
|
| assignment = scipy.optimize.linear_sum_assignment( |
| pairwise_scores, maximize=True |
| ) |
|
|
| assignment_utility = pairwise_scores[assignment[0], assignment[1]].sum() |
|
|
| |
| reweight_term = 1 / float(len(tgt_strings)) |
| assignment_utility *= reweight_term |
|
|
| all_utils.append(assignment_utility) |
|
|
| return np.mean(all_utils) |
|
|
| def extract_full_comparison_strings(self, example, is_target=True): |
| example_strings = [] |
|
|
| if is_target: |
| if "questions" in example: |
| for evidence in example["questions"]: |
| |
| if not isinstance(evidence["answers"], list): |
| evidence["answers"] = [evidence["answers"]] |
|
|
| for answer in evidence["answers"]: |
| example_strings.append( |
| evidence["question"] + " " + answer["answer"] |
| ) |
| if ( |
| "answer_type" in answer |
| and answer["answer_type"] == "Boolean" |
| ): |
| example_strings[-1] += ". " + answer["boolean_explanation"] |
| if len(evidence["answers"]) == 0: |
| example_strings.append( |
| evidence["question"] + " No answer could be found." |
| ) |
| else: |
| if "evidence" in example: |
| for evidence in example["evidence"]: |
| example_strings.append( |
| evidence["question"] + " " + evidence["answer"] |
| ) |
|
|
| if "string_evidence" in example: |
| for full_string_evidence in example["string_evidence"]: |
| example_strings.append(full_string_evidence) |
| return example_strings |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description="Evaluate the veracity prediction.") |
| parser.add_argument( |
| "-i", |
| "--prediction_file", |
| default="data_store/dev_veracity.json", |
| help="Json file with claim, evidence, and veracity prediction.", |
| ) |
| parser.add_argument( |
| "--label_file", |
| default="data/dev.json", |
| help="Json file with labels.", |
| ) |
| args = parser.parse_args() |
|
|
| with open(args.prediction_file) as f: |
| predictions = json.load(f) |
|
|
| with open(args.label_file) as f: |
| references = json.load(f) |
|
|
| scorer = AVeriTeCEvaluator() |
| q_score = scorer.evaluate_questions_only(predictions, references) |
| print_with_space("Question-only score (HU-" + scorer.metric + "):", str(q_score)) |
| p_score = scorer.evaluate_questions_and_answers(predictions, references) |
| print_with_space("Question-answer score (HU-" + scorer.metric + "):", str(p_score)) |
| print("====================") |
|
|
| v_score = scorer.evaluate_veracity(predictions, references) |
| print("Veracity F1 scores:") |
| for k, v in v_score.items(): |
| print_with_space(" * " + k + ":", str(v)) |
|
|
| print("--------------------") |
| print("AVeriTeC scores:") |
|
|
| v_score = scorer.evaluate_averitec_score(predictions, references) |
|
|
| for i, level in enumerate(scorer.averitec_reporting_levels): |
| print_with_space( |
| " * Veracity scores (" + scorer.metric + " @ " + str(level) + "):", |
| str(v_score[i]), |
| ) |
| print("--------------------") |
| type_scores = scorer.evaluate_averitec_veracity_by_type( |
| predictions, references, threshold=0.2 |
| ) |
| for t, v in type_scores.items(): |
| print_with_space(" * Veracity scores (" + t + "):", str(v)) |
| print("--------------------") |
| type_scores = scorer.evaluate_averitec_veracity_by_type( |
| predictions, references, threshold=0.3 |
| ) |
| for t, v in type_scores.items(): |
| print_with_space(" * Veracity scores (" + t + "):", str(v)) |
|
|