import transformers from transformers import PretrainedConfig import os from pathlib import Path import numpy as np from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union import torch.nn as nn import torch # This include should be add when using different AlignScoreFunction methods instead of score() # from nltk.tokenize import sent_tokenize from tqdm import tqdm class AlignScoreCS(transformers.XLMRobertaModel): """ ALIGNSCORE class Description: Model ALIGNSCORECS has been trained according the paper for 3 days on 4GPUs AMD NVIDIA. (3 epochs, 1e-5 learning rate, 1e-6 AdamWeps, batchsize 32, WarmupRatio 0.06, 0.1 WeighDecay) - XLMROBERTA-base model with 3 classification HEAD {regression,binary,3way} using shared encoder USAGE: AlignScore.py - from_pretrained - loads the model, usage as transformers.model - .score(context, claim) - function - returns probs of the ALIGNED class using 3way class head as in the paper. alignScoreCS = AlignScoreCS.from_pretrained("/mnt/data/factcheck/AlignScore-data/AAmodel/MTLModel/mo alignScoreCS.score(context,claim) If you want to try different classification head use parameter: - task_name = "re" : regression head - task_name = "bin" : binary classification head - task_name = "3way" : 3way classification head """ _regression_model = "re_model" _binary_class_model = "bin_model" _3way_class_model = "3way_model" def __init__(self, encoder, taskmodels_dict, model_name= "xlm-roberta-large", **kwargs): super().__init__(transformers.XLMRobertaConfig(), **kwargs) self.encoder = encoder self.taskmodels_dict = nn.ModuleDict(taskmodels_dict) self.tokenizer = None self.model_name = model_name self.inferencer = None def init_inferencer(self, device = "cuda"): self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer self.inferencer = self.InferenceHandler(self, self.tokenizer, device) """ Score: scores the context and claim with Aligned probabitlity of 3way classification head - using paper code inferencer from ALignScore """ def score(self, context, claim, **kwargs): if self.inferencer is None: self.init_inferencer() scores = self.inferencer.nlg_eval(context, claim) return scores """ Score: scores the context and claim with ALIGNED probability (wrt task_name ["re" | "bin" | "3way"]) Returns the probability of the ALIGNED CLASS between context text and claim text - chunks text by 350 tokens and splits claim into sentences - using 3way classification head """ def score_sentences(self, context :str, claim :str, task_name = "3way", batch_size = 2, return_all_outputs = False, **kwargs): self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer chunked_inputs = self.chunk_sent_input(context,claim, chunk_size=350,chunk_claim_size=150) nclaims, ncontexts = (chunked_inputs["n_claims"],chunked_inputs["n_contexts"]) with torch.no_grad(): chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items() if not key.startswith("n_")} chunked_outputs = {} for i in range(0,len(chunked_inputs["input_ids"]),batch_size): tmp = self.forward(task_name = task_name,**{"input_ids":chunked_inputs["input_ids"][i:i+batch_size],"attention_mask" :chunked_inputs["attention_mask"][i:i+batch_size]}, **kwargs) for k, item in tmp.items(): chunked_outputs[k] = chunked_outputs.get(k, []) + [item] logits = torch.vstack(chunked_outputs["logits"]).cpu() outputs = {"score" : self.alignscore_input(logits,nclaims=nclaims,ncontexts=ncontexts, task_name=task_name)} outputs["outputs"] = chunked_outputs return torch.tensor([outputs["score"]]) if not return_all_outputs else outputs """ Score: scores the context and claim with ALIGNED probability (wrt task_name ["re" | "bin" | "3way"]) Returns the probability of the ALIGNED CLASS between context text and claim text - chunks text into 350 tolens and chunks claim into 150 tokens - using 3way classification head """ def score_chunks(self, context :str, claim :str, task_name = "3way", batch_size = 2, return_all_outputs = False, **kwargs): self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer chunked_inputs = self.chunk_inputs(context,claim, chunk_size=350) chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items()} chunked_outputs = self.forward(task_name = task_name, **chunked_inputs, **kwargs) outputs = {"score" : self.alignscore_input_deprecated(chunked_outputs.logits.cpu(), task_name=task_name)} outputs["outputs"] = chunked_outputs return outputs["score"] if not return_all_outputs else outputs """ Classify: classify the context and claim to the class label given the task_name ["re" | "bin" | "3way"] Returns the class of {Neutral, contradict, aligned} between context text and claim text - using 3way classification head """ def classify(self, context :str, claim :str, task_name = "3way", return_all_outputs = False, **kwargs): self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer chunked_inputs = self.chunk_inputs(context,claim, chunk_size=350) chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items()} chunked_outputs = self.forward(task_name = task_name, **chunked_inputs, **kwargs) outputs = {"class" : self.get_system_label(chunked_outputs.logits.cpu(), task_name=task_name)} outputs["outputs"] = chunked_outputs return outputs["class"] if not return_all_outputs else outputs def score_truncated(self, context :str, claim :str, task_name = "3way", return_all_outputs = False, **kwargs): self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer tokenized_inputs = self.tokenizer(list(zip([context], [claim])), padding = "max_length", truncation = True, max_length = 512, return_tensors="pt") tokenized_inputs = {key : torch.tensor(item).to(self.device) for key, item in tokenized_inputs.items()} with torch.no_grad(): model_outputs = self.forward(task_name=task_name, **tokenized_inputs, **kwargs) outputs = {"score" : self.alignscore_input(model_outputs["logits"].cpu(),nclaims=1, ncontexts=1, task_name=task_name)} outputs["outputs"] = model_outputs return torch.tensor([outputs["score"]]) if not return_all_outputs else outputs def forward(self, task_name = "3way", **kwargs): return self.taskmodels_dict[task_name](**kwargs) def __call__(self, task_name, **kwargs): return self.taskmodels_dict[task_name](**kwargs) """ Get the probability of the ALIGNED label from input """ def alignscore_input(self, chunked_logits, nclaims, ncontexts, task_name = "3way"): if task_name == "re": ouptuts = chunked_logits.detach() # Reshape the tensor to separate each block of n rows reshaped_tensor = ouptuts.view(nclaims, ncontexts) # Extract the maximum values from the first column (index 0) within each block of n rows max_values, _ = reshaped_tensor.max(dim=1) # Calculate the mean of the max values for each block of n rows mean_of_maxes = torch.mean(max_values, dim=0) return mean_of_maxes.tolist() else: nlabels = {"3way" : 3, "re" : 1, "2way" : 2}[task_name] ouptuts = chunked_logits.softmax(1).detach() # Reshape the tensor to separate each block of n rows reshaped_tensor = ouptuts.view(nclaims, ncontexts, nlabels) # Extract the maximum values from the first column (index 0) within each block of n rows max_values, _ = torch.max(reshaped_tensor[:, :, 1], dim=1) # Calculate the mean of the max values for each block of n rows mean_of_maxes = torch.mean(max_values, dim=0) return mean_of_maxes.tolist() def alignscore_input_deprecated(self, chunked_logits, task_name = "3way"): if task_name == "re": return chunked_logits.detach().amax(0).tolist() else: return chunked_logits.softmax(1).detach()[:, 1].amax(0).tolist() # return max probability over the ALIGNED class """ get the label from the input """ def get_system_label(self, chunked_logits, task_name): if task_name == "re": return (chunked_logits.sum(0) / chunked_logits.size()[0]).detach().tolist() else: avg_probs = chunked_logits.softmax(1).sum(0) / chunked_logits.size()[0] numpy_array = chunked_logits.softmax(1).argmax(1).detach().numpy() # Calculate the frequencies of each value unique_values, counts = np.unique(numpy_array, return_counts=True) # Find the maximum count max_count = np.max(counts) # Find all values with the maximum count most_frequent_values = unique_values[counts == max_count] return most_frequent_values[0] if most_frequent_values.size == 1 else avg_probs.detach().argmax().tolist() """ Chunks input context and claim - context is chunked into 350 tokens - claim is chunked into sentences - using stride for overflowing tokens """ def chunk_sent_input(self, context, claim, max_length = 512, chunk_size = 350, chunk_claim_size = 150): assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, chunk_claim_size, max_length) chunk_claim_size = max_length - chunk_size if chunk_claim_size is None else chunk_claim_size assert chunk_size + chunk_claim_size <= max_length, "Chunk size {} and Chunk claim size {} cannot be together greater than max size {}".format(chunk_size, chunk_claim_size, max_length) return_chunked_inputs = {} context_chunks = self.chunk_text(context, chunk_size=chunk_size, overflowing_tokens_stride = 25, first_special_token=[0]) claim_chunks = self.chunk_sentences(claim, chunk_size=chunk_claim_size,overflowing_tokens_stride=int(chunk_claim_size/3), first_special_token=[2]) for claim_chunk in claim_chunks: for context_chunk in context_chunks: inputs,attention =self.fill_with_pad_tokens(context_chunk,claim_chunk ) return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs] return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention] return_chunked_inputs["n_claims"] = len(claim_chunks) return_chunked_inputs["n_contexts"] = len(context_chunks) return return_chunked_inputs """ According to paper - chunk the text into smaller parts (350tokens + claim_tokens) when the tokenized inputs exceed the max_length returns chunked input """ def chunk_inputs(self, context, claim, max_length = 512, chunk_size = 512, first_fit_within_max_length = True): assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, max_length) tokenized_claim = self.tokenizer(claim, return_length=True) tokenized_claim["input_ids"][0] = 2 # token according to pair tokenization where the separator of the context and claim is tokenized_context = self.tokenizer(context, return_length = True) assert tokenized_claim["length"][0] < max_length*4/5, "Create chunks of claim sentences. Claim is too long {} which is more than 4/5 from {}.".format(tokenized_claim["length"][0], max_length) # set chunk size to incorporate the claim size as well chunk_size = min(max_length, chunk_size + tokenized_claim["length"][0]) first_check_max_size = max_length if first_fit_within_max_length else chunk_size if tokenized_claim["length"][0] + tokenized_context["length"][0] <= first_check_max_size: #if it fits within max_length input_ids, attention_mask = self.fill_with_pad_tokens(tokenized_context["input_ids"],tokenized_claim["input_ids"]) return {"input_ids" : [input_ids], "attention_mask" : [attention_mask]} else: # make chunks return_chunked_inputs = {} current_chunk = {} for sentence in sent_tokenize(context, language="czech"): tok_sent = self.tokenizer(sentence, return_length=True) if len(current_chunk.get("input_ids",[0])) + tok_sent["length"][0] - 1 + tokenized_claim["length"][0] <= chunk_size: current_chunk["input_ids"] = current_chunk.get("input_ids",[0]) + tok_sent["input_ids"][1:-1] else: return_chunked_inputs = self._update_chunked_inputs(tokenized_claim, current_chunk, return_chunked_inputs, max_length, tok_sent) current_chunk["input_ids"] = [0] + tok_sent["input_ids"][1:-1] if current_chunk != {}: # add the rest return_chunked_inputs = self._update_chunked_inputs(tokenized_claim, current_chunk, return_chunked_inputs, max_length) current_chunk = {} return return_chunked_inputs """ Chunks input context and claim - context is chunked into 350 tokens - claim is chunked into 150 tokens - using stride for overflowing tokens """ def chunk_input_deprecated(self, context, claim, max_length = 512, chunk_size = 350, chunk_claim_size = 150): assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, chunk_claim_size, max_length) chunk_claim_size = max_length - chunk_size if chunk_claim_size is None else chunk_claim_size assert chunk_size + chunk_claim_size <= max_length, "Chunk size {} and Chunk claim size {} cannot be together greater than max size {}".format(chunk_size, chunk_claim_size, max_length) return_chunked_inputs = {} context_chunks = self.chunk_text(context, chunk_size=chunk_size, overflowing_tokens_stride = 25, first_special_token=[0]) claim_chunks = self.chunk_text(claim, chunk_size=chunk_claim_size,overflowing_tokens_stride=int(chunk_claim_size/3), first_special_token=[2]) for claim_chunk in claim_chunks: for context_chunk in context_chunks: inputs,attention =self.fill_with_pad_tokens(context_chunk,claim_chunk ) return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs] return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention] return_chunked_inputs["n_claims"] = len(claim_chunks) return_chunked_inputs["n_contexts"] = len(context_chunks) return return_chunked_inputs """ Chunk texts into blocks of chunk_size tokens """ def chunk_text(self, text, chunk_size = 350, overflowing_tokens_stride = 25, language="czech", first_special_token = [0]): sentences = sent_tokenize(text, language=language) tokenized = self.tokenizer(sentences if sentences != [] else [""], return_length=True) chunks = [] chunk, current_chunk_size = ([], 0) for i, length in enumerate(tokenized["length"]): # WRAP THE TOKENIZED SENTNECE INTO LIST TO HANDLE OVERFLOWING TOKENS EASILY # Case when length of one sentence is longer than the chunk size - split the sentence into chunks of chunk size if length > chunk_size: splits = [first_special_token + tokenized["input_ids"][i][max(1,cs):min(cs + chunk_size - 2, length - 1)] + [2] for cs in range(0, length , chunk_size-(2+overflowing_tokens_stride))] # Case when lenght of sequence is equal or smaller than the chunk size - only continue else: splits = [first_special_token + tokenized["input_ids"][i][1:]] # Go through sentence or splits of sentence for subsentence in splits: up_length = len(subsentence) - 2 # Case when the current chunk = 0 if current_chunk_size == 0: current_chunk_size = up_length + 2 # First include and tokens chunk = subsentence[:-1] # Case when the current chunk + length of new subsentence <= chunk_size - only add elif current_chunk_size + up_length <= chunk_size: current_chunk_size += up_length chunk += subsentence[1:-1] # Case when the current chunk + length of new subsentence > chunk_size - create chunk else: chunks += [chunk + [2]] current_chunk_size = up_length + 2 # First include and tokens chunk = subsentence[:-1] #Case when the loop ended but the current chunk isnt saved in the chunks if chunk != []: chunks += [chunk + [2]] # lengths = [len(ch) for ch in chunks] # print("Lenght in tokens of ",len(lengths)," chunks (AVG=",np.mean(lengths),",MAX=",np.max(lengths),",MIN=", np.min(lengths),")") return chunks """ Chunks text into sentences using nlt.sent_tokenize """ def chunk_sentences(self, text, chunk_size, overflowing_tokens_stride = 0, language="czech", sentence_window = 2, first_special_token = [2]): sentences = sent_tokenize(text, language=language) tokenized = self.tokenizer(sentences if sentences != [] else [""], return_length=True) chunks = [] current_chunk = [] for i, length in enumerate(tokenized["length"]): # WRAP THE TOKENIZED SENTNECE INTO LIST TO HANDLE OVERFLOWING TOKENS EASILY # Case when length of one sentence is longer than the chunk size - split the sentence into chunks of chunk size if length > chunk_size: splits = [first_special_token + tokenized["input_ids"][i][max(1,cs):min(cs + chunk_size - 2, length - 1)] + [2] for cs in range(0, length , chunk_size-(2+overflowing_tokens_stride))] # Case when lenght of sequence is equal or smaller than the chunk size - only continue else: splits = [first_special_token + tokenized["input_ids"][i][1:]] #Go through sentence or parts of sentence and create chunks for split in splits: chunks += [split] # if len(current_chunk) == sentence_window: # chunks += [first_special_token + [item for row in current_chunk for item in row] + [2]] # current_chunk = current_chunk[1:] + [split[1:-1]] # else: # current_chunk += [split[1:-1]] # if chunks == []: # chunks += [first_special_token + [item for row in current_chunk for item in row] + [2]] return chunks """ join context and claim tokens as input_ids and create attention_mask """ def fill_with_pad_tokens(self, first, second, max_length=512, pad_token = 1): return first + second + [pad_token]*max(max_length-len(first)-len(second),0), [1]*(len(first)+len(second)) + [0]*max(max_length-len(first)-len(second),0) def _update_chunked_inputs(self, tokenized_claim, current_chunk, return_chunked_inputs, max_length, tok_sent = {"input_ids" : []}): # truncate if there is a long sentence (rare occurrences) if len(current_chunk.get("input_ids",[0])) + tokenized_claim["length"][0] >= max_length: chunk = current_chunk["input_ids"].copy()[:max_length-tokenized_claim["length"][0]-1] + [2] elif not current_chunk.get("input_ids",False): chunk = tok_sent["input_ids"][: max_length - tokenized_claim["length"][0] -1] + [2] else: chunk = current_chunk["input_ids"].copy() + [2] # add end of sentence claim_ids = tokenized_claim["input_ids"].copy() inputs, attention = self.fill_with_pad_tokens(chunk,claim_ids ) return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs] return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention] return return_chunked_inputs @classmethod def get_encoder_attr_name(cls, model): """ The encoder transformer is named differently in each model "architecture". This method lets us get the name of the encoder attribute """ model_class_name = model.__class__.__name__ if model_class_name.startswith("XLMRoberta"): return "roberta" else: raise KeyError(f"Add support for new model {model_class_name}") @classmethod def from_pretrained( cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], model_name : str = "xlm-roberta-large", *model_args, config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None, cache_dir: Optional[Union[str, os.PathLike]] = None, ignore_mismatched_sizes: bool = False, force_download: bool = False, local_files_only: bool = False, token: Optional[Union[str, bool]] = None, revision: str = "main", use_safetensors: bool = None, **kwargs, ): # Check if the required model directories exist then load it from file if all(os.path.exists(os.path.join(pretrained_model_name_or_path, model_dir)) for model_dir in [cls._3way_class_model, cls._regression_model, cls._binary_class_model]): # assert all( # for model_dir in [cls._3way_class_model, cls._regression_model, cls._binary_class_model] # ), "Error: Required model directories not found!" # Disable the warning about newly initialized weights transformers.logging.set_verbosity_error() shared_encoder = None taskmodels_dict = {} for path_name in [cls._regression_model, cls._binary_class_model, cls._3way_class_model]: task_name = path_name.split("_")[0] # Load the configuration for the task-specific model task_config = transformers.XLMRobertaConfig.from_json_file("{}/{}/config.json".format(pretrained_model_name_or_path,path_name)) # Create the task-specific model model = transformers.XLMRobertaForSequenceClassification.from_pretrained(model_name, config=task_config,*model_args,**kwargs) # Load the weights for the task-specific model model.load_state_dict(torch.load("{}/{}/pytorch_model.bin".format(pretrained_model_name_or_path,path_name), map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu'))) # Set the shared encoder to the model's encoder if shared_encoder is None: shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) else: setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) taskmodels_dict[task_name] = model # Create the AlignScoreCS with the shared encoder and loaded task-specific models alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) #Try load the model from huggingface hub else: shared_encoder = None taskmodels_dict = {} for model_dir in [cls._regression_model, cls._binary_class_model, cls._3way_class_model]: task_name = model_dir.split("_")[0] config = transformers.XLMRobertaConfig.from_pretrained(f"{pretrained_model_name_or_path}", subfolder=model_dir) model = transformers.XLMRobertaForSequenceClassification.from_pretrained(f"{pretrained_model_name_or_path}",config=config, subfolder=model_dir) if shared_encoder is None: shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) else: setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) taskmodels_dict[task_name] = model alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) return alignScoreCS def save_pretrained( self, save_directory: Union[str, os.PathLike], is_main_process: bool = True, state_dict: Optional[dict] = None, save_function: Callable = torch.save, push_to_hub: bool = False, max_shard_size: Union[int, str] = "10GB", safe_serialization: bool = False, variant: Optional[str] = None, token: Optional[Union[str, bool]] = None, save_peft_format: bool = True, **kwargs, ): for task_name, model_type in self.taskmodels_dict.items(): model_type.save_pretrained(save_directory = Path(save_directory,task_name+"_model"), is_main_process = is_main_process, state_dict = state_dict, save_function = save_function, push_to_hub = push_to_hub, max_shard_size = max_shard_size, safe_serialization = safe_serialization, variant = variant, token = token, save_peft_format = save_peft_format, **kwargs) # This piece of code is copied from AlignScore github repository # if you want to use different nlg_eval_mode you have to fix errors on your own class InferenceHandler: def __init__(self, model, tokenizer, device = "cuda"): self.model = model self.device = device self.tokenizer = tokenizer self.model.to(self.device) self.model.eval() self.batch_size = 32 self.nlg_eval_mode = "nli_sp" self.verbose = False self.task_name = "3way" self.softmax = nn.Softmax(dim=-1) def nlg_eval(self, premise, hypo): if isinstance(premise, str) and isinstance(hypo, str): premise = [premise] hypo = [hypo] return self.inference_example_batch(premise, hypo) def inference_example_batch(self, premise: list, hypo: list): """ inference a example, premise: list hypo: list using self.inference to batch the process SummaC Style aggregation """ self.disable_progress_bar_in_inference = True assert len(premise) == len(hypo), "Premise must has the same length with Hypothesis!" out_score = [] for one_pre, one_hypo in tqdm(zip(premise, hypo), desc="Evaluating", total=len(premise), disable=(not self.verbose)): out_score.append(self.inference_per_example(one_pre, one_hypo)) return torch.tensor(out_score) def inference_per_example(self, premise:str, hypo: str): """ inference a example, premise: string hypo: string using self.inference to batch the process """ def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield ' '.join(lst[i:i + n]) premise_sents = sent_tokenize(premise) premise_sents = premise_sents or [''] n_chunk = len(premise.strip().split()) // 350 + 1 n_chunk = max(len(premise_sents) // n_chunk, 1) premise_sents = [each for each in chunks(premise_sents, n_chunk)] hypo_sents = sent_tokenize(hypo) premise_sent_mat = [] hypo_sents_mat = [] for i in range(len(premise_sents)): for j in range(len(hypo_sents)): premise_sent_mat.append(premise_sents[i]) hypo_sents_mat.append(hypo_sents[j]) if self.nlg_eval_mode is not None: if self.nlg_eval_mode == 'nli_sp': output_score = self.inference(premise_sent_mat, hypo_sents_mat)[:,1] ### use NLI head OR ALIGN head output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() ### sum or mean depends on the task/aspect return output_score output_score = self.inference(premise_sent_mat, hypo_sents_mat) ### use NLI head OR ALIGN head output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() ### sum or mean depends on the task/aspect return output_score def inference(self, premise, hypo, task_name = None): """ inference a list of premise and hypo Standard aggregation """ task_name = self.task_name if task_name is None else task_name if isinstance(premise, str) and isinstance(hypo, str): premise = [premise] hypo = [hypo] batch = self.batch_tokenize(premise, hypo) output_score = [] for mini_batch in tqdm(batch, desc="Evaluating", disable=not self.verbose or self.disable_progress_bar_in_inference): mini_batch = mini_batch.to(self.device) with torch.no_grad(): model_output = self.model.forward(task_name=task_name, **mini_batch) model_output = model_output.logits if task_name == "re": model_output = model_output.cpu() else: model_output = self.softmax(model_output).cpu() output_score.append(model_output[:,:]) output_score = torch.cat(output_score) if self.nlg_eval_mode is not None: if self.nlg_eval_mode == 'nli': output_score_nli = output_score[:,1] return output_score_nli elif self.nlg_eval_mode == 'bin': return output_score elif self.nlg_eval_mode == 'reg': return output_score else: ValueError("unrecognized nlg eval mode") return output_score def batch_tokenize(self, premise, hypo): """ input premise and hypos are lists """ assert isinstance(premise, list) and isinstance(hypo, list) assert len(premise) == len(hypo), "premise and hypo should be in the same length." batch = [] for mini_batch_pre, mini_batch_hypo in zip(self.chunks(premise, self.batch_size), self.chunks(hypo, self.batch_size)): try: mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation='only_first', padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt') except: print('text_b too long...') mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation=True, padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt') batch.append(mini_batch) return batch def chunks(self, lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] if __name__ == "__main__": alignScore = AlignScoreCS.from_pretrained("krotima1/AlignScoreCS") alignScore.to("cuda" if torch.cuda.is_available() else "cpu") print("Tomáš miluje Zuzku!", "|", "Tomáš miluje Petru!",alignScore.score("Tomáš miluje Zuzku!", "Tomáš miluje Petru.")) print("Tomáš miluje Zuzku!", "|", "Tomáš miluje Zuzku!",alignScore.score("Tomáš miluje Zuzku!", "Tomáš miluje Zuzku!")) print("Tomáš miluje Zuzku.", "|", "Zuzka miluje Tomáše.",alignScore.score("Tomáš miluje Zuzku!", "Zuzka miluje Tomáše.")) print("Tomáš miluje Zuzku.", "|", "Zuzka nemiluje Tomáše.",alignScore.score("Tomáš miluje Zuzku!", "Zuzka nemiluje Tomáše.")) print("Tomáš miluje Zuzku.", "|", "Tomáš nemiluje Zuzku.",alignScore.score("Tomáš miluje Zuzku!", "Tomáš nemiluje Zuzku.")) print("Dva chlapi se perou.", "|", "Je tu bitka.",alignScore.score("Dva chlapi se perou.", "Je tu bitka.")) print("Dva chlapi se perou.", "|", "Je tu láska.",alignScore.score("Dva chlapi se perou.", "Je tu láska.")) print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byl to Karel.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byl to Karel.")) print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byl to Vít.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byl to Vít.")) print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byla to katedrála.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byla to katedrála.")) print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je Otec.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je Otec.")) print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je Otec vlasti.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je Otec vlasti.")) print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je katedrála svatého Víta.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je katedrála svatého Víta.")) print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka utekla vklovi.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka utekla vklovi.")) print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka neutekla vklovi.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka neutekla vklovi.")) print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Vlk snědl Karkulku.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Vlk snědl karkulku.")) print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Vlk nesnědl Karkulku.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Vlk nesnědl karkulku.")) print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka snědla vlka.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka snědla vlka.")) print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka dala vlkovi jablko.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka dala vlkovi jablko."))