Spaces:
Sleeping
Sleeping
File size: 5,256 Bytes
84bf79a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import random
import math
import json
import numpy as np
import os
from huggingface_hub import HfApi
from constants import HF_DATASET_REPO_NAME, HF_REPO_TYPE
supported_models = [
"llama-3.1-8b-instant", # "llama3-8b-8192",
# "llama-3.3-70b-versatile", # "llama3-70b-8192",
"gemma2-9b-it", # "gemma-7b-it",
"deepseek-r1-distill-llama-70b", # "DeepSeek‑R1‑distill‑llama‑70b",
"qwen/qwen3-32b"
]
def processdata(instance, noise_rate, passage_num, filename, correct_rate = 0):
query = instance['query']
ans = instance['answer']
neg_num = math.ceil(passage_num * noise_rate)
pos_num = passage_num - neg_num
if '_int' in filename:
for i in instance['positive']:
random.shuffle(i)
print(len(instance['positive']))
docs = [i[0] for i in instance['positive']]
if len(docs) < pos_num:
maxnum = max([len(i) for i in instance['positive']])
for i in range(1,maxnum):
for j in instance['positive']:
if len(j) > i:
docs.append(j[i])
if len(docs) == pos_num:
break
if len(docs) == pos_num:
break
neg_num = passage_num - len(docs)
if neg_num > 0:
negative = instance['negative'][:neg_num]
docs += negative
elif '_fact' in filename:
correct_num = math.ceil(passage_num * correct_rate)
pos_num = passage_num - neg_num - correct_num
indexs = list(range(len(instance['positive'])))
selected = random.sample(indexs,min(len(indexs),pos_num))
docs = [instance['positive_wrong'][i] for i in selected]
remain = [i for i in indexs if i not in selected]
if correct_num > 0 and len(remain) > 0:
docs += [instance['positive'][i] for i in random.sample(remain,min(len(remain),correct_num))]
if neg_num > 0:
docs += instance['negative'][:neg_num]
else:
if noise_rate == 1:
neg_num = passage_num
pos_num = 0
else:
if neg_num > len(instance['negative']):
neg_num = len(instance['negative'])
pos_num = passage_num - neg_num
elif pos_num > len(instance['positive']):
pos_num = len(instance['positive'])
neg_num = passage_num - pos_num
positive = instance['positive'][:pos_num]
negative = instance['negative'][:neg_num]
docs = positive + negative
random.shuffle(docs)
return query, ans, docs
def checkanswer(prediction, ground_truth):
prediction = prediction.lower()
if type(ground_truth) is not list:
ground_truth = [ground_truth]
labels = []
for instance in ground_truth:
flag = True
if type(instance) == list:
flag = False
instance = [i.lower() for i in instance]
for i in instance:
if i in prediction:
flag = True
break
else:
instance = instance.lower()
if instance not in prediction:
flag = False
labels.append(int(flag))
return labels
def getevalue(results):
results = np.array(results)
results = np.max(results,axis = 0)
if 0 in results:
return False
else:
return True
def predict(query, ground_truth, docs, model, system, instruction, temperature, dataset):
'''
label: 0 for positive, 1 for negative, -1 for not enough information
'''
if len(docs) == 0:
text = instruction.format(QUERY=query, DOCS='')
prediction = model.generate(text, temperature)
else:
docs = '\n'.join(docs)
text = instruction.format(QUERY=query, DOCS=docs)
prediction = model.generate(text, temperature, system)
if 'zh' in dataset:
prediction = prediction.replace(" ","")
if '信息不足' in prediction or 'insufficient information' in prediction:
labels = [-1]
else:
labels = checkanswer(prediction, ground_truth)
factlabel = 0
if '事实性错误' in prediction or 'factual errors' in prediction:
factlabel = 1
return labels,prediction, factlabel
def upload_file(filename: str, folder_path: str) -> str:
"""Upload a file to Hugging Face hub from the specified folder."""
try:
# file_path = os.path.join(folder_path, filename)
# if not os.path.exists(file_path):
# raise FileNotFoundError(f"File {file_path} does not exist.")
api = HfApi()
api.upload_file(
path_or_fileobj=filename,
path_in_repo=f"{folder_path}/{filename}",
repo_id=HF_DATASET_REPO_NAME,
repo_type=HF_REPO_TYPE,
token=os.getenv("HF_TOKEN")
)
print(f"Uploaded {filename} to {HF_DATASET_REPO_NAME}")
return True
except Exception as e:
print(f"Error uploading {filename}: {e}")
return None |