|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import math |
|
|
import sys |
|
|
import warnings |
|
|
from collections import Counter |
|
|
from fractions import Fraction |
|
|
|
|
|
import nltk |
|
|
import numpy as np |
|
|
from nltk.translate.bleu_score import ( |
|
|
SmoothingFunction, |
|
|
brevity_penalty, |
|
|
closest_ref_length, |
|
|
modified_precision, |
|
|
) |
|
|
|
|
|
|
|
|
def corpus_bleu( |
|
|
list_of_references, |
|
|
hypotheses, |
|
|
weights=(0.25, 0.25, 0.25, 0.25), |
|
|
smoothing_function=None, |
|
|
auto_reweigh=False, |
|
|
averaging_mode="geometric", |
|
|
no_length_penalty=False, |
|
|
): |
|
|
""" |
|
|
Calculate a single corpus-level BLEU score (aka. system-level BLEU) for all |
|
|
the hypotheses and their respective references. |
|
|
|
|
|
Instead of averaging the sentence level BLEU scores (i.e. marco-average |
|
|
precision), the original BLEU metric (Papineni et al. 2002) accounts for |
|
|
the micro-average precision (i.e. summing the numerators and denominators |
|
|
for each hypothesis-reference(s) pairs before the division). |
|
|
|
|
|
>>> hyp1 = ['It', 'is', 'a', 'guide', 'to', 'action', 'which', |
|
|
... 'ensures', 'that', 'the', 'military', 'always', |
|
|
... 'obeys', 'the', 'commands', 'of', 'the', 'party'] |
|
|
>>> ref1a = ['It', 'is', 'a', 'guide', 'to', 'action', 'that', |
|
|
... 'ensures', 'that', 'the', 'military', 'will', 'forever', |
|
|
... 'heed', 'Party', 'commands'] |
|
|
>>> ref1b = ['It', 'is', 'the', 'guiding', 'principle', 'which', |
|
|
... 'guarantees', 'the', 'military', 'forces', 'always', |
|
|
... 'being', 'under', 'the', 'command', 'of', 'the', 'Party'] |
|
|
>>> ref1c = ['It', 'is', 'the', 'practical', 'guide', 'for', 'the', |
|
|
... 'army', 'always', 'to', 'heed', 'the', 'directions', |
|
|
... 'of', 'the', 'party'] |
|
|
|
|
|
>>> hyp2 = ['he', 'read', 'the', 'book', 'because', 'he', 'was', |
|
|
... 'interested', 'in', 'world', 'history'] |
|
|
>>> ref2a = ['he', 'was', 'interested', 'in', 'world', 'history', |
|
|
... 'because', 'he', 'read', 'the', 'book'] |
|
|
|
|
|
>>> list_of_references = [[ref1a, ref1b, ref1c], [ref2a]] |
|
|
>>> hypotheses = [hyp1, hyp2] |
|
|
>>> corpus_bleu(list_of_references, hypotheses) # doctest: +ELLIPSIS |
|
|
0.5920... |
|
|
|
|
|
The example below show that corpus_bleu() is different from averaging |
|
|
sentence_bleu() for hypotheses |
|
|
|
|
|
>>> score1 = sentence_bleu([ref1a, ref1b, ref1c], hyp1) |
|
|
>>> score2 = sentence_bleu([ref2a], hyp2) |
|
|
>>> (score1 + score2) / 2 # doctest: +ELLIPSIS |
|
|
0.6223... |
|
|
|
|
|
:param list_of_references: a corpus of lists of reference |
|
|
sentences, w.r.t. hypotheses |
|
|
:type list_of_references: list(list(list(str))) |
|
|
:param hypotheses: a list of hypothesis sentences |
|
|
:type hypotheses: list(list(str)) |
|
|
:param weights: weights for unigrams, bigrams, trigrams and so on |
|
|
:type weights: list(float) |
|
|
:param smoothing_function: |
|
|
:type smoothing_function: SmoothingFunction |
|
|
:param auto_reweigh: Option to re-normalize the weights uniformly. |
|
|
:type auto_reweigh: bool |
|
|
:return: The corpus-level BLEU score. |
|
|
:rtype: float |
|
|
""" |
|
|
|
|
|
|
|
|
p_numerators = Counter() |
|
|
p_denominators = Counter() |
|
|
hyp_lengths, ref_lengths = 0, 0 |
|
|
|
|
|
assert len(list_of_references) == len(hypotheses), ( |
|
|
"The number of hypotheses and their reference(s) should be the " "same " |
|
|
) |
|
|
|
|
|
|
|
|
for references, hypothesis in zip(list_of_references, hypotheses): |
|
|
|
|
|
|
|
|
for i, _ in enumerate(weights, start=1): |
|
|
p_i = modified_precision(references, hypothesis, i) |
|
|
p_numerators[i] += p_i.numerator |
|
|
p_denominators[i] += p_i.denominator |
|
|
|
|
|
|
|
|
|
|
|
hyp_len = len(hypothesis) |
|
|
hyp_lengths += hyp_len |
|
|
ref_lengths += closest_ref_length(references, hyp_len) |
|
|
|
|
|
|
|
|
if no_length_penalty and averaging_mode == "geometric": |
|
|
bp = 1.0 |
|
|
elif no_length_penalty and averaging_mode == "arithmetic": |
|
|
bp = 0.0 |
|
|
else: |
|
|
assert not no_length_penalty |
|
|
assert ( |
|
|
averaging_mode != "arithmetic" |
|
|
), "Not sure how to apply length penalty when aurithmetic mode" |
|
|
bp = brevity_penalty(ref_lengths, hyp_lengths) |
|
|
|
|
|
|
|
|
|
|
|
if auto_reweigh: |
|
|
if hyp_lengths < 4 and weights == (0.25, 0.25, 0.25, 0.25): |
|
|
weights = (1 / hyp_lengths,) * hyp_lengths |
|
|
|
|
|
|
|
|
p_n = [ |
|
|
Fraction(p_numerators[i], p_denominators[i], _normalize=False) |
|
|
for i, _ in enumerate(weights, start=1) |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if p_numerators[1] == 0: |
|
|
return 0 |
|
|
|
|
|
|
|
|
if not smoothing_function: |
|
|
smoothing_function = SmoothingFunction().method0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
p_n = smoothing_function( |
|
|
p_n, references=references, hypothesis=hypothesis, hyp_len=hyp_lengths |
|
|
) |
|
|
|
|
|
if averaging_mode == "geometric": |
|
|
s = (w_i * math.log(p_i) for w_i, p_i in zip(weights, p_n)) |
|
|
s = bp * math.exp(math.fsum(s)) |
|
|
elif averaging_mode == "arithmetic": |
|
|
s = (w_i * p_i for w_i, p_i in zip(weights, p_n)) |
|
|
s = math.fsum(s) |
|
|
|
|
|
return s |
|
|
|
|
|
|
|
|
def sentence_bleu( |
|
|
references, |
|
|
hypothesis, |
|
|
weights=(0.25, 0.25, 0.25, 0.25), |
|
|
smoothing_function=None, |
|
|
auto_reweigh=False, |
|
|
averaging_mode="geometric", |
|
|
no_length_penalty=False, |
|
|
): |
|
|
return corpus_bleu( |
|
|
[references], |
|
|
[hypothesis], |
|
|
weights, |
|
|
smoothing_function, |
|
|
auto_reweigh, |
|
|
averaging_mode, |
|
|
no_length_penalty, |
|
|
) |
|
|
|
|
|
|
|
|
def get_target_sequences(manifest, ground_truth, to_take=1000): |
|
|
import json |
|
|
import pathlib |
|
|
|
|
|
with open(ground_truth, "r") as fin: |
|
|
original_continuations = json.loads(fin.read()) |
|
|
|
|
|
sequence2length = [(k, v[0]) for k, v in original_continuations.items()] |
|
|
assert all(float(v) >= 6.0 for (_, v) in sequence2length) |
|
|
|
|
|
sequence2length.sort(key=lambda x: x[1]) |
|
|
to_take_sequences = set(v[0] for v in sequence2length[:to_take]) |
|
|
to_take_ids = [] |
|
|
|
|
|
with open(manifest, "r") as f: |
|
|
f.readline() |
|
|
|
|
|
for i, line in enumerate(f.readlines()): |
|
|
seq_id = line.split()[0] |
|
|
seq_id = pathlib.Path(seq_id).name.split("__")[0] |
|
|
|
|
|
if seq_id in to_take_sequences: |
|
|
to_take_ids.append(i) |
|
|
|
|
|
print(f"Took {len(to_take_ids)} ids") |
|
|
return set(to_take_ids) |
|
|
|
|
|
|
|
|
def get_self_bleu(utterances, averaging_mode, weights): |
|
|
self_bleu = [] |
|
|
|
|
|
for i in range(len(utterances)): |
|
|
hypo = utterances[i] |
|
|
rest = utterances[:i] + utterances[i + 1 :] |
|
|
|
|
|
self_bleu.append( |
|
|
sentence_bleu( |
|
|
rest, |
|
|
hypo, |
|
|
weights, |
|
|
no_length_penalty=True, |
|
|
averaging_mode=averaging_mode, |
|
|
) |
|
|
) |
|
|
|
|
|
return self_bleu |
|
|
|
|
|
|
|
|
def get_self_bleu2_arithmetic(utterances): |
|
|
weights = (0.5, 0.5) |
|
|
return get_self_bleu(utterances, averaging_mode="arithmetic", weights=weights) |
|
|
|
|
|
|
|
|
def get_self_bleu2_geometric(utterances): |
|
|
weights = (0.5, 0.5) |
|
|
return get_self_bleu(utterances, averaging_mode="geometric", weights=weights) |
|
|
|
|
|
|
|
|
def get_auto_bleu2_arithmetic(utterances): |
|
|
weights = (0.5, 0.5) |
|
|
return [auto_bleu(u, mean_mode="arithmetic", weights=weights) for u in utterances] |
|
|
|
|
|
|
|
|
def get_auto_bleu2_geometric(utterances): |
|
|
weights = (0.5, 0.5) |
|
|
return [auto_bleu(u, mean_mode="geometric", weights=weights) for u in utterances] |
|
|
|
|
|
|
|
|
def get_auto_bleu3_geometric(utterances): |
|
|
weights = (1.0 / 3, 1.0 / 3, 1.0 / 3) |
|
|
return [auto_bleu(u, mean_mode="geometric", weights=weights) for u in utterances] |
|
|
|
|
|
|
|
|
def get_auto_bleu3_arithmetic(utterances): |
|
|
weights = (1.0 / 3, 1.0 / 3, 1.0 / 3) |
|
|
return [auto_bleu(u, mean_mode="arithmetic", weights=weights) for u in utterances] |
|
|
|
|
|
|
|
|
def get_self_bleu3_arithmetic(utterances): |
|
|
weights = (1.0 / 3, 1.0 / 3, 1.0 / 3) |
|
|
return get_self_bleu(utterances, averaging_mode="arithmetic", weights=weights) |
|
|
|
|
|
|
|
|
def get_self_bleu3_geometric(utterances): |
|
|
weights = (1.0 / 3, 1.0 / 3, 1.0 / 3) |
|
|
return get_self_bleu(utterances, averaging_mode="geometric", weights=weights) |
|
|
|
|
|
|
|
|
def auto_bleu(sentence, weights, mean_mode="arithmetic"): |
|
|
if len(sentence) <= 1: |
|
|
return 0 |
|
|
|
|
|
N = len(weights) |
|
|
|
|
|
bleu_n = np.zeros([N]) |
|
|
for n in range(N): |
|
|
targ_ngrams = list(nltk.ngrams(sentence, n + 1)) |
|
|
for p in range(len(targ_ngrams)): |
|
|
left = sentence[:p] |
|
|
right = sentence[(p + n + 1) :] |
|
|
rest_ngrams = list(nltk.ngrams(left, n + 1)) + list( |
|
|
nltk.ngrams(right, n + 1) |
|
|
) |
|
|
|
|
|
bleu_n[n] += targ_ngrams[p] in rest_ngrams |
|
|
bleu_n[n] /= len(targ_ngrams) |
|
|
|
|
|
weights = np.array(weights) |
|
|
if mean_mode == "arithmetic": |
|
|
return (bleu_n * weights).sum() |
|
|
elif mean_mode == "geometric": |
|
|
return (bleu_n**weights).prod() |
|
|
else: |
|
|
raise ValueError(f"Unknown agggregation mode {mean_mode}") |
|
|
|
|
|
|
|
|
def run_f(task_params): |
|
|
f, terms = task_params |
|
|
return f(terms) |
|
|
|