| | import transformers |
| | from transformers import PretrainedConfig |
| | import os |
| | from pathlib import Path |
| | import numpy as np |
| | from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union |
| | import torch.nn as nn |
| | import torch |
| | |
| | |
| | from tqdm import tqdm |
| |
|
| | class AlignScoreCS(transformers.XLMRobertaModel): |
| | """ |
| | ALIGNSCORE class |
| | |
| | Description: |
| | Model ALIGNSCORECS has been trained according the paper for 3 days on 4GPUs AMD NVIDIA. |
| | (3 epochs, 1e-5 learning rate, 1e-6 AdamWeps, batchsize 32, WarmupRatio 0.06, 0.1 WeighDecay) |
| | - XLMROBERTA-base model with 3 classification HEAD {regression,binary,3way} using shared encoder |
| | |
| | USAGE: AlignScore.py |
| | - from_pretrained - loads the model, usage as transformers.model |
| | - .score(context, claim) - function |
| | - returns probs of the ALIGNED class using 3way class head as in the paper. |
| | |
| | alignScoreCS = AlignScoreCS.from_pretrained("/mnt/data/factcheck/AlignScore-data/AAmodel/MTLModel/mo |
| | alignScoreCS.score(context,claim) |
| | |
| | If you want to try different classification head use parameter: |
| | - task_name = "re" : regression head |
| | - task_name = "bin" : binary classification head |
| | - task_name = "3way" : 3way classification head |
| | |
| | """ |
| | _regression_model = "re_model" |
| | _binary_class_model = "bin_model" |
| | _3way_class_model = "3way_model" |
| | |
| | def __init__(self, encoder, taskmodels_dict, model_name= "xlm-roberta-large", **kwargs): |
| | super().__init__(transformers.XLMRobertaConfig(), **kwargs) |
| | self.encoder = encoder |
| | self.taskmodels_dict = nn.ModuleDict(taskmodels_dict) |
| | self.tokenizer = None |
| | self.model_name = model_name |
| | self.inferencer = None |
| | |
| | def init_inferencer(self, device = "cuda"): |
| | self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer |
| | self.inferencer = self.InferenceHandler(self, self.tokenizer, device) |
| | |
| | |
| | |
| | """ |
| | Score: scores the context and claim with Aligned probabitlity of 3way classification head |
| | - using paper code inferencer from ALignScore |
| | |
| | """ |
| | def score(self, context, claim, **kwargs): |
| | if self.inferencer is None: |
| | self.init_inferencer() |
| | scores = self.inferencer.nlg_eval(context, claim) |
| | return scores |
| | |
| | """ |
| | Score: scores the context and claim with ALIGNED probability (wrt task_name ["re" | "bin" | "3way"]) |
| | |
| | Returns the probability of the ALIGNED CLASS between context text and claim text |
| | - chunks text by 350 tokens and splits claim into sentences |
| | - using 3way classification head |
| | """ |
| | def score_sentences(self, context :str, claim :str, task_name = "3way", batch_size = 2, return_all_outputs = False, **kwargs): |
| | self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer |
| | chunked_inputs = self.chunk_sent_input(context,claim, chunk_size=350,chunk_claim_size=150) |
| | nclaims, ncontexts = (chunked_inputs["n_claims"],chunked_inputs["n_contexts"]) |
| | with torch.no_grad(): |
| | chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items() if not key.startswith("n_")} |
| | chunked_outputs = {} |
| | for i in range(0,len(chunked_inputs["input_ids"]),batch_size): |
| | tmp = self.forward(task_name = task_name,**{"input_ids":chunked_inputs["input_ids"][i:i+batch_size],"attention_mask" :chunked_inputs["attention_mask"][i:i+batch_size]}, **kwargs) |
| | for k, item in tmp.items(): |
| | chunked_outputs[k] = chunked_outputs.get(k, []) + [item] |
| | logits = torch.vstack(chunked_outputs["logits"]).cpu() |
| | outputs = {"score" : self.alignscore_input(logits,nclaims=nclaims,ncontexts=ncontexts, task_name=task_name)} |
| | outputs["outputs"] = chunked_outputs |
| | return torch.tensor([outputs["score"]]) if not return_all_outputs else outputs |
| | |
| | |
| | """ |
| | Score: scores the context and claim with ALIGNED probability (wrt task_name ["re" | "bin" | "3way"]) |
| | |
| | Returns the probability of the ALIGNED CLASS between context text and claim text |
| | - chunks text into 350 tolens and chunks claim into 150 tokens |
| | - using 3way classification head |
| | """ |
| | def score_chunks(self, context :str, claim :str, task_name = "3way", batch_size = 2, return_all_outputs = False, **kwargs): |
| | self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer |
| | chunked_inputs = self.chunk_inputs(context,claim, chunk_size=350) |
| | chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items()} |
| | chunked_outputs = self.forward(task_name = task_name, **chunked_inputs, **kwargs) |
| | outputs = {"score" : self.alignscore_input_deprecated(chunked_outputs.logits.cpu(), task_name=task_name)} |
| | outputs["outputs"] = chunked_outputs |
| | return outputs["score"] if not return_all_outputs else outputs |
| | |
| | """ |
| | Classify: classify the context and claim to the class label given the task_name ["re" | "bin" | "3way"] |
| | |
| | Returns the class of {Neutral, contradict, aligned} between context text and claim text |
| | - using 3way classification head |
| | """ |
| | def classify(self, context :str, claim :str, task_name = "3way", return_all_outputs = False, **kwargs): |
| | self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer |
| | chunked_inputs = self.chunk_inputs(context,claim, chunk_size=350) |
| | chunked_inputs = {key : torch.tensor(item).to(self.device) for key, item in chunked_inputs.items()} |
| | chunked_outputs = self.forward(task_name = task_name, **chunked_inputs, **kwargs) |
| | outputs = {"class" : self.get_system_label(chunked_outputs.logits.cpu(), task_name=task_name)} |
| | outputs["outputs"] = chunked_outputs |
| | return outputs["class"] if not return_all_outputs else outputs |
| | |
| | |
| | def score_truncated(self, context :str, claim :str, task_name = "3way", return_all_outputs = False, **kwargs): |
| | self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer |
| | tokenized_inputs = self.tokenizer(list(zip([context], [claim])), padding = "max_length", truncation = True, max_length = 512, return_tensors="pt") |
| | tokenized_inputs = {key : torch.tensor(item).to(self.device) for key, item in tokenized_inputs.items()} |
| | with torch.no_grad(): |
| | model_outputs = self.forward(task_name=task_name, **tokenized_inputs, **kwargs) |
| | outputs = {"score" : self.alignscore_input(model_outputs["logits"].cpu(),nclaims=1, ncontexts=1, task_name=task_name)} |
| | outputs["outputs"] = model_outputs |
| | return torch.tensor([outputs["score"]]) if not return_all_outputs else outputs |
| | |
| | def forward(self, task_name = "3way", **kwargs): |
| | return self.taskmodels_dict[task_name](**kwargs) |
| | |
| | def __call__(self, task_name, **kwargs): |
| | return self.taskmodels_dict[task_name](**kwargs) |
| | |
| | """ |
| | Get the probability of the ALIGNED label from input |
| | """ |
| | def alignscore_input(self, chunked_logits, nclaims, ncontexts, task_name = "3way"): |
| | if task_name == "re": |
| | ouptuts = chunked_logits.detach() |
| | |
| | reshaped_tensor = ouptuts.view(nclaims, ncontexts) |
| |
|
| | |
| | max_values, _ = reshaped_tensor.max(dim=1) |
| |
|
| | |
| | mean_of_maxes = torch.mean(max_values, dim=0) |
| | return mean_of_maxes.tolist() |
| | else: |
| | nlabels = {"3way" : 3, "re" : 1, "2way" : 2}[task_name] |
| | ouptuts = chunked_logits.softmax(1).detach() |
| | |
| | reshaped_tensor = ouptuts.view(nclaims, ncontexts, nlabels) |
| |
|
| | |
| | max_values, _ = torch.max(reshaped_tensor[:, :, 1], dim=1) |
| |
|
| | |
| | mean_of_maxes = torch.mean(max_values, dim=0) |
| | return mean_of_maxes.tolist() |
| | |
| | |
| | def alignscore_input_deprecated(self, chunked_logits, task_name = "3way"): |
| | if task_name == "re": |
| | return chunked_logits.detach().amax(0).tolist() |
| | else: |
| | return chunked_logits.softmax(1).detach()[:, 1].amax(0).tolist() |
| | |
| | |
| | """ |
| | get the label from the input |
| | """ |
| | def get_system_label(self, chunked_logits, task_name): |
| | if task_name == "re": |
| | return (chunked_logits.sum(0) / chunked_logits.size()[0]).detach().tolist() |
| | else: |
| | avg_probs = chunked_logits.softmax(1).sum(0) / chunked_logits.size()[0] |
| | numpy_array = chunked_logits.softmax(1).argmax(1).detach().numpy() |
| | |
| | unique_values, counts = np.unique(numpy_array, return_counts=True) |
| | |
| | max_count = np.max(counts) |
| | |
| | most_frequent_values = unique_values[counts == max_count] |
| | return most_frequent_values[0] if most_frequent_values.size == 1 else avg_probs.detach().argmax().tolist() |
| |
|
| | """ |
| | Chunks input context and claim - context is chunked into 350 tokens |
| | - claim is chunked into sentences |
| | - using stride for overflowing tokens |
| | """ |
| | def chunk_sent_input(self, context, claim, max_length = 512, chunk_size = 350, chunk_claim_size = 150): |
| | assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, chunk_claim_size, max_length) |
| | chunk_claim_size = max_length - chunk_size if chunk_claim_size is None else chunk_claim_size |
| | assert chunk_size + chunk_claim_size <= max_length, "Chunk size {} and Chunk claim size {} cannot be together greater than max size {}".format(chunk_size, chunk_claim_size, max_length) |
| | return_chunked_inputs = {} |
| | context_chunks = self.chunk_text(context, chunk_size=chunk_size, overflowing_tokens_stride = 25, first_special_token=[0]) |
| | claim_chunks = self.chunk_sentences(claim, chunk_size=chunk_claim_size,overflowing_tokens_stride=int(chunk_claim_size/3), first_special_token=[2]) |
| | for claim_chunk in claim_chunks: |
| | for context_chunk in context_chunks: |
| | inputs,attention =self.fill_with_pad_tokens(context_chunk,claim_chunk ) |
| | return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs] |
| | return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention] |
| | return_chunked_inputs["n_claims"] = len(claim_chunks) |
| | return_chunked_inputs["n_contexts"] = len(context_chunks) |
| | return return_chunked_inputs |
| | |
| | """ |
| | According to paper - chunk the text into smaller parts (350tokens + claim_tokens) when the tokenized inputs exceed the max_length |
| | returns chunked input |
| | """ |
| | def chunk_inputs(self, context, claim, max_length = 512, chunk_size = 512, first_fit_within_max_length = True): |
| | assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, max_length) |
| | |
| | tokenized_claim = self.tokenizer(claim, return_length=True) |
| | tokenized_claim["input_ids"][0] = 2 |
| | tokenized_context = self.tokenizer(context, return_length = True) |
| | assert tokenized_claim["length"][0] < max_length*4/5, "Create chunks of claim sentences. Claim is too long {} which is more than 4/5 from {}.".format(tokenized_claim["length"][0], max_length) |
| |
|
| | |
| | chunk_size = min(max_length, chunk_size + tokenized_claim["length"][0]) |
| | |
| | first_check_max_size = max_length if first_fit_within_max_length else chunk_size |
| | |
| | if tokenized_claim["length"][0] + tokenized_context["length"][0] <= first_check_max_size: |
| | input_ids, attention_mask = self.fill_with_pad_tokens(tokenized_context["input_ids"],tokenized_claim["input_ids"]) |
| | return {"input_ids" : [input_ids], "attention_mask" : [attention_mask]} |
| | else: |
| | return_chunked_inputs = {} |
| | current_chunk = {} |
| | for sentence in sent_tokenize(context, language="czech"): |
| | tok_sent = self.tokenizer(sentence, return_length=True) |
| | if len(current_chunk.get("input_ids",[0])) + tok_sent["length"][0] - 1 + tokenized_claim["length"][0] <= chunk_size: |
| | current_chunk["input_ids"] = current_chunk.get("input_ids",[0]) + tok_sent["input_ids"][1:-1] |
| | else: |
| | return_chunked_inputs = self._update_chunked_inputs(tokenized_claim, current_chunk, return_chunked_inputs, max_length, tok_sent) |
| | current_chunk["input_ids"] = [0] + tok_sent["input_ids"][1:-1] |
| | if current_chunk != {}: |
| | return_chunked_inputs = self._update_chunked_inputs(tokenized_claim, current_chunk, return_chunked_inputs, max_length) |
| | current_chunk = {} |
| | return return_chunked_inputs |
| | |
| | """ |
| | Chunks input context and claim - context is chunked into 350 tokens |
| | - claim is chunked into 150 tokens |
| | - using stride for overflowing tokens |
| | """ |
| | def chunk_input_deprecated(self, context, claim, max_length = 512, chunk_size = 350, chunk_claim_size = 150): |
| | assert chunk_size <= max_length, "Chunk size {} cannot be greater than max size {}".format(chunk_size, chunk_claim_size, max_length) |
| | chunk_claim_size = max_length - chunk_size if chunk_claim_size is None else chunk_claim_size |
| | assert chunk_size + chunk_claim_size <= max_length, "Chunk size {} and Chunk claim size {} cannot be together greater than max size {}".format(chunk_size, chunk_claim_size, max_length) |
| | return_chunked_inputs = {} |
| | context_chunks = self.chunk_text(context, chunk_size=chunk_size, overflowing_tokens_stride = 25, first_special_token=[0]) |
| | claim_chunks = self.chunk_text(claim, chunk_size=chunk_claim_size,overflowing_tokens_stride=int(chunk_claim_size/3), first_special_token=[2]) |
| | for claim_chunk in claim_chunks: |
| | for context_chunk in context_chunks: |
| | inputs,attention =self.fill_with_pad_tokens(context_chunk,claim_chunk ) |
| | return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs] |
| | return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention] |
| | return_chunked_inputs["n_claims"] = len(claim_chunks) |
| | return_chunked_inputs["n_contexts"] = len(context_chunks) |
| | return return_chunked_inputs |
| | |
| | |
| | """ |
| | Chunk texts into blocks of chunk_size tokens |
| | |
| | """ |
| | def chunk_text(self, text, chunk_size = 350, overflowing_tokens_stride = 25, language="czech", first_special_token = [0]): |
| | sentences = sent_tokenize(text, language=language) |
| | tokenized = self.tokenizer(sentences if sentences != [] else [""], return_length=True) |
| | chunks = [] |
| | chunk, current_chunk_size = ([], 0) |
| | for i, length in enumerate(tokenized["length"]): |
| | |
| | |
| | |
| | if length > chunk_size: |
| | splits = [first_special_token + tokenized["input_ids"][i][max(1,cs):min(cs + chunk_size - 2, length - 1)] + [2] for cs in range(0, length , chunk_size-(2+overflowing_tokens_stride))] |
| | |
| | else: |
| | splits = [first_special_token + tokenized["input_ids"][i][1:]] |
| | |
| | |
| | for subsentence in splits: |
| | up_length = len(subsentence) - 2 |
| | |
| | |
| | if current_chunk_size == 0: |
| | current_chunk_size = up_length + 2 |
| | chunk = subsentence[:-1] |
| | |
| | elif current_chunk_size + up_length <= chunk_size: |
| | current_chunk_size += up_length |
| | chunk += subsentence[1:-1] |
| | |
| | else: |
| | chunks += [chunk + [2]] |
| | current_chunk_size = up_length + 2 |
| | chunk = subsentence[:-1] |
| | |
| | if chunk != []: |
| | chunks += [chunk + [2]] |
| | |
| | |
| | return chunks |
| | |
| | """ |
| | Chunks text into sentences using nlt.sent_tokenize |
| | """ |
| | def chunk_sentences(self, text, chunk_size, overflowing_tokens_stride = 0, language="czech", sentence_window = 2, first_special_token = [2]): |
| | sentences = sent_tokenize(text, language=language) |
| | tokenized = self.tokenizer(sentences if sentences != [] else [""], return_length=True) |
| | chunks = [] |
| | current_chunk = [] |
| | for i, length in enumerate(tokenized["length"]): |
| | |
| | |
| | if length > chunk_size: |
| | splits = [first_special_token + tokenized["input_ids"][i][max(1,cs):min(cs + chunk_size - 2, length - 1)] + [2] for cs in range(0, length , chunk_size-(2+overflowing_tokens_stride))] |
| | |
| | else: |
| | splits = [first_special_token + tokenized["input_ids"][i][1:]] |
| | |
| | |
| | for split in splits: |
| | chunks += [split] |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | return chunks |
| |
|
| | """ |
| | join context and claim tokens as input_ids and create attention_mask |
| | """ |
| | def fill_with_pad_tokens(self, first, second, max_length=512, pad_token = 1): |
| | return first + second + [pad_token]*max(max_length-len(first)-len(second),0), [1]*(len(first)+len(second)) + [0]*max(max_length-len(first)-len(second),0) |
| | |
| | |
| | def _update_chunked_inputs(self, tokenized_claim, current_chunk, return_chunked_inputs, max_length, tok_sent = {"input_ids" : []}): |
| | |
| | if len(current_chunk.get("input_ids",[0])) + tokenized_claim["length"][0] >= max_length: |
| | chunk = current_chunk["input_ids"].copy()[:max_length-tokenized_claim["length"][0]-1] + [2] |
| | elif not current_chunk.get("input_ids",False): |
| | chunk = tok_sent["input_ids"][: max_length - tokenized_claim["length"][0] -1] + [2] |
| | else: |
| | chunk = current_chunk["input_ids"].copy() + [2] |
| | claim_ids = tokenized_claim["input_ids"].copy() |
| | inputs, attention = self.fill_with_pad_tokens(chunk,claim_ids ) |
| | return_chunked_inputs["input_ids"] = return_chunked_inputs.get("input_ids",[]) + [inputs] |
| | return_chunked_inputs["attention_mask"] = return_chunked_inputs.get("attention_mask",[]) + [attention] |
| | return return_chunked_inputs |
| | |
| | @classmethod |
| | def get_encoder_attr_name(cls, model): |
| | """ |
| | The encoder transformer is named differently in each model "architecture". |
| | This method lets us get the name of the encoder attribute |
| | """ |
| | model_class_name = model.__class__.__name__ |
| | if model_class_name.startswith("XLMRoberta"): |
| | return "roberta" |
| | else: |
| | raise KeyError(f"Add support for new model {model_class_name}") |
| | |
| | |
| | @classmethod |
| | def from_pretrained( |
| | cls, |
| | pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], |
| | model_name : str = "xlm-roberta-large", |
| | *model_args, |
| | config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None, |
| | cache_dir: Optional[Union[str, os.PathLike]] = None, |
| | ignore_mismatched_sizes: bool = False, |
| | force_download: bool = False, |
| | local_files_only: bool = False, |
| | token: Optional[Union[str, bool]] = None, |
| | revision: str = "main", |
| | use_safetensors: bool = None, |
| | **kwargs, |
| | ): |
| | |
| | if all(os.path.exists(os.path.join(pretrained_model_name_or_path, model_dir)) for model_dir in [cls._3way_class_model, cls._regression_model, cls._binary_class_model]): |
| | |
| | |
| | |
| | |
| | |
| | |
| | transformers.logging.set_verbosity_error() |
| |
|
| | shared_encoder = None |
| | taskmodels_dict = {} |
| | for path_name in [cls._regression_model, cls._binary_class_model, cls._3way_class_model]: |
| | task_name = path_name.split("_")[0] |
| | |
| | |
| | task_config = transformers.XLMRobertaConfig.from_json_file("{}/{}/config.json".format(pretrained_model_name_or_path,path_name)) |
| | |
| | model = transformers.XLMRobertaForSequenceClassification.from_pretrained(model_name, config=task_config,*model_args,**kwargs) |
| | |
| | model.load_state_dict(torch.load("{}/{}/pytorch_model.bin".format(pretrained_model_name_or_path,path_name), map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu'))) |
| | |
| | if shared_encoder is None: |
| | shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) |
| | else: |
| | setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) |
| | taskmodels_dict[task_name] = model |
| |
|
| | |
| | alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) |
| | |
| | else: |
| | shared_encoder = None |
| | taskmodels_dict = {} |
| | for model_dir in [cls._regression_model, cls._binary_class_model, cls._3way_class_model]: |
| | task_name = model_dir.split("_")[0] |
| | config = transformers.XLMRobertaConfig.from_pretrained(f"{pretrained_model_name_or_path}", subfolder=model_dir) |
| | model = transformers.XLMRobertaForSequenceClassification.from_pretrained(f"{pretrained_model_name_or_path}",config=config, subfolder=model_dir) |
| | if shared_encoder is None: |
| | shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) |
| | else: |
| | setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) |
| | taskmodels_dict[task_name] = model |
| | alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) |
| |
|
| | return alignScoreCS |
| | |
| | |
| | def save_pretrained( |
| | self, |
| | save_directory: Union[str, os.PathLike], |
| | is_main_process: bool = True, |
| | state_dict: Optional[dict] = None, |
| | save_function: Callable = torch.save, |
| | push_to_hub: bool = False, |
| | max_shard_size: Union[int, str] = "10GB", |
| | safe_serialization: bool = False, |
| | variant: Optional[str] = None, |
| | token: Optional[Union[str, bool]] = None, |
| | save_peft_format: bool = True, |
| | **kwargs, |
| | ): |
| | for task_name, model_type in self.taskmodels_dict.items(): |
| | model_type.save_pretrained(save_directory = Path(save_directory,task_name+"_model"), |
| | is_main_process = is_main_process, |
| | state_dict = state_dict, |
| | save_function = save_function, |
| | push_to_hub = push_to_hub, |
| | max_shard_size = max_shard_size, |
| | safe_serialization = safe_serialization, |
| | variant = variant, |
| | token = token, |
| | save_peft_format = save_peft_format, |
| | **kwargs) |
| |
|
| | |
| | |
| | class InferenceHandler: |
| | def __init__(self, model, tokenizer, device = "cuda"): |
| | self.model = model |
| | self.device = device |
| | self.tokenizer = tokenizer |
| | self.model.to(self.device) |
| | self.model.eval() |
| | self.batch_size = 32 |
| | self.nlg_eval_mode = "nli_sp" |
| | self.verbose = False |
| | self.task_name = "3way" |
| | self.softmax = nn.Softmax(dim=-1) |
| | |
| | def nlg_eval(self, premise, hypo): |
| | if isinstance(premise, str) and isinstance(hypo, str): |
| | premise = [premise] |
| | hypo = [hypo] |
| | return self.inference_example_batch(premise, hypo) |
| | |
| | def inference_example_batch(self, premise: list, hypo: list): |
| | """ |
| | inference a example, |
| | premise: list |
| | hypo: list |
| | using self.inference to batch the process |
| | |
| | SummaC Style aggregation |
| | """ |
| | self.disable_progress_bar_in_inference = True |
| | assert len(premise) == len(hypo), "Premise must has the same length with Hypothesis!" |
| |
|
| | out_score = [] |
| | for one_pre, one_hypo in tqdm(zip(premise, hypo), desc="Evaluating", total=len(premise), disable=(not self.verbose)): |
| | out_score.append(self.inference_per_example(one_pre, one_hypo)) |
| | |
| | return torch.tensor(out_score) |
| | |
| | def inference_per_example(self, premise:str, hypo: str): |
| | """ |
| | inference a example, |
| | premise: string |
| | hypo: string |
| | using self.inference to batch the process |
| | """ |
| | def chunks(lst, n): |
| | """Yield successive n-sized chunks from lst.""" |
| | for i in range(0, len(lst), n): |
| | yield ' '.join(lst[i:i + n]) |
| | |
| | premise_sents = sent_tokenize(premise) |
| | premise_sents = premise_sents or [''] |
| |
|
| | n_chunk = len(premise.strip().split()) // 350 + 1 |
| | n_chunk = max(len(premise_sents) // n_chunk, 1) |
| | premise_sents = [each for each in chunks(premise_sents, n_chunk)] |
| |
|
| | hypo_sents = sent_tokenize(hypo) |
| |
|
| | premise_sent_mat = [] |
| | hypo_sents_mat = [] |
| | for i in range(len(premise_sents)): |
| | for j in range(len(hypo_sents)): |
| | premise_sent_mat.append(premise_sents[i]) |
| | hypo_sents_mat.append(hypo_sents[j]) |
| | |
| | if self.nlg_eval_mode is not None: |
| | if self.nlg_eval_mode == 'nli_sp': |
| | output_score = self.inference(premise_sent_mat, hypo_sents_mat)[:,1] |
| | output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() |
| | |
| | return output_score |
| |
|
| | |
| | output_score = self.inference(premise_sent_mat, hypo_sents_mat) |
| | output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() |
| |
|
| | return output_score |
| |
|
| | def inference(self, premise, hypo, task_name = None): |
| | """ |
| | inference a list of premise and hypo |
| | |
| | Standard aggregation |
| | """ |
| | task_name = self.task_name if task_name is None else task_name |
| | if isinstance(premise, str) and isinstance(hypo, str): |
| | premise = [premise] |
| | hypo = [hypo] |
| | |
| | batch = self.batch_tokenize(premise, hypo) |
| | output_score = [] |
| |
|
| | for mini_batch in tqdm(batch, desc="Evaluating", disable=not self.verbose or self.disable_progress_bar_in_inference): |
| | mini_batch = mini_batch.to(self.device) |
| | with torch.no_grad(): |
| | model_output = self.model.forward(task_name=task_name, **mini_batch) |
| | model_output = model_output.logits |
| | if task_name == "re": |
| | model_output = model_output.cpu() |
| | else: |
| | model_output = self.softmax(model_output).cpu() |
| | output_score.append(model_output[:,:]) |
| | |
| | output_score = torch.cat(output_score) |
| | |
| | if self.nlg_eval_mode is not None: |
| | if self.nlg_eval_mode == 'nli': |
| | output_score_nli = output_score[:,1] |
| | return output_score_nli |
| | elif self.nlg_eval_mode == 'bin': |
| | return output_score |
| | elif self.nlg_eval_mode == 'reg': |
| | return output_score |
| | else: |
| | ValueError("unrecognized nlg eval mode") |
| |
|
| | |
| | return output_score |
| | |
| | def batch_tokenize(self, premise, hypo): |
| | """ |
| | input premise and hypos are lists |
| | """ |
| | assert isinstance(premise, list) and isinstance(hypo, list) |
| | assert len(premise) == len(hypo), "premise and hypo should be in the same length." |
| |
|
| | batch = [] |
| | for mini_batch_pre, mini_batch_hypo in zip(self.chunks(premise, self.batch_size), self.chunks(hypo, self.batch_size)): |
| | try: |
| | mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation='only_first', padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt') |
| | except: |
| | print('text_b too long...') |
| | mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation=True, padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt') |
| | batch.append(mini_batch) |
| |
|
| | return batch |
| | |
| | def chunks(self, lst, n): |
| | """Yield successive n-sized chunks from lst.""" |
| | for i in range(0, len(lst), n): |
| | yield lst[i:i + n] |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | alignScore = AlignScoreCS.from_pretrained("krotima1/AlignScoreCS") |
| | alignScore.to("cuda" if torch.cuda.is_available() else "cpu") |
| | |
| | print("Tomáš miluje Zuzku!", "|", "Tomáš miluje Petru!",alignScore.score("Tomáš miluje Zuzku!", "Tomáš miluje Petru.")) |
| | print("Tomáš miluje Zuzku!", "|", "Tomáš miluje Zuzku!",alignScore.score("Tomáš miluje Zuzku!", "Tomáš miluje Zuzku!")) |
| | print("Tomáš miluje Zuzku.", "|", "Zuzka miluje Tomáše.",alignScore.score("Tomáš miluje Zuzku!", "Zuzka miluje Tomáše.")) |
| | print("Tomáš miluje Zuzku.", "|", "Zuzka nemiluje Tomáše.",alignScore.score("Tomáš miluje Zuzku!", "Zuzka nemiluje Tomáše.")) |
| | print("Tomáš miluje Zuzku.", "|", "Tomáš nemiluje Zuzku.",alignScore.score("Tomáš miluje Zuzku!", "Tomáš nemiluje Zuzku.")) |
| | print("Dva chlapi se perou.", "|", "Je tu bitka.",alignScore.score("Dva chlapi se perou.", "Je tu bitka.")) |
| | print("Dva chlapi se perou.", "|", "Je tu láska.",alignScore.score("Dva chlapi se perou.", "Je tu láska.")) |
| | print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byl to Karel.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byl to Karel.")) |
| | print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byl to Vít.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byl to Vít.")) |
| | print("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta. \n Kdo nechal vystavět katedrálu?", "|", "Byla to katedrála.",alignScore.score("Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.\nKdo nechal vystavět katedrálu?", "Byla to katedrála.")) |
| | print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je Otec.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je Otec.")) |
| | print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je Otec vlasti.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je Otec vlasti.")) |
| | print("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "|", "Je katedrála svatého Víta.",alignScore.score("Kdo je Karel IV.? Karel IV. je Otec vlasti. Nechal postavit katedrálu svatého Víta.", "Je katedrála svatého Víta.")) |
| | print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka utekla vklovi.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka utekla vklovi.")) |
| | print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka neutekla vklovi.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka neutekla vklovi.")) |
| | print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Vlk snědl Karkulku.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Vlk snědl karkulku.")) |
| | print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Vlk nesnědl Karkulku.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Vlk nesnědl karkulku.")) |
| | print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka snědla vlka.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka snědla vlka.")) |
| | print("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "|", "Karkulka dala vlkovi jablko.",alignScore.score("Karkulka šla do lesa. V lese potkala vlka. Vlk ji zkoušel sníst, ale Karkulka se nedala a Vlkovi utekla!", "Karkulka dala vlkovi jablko.")) |
| |
|