RGB26Demo / utils.py
Ajaykanth Maddi
Code Changes - Counterfactual Implementation
ade9487
import random
import math
import json
import numpy as np
import os
from huggingface_hub import HfApi
from constants import HF_DATASET_REPO_NAME, HF_REPO_TYPE
supported_models = [
"llama-3.1-8b-instant", # "llama3-8b-8192",
# "llama-3.3-70b-versatile", # "llama3-70b-8192",
"gemma2-9b-it", # "gemma-7b-it",
"deepseek-r1-distill-llama-70b", # "DeepSeek‑R1‑distill‑llama‑70b",
"qwen/qwen3-32b"
]
def processdata(instance, noise_rate, passage_num, filename, correct_rate = 0):
query = instance['query']
ans = instance['answer']
neg_num = math.ceil(passage_num * noise_rate)
pos_num = passage_num - neg_num
if '_int' in filename:
for i in instance['positive']:
random.shuffle(i)
print(len(instance['positive']))
docs = [i[0] for i in instance['positive']]
if len(docs) < pos_num:
maxnum = max([len(i) for i in instance['positive']])
for i in range(1,maxnum):
for j in instance['positive']:
if len(j) > i:
docs.append(j[i])
if len(docs) == pos_num:
break
if len(docs) == pos_num:
break
neg_num = passage_num - len(docs)
if neg_num > 0:
negative = instance['negative'][:neg_num]
docs += negative
elif '_fact' in filename:
correct_num = math.ceil(passage_num * correct_rate)
pos_num = passage_num - neg_num - correct_num
indexs = list(range(len(instance['positive'])))
selected = random.sample(indexs,min(len(indexs),pos_num))
docs = [instance['positive_wrong'][i] for i in selected]
remain = [i for i in indexs if i not in selected]
if correct_num > 0 and len(remain) > 0:
docs += [instance['positive'][i] for i in random.sample(remain,min(len(remain),correct_num))]
if neg_num > 0:
docs += instance['negative'][:neg_num]
else:
if noise_rate == 1:
neg_num = passage_num
pos_num = 0
else:
if neg_num > len(instance['negative']):
neg_num = len(instance['negative'])
pos_num = passage_num - neg_num
elif pos_num > len(instance['positive']):
pos_num = len(instance['positive'])
neg_num = passage_num - pos_num
positive = instance['positive'][:pos_num]
negative = instance['negative'][:neg_num]
docs = positive + negative
random.shuffle(docs)
return query, ans, docs
def checkanswer(prediction, ground_truth):
prediction = prediction.lower()
if type(ground_truth) is not list:
ground_truth = [ground_truth]
labels = []
for instance in ground_truth:
flag = True
if type(instance) == list:
flag = False
instance = [i.lower() for i in instance]
for i in instance:
if i in prediction:
flag = True
break
else:
instance = instance.lower()
if instance not in prediction:
flag = False
labels.append(int(flag))
return labels
def getevalue(results):
results = np.array(results)
results = np.max(results,axis = 0)
if 0 in results:
return False
else:
return True
def predict(query, ground_truth, docs, model, system, instruction, temperature, dataset):
'''
label: 0 for positive, 1 for negative, -1 for not enough information
'''
if len(docs) == 0:
text = instruction.format(QUERY=query, DOCS='')
prediction = model.generate(text, temperature)
else:
docs = '\n'.join(docs)
text = instruction.format(QUERY=query, DOCS=docs)
prediction = model.generate(text, temperature, system)
if 'zh' in dataset:
prediction = prediction.replace(" ","")
if '信息不足' in prediction or 'insufficient information' in prediction:
labels = [-1]
else:
labels = checkanswer(prediction, ground_truth)
factlabel = 0
if '事实性错误' in prediction or 'factual errors' in prediction:
factlabel = 1
return labels,prediction, factlabel
def upload_file(filename: str, folder_path: str) -> str:
"""Upload a file to Hugging Face hub from the specified folder."""
try:
# file_path = os.path.join(folder_path, filename)
# if not os.path.exists(file_path):
# raise FileNotFoundError(f"File {file_path} does not exist.")
api = HfApi()
api.upload_file(
path_or_fileobj=filename,
path_in_repo=f"{folder_path}/{filename}",
repo_id=HF_DATASET_REPO_NAME,
repo_type=HF_REPO_TYPE,
token=os.getenv("HF_TOKEN")
)
print(f"Uploaded {filename} to {HF_DATASET_REPO_NAME}")
return True
except Exception as e:
print(f"Error uploading {filename}: {e}")
return None