| import numpy as np |
| import torch |
| import torch.nn.functional as F |
| from torch.nn.utils.rnn import pad_sequence |
| from sentence_transformers import SentenceTransformer, CrossEncoder |
| from sentence_transformers.util import pytorch_cos_sim |
| from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForCausalLM |
| from nltk import word_tokenize |
| from collections import defaultdict |
| from pprint import pprint |
|
|
|
|
| from collections import Counter |
| from rouge_score import rouge_scorer |
|
|
|
|
| ROUGE_TYPES = ["rouge1", "rouge2", "rougeL"] |
| rouge_scorer = rouge_scorer.RougeScorer( |
| ROUGE_TYPES, |
| use_stemmer=True |
| ) |
|
|
|
|
| def load_rewards(args): |
| rewards, names = [], [] |
| for name, settings in args.rewards.items(): |
| settings["device"] = args.device |
| print("Loading reward:", name) |
| pprint(settings) |
| print() |
| reward_cls = globals()[name] |
| reward_func = reward_cls(**settings) |
| rewards.append(reward_func) |
| names.append(name) |
| return RewardAggregator(rewards, names) |
|
|
|
|
| class RewardAggregator: |
| def __init__(self, reward_generators, reward_names): |
| self.reward_generators = reward_generators |
| self.reward_names = reward_names |
| self.weights = [rg.weight for rg in reward_generators] |
| self.n_rewards = len(reward_generators) |
|
|
| def __call__(self, sources, summaries): |
| name_to_scores = {} |
| for rg, name in zip(self.reward_generators, self.reward_names): |
| scores = rg(sources=sources, summaries=summaries) |
| name_to_scores[name] = scores |
| final_scores = [] |
| for i in range(len(summaries)): |
| score = 0. |
| total_weights = 0. |
| for name, w in zip(self.reward_names, self.weights): |
| score += name_to_scores[name][i] * w |
| total_weights += w |
| score /= total_weights |
| final_scores.append(score) |
|
|
| return final_scores, name_to_scores |
|
|
|
|
| class Fluency: |
|
|
| def __init__( |
| self, |
| model_id="distilroberta", |
| weight=1, |
| type="masked", |
| device="cuda", |
| norm="max", |
| max_score=40., |
| min_score=-30., |
| ): |
| tokenizer = AutoTokenizer.from_pretrained(model_id) |
| if type == "masked": |
| pad_token_id = tokenizer.pad_token_id |
| model = AutoModelForMaskedLM.from_pretrained(model_id).to(device) |
| else: |
| pad_token_id = tokenizer.eos_token_id |
| model = AutoModelForCausalLM.from_pretrained(model_id).to(device) |
|
|
| self.model = model |
| self.tokenizer = tokenizer |
| self.weight = weight |
| self.device = device |
| self.max_score = max_score |
| self.min_score = min_score |
| self.pad_token_id = pad_token_id |
| self.norm = norm |
| assert self.norm in ("max", "minmax") |
|
|
| def ids_to_tokens(self, ids): |
| return [self.tokenizer._convert_id_to_token(id) for id in ids] |
|
|
| def __call__(self, sources=None, summaries=None, normalize_len=False): |
| summaries = [s if s != "" else " " for s in summaries] |
| input_ids = [self.tokenizer.encode(text) for text in summaries] |
| lens = [len(ids) for ids in input_ids] |
| input_ids = [torch.tensor(ids) for ids in input_ids] |
| input_ids = pad_sequence( |
| input_ids, |
| batch_first=True, |
| padding_value=self.pad_token_id |
| ).to(self.device) |
| with torch.no_grad(): |
| output = self.model(input_ids=input_ids, labels=input_ids) |
| logits = output["logits"] |
|
|
| scores = [] |
| for i in range(logits.size(0)): |
| i_scores = [] |
| for j in range(logits.size(1)): |
| tok_idx = input_ids[i, j] |
| if tok_idx == self.pad_token_id: |
| break |
| score = logits[i, j, tok_idx].item() |
| i_scores.append(score) |
| i_score_max = np.mean(i_scores) / self.max_score |
| i_score_minmax = (np.mean(i_scores) - self.min_score) / (self.max_score - self.min_score) |
| if self.norm == "max": |
| i_score = i_score_max |
| else: |
| i_score = i_score_minmax |
| scores.append(i_score) |
| return scores |
|
|
|
|
| class BiEncoderSimilarity: |
| def __init__( |
| self, |
| model_id="all-distilroberta-v1", |
| device="cuda", |
| weight=1 |
| ): |
| self.model = SentenceTransformer(model_id).to(device) |
| self.weight = weight |
|
|
| def __call__(self, sources=None, summaries=None): |
| src_embs = self.model.encode(sources) |
| sum_embs = self.model.encode(summaries) |
| scores = [] |
| for i in range(len(summaries)): |
| score = pytorch_cos_sim( |
| src_embs[i].reshape(1, -1), |
| sum_embs[i].reshape(1, -1), |
| )[0, 0].item() |
| scores.append(score) |
| return scores |
|
|
|
|
| class CrossEncoderSimilarity: |
| def __init__( |
| self, |
| model_id="all-distilroberta-v1", |
| device="cuda", |
| weight=1 |
| ): |
| self.model = CrossEncoder(model_id, device=device) |
| self.weight = weight |
|
|
| def __call__(self, sources=None, summaries=None): |
| scores = self.model.predict([ |
| (src, sum) for src, sum in zip(sources, summaries) |
| ]) |
| return scores.tolist() |
|
|
|
|
| class SelectedTokenSimilarity: |
| def __init__( |
| self, |
| model_id="all-distilroberta-v1", |
| device="cuda", |
| weight=1 |
| ): |
| self.model = SentenceTransformer(model_id).to(device) |
| self.weight = weight |
| self.tokenizer = model.tokenizer |
|
|
| def ids_to_tokens(self, ids): |
| return [self.tokenizer._convert_id_to_token(id) for id in ids] |
|
|
| def align_tokens(self, src, summary): |
| src_ids, sum_ids = self.tokenizer( |
| [src, summary], |
| truncation=True, |
| max_length=self.model.max_seq_length, |
| ).input_ids |
| src_tokens = self.ids_to_tokens(src_ids) |
| sum_tokens = self.ids_to_tokens(sum_ids) |
| sum_to_src = defaultdict(list) |
| for i, sum_tok in enumerate(sum_tokens): |
| for j, src_tok in enumerate(src_tokens): |
| if sum_tok == src_tok: |
| sum_to_src[i].append(j) |
| if len(sum_to_src[i]) == 0: |
| sum_to_src[i] = None |
| return sum_to_src |
|
|
| def compute_score(self, x_sum, x_src, sum_to_src): |
| S = pytorch_cos_sim(x_sum, x_src).cpu().numpy() |
| scores = [] |
| for i, J in sum_to_src.items(): |
| if J is None: |
| i_score = 0. |
| else: |
| i_scores = [S[i, j] for j in J] |
| i_score = max(i_scores) |
| scores.append(i_score) |
| return np.mean(scores) |
|
|
| def __call__(self, sources=None, summaries=None): |
| src_embs = self.model.encode(sources, output_value="token_embeddings") |
| sum_embs = self.model.encode(summaries, output_value="token_embeddings") |
| scores = [] |
| for i in range(len(summaries)): |
| x_src = src_embs[i] |
| x_sum = sum_embs[i] |
| sum_to_src = self.align_tokens(sources[i], summaries[i]) |
| score = self.compute_score(x_sum, x_src, sum_to_src) |
| scores.append(score) |
| return scores |
|
|
|
|
| class NLIReward(): |
| def __init__( |
| self, |
| model_id="cross-encoder/nli-distilroberta-base", |
| device="cuda", |
| weight=1 |
| ): |
| self.model = CrossEncoder(model_id, device) |
| self.label_mapping = ['contradiction', 'entailment', 'neutral'] |
| self.weight = weight |
|
|
| def __call__(self, sources=None, summaries=None): |
| scores = self.model.predict([ |
| (src, sum) for src, sum in zip(sources, summaries) |
| ]) |
| probs = torch.softmax(torch.tensor(scores), dim=1) |
| labels = [ |
| self.label_mapping[score_max] for score_max in scores.argmax(axis=1) |
| ] |
| rewards = [probs[i, 1].item() for i in range(len(summaries))] |
| rewards = [ |
| (0 if summaries[i].strip()=="" else r) |
| for i, r in enumerate(rewards) |
| ] |
| return rewards |
|
|
|
|
| class GaussianLength: |
| def __init__(self, mean=11, std=0.3, max_len=100, weight=1, device=None): |
| self.weight = weight |
| lens = np.arange(0, max_len + 1) |
| scores = gaussian(lens, mean, std) |
| scores /= scores.max() |
| self.len_to_reward = dict((l, scores[l]) for l in lens) |
| self.max_len = max_len |
|
|
| def __call__(self, sources=None, summaries=None): |
| lens = [len(word_tokenize(s)) for s in summaries] |
| scores = [ |
| self.len_to_reward[l] if l <= self.max_len else 0. |
| for l in lens |
| ] |
| return scores |
|
|
|
|
| class GaussianCR: |
| def __init__(self, mean=0.45, std=0.3, weight=1, device=None): |
| self.weight = weight |
| ratios = np.arange(0, 1.1, 0.01) |
| scores = gaussian(ratios, mean, std) |
| scores /= scores.max() |
| self.ratio_to_reward = dict((round(r, 3), s) for r, s in zip(ratios, scores)) |
|
|
| def __call__(self, sources=None, summaries=None): |
| source_lens = [len(word_tokenize(s)) for s in sources] |
| summary_lens = [len(word_tokenize(s)) for s in summaries] |
|
|
| ratios = [round(x / y, 2) for x, y in zip(summary_lens, source_lens)] |
| ratios = [min(1., x) for x in ratios] |
|
|
| return [ |
| self.ratio_to_reward[round(ratio, 2)] |
| for ratio in ratios |
| ] |
|
|
|
|
| class NoDaysReward(): |
| def __init__(self, weight=1, device=None): |
| self.day_words = [ |
| "monday", "tuesday", "wednesday", |
| "thursday", "friday", "saturday", "sunday", |
| "today", "tomorrow", "yesterday", "tonight" |
| ] |
| self.weight = weight |
|
|
| def __call__(self, sources=None, summaries=None): |
| scores = [] |
| for s in summaries: |
| s = s.lower() |
| if any([w in s for w in self.day_words]): |
| score = 0. |
| else: |
| score = 1. |
| scores.append(score) |
| return scores |
|
|
|
|
| def gaussian(x, mu, sig): |
| return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.))) |
|
|
|
|
| class RougeReward: |
| def __init__(self, rouge_type="rougeL", weight=1, device=None): |
| self.rouge_type = rouge_type |
| self.weight = weight |
| self.targets = None |
|
|
| def __call__(self, sources=None, summaries=None): |
| scores = [] |
| for pred, tgt in zip(summaries, self.targets): |
| rouge_scores = rouge_scorer.score(tgt, pred) |
| score = rouge_scores[self.rouge_type].fmeasure |
| scores.append(score) |
| return scores |
|
|
| |
|
|