from transformers import AutoTokenizer,AutoModelForSequenceClassification
import torch.nn.functional as F
import numpy as np
import pandas as pd
from nltk.tokenize import sent_tokenize
import torch
import requests
import torch
import xgboost as xgb
class HallucinationPipeline:
def __init__(self,nli_model_name,device,llm_model=None):
# NLI Models Setting
self.tokenizer=AutoTokenizer.from_pretrained(nli_model_name)
self.nli_model=AutoModelForSequenceClassification.from_pretrained(nli_model_name).to(device)
self.labels=["CONTRADICTION","NEUTRAL","ENTAILMENT"]
self.device=device
self.arbiter=False
# Large NLI model
self.roberta_large=BatchNLI("roberta-large-mnli",device=device)
# self.roberta_large_ynie=BatchNLI("ynie/roberta-large-snli_mnli_fever_anli_R1_R2_R3-nli",device=device)
self.roberta_large_ynie=BatchNLI("MoritzLaurer/DeBERTa-v3-large-mnli-fever-anli-ling-wanli",device=device)
# self.roberta_large_ynie=BatchNLI("facebook/bart-large-mnli",device=device)
# self.roberta_large_ynie=BatchNLI("microsoft/deberta-v2-xxlarge-mnli",device=device)
# Correction models
self.llm=llm_model
# Arbiter Model
# self.arbiter=torch.load("./Arbiter-dataset/best_model.pt")
# self.arbiterModel=NeuralNetwork()
self.arbiter_model=xgb.XGBClassifier()
self.arbiter_model.load_model("./Models/ArbiterModel/arbiter_xgboost_model.json")
# Storing the results of the predicted models
self.detection_predicted_labels=[]
self.detection_predicted_labels_after_correction=[]
self.corrected_summary=[]
# -------------------------------------------------------------
# | FUNCTION 1 : Detecting the hallucinated parts using NLI. |
# -------------------------------------------------------------
def Detection(self,premise,hypothesis):
inputs=self.tokenizer(premise,hypothesis,return_tensors="pt",truncation=True,padding=True)
inputs = {key: val.to(self.device) for key, val in inputs.items()}
with torch.no_grad():
logit=self.nli_model(**inputs).logits
prob=F.softmax(logit,dim=-1)
prediction=torch.argmax(prob,dim=-1)
entailment_score=prob[:,2]
contra_score=prob[:,0]
# print("Logits : ",logit)
# print("Probability : \n",prob)
# print("Label : ",[self.labels[i] for i in prediction])
# print("\n")
# return prediction.tolist()
return prediction.cpu().tolist(),entailment_score.cpu().tolist(), contra_score.cpu().tolist()
# ----------------------------------------------------------------------
# | FUNCTION 2 : This involves using the LLM to correct the summaries. |
# ----------------------------------------------------------------------
def Correction(self,premise,prompt):
correction=self.llm.correct(premise,prompt)
return correction
# ---------------------------------------------------------------------------
# | FUNCTION 3 : This involves adding the Tag to the summary ... |
# ---------------------------------------------------------------------------
def addTags(self,sentences_list,sent_predicted_labels,size):
for i in range(size):
if sent_predicted_labels[i]==0:
sentences_list[i]=f" {sentences_list[i]} "
prompt_ready_summary=" ".join(sentences_list)
return prompt_ready_summary
# ------------------------------------------------------------------------------------------------------
# | FUNCTION 4 : This function helps to chunk the article, because max token for base nli model is 256.|
# ------------------------------------------------------------------------------------------------------
def chunk_article(self,article,chunk_size=240,stride=128):
input_ids=self.tokenizer.encode(article)
chunks=[]
for i in range(0,len(input_ids),stride):
new_token=input_ids[i:i+chunk_size]
decoded_article=self.tokenizer.decode(new_token)
chunks.append(decoded_article)
return chunks
# -------------------------------------------------------------------------------------------
# | FUNCTION 5 : Decider neural network which combines the scores and give the final result |
# -------------------------------------------------------------------------------------------
# def arbiterNeuralNetwork(self,data):
# data=torch.Tensor(data).to(self.device)
# self.arbiterModel=self.arbiterModel.to(self.device)
# self.arbiterModel.load_state_dict(self.arbiter["model_state_dict"])
# results=self.arbiterModel(data)
# return (torch.argmax(results,dim=-1).tolist(),torch.mean(results[::,0]).item(),torch.mean(results[::,2]).item())
def arbiterModel(self,data):
result=self.arbiter_model.predict_proba(data)
sent_pred_label=np.argmax(result,axis=1)
sent_pred_label=np.where(sent_pred_label==1,2,0)
return sent_pred_label,result[::,0],result[::,1]
# --------------------------------------------------
# | FUNCTION 6 : Integrated all the above methods. |
# --------------------------------------------------
# Input : DataFrame(Premise, Hypothesis, labels[Optional])
def process(self,df,correct_the_summary=False,arbiter=False):
# Empty previous outputs.
self.arbiter=arbiter
self.detection_predicted_labels=[]
self.detection_predicted_labels_after_correction=[]
self.corrected_summary=[]
sentence_predicted_labels=[]
summary_factual_score=[]
summary_contradiction_score=[]
all_features=[]
count=1
# Starting the Pipeline of Detection and Correction
for premise,hypo in np.array(df):
# | Step 1 |: Splitting and detecting each sentence.
splitting_sentences= sent_tokenize(hypo)
size_of_sentence=len(splitting_sentences)
chunks=self.chunk_article(premise,chunk_size=384,stride=256)
chunk_sent_pred=[]
chunk_sent_score=[]
chunk_contra_score=[]
chunk_roberta_large_entail=[]
chunk_roberta_large_neutral=[]
chunk_roberta_large_contra=[]
chunk_roberta_large_ynie_contra=[]
chunk_roberta_large_ynie_neutral=[]
chunk_roberta_large_ynie_entail=[]
for article_part in chunks:
premises=[article_part]*size_of_sentence
# NLI Model output.
sent_predicted_labels,sent_predicted_scores, sent_contra_scores= self.Detection(premises, splitting_sentences)
chunk_sent_pred.append(sent_predicted_labels)
chunk_sent_score.append(sent_predicted_scores)
chunk_contra_score.append(sent_contra_scores)
# Other metrics score
if arbiter:
# rouge,entity_overlap,sbert,tfidf=self.metrics.calculate_all(premises,splitting_sentences)
contra,neutral,entail=self.roberta_large.predict(premises,splitting_sentences)
chunk_roberta_large_contra.append(contra)
chunk_roberta_large_neutral.append(neutral)
chunk_roberta_large_entail.append(entail)
entail,neutral,contra=self.roberta_large_ynie.predict(premises,splitting_sentences)
chunk_roberta_large_ynie_contra.append(contra)
chunk_roberta_large_ynie_neutral.append(neutral)
chunk_roberta_large_ynie_entail.append(entail)
# Stacking scores of all chunks. [Columns: Chunks, Row:summary_sentences]
chunk_sent_pred=np.stack(chunk_sent_pred,axis=1)
chunk_sent_score=np.stack(chunk_sent_score,axis=1)
chunk_contra_score=np.stack(chunk_contra_score,axis=1)
sent_predicted_labels=np.max(chunk_sent_pred,axis=1)
factual_score=np.max(chunk_sent_score,axis=1)
contra_score=np.min(chunk_contra_score,axis=1)
# Taking the score which gives more information.
if arbiter:
chunk_roberta_large_entail = np.stack(chunk_roberta_large_entail, axis=1)
chunk_roberta_large_neutral = np.stack(chunk_roberta_large_neutral, axis=1)
chunk_roberta_large_contra = np.stack(chunk_roberta_large_contra, axis=1)
# DeBERTa
chunk_roberta_large_ynie_entail = np.stack(chunk_roberta_large_ynie_entail, axis=1)
chunk_roberta_large_ynie_neutral = np.stack(chunk_roberta_large_ynie_neutral, axis=1)
chunk_roberta_large_ynie_contra = np.stack(chunk_roberta_large_ynie_contra, axis=1)
roberta_large_contra=np.min(chunk_roberta_large_contra,axis=1)
roberta_large_neutral=np.median(chunk_roberta_large_neutral,axis=1)
roberta_large_entail=np.max(chunk_roberta_large_entail,axis=1)
roberta_large_ynie_contra=np.min(chunk_roberta_large_ynie_contra,axis=1)
roberta_large_ynie_neutral=np.median(chunk_roberta_large_ynie_neutral,axis=1)
roberta_large_ynie_entail=np.max(chunk_roberta_large_ynie_entail,axis=1)
# COMBINING FEATURES
features=np.stack([factual_score,contra_score,
roberta_large_contra,roberta_large_neutral,roberta_large_entail,
roberta_large_ynie_contra,roberta_large_ynie_neutral,roberta_large_ynie_entail
],
axis=1)
# all_features.append(features)
# Arbiter Neural Network Output
sent_predicted_labels,contra_score,factual_score=self.arbiterModel(features)
# sent_predicted_labels,contra_score,factual_score=self.arbiterNeuralNetwork(features)
# summary_factual_score.append(np.mean(factual_score))
# summary_contradiction_score.append(np.mean(contra_score))
summary_factual_score.append(np.mean(factual_score))
summary_contradiction_score.append(np.mean(contra_score))
# Label whether summary is factually correct or not
# SummaryPrediction=2 if factual_score>contra_score else 0
SummaryPrediction=0 if 0 in sent_predicted_labels else 2
sentence_predicted_labels.append(sent_predicted_labels)
self.detection_predicted_labels.append(int(SummaryPrediction))
# | Step 2 |: Correction of the Summary using LLMs, if parameter correct_the_summary=True.
if correct_the_summary:
if SummaryPrediction<=1:
prompt=self.addTags(splitting_sentences,sent_predicted_labels,size_of_sentence)
corrected_result=self.Correction(premise,prompt)
self.corrected_summary.append(corrected_result)
else:
self.corrected_summary.append(None)
print("Detected : ",count)
count+=1
output={
"predictions":self.detection_predicted_labels,
"corrected_summary":self.corrected_summary,
"sent_predicted":sentence_predicted_labels,
"factual_score":summary_factual_score,
"contradiction_score":summary_contradiction_score,
"features":all_features
}
return output
# ---------------------------------------------------------
# | Optional FUNCTION : Integrated all the above methods. |
# ---------------------------------------------------------
def fetchData(self,url,depth=[],articleFieldName=None,summaryFieldName=None,limit=None):
try:
data=requests.get(url)
data=data.json()
article=[]
summary=[]
count=limit
for field in depth:
data=data[field]
if type(data)!=list:
data=[data]
for index,i in enumerate(data):
print("Index : ",index)
if i.get(articleFieldName):
article.append(str(i[articleFieldName]))
if summaryFieldName and i[summaryFieldName]:
summary.append(str(i[summaryFieldName]))
else:
#Create Summary and append
result=self.llm.create(str(i[articleFieldName]))
summary.append(result)
if count:
count-=1
if count<1:
break
# Detection and getting Factual Score.
data=np.column_stack([article,summary])
result=self.process(data,correct_the_summary=False)
data=np.column_stack([data,result["predictions"],result["factual_score"]])
df=pd.DataFrame(data,columns=["Article","Summary","Prediction","Factuality"])
return df
except Exception as e:
print("Failed to fetch data:", e)
return None
class BatchNLI:
def __init__(self, model_name, device="cpu"):
self.device = device
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name,num_labels=3,use_safetensors=True)
self.model.to(self.device)
self.model.eval()
def predict(self, premises, hypotheses):
# Tokenize entire batch at once
inputs = self.tokenizer(premises, hypotheses, return_tensors="pt", padding=True, truncation=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Forward pass
with torch.no_grad():
logits = self.model(**inputs).logits
probs = F.softmax(logits, dim=-1)
contra=probs[::,0].cpu().tolist()
neutral=probs[::,1].cpu().tolist()
entail=probs[::,2].cpu().tolist()
return contra,neutral,entail