AlignScoreCS / AlignScoreCS.py
krotima1
Add AlignScore.py class of transformer model - easy to use
6885f5c
raw
history blame
37.9 kB
import transformers
from transformers import PretrainedConfig
import os
from pathlib import Path
import numpy as np
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
import torch.nn as nn
import torch
# This include should be add when using different AlignScoreFunction methods instead of score()
# from nltk.tokenize import sent_tokenize
from tqdm import tqdm
class AlignScoreCS(transformers.XLMRobertaModel):
"""
ALIGNSCORE class
Description:
Model ALIGNSCORECS has been trained according the paper for 3 days on 4GPUs AMD NVIDIA.
(3 epochs, 1e-5 learning rate, 1e-6 AdamWeps, batchsize 32, WarmupRatio 0.06, 0.1 WeighDecay)
- XLMROBERTA-base model with 3 classification HEAD {regression,binary,3way} using shared encoder
USAGE: AlignScore.py
- from_pretrained - loads the model, usage as transformers.model
- .score(context, claim) - function
- returns probs of the ALIGNED class using 3way class head as in the paper.
alignScoreCS = AlignScoreCS.from_pretrained("/mnt/data/factcheck/AlignScore-data/AAmodel/MTLModel/mo
alignScoreCS.score(context,claim)
If you want to try different classification head use parameter:
- task_name = "re" : regression head
- task_name = "bin" : binary classification head
- task_name = "3way" : 3way classification head
"""
_regression_model = "re_model"
_binary_class_model = "bin_model"
_3way_class_model = "3way_model"
def __init__(self, encoder, taskmodels_dict, model_name= "xlm-roberta-large", **kwargs):
super().__init__(transformers.XLMRobertaConfig(), **kwargs)
self.encoder = encoder
self.taskmodels_dict = nn.ModuleDict(taskmodels_dict)
self.tokenizer = None
self.model_name = model_name
self.inferencer = None
def init_inferencer(self, device = "cuda"):
self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer
self.inferencer = self.InferenceHandler(self, self.tokenizer, device)
"""
Score: scores the context and claim with Aligned probabitlity of 3way classification head
- using paper code inferencer from ALignScore
"""
def score(self, context, claim, **kwargs):
if self.inferencer is None:
self.init_inferencer()
scores = self.inferencer.nlg_eval(context, claim)
return scores
"""
Score: scores the context and claim with ALIGNED probability (wrt task_name ["re" | "bin" | "3way"])
Returns the probability of the ALIGNED CLASS between context text and claim text
- chunks text by 350 tokens and splits claim into sentences
- using 3way classification head
"""
def score_sentences(self, context :str, claim :str, task_name = "3way", batch_size = 2, return_all_outputs = False, **kwargs):
self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer
chunked_inputs = self.chunk_sent_input(context,claim, chunk_size=350,chunk_claim_size=150)
nclaims, ncontexts = (chunked_inputs["n_claims"],chunked_inputs["n_contexts"])
with torch.no_grad():
chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items() if not key.startswith("n_")}
chunked_outputs = {}
for i in range(0,len(chunked_inputs["input_ids"]),batch_size):
tmp = self.forward(task_name = task_name,**{"input_ids":chunked_inputs["input_ids"][i:i+batch_size],"attention_mask" :chunked_inputs["attention_mask"][i:i+batch_size]}, **kwargs)
for k, item in tmp.items():
chunked_outputs[k] = chunked_outputs.get(k, []) + [item]
logits = torch.vstack(chunked_outputs["logits"]).cpu()
outputs = {"score" : self.alignscore_input(logits,nclaims=nclaims,ncontexts=ncontexts, task_name=task_name)}
outputs["outputs"] = chunked_outputs
return torch.tensor([outputs["score"]]) if not return_all_outputs else outputs
"""
Score: scores the context and claim with ALIGNED probability (wrt task_name ["re" | "bin" | "3way"])
Returns the probability of the ALIGNED CLASS between context text and claim text
- chunks text into 350 tolens and chunks claim into 150 tokens
- using 3way classification head
"""
def score_chunks(self, context :str, claim :str, task_name = "3way", batch_size = 2, return_all_outputs = False, **kwargs):
self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer
chunked_inputs = self.chunk_inputs(context,claim, chunk_size=350)
chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items()}
chunked_outputs = self.forward(task_name = task_name, **chunked_inputs, **kwargs)
outputs = {"score" : self.alignscore_input_deprecated(chunked_outputs.logits.cpu(), task_name=task_name)}
outputs["outputs"] = chunked_outputs
return outputs["score"] if not return_all_outputs else outputs
"""
Classify: classify the context and claim to the class label given the task_name ["re" | "bin" | "3way"]
Returns the class of {Neutral, contradict, aligned} between context text and claim text
- using 3way classification head
"""
def classify(self, context :str, claim :str, task_name = "3way", return_all_outputs = False, **kwargs):
self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer
chunked_inputs = self.chunk_inputs(context,claim, chunk_size=350)
chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items()}
chunked_outputs = self.forward(task_name = task_name, **chunked_inputs, **kwargs)
outputs = {"class" : self.get_system_label(chunked_outputs.logits.cpu(), task_name=task_name)}
outputs["outputs"] = chunked_outputs
return outputs["class"] if not return_all_outputs else outputs
def score_truncated(self, context :str, claim :str, task_name = "3way", return_all_outputs = False, **kwargs):
self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer
tokenized_inputs = self.tokenizer(list(zip([context], [claim])), padding = "max_length", truncation = True, max_length = 512, return_tensors="pt")
tokenized_inputs = {key : torch.tensor(item).to(self.device) for key, item in tokenized_inputs.items()}
with torch.no_grad():
model_outputs = self.forward(task_name=task_name, **tokenized_inputs, **kwargs)
outputs = {"score" : self.alignscore_input(model_outputs["logits"].cpu(),nclaims=1, ncontexts=1, task_name=task_name)}
outputs["outputs"] = model_outputs
return torch.tensor([outputs["score"]]) if not return_all_outputs else outputs
def forward(self, task_name = "3way", **kwargs):
return self.taskmodels_dict[task_name](**kwargs)
def __call__(self, task_name, **kwargs):
return self.taskmodels_dict[task_name](**kwargs)
"""
Get the probability of the ALIGNED label from input
"""
def alignscore_input(self, chunked_logits, nclaims, ncontexts, task_name = "3way"):
if task_name == "re":
ouptuts = chunked_logits.detach()
# Reshape the tensor to separate each block of n rows
reshaped_tensor = ouptuts.view(nclaims, ncontexts)
# Extract the maximum values from the first column (index 0) within each block of n rows
max_values, _ = reshaped_tensor.max(dim=1)
# Calculate the mean of the max values for each block of n rows
mean_of_maxes = torch.mean(max_values, dim=0)
return mean_of_maxes.tolist()
else:
nlabels = {"3way" : 3, "re" : 1, "2way" : 2}[task_name]
ouptuts = chunked_logits.softmax(1).detach()
# Reshape the tensor to separate each block of n rows
reshaped_tensor = ouptuts.view(nclaims, ncontexts, nlabels)
# Extract the maximum values from the first column (index 0) within each block of n rows
max_values, _ = torch.max(reshaped_tensor[:, :, 1], dim=1)
# Calculate the mean of the max values for each block of n rows
mean_of_maxes = torch.mean(max_values, dim=0)
return mean_of_maxes.tolist()
def alignscore_input_deprecated(self, chunked_logits, task_name = "3way"):
if task_name == "re":
return chunked_logits.detach().amax(0).tolist()
else:
return chunked_logits.softmax(1).detach()[:, 1].amax(0).tolist() # return max probability over the ALIGNED class
"""
get the label from the input
"""
def get_system_label(self, chunked_logits, task_name):
if task_name == "re":
return (chunked_logits.sum(0) / chunked_logits.size()[0]).detach().tolist()
else:
avg_probs = chunked_logits.softmax(1).sum(0) / chunked_logits.size()[0]
numpy_array = chunked_logits.softmax(1).argmax(1).detach().numpy()
# Calculate the frequencies of each value
unique_values, counts = np.unique(numpy_array, return_counts=True)
# Find the maximum count
max_count = np.max(counts)
# Find all values with the maximum count
most_frequent_values = unique_values[counts == max_count]
return most_frequent_values[0] if most_frequent_values.size == 1 else avg_probs.detach().argmax().tolist()
"""
Chunks input context and claim - context is chunked into 350 tokens
- claim is chunked into sentences
- using stride for overflowing tokens
"""
def chunk_sent_input(self, context, claim, max_length = 512, chunk_size = 350, chunk_claim_size = 150):
assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, chunk_claim_size, max_length)
chunk_claim_size = max_length - chunk_size if chunk_claim_size is None else chunk_claim_size
assert chunk_size + chunk_claim_size <= max_length, "Chunk size {} and Chunk claim size {} cannot be together greater than max size {}".format(chunk_size, chunk_claim_size, max_length)
return_chunked_inputs = {}
context_chunks = self.chunk_text(context, chunk_size=chunk_size, overflowing_tokens_stride = 25, first_special_token=[0])
claim_chunks = self.chunk_sentences(claim, chunk_size=chunk_claim_size,overflowing_tokens_stride=int(chunk_claim_size/3), first_special_token=[2])
for claim_chunk in claim_chunks:
for context_chunk in context_chunks:
inputs,attention =self.fill_with_pad_tokens(context_chunk,claim_chunk )
return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs]
return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention]
return_chunked_inputs["n_claims"] = len(claim_chunks)
return_chunked_inputs["n_contexts"] = len(context_chunks)
return return_chunked_inputs
"""
According to paper - chunk the text into smaller parts (350tokens + claim_tokens) when the tokenized inputs exceed the max_length
returns chunked input
"""
def chunk_inputs(self, context, claim, max_length = 512, chunk_size = 512, first_fit_within_max_length = True):
assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, max_length)
tokenized_claim = self.tokenizer(claim, return_length=True)
tokenized_claim["input_ids"][0] = 2 # </s> token according to pair tokenization where the separator of the context and claim is </s></s>
tokenized_context = self.tokenizer(context, return_length = True)
assert tokenized_claim["length"][0] < max_length*4/5, "Create chunks of claim sentences. Claim is too long {} which is more than 4/5 from {}.".format(tokenized_claim["length"][0], max_length)
# set chunk size to incorporate the claim size as well
chunk_size = min(max_length, chunk_size + tokenized_claim["length"][0])
first_check_max_size = max_length if first_fit_within_max_length else chunk_size
if tokenized_claim["length"][0] + tokenized_context["length"][0] <= first_check_max_size: #if it fits within max_length
input_ids, attention_mask = self.fill_with_pad_tokens(tokenized_context["input_ids"],tokenized_claim["input_ids"])
return {"input_ids" : [input_ids], "attention_mask" : [attention_mask]}
else: # make chunks
return_chunked_inputs = {}
current_chunk = {}
for sentence in sent_tokenize(context, language="czech"):
tok_sent = self.tokenizer(sentence, return_length=True)
if len(current_chunk.get("input_ids",[0])) + tok_sent["length"][0] - 1 + tokenized_claim["length"][0] <= chunk_size:
current_chunk["input_ids"] = current_chunk.get("input_ids",[0]) + tok_sent["input_ids"][1:-1]
else:
return_chunked_inputs = self._update_chunked_inputs(tokenized_claim, current_chunk, return_chunked_inputs, max_length, tok_sent)
current_chunk["input_ids"] = [0] + tok_sent["input_ids"][1:-1]
if current_chunk != {}: # add the rest
return_chunked_inputs = self._update_chunked_inputs(tokenized_claim, current_chunk, return_chunked_inputs, max_length)
current_chunk = {}
return return_chunked_inputs
"""
Chunks input context and claim - context is chunked into 350 tokens
- claim is chunked into 150 tokens
- using stride for overflowing tokens
"""
def chunk_input_deprecated(self, context, claim, max_length = 512, chunk_size = 350, chunk_claim_size = 150):
assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, chunk_claim_size, max_length)
chunk_claim_size = max_length - chunk_size if chunk_claim_size is None else chunk_claim_size
assert chunk_size + chunk_claim_size <= max_length, "Chunk size {} and Chunk claim size {} cannot be together greater than max size {}".format(chunk_size, chunk_claim_size, max_length)
return_chunked_inputs = {}
context_chunks = self.chunk_text(context, chunk_size=chunk_size, overflowing_tokens_stride = 25, first_special_token=[0])
claim_chunks = self.chunk_text(claim, chunk_size=chunk_claim_size,overflowing_tokens_stride=int(chunk_claim_size/3), first_special_token=[2])
for claim_chunk in claim_chunks:
for context_chunk in context_chunks:
inputs,attention =self.fill_with_pad_tokens(context_chunk,claim_chunk )
return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs]
return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention]
return_chunked_inputs["n_claims"] = len(claim_chunks)
return_chunked_inputs["n_contexts"] = len(context_chunks)
return return_chunked_inputs
"""
Chunk texts into blocks of chunk_size tokens
"""
def chunk_text(self, text, chunk_size = 350, overflowing_tokens_stride = 25, language="czech", first_special_token = [0]):
sentences = sent_tokenize(text, language=language)
tokenized = self.tokenizer(sentences if sentences != [] else [""], return_length=True)
chunks = []
chunk, current_chunk_size = ([], 0)
for i, length in enumerate(tokenized["length"]):
# WRAP THE TOKENIZED SENTNECE INTO LIST TO HANDLE OVERFLOWING TOKENS EASILY
# Case when length of one sentence is longer than the chunk size - split the sentence into chunks of chunk size
if length > chunk_size:
splits = [first_special_token + tokenized["input_ids"][i][max(1,cs):min(cs + chunk_size - 2, length - 1)] + [2] for cs in range(0, length , chunk_size-(2+overflowing_tokens_stride))]
# Case when lenght of sequence is equal or smaller than the chunk size - only continue
else:
splits = [first_special_token + tokenized["input_ids"][i][1:]]
# Go through sentence or splits of sentence
for subsentence in splits:
up_length = len(subsentence) - 2
# Case when the current chunk = 0
if current_chunk_size == 0:
current_chunk_size = up_length + 2 # First include <s> and </s> tokens
chunk = subsentence[:-1]
# Case when the current chunk + length of new subsentence <= chunk_size - only add
elif current_chunk_size + up_length <= chunk_size:
current_chunk_size += up_length
chunk += subsentence[1:-1]
# Case when the current chunk + length of new subsentence > chunk_size - create chunk
else:
chunks += [chunk + [2]]
current_chunk_size = up_length + 2 # First include <s> and </s> tokens
chunk = subsentence[:-1]
#Case when the loop ended but the current chunk isnt saved in the chunks
if chunk != []:
chunks += [chunk + [2]]
# lengths = [len(ch) for ch in chunks]
# print("Lenght in tokens of ",len(lengths)," chunks (AVG=",np.mean(lengths),",MAX=",np.max(lengths),",MIN=", np.min(lengths),")")
return chunks
"""
Chunks text into sentences using nlt.sent_tokenize
"""
def chunk_sentences(self, text, chunk_size, overflowing_tokens_stride = 0, language="czech", sentence_window = 2, first_special_token = [2]):
sentences = sent_tokenize(text, language=language)
tokenized = self.tokenizer(sentences if sentences != [] else [""], return_length=True)
chunks = []
current_chunk = []
for i, length in enumerate(tokenized["length"]):
# WRAP THE TOKENIZED SENTNECE INTO LIST TO HANDLE OVERFLOWING TOKENS EASILY
# Case when length of one sentence is longer than the chunk size - split the sentence into chunks of chunk size
if length > chunk_size:
splits = [first_special_token + tokenized["input_ids"][i][max(1,cs):min(cs + chunk_size - 2, length - 1)] + [2] for cs in range(0, length , chunk_size-(2+overflowing_tokens_stride))]
# Case when lenght of sequence is equal or smaller than the chunk size - only continue
else:
splits = [first_special_token + tokenized["input_ids"][i][1:]]
#Go through sentence or parts of sentence and create chunks
for split in splits:
chunks += [split]
# if len(current_chunk) == sentence_window:
# chunks += [first_special_token + [item for row in current_chunk for item in row] + [2]]
# current_chunk = current_chunk[1:] + [split[1:-1]]
# else:
# current_chunk += [split[1:-1]]
# if chunks == []:
# chunks += [first_special_token + [item for row in current_chunk for item in row] + [2]]
return chunks
"""
join context and claim tokens as input_ids and create attention_mask
"""
def fill_with_pad_tokens(self, first, second, max_length=512, pad_token = 1):
return first + second + [pad_token]*max(max_length-len(first)-len(second),0), [1]*(len(first)+len(second)) + [0]*max(max_length-len(first)-len(second),0)
def _update_chunked_inputs(self, tokenized_claim, current_chunk, return_chunked_inputs, max_length, tok_sent = {"input_ids" : []}):
# truncate if there is a long sentence (rare occurrences)
if len(current_chunk.get("input_ids",[0])) + tokenized_claim["length"][0] >= max_length:
chunk = current_chunk["input_ids"].copy()[:max_length-tokenized_claim["length"][0]-1] + [2]
elif not current_chunk.get("input_ids",False):
chunk = tok_sent["input_ids"][: max_length - tokenized_claim["length"][0] -1] + [2]
else:
chunk = current_chunk["input_ids"].copy() + [2] # add </s> end of sentence
claim_ids = tokenized_claim["input_ids"].copy()
inputs, attention = self.fill_with_pad_tokens(chunk,claim_ids )
return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs]
return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention]
return return_chunked_inputs
@classmethod
def get_encoder_attr_name(cls, model):
"""
The encoder transformer is named differently in each model "architecture".
This method lets us get the name of the encoder attribute
"""
model_class_name = model.__class__.__name__
if model_class_name.startswith("XLMRoberta"):
return "roberta"
else:
raise KeyError(f"Add support for new model {model_class_name}")
@classmethod
def from_pretrained(
cls,
pretrained_model_name_or_path: Optional[Union[str, os.PathLike]],
model_name : str = "xlm-roberta-large",
*model_args,
config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None,
cache_dir: Optional[Union[str, os.PathLike]] = None,
ignore_mismatched_sizes: bool = False,
force_download: bool = False,
local_files_only: bool = False,
token: Optional[Union[str, bool]] = None,
revision: str = "main",
use_safetensors: bool = None,
**kwargs,
):
# Check if the required model directories exist then load it from file
if all(os.path.exists(os.path.join(pretrained_model_name_or_path, model_dir)) for model_dir in [cls._3way_class_model, cls._regression_model, cls._binary_class_model]):
# assert all(
# for model_dir in [cls._3way_class_model, cls._regression_model, cls._binary_class_model]
# ), "Error: Required model directories not found!"
# Disable the warning about newly initialized weights
transformers.logging.set_verbosity_error()
shared_encoder = None
taskmodels_dict = {}
for path_name in [cls._regression_model, cls._binary_class_model, cls._3way_class_model]:
task_name = path_name.split("_")[0]
# Load the configuration for the task-specific model
task_config = transformers.XLMRobertaConfig.from_json_file("{}/{}/config.json".format(pretrained_model_name_or_path,path_name))
# Create the task-specific model
model = transformers.XLMRobertaForSequenceClassification.from_pretrained(model_name, config=task_config,*model_args,**kwargs)
# Load the weights for the task-specific model
model.load_state_dict(torch.load("{}/{}/pytorch_model.bin".format(pretrained_model_name_or_path,path_name), map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
# Set the shared encoder to the model's encoder
if shared_encoder is None:
shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model))
else:
setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder)
taskmodels_dict[task_name] = model
# Create the AlignScoreCS with the shared encoder and loaded task-specific models
alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name)
#Try load the model from huggingface hub
else:
shared_encoder = None
taskmodels_dict = {}
for model_dir in [cls._regression_model, cls._binary_class_model, cls._3way_class_model]:
task_name = model_dir.split("_")[0]
config = transformers.XLMRobertaConfig.from_pretrained(f"{pretrained_model_name_or_path}", subfolder=model_dir)
model = transformers.XLMRobertaForSequenceClassification.from_pretrained(f"{pretrained_model_name_or_path}",config=config, subfolder=model_dir)
if shared_encoder is None:
shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model))
else:
setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder)
taskmodels_dict[task_name] = model
alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name)
return alignScoreCS
def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
is_main_process: bool = True,
state_dict: Optional[dict] = None,
save_function: Callable = torch.save,
push_to_hub: bool = False,
max_shard_size: Union[int, str] = "10GB",
safe_serialization: bool = False,
variant: Optional[str] = None,
token: Optional[Union[str, bool]] = None,
save_peft_format: bool = True,
**kwargs,
):
for task_name, model_type in self.taskmodels_dict.items():
model_type.save_pretrained(save_directory = Path(save_directory,task_name+"_model"),
is_main_process = is_main_process,
state_dict = state_dict,
save_function = save_function,
push_to_hub = push_to_hub,
max_shard_size = max_shard_size,
safe_serialization = safe_serialization,
variant = variant,
token = token,
save_peft_format = save_peft_format,
**kwargs)
# This piece of code is copied from AlignScore github repository
# if you want to use different nlg_eval_mode you have to fix errors on your own
class InferenceHandler:
def __init__(self, model, tokenizer, device = "cuda"):
self.model = model
self.device = device
self.tokenizer = tokenizer
self.model.to(self.device)
self.model.eval()
self.batch_size = 32
self.nlg_eval_mode = "nli_sp"
self.verbose = False
self.task_name = "3way"
self.softmax = nn.Softmax(dim=-1)
def nlg_eval(self, premise, hypo):
if isinstance(premise, str) and isinstance(hypo, str):
premise = [premise]
hypo = [hypo]
return self.inference_example_batch(premise, hypo)
def inference_example_batch(self, premise: list, hypo: list):
"""
inference a example,
premise: list
hypo: list
using self.inference to batch the process
SummaC Style aggregation
"""
self.disable_progress_bar_in_inference = True
assert len(premise) == len(hypo), "Premise must has the same length with Hypothesis!"
out_score = []
for one_pre, one_hypo in tqdm(zip(premise, hypo), desc="Evaluating", total=len(premise), disable=(not self.verbose)):
out_score.append(self.inference_per_example(one_pre, one_hypo))
return torch.tensor(out_score)
def inference_per_example(self, premise:str, hypo: str):
"""
inference a example,
premise: string
hypo: string
using self.inference to batch the process
"""
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield ' '.join(lst[i:i + n])
premise_sents = sent_tokenize(premise)
premise_sents = premise_sents or ['']
n_chunk = len(premise.strip().split()) // 350 + 1
n_chunk = max(len(premise_sents) // n_chunk, 1)
premise_sents = [each for each in chunks(premise_sents, n_chunk)]
hypo_sents = sent_tokenize(hypo)
premise_sent_mat = []
hypo_sents_mat = []
for i in range(len(premise_sents)):
for j in range(len(hypo_sents)):
premise_sent_mat.append(premise_sents[i])
hypo_sents_mat.append(hypo_sents[j])
if self.nlg_eval_mode is not None:
if self.nlg_eval_mode == 'nli_sp':
output_score = self.inference(premise_sent_mat, hypo_sents_mat)[:,1] ### use NLI head OR ALIGN head
output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() ### sum or mean depends on the task/aspect
return output_score
output_score = self.inference(premise_sent_mat, hypo_sents_mat) ### use NLI head OR ALIGN head
output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() ### sum or mean depends on the task/aspect
return output_score
def inference(self, premise, hypo, task_name = None):
"""
inference a list of premise and hypo
Standard aggregation
"""
task_name = self.task_name if task_name is None else task_name
if isinstance(premise, str) and isinstance(hypo, str):
premise = [premise]
hypo = [hypo]
batch = self.batch_tokenize(premise, hypo)
output_score = []
for mini_batch in tqdm(batch, desc="Evaluating", disable=not self.verbose or self.disable_progress_bar_in_inference):
mini_batch = mini_batch.to(self.device)
with torch.no_grad():
model_output = self.model.forward(task_name=task_name, **mini_batch)
model_output = model_output.logits
if task_name == "re":
model_output = model_output.cpu()
else:
model_output = self.softmax(model_output).cpu()
output_score.append(model_output[:,:])
output_score = torch.cat(output_score)
if self.nlg_eval_mode is not None:
if self.nlg_eval_mode == 'nli':
output_score_nli = output_score[:,1]
return output_score_nli
elif self.nlg_eval_mode == 'bin':
return output_score
elif self.nlg_eval_mode == 'reg':
return output_score
else:
ValueError("unrecognized nlg eval mode")
return output_score
def batch_tokenize(self, premise, hypo):
"""
input premise and hypos are lists
"""
assert isinstance(premise, list) and isinstance(hypo, list)
assert len(premise) == len(hypo), "premise and hypo should be in the same length."
batch = []
for mini_batch_pre, mini_batch_hypo in zip(self.chunks(premise, self.batch_size), self.chunks(hypo, self.batch_size)):
try:
mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation='only_first', padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt')
except:
print('text_b too long...')
mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation=True, padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt')
batch.append(mini_batch)
return batch
def chunks(self, lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
if __name__ == "__main__":
alignScore = AlignScoreCS.from_pretrained("krotima1/AlignScoreCS")
alignScore.to("cuda" if torch.cuda.is_available() else "cpu")
print("Tomáš miluje Zuzku!", "|", "Tomáš miluje Petru!",alignScore.score("Tomáš miluje Zuzku!", "Tomáš miluje Petru."))
print("Tomáš miluje Zuzku!", "|", "Tomáš miluje Zuzku!",alignScore.score("Tomáš miluje Zuzku!", "Tomáš miluje Zuzku!"))
print("Tomáš miluje Zuzku.", "|", "Zuzka miluje Tomáše.",alignScore.score("Tomáš miluje Zuzku!", "Zuzka miluje Tomáše."))
print("Tomáš miluje Zuzku.", "|", "Zuzka nemiluje Tomáše.",alignScore.score("Tomáš miluje Zuzku!", "Zuzka nemiluje Tomáše."))
print("Tomáš miluje Zuzku.", "|", "Tomáš nemiluje Zuzku.",alignScore.score("Tomáš miluje Zuzku!", "Tomáš nemiluje Zuzku."))
print("Dva chlapi se perou.", "|", "Je tu bitka.",alignScore.score("Dva chlapi se perou.", "Je tu bitka."))
print("Dva chlapi se perou.", "|", "Je tu láska.",alignScore.score("Dva chlapi se perou.", "Je tu láska."))
print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byl to Karel.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byl to Karel."))
print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byl to Vít.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byl to Vít."))
print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byla to katedrála.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byla to katedrála."))
print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je Otec.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je Otec."))
print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je Otec vlasti.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je Otec vlasti."))
print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je katedrála svatého Víta.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je katedrála svatého Víta."))
print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka utekla vklovi.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka utekla vklovi."))
print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka neutekla vklovi.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka neutekla vklovi."))
print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Vlk snědl Karkulku.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Vlk snědl karkulku."))
print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Vlk nesnědl Karkulku.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Vlk nesnědl karkulku."))
print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka snědla vlka.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka snědla vlka."))
print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka dala vlkovi jablko.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka dala vlkovi jablko."))