import random import math import json import numpy as np import os from huggingface_hub import HfApi from constants import HF_DATASET_REPO_NAME, HF_REPO_TYPE supported_models = [ "llama-3.1-8b-instant", # "llama3-8b-8192", # "llama-3.3-70b-versatile", # "llama3-70b-8192", "gemma2-9b-it", # "gemma-7b-it", "deepseek-r1-distill-llama-70b", # "DeepSeek‑R1‑distill‑llama‑70b", "qwen/qwen3-32b" ] def processdata(instance, noise_rate, passage_num, filename, correct_rate = 0): query = instance['query'] ans = instance['answer'] neg_num = math.ceil(passage_num * noise_rate) pos_num = passage_num - neg_num if '_int' in filename: for i in instance['positive']: random.shuffle(i) print(len(instance['positive'])) docs = [i[0] for i in instance['positive']] if len(docs) < pos_num: maxnum = max([len(i) for i in instance['positive']]) for i in range(1,maxnum): for j in instance['positive']: if len(j) > i: docs.append(j[i]) if len(docs) == pos_num: break if len(docs) == pos_num: break neg_num = passage_num - len(docs) if neg_num > 0: negative = instance['negative'][:neg_num] docs += negative elif '_fact' in filename: correct_num = math.ceil(passage_num * correct_rate) pos_num = passage_num - neg_num - correct_num indexs = list(range(len(instance['positive']))) selected = random.sample(indexs,min(len(indexs),pos_num)) docs = [instance['positive_wrong'][i] for i in selected] remain = [i for i in indexs if i not in selected] if correct_num > 0 and len(remain) > 0: docs += [instance['positive'][i] for i in random.sample(remain,min(len(remain),correct_num))] if neg_num > 0: docs += instance['negative'][:neg_num] else: if noise_rate == 1: neg_num = passage_num pos_num = 0 else: if neg_num > len(instance['negative']): neg_num = len(instance['negative']) pos_num = passage_num - neg_num elif pos_num > len(instance['positive']): pos_num = len(instance['positive']) neg_num = passage_num - pos_num positive = instance['positive'][:pos_num] negative = instance['negative'][:neg_num] docs = positive + negative random.shuffle(docs) return query, ans, docs def checkanswer(prediction, ground_truth): prediction = prediction.lower() if type(ground_truth) is not list: ground_truth = [ground_truth] labels = [] for instance in ground_truth: flag = True if type(instance) == list: flag = False instance = [i.lower() for i in instance] for i in instance: if i in prediction: flag = True break else: instance = instance.lower() if instance not in prediction: flag = False labels.append(int(flag)) return labels def getevalue(results): results = np.array(results) results = np.max(results,axis = 0) if 0 in results: return False else: return True def predict(query, ground_truth, docs, model, system, instruction, temperature, dataset): ''' label: 0 for positive, 1 for negative, -1 for not enough information ''' if len(docs) == 0: text = instruction.format(QUERY=query, DOCS='') prediction = model.generate(text, temperature) else: docs = '\n'.join(docs) text = instruction.format(QUERY=query, DOCS=docs) prediction = model.generate(text, temperature, system) if 'zh' in dataset: prediction = prediction.replace(" ","") if '信息不足' in prediction or 'insufficient information' in prediction: labels = [-1] else: labels = checkanswer(prediction, ground_truth) factlabel = 0 if '事实性错误' in prediction or 'factual errors' in prediction: factlabel = 1 return labels,prediction, factlabel def upload_file(filename: str, folder_path: str) -> str: """Upload a file to Hugging Face hub from the specified folder.""" try: # file_path = os.path.join(folder_path, filename) # if not os.path.exists(file_path): # raise FileNotFoundError(f"File {file_path} does not exist.") api = HfApi() api.upload_file( path_or_fileobj=filename, path_in_repo=f"{folder_path}/{filename}", repo_id=HF_DATASET_REPO_NAME, repo_type=HF_REPO_TYPE, token=os.getenv("HF_TOKEN") ) print(f"Uploaded {filename} to {HF_DATASET_REPO_NAME}") return True except Exception as e: print(f"Error uploading {filename}: {e}") return None