Spaces:
Runtime error
Runtime error
| import os | |
| import time | |
| from datetime import datetime | |
| import logging | |
| from pathlib import Path | |
| import requests | |
| import json | |
| # import numpy as np | |
| import pandas as pd | |
| import spacy | |
| from sentence_transformers import CrossEncoder | |
| import litellm | |
| # from litellm import completion | |
| from tqdm import tqdm | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoConfig, pipeline | |
| # from accelerate import PartialState | |
| # from accelerate.inference import prepare_pippy | |
| import torch | |
| # import cohere | |
| # from openai import OpenAI | |
| # # import google | |
| import google.generativeai as genai | |
| import src.backend.util as util | |
| import src.envs as envs | |
| # | |
| # # import pandas as pd | |
| # import scipy | |
| from scipy.spatial.distance import jensenshannon | |
| from scipy.stats import bootstrap | |
| import numpy as np | |
| import spacy_transformers | |
| import subprocess | |
| # Run the command to download the spaCy model | |
| # subprocess.run(["python", "-m", "spacy", "download", "en_core_web_lg"], check=True) | |
| # subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
| # subprocess.run(["pip", "install", "spacy-transformers"], check=True) | |
| # subprocess.run(["pip", "install", "curated-transformers"], check=True) | |
| # Load spacy model for word tokenization | |
| # nlp = spacy.load("en_core_web_sm") | |
| try: | |
| nlp1 = spacy.load("en_core_web_sm") | |
| except OSError: | |
| print("Can not load spacy model") | |
| # litellm.set_verbose=False | |
| litellm.set_verbose=True | |
| # Set up basic configuration for logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| class ModelLoadingException(Exception): | |
| """Exception raised for errors in loading a model. | |
| Attributes: | |
| model_id (str): The model identifier. | |
| revision (str): The model revision. | |
| """ | |
| def __init__(self, model_id, revision, messages="Error initializing model"): | |
| self.model_id = model_id | |
| self.revision = revision | |
| super().__init__(f"{messages} id={model_id} revision={revision}") | |
| class ResponseGenerator: | |
| """A class to generate responses using a causal language model. | |
| Attributes: | |
| model (str): huggingface/{model_id} | |
| api_base (str): https://api-inference.huggingface.co/models/{model_id} | |
| responses_df (DataFrame): DataFrame to store generated responses. | |
| revision (str): Model revision. | |
| avg_length (float): Average length of responses. | |
| answer_rate (float): Rate of non-empty responses. | |
| """ | |
| def __init__(self, model_id, revision): | |
| """ | |
| Initializes the ResponseGenerator with a model. | |
| Args: | |
| model_id (str): Identifier for the model. | |
| revision (str): Revision of the model. | |
| """ | |
| self.model_id = model_id | |
| self.model = f"huggingface/{model_id}" | |
| self.api_base = f"https://api-inference.huggingface.co/models/{model_id}" | |
| self.responses_df = pd.DataFrame() | |
| self.revision = revision | |
| self.avg_length = None | |
| self.answer_rate = None | |
| self.exceptions = None | |
| self.local_model = None | |
| def generate_response(self, dataset, df_prompt, save_path=None): | |
| """Generate responses for a given DataFrame of source docs. | |
| Args: | |
| dataset (DataFrame): DataFrame containing source docs. | |
| Returns: | |
| responses_df (DataFrame): Generated responses by the model. | |
| """ | |
| exceptions = [] | |
| if (save_path is not None) and os.path.exists(save_path): | |
| '''已存在文件,可以读取已经存在的测试文本''' | |
| self.responses_df = pd.read_csv(save_path) | |
| # print(self.responses_df['Experiment']) | |
| print(f'Loaded generated responses from {save_path}') | |
| else: | |
| '''测试文件不存在,则需要调用指定的模型来进行测试''' | |
| # prompt = {} | |
| # for index, row in tqdm(df_prompt.iterrows(), total=df_prompt.shape[0]): | |
| # prompt['E' + row['Item']] = row['Prompt'] | |
| xls = pd.ExcelFile(dataset) | |
| sheet_names = xls.sheet_names | |
| # sheet_names = df.sheetnames | |
| print(f"Total: {len(sheet_names)}") | |
| print(sheet_names) | |
| Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1 = [], [], [], [], [] ,[], [], [] | |
| exit_outer_loop = False # bad model | |
| for i, sheet_name in enumerate(sheet_names, start=1): | |
| if exit_outer_loop: | |
| break | |
| # 读取每个工作表 | |
| # if i > 2 and i ==1: | |
| # continue | |
| print(i, sheet_name) | |
| df_sheet = pd.read_excel(xls, sheet_name=sheet_name) | |
| # 假设第一列是'Prompt0',但这里我们使用列名来避免硬编码 | |
| if 'Prompt0' in df_sheet.columns: | |
| prompt_column = df_sheet['Prompt0'] | |
| else: | |
| # 如果'Prompt0'列不存在,则跳过该工作表或进行其他处理 | |
| continue | |
| if i == 3 : | |
| word1_list = df_sheet['Stimuli-2'] | |
| word2_list = df_sheet['Stimuli-3'] | |
| V2_column = [] | |
| for jj in range(len(word1_list)): | |
| V2_column.append(word1_list[jj] + '_' + word2_list[jj]) | |
| # print(V2_column) | |
| elif i == 9: | |
| V2_column = df_sheet['V2'] #SL, LS | |
| elif i == 4 or i == 6 : | |
| V2_column = df_sheet['Stimuli-2'] #Stimuli-2 | |
| else: | |
| V2_column = [""] * len(prompt_column) | |
| q_column = df_sheet["ID"] | |
| Item_column = df_sheet["Item"] | |
| Condition_column = df_sheet["Condition"] | |
| Stimuli_1_column = df_sheet["Stimuli-1"] | |
| if 'Stimuli-2' in df_sheet.columns: | |
| Stimuli_2_column = df_sheet["Stimuli-2"] | |
| for j, prompt_value in enumerate(tqdm(prompt_column, desc=f"Processing {sheet_name}"), start=0): | |
| if exit_outer_loop: | |
| break | |
| ID = 'E' + str(i) | |
| # q_ID = ID + '_' + str(j) | |
| # print(ID, q_ID, prompt_value) | |
| system_prompt = envs.SYSTEM_PROMPT | |
| _user_prompt = prompt_value | |
| print(_user_prompt) | |
| for ii in range(100): | |
| # user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}" | |
| while True: | |
| try: | |
| '''调用''' | |
| print(self.model_id.lower(),'-',ID,'-',j,'-',ii) | |
| _response = self.send_request(system_prompt, _user_prompt) | |
| # print(_response) | |
| # print(f"Finish index {index}") | |
| break | |
| except Exception as e: | |
| if 'Rate limit reached' in str(e): | |
| wait_time = 3660 | |
| current_time = datetime.now().strftime('%H:%M:%S') | |
| print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...") | |
| time.sleep(wait_time) | |
| elif 'is currently loading' in str(e): | |
| wait_time = 200 | |
| print(f"Model is loading, wait for {wait_time}") | |
| time.sleep(wait_time) | |
| elif '429 Resource has been exhausted' in str(e): # for gemini models | |
| wait_time = 60 | |
| print(f"Quota has reached, wait for {wait_time}") | |
| time.sleep(wait_time) | |
| else: | |
| max_retries = 30 | |
| retries = 0 | |
| wait_time = 120 | |
| while retries < max_retries: | |
| print(f"Error at index {i}: {e}") | |
| time.sleep(wait_time) | |
| try: | |
| _response = self.send_request(system_prompt, _user_prompt) | |
| break | |
| except Exception as ee: | |
| exceptions.append(ee) | |
| retries += 1 | |
| print(f"Retry {retries}/{max_retries} failed at index {i}: {ee}") | |
| if retries >= max_retries: | |
| exit_outer_loop = True | |
| break | |
| if exit_outer_loop: | |
| break | |
| if i == 5: | |
| #print(_response) | |
| # For E5, the responses might be in the following formats: | |
| # "Sure\n\nThe first sentence of the response\n\nThe second sentence of the response" | |
| # "The first sentence of the response\n\nThe second sentence of the response" | |
| # "XXX: The first sentence of the response\n\nXXX: The second sentence of the response" | |
| # "Sure\n\nXXX: The first sentence of the response\n\nXXX: The second sentence of the response" | |
| # "Sure\n\nThe first sentence of the response\n\nThe second sentence of the response\n\n" | |
| def extract_responses(text, trigger_words=None): | |
| if trigger_words is None: | |
| trigger_words = ["sure", "okay", "yes"] | |
| try: | |
| # Split the text into sentences | |
| sentences = text.split('\n') | |
| # Remove empty sentences | |
| sentences = [sentence.strip() for sentence in sentences if sentence.strip()] | |
| # Remove the first sentence if it has a : in it, | |
| sentences = [sentence.split(':', 1)[-1].strip() if ':' in sentence else sentence for | |
| sentence in sentences] | |
| # Remove empty sentences | |
| sentences = [sentence.strip() for sentence in sentences if sentence.strip()] | |
| # Remove the first sentence if it is a trigger word | |
| if any(sentences[0].lower().startswith(word) for word in trigger_words) and len( | |
| sentences) > 2: | |
| _response1 = sentences[1].strip() if len(sentences) > 1 else None | |
| _response2 = sentences[2].strip() if len(sentences) > 2 else None | |
| else: | |
| _response1 = sentences[0].strip() if len(sentences) > 0 else None | |
| _response2 = sentences[1].strip() if len(sentences) > 1 else None | |
| except Exception as e: | |
| print(f"Error occurred: {e}") | |
| _response1, _response2 = None, None | |
| print(_response1), print(_response2) | |
| return _response1, _response2 | |
| _response1, _response2 = extract_responses(_response) | |
| Experiment_ID.append(ID) | |
| Questions_ID.append(q_column[j]) | |
| User_prompt.append(_user_prompt) | |
| Response.append(_response2) | |
| Factor_2.append(_response) | |
| Stimuli_1.append(Stimuli_2_column[j]) | |
| Item_ID.append(Item_column[j]) | |
| Condition.append(Condition_column[j]) | |
| # the first sentence in the response is saved as E51 | |
| Experiment_ID.append(ID + '1') | |
| Questions_ID.append(str(q_column[j]) + '1') | |
| User_prompt.append(_user_prompt) | |
| Response.append(_response1) | |
| Factor_2.append(_response) | |
| Stimuli_1.append(Stimuli_1_column[j]) | |
| Item_ID.append(Item_column[j]) | |
| Condition.append(Condition_column[j]) | |
| else: | |
| Experiment_ID.append(ID) | |
| Questions_ID.append(q_column[j]) | |
| User_prompt.append(_user_prompt) | |
| Response.append(_response) | |
| if i == 6: | |
| Factor_2.append(Condition_column[j]) | |
| Stimuli_1.append(V2_column[j]) | |
| else: | |
| Factor_2.append(V2_column[j]) | |
| Stimuli_1.append(Stimuli_1_column[j]) | |
| Item_ID.append(Item_column[j]) | |
| Condition.append(Condition_column[j]) | |
| # Sleep to prevent hitting rate limits too frequently | |
| time.sleep(1) | |
| self.responses_df = pd.DataFrame(list(zip(Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1)), | |
| columns=["Experiment", "Question_ID", "Item", "Condition", "User_prompt", "Response","Factor 2","Stimuli 1"]) | |
| if save_path is not None: | |
| print(f'Save responses to {save_path}') | |
| fpath = Path(save_path) | |
| fpath.parent.mkdir(parents=True, exist_ok=True) | |
| self.responses_df.to_csv(fpath) | |
| self.exceptions = exceptions | |
| # self._compute_avg_length() | |
| # self._compute_answer_rate() | |
| return self.responses_df | |
| def send_request(self, system_prompt: str, user_prompt: str): | |
| # Using Together AI API | |
| using_together_api = False | |
| together_ai_api_models = ['mixtral', 'dbrx', 'wizardlm'] | |
| for together_ai_api_model in together_ai_api_models: | |
| if together_ai_api_model in self.model_id.lower(): | |
| #using_together_api = True | |
| break | |
| # print('适用哪一种LLM',together_ai_api_model , using_together_api) | |
| # print(self.model_id.lower()) #meta-llama/llama-2-7b-chat-hf | |
| # print('local',self.local_model) $None | |
| # exit() | |
| # if 'mixtral' in self.model_id.lower() or 'dbrx' in self.model_id.lower() or 'wizardlm' in self.model_id.lower(): # For mixtral and dbrx models, use Together AI API | |
| if using_together_api: | |
| # suffix = "completions" if ('mixtral' in self.model_id.lower() or 'base' in self.model_id.lower()) else "chat/completions" | |
| suffix = "chat/completions" | |
| url = f"https://api.together.xyz/v1/{suffix}" | |
| payload = { | |
| "model": self.model_id, | |
| # "max_tokens": 4096, | |
| 'max_new_tokens': 100, | |
| # "a": 0.0, | |
| # 'repetition_penalty': 1.1 if 'mixtral' in self.model_id.lower() else 1 | |
| } | |
| payload['messages'] = [{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}] | |
| headers = { | |
| "accept": "application/json", | |
| "content-type": "application/json", | |
| "Authorization": f"Bearer {os.environ['TOGETHER_API_KEY']}" | |
| } | |
| response = requests.post(url, json=payload, headers=headers) | |
| try: | |
| result = json.loads(response.text) | |
| # print(result) | |
| result = result["choices"][0] | |
| if 'message' in result: | |
| result = result["message"]["content"].strip() | |
| else: | |
| result = result["text"] | |
| result_candidates = [result_cancdidate for result_cancdidate in result.split('\n\n') if len(result_cancdidate) > 0] | |
| result = result_candidates[0] | |
| print(result) | |
| except: | |
| print(response) | |
| result = '' | |
| print(result) | |
| return result | |
| if self.local_model: # cannot call API. using local model | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, # gemma-1.1 does not accept system role | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| try: # some models support pipeline | |
| pipe = pipeline( | |
| "text-generation", | |
| model=self.local_model, | |
| tokenizer=self.tokenizer, | |
| ) | |
| generation_args = { | |
| "max_new_tokens": 100, | |
| "return_full_text": False, | |
| #"temperature": 0.0, | |
| "do_sample": False, | |
| } | |
| output = pipe(messages, **generation_args) | |
| result = output[0]['generated_text'] | |
| print(result) | |
| except: | |
| prompt = self.tokenizer.apply_chat_template(messages,add_generation_prompt=True, tokenize=False) | |
| print(prompt) | |
| input_ids = self.tokenizer(prompt, return_tensors="pt").to('cuda') | |
| with torch.no_grad(): | |
| outputs = self.local_model.generate(**input_ids, max_new_tokens=100, do_sample=True, pad_token_id=self.tokenizer.eos_token_id) | |
| result = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| result = result.replace(prompt[0], '') | |
| print(result) | |
| return result | |
| # Using OpenAI API | |
| elif 'gpt' in self.model_id.lower(): | |
| response = litellm.completion( | |
| model=self.model_id.replace('openai/', ''), | |
| messages=[{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}], | |
| # temperature=0.0, | |
| max_tokens=100, | |
| api_key=os.getenv('OpenAI_key') | |
| ) | |
| result = response['choices'][0]['message']['content'] | |
| # print() | |
| # print(result) | |
| return result | |
| elif self.local_model is None: | |
| import random | |
| def get_random_token(): | |
| i = random.randint(1, 20) | |
| token = getattr(envs, f"TOKEN{i}") | |
| return token, i | |
| tokens_tried = set() | |
| while len(tokens_tried) < 10: | |
| token,i = get_random_token() | |
| if token in tokens_tried: | |
| continue | |
| tokens_tried.add(token) | |
| print(f"Trying with token: TOKEN{i}") | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(self.model_id, api_key=token, headers={"X-use-cache": "false"}) | |
| messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] | |
| result = None | |
| while result is None: | |
| outputs = client.chat_completion(messages, max_tokens=100) | |
| result = outputs['choices'][0]['message']['content'] | |
| if result is None: | |
| time.sleep(1) # Optional: Add a small delay before retrying | |
| return result | |
| except Exception as e: | |
| print(f"Error with token: {token}, trying another token...") | |
| continue | |
| raise Exception("All tokens failed.") | |
| elif 'gemini' in self.model_id.lower(): | |
| genai.configure(api_key=os.getenv('GOOGLE_AI_API_KEY')) | |
| generation_config = { | |
| # "temperature": 0, | |
| # "top_p": 0.95, # cannot change | |
| # "top_k": 0, | |
| "max_output_tokens": 100, | |
| # "response_mime_type": "application/json", | |
| } | |
| safety_settings = [ | |
| { | |
| "category": "HARM_CATEGORY_HARASSMENT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_HATE_SPEECH", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| ] | |
| model = genai.GenerativeModel( | |
| model_name="gemini-1.5-pro-latest" if "gemini-1.5-pro" in self.model_id.lower() else | |
| self.model_id.lower().split('google/')[-1], | |
| generation_config=generation_config, | |
| system_instruction=system_prompt, | |
| safety_settings=safety_settings) | |
| convo = model.start_chat(history=[]) | |
| convo.send_message(user_prompt) | |
| # print(convo.last) | |
| result = convo.last.text | |
| print(result) | |
| return result | |
| # exit() | |
| # Using local model | |
| class EvaluationModel: | |
| """A class to evaluate generated responses. | |
| Attributes: | |
| model (CrossEncoder): The evaluation model. | |
| scores (list): List of scores for the responses. | |
| humanlike_score (float): Human-likeness score | |
| """ | |
| def __init__(self): | |
| """ | |
| Initializes the EvaluationModel. | |
| """ | |
| self.scores = [] | |
| self.humanlike_score = None | |
| def code_results_llm_cleaned(self, responses_df): | |
| '''code results from LLM's response''' | |
| output = [] | |
| '''database for Exp4''' | |
| item4 = pd.read_csv(envs.ITEM_4_DATA) | |
| wordpair2code = {} | |
| for j in range(len(item4['Coding'])): | |
| wordpair2code[item4['Pair'][j]] = item4['Coding'][j] | |
| '''verb for Exp5''' | |
| item5 = pd.read_csv(envs.ITEM_5_DATA) | |
| # item corresponding to verb, same item id corresponding to verb pair | |
| item2verb2 = {} | |
| item2verb1 = {} | |
| Stimuli1, Stimuli2 = {}, {} | |
| for j in range(len(item5['Item'])): | |
| item2verb1[item5['Item'][j]] = item5['Verb1'][j] | |
| item2verb2[item5['Item'][j]] = item5['Verb2'][j] | |
| Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j] | |
| Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j] | |
| male_keyword = ["he", "his", "himself"] | |
| female_keyword = ["she", "her", "herself"] | |
| #print(len(responses_df["Experiment"])) | |
| for i in range(len(responses_df["Experiment"])): | |
| print(i, "/", len(responses_df["Experiment"])) | |
| # vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 | |
| # print() | |
| if pd.isna(responses_df["Response"][i]): | |
| output.append("Other") | |
| continue | |
| rs = responses_df["Response"][i].strip().lower() | |
| rs = rs.replace('"', '').replace(" ", " ").replace('.', '') | |
| lines = rs.split("\n") | |
| filtered_lines = [line for line in lines if line and not (line.endswith(":") or line.endswith(":"))] | |
| filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for | |
| r in filtered_lines] | |
| rs = "\n".join(filtered_lines) | |
| rs = rs.strip() | |
| '''Exp1''' | |
| if responses_df["Experiment"][i] == "E1": | |
| #print("E1", rs) | |
| if rs == "round": | |
| # vote_1_1 += 1 | |
| output.append("Round") | |
| elif rs == "spiky": | |
| output.append("Spiky") | |
| else: | |
| output.append("Other") | |
| '''Exp2''' | |
| elif responses_df["Experiment"][i] == "E2": | |
| # rs = responses_df["Response"][i].strip() | |
| rs = rs.split(' ') | |
| #print("E2", rs) | |
| male, female = 0, 0 | |
| for word in rs: | |
| if word in female_keyword and male == 0: | |
| female = 1 | |
| output.append("Female") | |
| break | |
| if word in male_keyword and female == 0: | |
| male = 1 | |
| output.append("Male") | |
| break | |
| if male == 0 and female == 0: | |
| output.append("Other") | |
| '''Exp3''' | |
| elif responses_df["Experiment"][i] == "E3": | |
| # rs = responses_df["Response"][i].strip() | |
| #print("E3", rs) | |
| pair = responses_df["Factor 2"][i] | |
| word1, word2 = pair.replace(".", "").split('_') | |
| if responses_df["Item"][i] == 12: | |
| output.append("Other") | |
| else: | |
| words = rs.split() # split the response into words | |
| if any(word == word1 for word in words) and any(word == word2 for word in words): | |
| output.append("Other") | |
| else: | |
| if any(word.lower() == word1.lower() for word in words): | |
| if len(word1) > len(word2): | |
| output.append("Long") | |
| else: | |
| output.append("Short") | |
| elif any(word.lower() == word2.lower() for word in words): | |
| if len(word1) > len(word2): | |
| output.append("Short") | |
| else: | |
| output.append("Long") | |
| else: | |
| if len(words) > 1: | |
| # joint the words using " " | |
| word = " ".join(words) | |
| if word.lower() == word1.lower(): | |
| if len(word1) > len(word2): | |
| output.append("Long") | |
| else: | |
| output.append("Short") | |
| elif word.lower() == word2.lower(): | |
| if len(word1) > len(word2): | |
| output.append("Short") | |
| else: | |
| output.append("Long") | |
| else: | |
| output.append("Other") | |
| else: | |
| output.append("Other") | |
| '''Exp4''' | |
| elif responses_df["Experiment"][i] == "E4": | |
| lines = rs.split("\n") | |
| filtered_lines = [] | |
| if len(lines) > 1: | |
| for r in lines[1:]: | |
| if ':' in r: | |
| filtered_lines.append(r.split(':', 1)[-1].strip()) | |
| else: | |
| filtered_lines.append(r) | |
| filtered_lines.insert(0, lines[0]) | |
| else: | |
| filtered_lines = lines | |
| # print(filtered_lines) | |
| filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] | |
| rs = "\n".join(filtered_lines) | |
| filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for r in rs.split(";")] | |
| filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] | |
| rs = ";".join(filtered_lines).strip() | |
| try: | |
| meaning_word = rs.split(";")[4].replace(" ", '') | |
| except IndexError: | |
| try: | |
| meaning_word = rs.split("\n")[4].replace(" ", '') | |
| except IndexError: | |
| output.append("Other") | |
| continue | |
| except Exception as e: | |
| print(f"Unexpected error: {e}") | |
| output.append("Other") | |
| continue | |
| target = responses_df["Factor 2"][i].strip().lower() | |
| pair = target + "_" + meaning_word | |
| #print("E4:", pair) | |
| if pair in wordpair2code.keys(): | |
| output.append(wordpair2code[pair]) | |
| else: | |
| output.append("Other") | |
| '''Exp5''' | |
| elif responses_df["Experiment"][i] == "E5" or responses_df["Experiment"][i] == "E51": | |
| # sentence = responses_df["Response"][i].strip() | |
| item_id = responses_df["Item"][i] | |
| question_id = responses_df["Question_ID"][i] | |
| sti1, sti2 = "", "" | |
| if responses_df["Experiment"][i] == "E51": | |
| sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "") | |
| #sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "") | |
| verb = item2verb1[item_id].lower() | |
| sentence = sti1 + " " + rs.replace(sti1, "") | |
| #print("E5", verb, sentence) | |
| if responses_df["Experiment"][i] == "E5": | |
| #sti1 = Stimuli1[question_id].lower().replace("...", "") | |
| # print(sti1) | |
| sti2 = Stimuli2[question_id].lower().replace("...", "") | |
| verb = item2verb2[item_id].lower() | |
| sentence = sti2 + " " + rs.replace(sti2, "") | |
| #print("E5", verb, sentence) | |
| doc = nlp1(sentence.replace(" ", " ")) | |
| # print(doc) | |
| # print() | |
| verb_token = None | |
| for token in doc: | |
| # print(token.lemma_) | |
| if token.lemma_ == verb: | |
| verb_token = token | |
| break | |
| # exit() | |
| pobj, dative = None, None | |
| # print(verb_token.children) | |
| # exit() | |
| if verb_token is not None: | |
| for child in verb_token.children: | |
| # print(child) | |
| if (child.dep_ == 'dative' and child.pos_ == "ADP") or ( | |
| child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"): | |
| pobj = child.text | |
| if child.dep_ == 'dative': | |
| dative = child.text | |
| # print("E5", pobj, dative) | |
| # exit() | |
| if pobj: | |
| output.append("PO") | |
| elif dative: | |
| output.append("DO") | |
| else: | |
| # print("Other", sentence, pobj, dative) | |
| # exit() | |
| output.append("Other") | |
| '''Exp6''' | |
| elif responses_df["Experiment"][i] == "E6": | |
| sentence = responses_df["Stimuli 1"][i].strip().lower() | |
| #print("E6", sentence) | |
| doc = nlp1(sentence) | |
| subject = "None" | |
| obj = "None" | |
| pobj_list = [] # To collect all prepositional objects | |
| for token in doc: | |
| if token.dep_ == "nsubj": | |
| subject = token.text | |
| elif token.dep_ == "dobj": | |
| obj = token.text | |
| elif token.dep_ == "pobj": | |
| pobj_list.append(token.text) # Collect prepositional objects | |
| rs_list = rs.lower().split() | |
| if subject in rs_list and (obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list)): | |
| output.append("Other") | |
| elif subject in rs_list: | |
| output.append("VP") | |
| elif obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list): | |
| output.append("NP") | |
| else: | |
| output.append("Other") | |
| '''Exp7''' | |
| elif responses_df["Experiment"][i] == "E7": | |
| # rs = responses_df["Response"][i].strip().lower() | |
| rs = rs.replace(".", "").replace(",", "").lower() | |
| #print("E7", rs) | |
| if "yes" in rs and "no" in rs: | |
| output.append("Other") | |
| elif "no" in rs: | |
| output.append("0") | |
| elif "yes" in rs: | |
| output.append("1") | |
| else: | |
| output.append("Other") | |
| '''Exp8''' | |
| elif responses_df["Experiment"][i] == "E8": | |
| # rs = responses_df["Response"][i].strip() | |
| #print("E8", rs) | |
| if "something is wrong with the question" in rs: | |
| output.append("1") | |
| else: | |
| output.append("0") | |
| '''Exp9''' | |
| elif responses_df["Experiment"][i] == "E9": | |
| male, female = 0, 0 | |
| # rs = responses_df["Response"][i].strip() | |
| if "because" in rs: | |
| rs = rs.replace("because because", "because").split("because")[1] | |
| else: | |
| rs = rs | |
| condition = responses_df["Factor 2"][i].strip() | |
| rs = rs.split(" ") | |
| for w in rs: | |
| if w in male_keyword and female != 1: | |
| male = 1 | |
| break | |
| if w in female_keyword and male != 1: | |
| female = 1 | |
| break | |
| #print("E9", "condition", condition, "male", male, "female", female) | |
| if male == 0 and female == 0: | |
| output.append('Other') | |
| else: | |
| if male == 1 and female == 0: | |
| if condition == "MF": | |
| output.append("Subject") | |
| elif condition == "FM": | |
| output.append("Object") | |
| else: | |
| output.append("Other") | |
| elif female == 1 and male == 0: | |
| if condition == "MF": | |
| output.append("Object") | |
| elif condition == "FM": | |
| output.append("Subject") | |
| else: | |
| output.append("Other") | |
| '''Exp10''' | |
| elif responses_df["Experiment"][i] == "E10": | |
| # rs = responses_df["Response"][i].strip() | |
| rs = rs.replace(".", "") | |
| if rs == "yes": | |
| output.append("1") | |
| else: | |
| output.append("0") | |
| else: | |
| #print("can;t find the Exp:", responses_df["Experiment"][i]) | |
| output.append("NA") | |
| # print(output) | |
| # exit() | |
| '''LLM''' | |
| print(len(output)) | |
| import re | |
| def clean_text(text): | |
| if isinstance(text, str): | |
| return re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text | |
| responses_df["Experiment"] = responses_df["Experiment"].apply(clean_text) | |
| responses_df["Question_ID"] = responses_df["Question_ID"].apply(clean_text) | |
| responses_df["Item"] = responses_df["Item"].apply(clean_text) | |
| responses_df["Response"] = responses_df["Response"].apply(clean_text) | |
| output = [str(item) for item in output] | |
| self.data = pd.DataFrame(list( | |
| zip(responses_df["Experiment"], responses_df["Question_ID"], responses_df["Item"], responses_df["Response"],output)), | |
| columns=["Experiment", "Question_ID", "Item", "Response","Coding"]) | |
| return self.data | |
| def code_results_llm(self, responses_df): | |
| '''code results from LLM's response''' | |
| output = [] | |
| '''database for Exp4''' | |
| item4 = pd.read_csv(envs.ITEM_4_DATA) | |
| wordpair2code = {} | |
| for j in range(len(item4['Coding'])): | |
| wordpair2code[item4['Pair'][j]] = item4['Coding'][j] | |
| '''verb for Exp5''' | |
| item5 = pd.read_csv(envs.ITEM_5_DATA) | |
| # item corresponding to verb, same item id corresponding to verb pair | |
| item2verb2 = {} | |
| item2verb1 = {} | |
| Stimuli1, Stimuli2 = {}, {} | |
| for j in range(len(item5['Item'])): | |
| item2verb1[item5['Item'][j]] = item5['Verb1'][j] | |
| item2verb2[item5['Item'][j]] = item5['Verb2'][j] | |
| Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j] | |
| Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j] | |
| male_keyword = ["he", "his", "himself"] | |
| female_keyword = ["she", "her", "herself"] | |
| #print(len(responses_df["Experiment"])) | |
| for i in range(len(responses_df["Experiment"])): | |
| print(i, "/", len(responses_df["Experiment"])) | |
| # vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 | |
| # print() | |
| if pd.isna(responses_df["Response"][i]): | |
| output.append("Other") | |
| continue | |
| rs = responses_df["Response"][i].strip().lower() | |
| print(rs) | |
| rs = rs.replace('"', '').replace(" ", " ").replace('.', '') | |
| #lines = rs.split("\n") | |
| #filtered_lines = [line for line in lines if line and not (line.endswith(":") or line.endswith(":"))] | |
| # filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for | |
| # r in filtered_lines] | |
| # rs = "\n".join(filtered_lines) | |
| # rs = rs.strip() | |
| '''Exp1''' | |
| if responses_df["Experiment"][i] == "E1": | |
| rs_lower = rs.lower() | |
| if "round" in rs_lower and "spiky" in rs_lower: | |
| output.append("Other") | |
| elif "round" in rs_lower: | |
| output.append("Round") | |
| elif "spiky" in rs_lower: | |
| output.append("Spiky") | |
| else: | |
| output.append("Other") | |
| '''Exp2''' | |
| elif responses_df["Experiment"][i] == "E2": | |
| # rs = responses_df["Response"][i].strip() | |
| rs = rs.split(' ') | |
| #print("E2", rs) | |
| male, female = 0, 0 | |
| for word in rs: | |
| if word in female_keyword and male == 0: | |
| female = 1 | |
| output.append("Female") | |
| break | |
| if word in male_keyword and female == 0: | |
| male = 1 | |
| output.append("Male") | |
| break | |
| if male == 0 and female == 0: | |
| output.append("Other") | |
| '''Exp3''' | |
| elif responses_df["Experiment"][i] == "E3": | |
| # rs = responses_df["Response"][i].strip() | |
| #print("E3", rs) | |
| pair = responses_df["Factor 2"][i] | |
| word1, word2 = pair.replace(".", "").split('_') | |
| if responses_df["Item"][i] == 12: | |
| output.append("Other") | |
| else: | |
| words = rs.split() # split the response into words | |
| if any(word == word1 for word in words) and any(word == word2 for word in words): | |
| output.append("Other") | |
| else: | |
| if any(word.lower() == word1.lower() for word in words): | |
| if len(word1) > len(word2): | |
| output.append("Long") | |
| else: | |
| output.append("Short") | |
| elif any(word.lower() == word2.lower() for word in words): | |
| if len(word1) > len(word2): | |
| output.append("Short") | |
| else: | |
| output.append("Long") | |
| else: | |
| if len(words) > 1: | |
| # joint the words using " " | |
| word = " ".join(words) | |
| if word.lower() == word1.lower(): | |
| if len(word1) > len(word2): | |
| output.append("Long") | |
| else: | |
| output.append("Short") | |
| elif word.lower() == word2.lower(): | |
| if len(word1) > len(word2): | |
| output.append("Short") | |
| else: | |
| output.append("Long") | |
| else: | |
| output.append("Other") | |
| else: | |
| output.append("Other") | |
| '''Exp4''' | |
| elif responses_df["Experiment"][i] == "E4": | |
| lines = rs.split("\n") | |
| filtered_lines = [] | |
| if len(lines) > 1: | |
| for r in lines[1:]: | |
| if ':' in r: | |
| filtered_lines.append(r.split(':', 1)[-1].strip()) | |
| else: | |
| filtered_lines.append(r) | |
| filtered_lines.insert(0, lines[0]) | |
| else: | |
| filtered_lines = lines | |
| # print(filtered_lines) | |
| #filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] | |
| #rs = "\n".join(filtered_lines) | |
| #filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for r in rs.split(";")] | |
| #filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] | |
| rs = ";".join(filtered_lines).strip() | |
| try: | |
| meaning_word = rs.split(";")[4].replace(" ", '') | |
| except IndexError: | |
| try: | |
| meaning_word = rs.split("\n")[4].replace(" ", '') | |
| except IndexError: | |
| output.append("Other") | |
| continue | |
| except Exception as e: | |
| print(f"Unexpected error: {e}") | |
| output.append("Other") | |
| continue | |
| target = responses_df["Factor 2"][i].strip().lower() | |
| pair = target + "_" + meaning_word | |
| #print("E4:", pair) | |
| if pair in wordpair2code.keys(): | |
| output.append(wordpair2code[pair]) | |
| else: | |
| output.append("Other") | |
| '''Exp5''' | |
| elif responses_df["Experiment"][i] == "E5" or responses_df["Experiment"][i] == "E51": | |
| # sentence = responses_df["Response"][i].strip() | |
| item_id = responses_df["Item"][i] | |
| question_id = responses_df["Question_ID"][i] | |
| if responses_df["Experiment"][i] == "E51": | |
| sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "") | |
| #sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "") | |
| verb = item2verb1[item_id].lower() | |
| sentence = sti1 + " " + rs.replace(sti1, "") | |
| #print("E5", verb, sentence) | |
| if responses_df["Experiment"][i] == "E5": | |
| #sti1 = Stimuli1[question_id].lower().replace("...", "") | |
| # print(sti1) | |
| sti2 = Stimuli2[question_id].lower().replace("...", "") | |
| verb = item2verb2[item_id].lower() | |
| sentence = sti2 + " " + rs.replace(sti2, "") | |
| #print("E5", verb, sentence) | |
| doc = nlp1(sentence.replace(" ", " ")) | |
| # print(doc) | |
| # print() | |
| verb_token = None | |
| for token in doc: | |
| # print(token.lemma_) | |
| if token.lemma_ == verb: | |
| verb_token = token | |
| break | |
| # exit() | |
| pobj, dative = None, None | |
| # print(verb_token.children) | |
| # exit() | |
| if verb_token is not None: | |
| for child in verb_token.children: | |
| # print(child) | |
| if (child.dep_ == 'dative' and child.pos_ == "ADP") or ( | |
| child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"): | |
| pobj = child.text | |
| if child.dep_ == 'dative': | |
| dative = child.text | |
| # print("E5", pobj, dative) | |
| # exit() | |
| if pobj: | |
| output.append("PO") | |
| elif dative: | |
| output.append("DO") | |
| else: | |
| # print("Other", sentence, pobj, dative) | |
| # exit() | |
| output.append("Other") | |
| '''Exp6''' | |
| elif responses_df["Experiment"][i] == "E6": | |
| sentence = responses_df["Stimuli 1"][i].strip().lower() | |
| #print("E6", sentence) | |
| doc = nlp1(sentence) | |
| subject = "None" | |
| obj = "None" | |
| pobj_list = [] # To collect all prepositional objects | |
| for token in doc: | |
| if token.dep_ == "nsubj": | |
| subject = token.text | |
| elif token.dep_ == "dobj": | |
| obj = token.text | |
| elif token.dep_ == "pobj": | |
| pobj_list.append(token.text) # Collect prepositional objects | |
| rs_list = rs.lower().split() | |
| if subject in rs_list and (obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list)): | |
| output.append("Other") | |
| elif subject in rs_list: | |
| output.append("VP") | |
| elif obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list): | |
| output.append("NP") | |
| else: | |
| output.append("Other") | |
| '''Exp7''' | |
| elif responses_df["Experiment"][i] == "E7": | |
| # rs = responses_df["Response"][i].strip().lower() | |
| rs = rs.replace(".", "").replace(",", "").lower() | |
| #print("E7", rs) | |
| if "yes" in rs and "no" in rs: | |
| output.append("Other") | |
| elif "no" in rs: | |
| output.append("0") | |
| elif "yes" in rs: | |
| output.append("1") | |
| else: | |
| output.append("Other") | |
| '''Exp8''' | |
| elif responses_df["Experiment"][i] == "E8": | |
| # rs = responses_df["Response"][i].strip() | |
| #print("E8", rs) | |
| if "something is wrong with the question" in rs: | |
| output.append("1") | |
| else: | |
| output.append("0") | |
| '''Exp9''' | |
| elif responses_df["Experiment"][i] == "E9": | |
| male, female = 0, 0 | |
| # rs = responses_df["Response"][i].strip() | |
| if "because" in rs: | |
| rs = rs.replace("because because", "because").split("because")[1] | |
| else: | |
| rs = rs | |
| condition = responses_df["Factor 2"][i].strip() | |
| rs = rs.split(" ") | |
| for w in rs: | |
| if w in male_keyword and female != 1: | |
| male = 1 | |
| break | |
| if w in female_keyword and male != 1: | |
| female = 1 | |
| break | |
| #print("E9", "condition", condition, "male", male, "female", female) | |
| if male == 0 and female == 0: | |
| output.append('Other') | |
| else: | |
| if male == 1 and female == 0: | |
| if condition == "MF": | |
| output.append("Subject") | |
| elif condition == "FM": | |
| output.append("Object") | |
| else: | |
| output.append("Other") | |
| elif female == 1 and male == 0: | |
| if condition == "MF": | |
| output.append("Object") | |
| elif condition == "FM": | |
| output.append("Subject") | |
| else: | |
| output.append("Other") | |
| '''Exp10''' | |
| elif responses_df["Experiment"][i] == "E10": | |
| # rs = responses_df["Response"][i].strip() | |
| rs = rs.replace(".", "") | |
| if rs == "yes": | |
| output.append("1") | |
| else: | |
| output.append("0") | |
| else: | |
| #print("can;t find the Exp:", responses_df["Experiment"][i]) | |
| output.append("NA") | |
| # print(output) | |
| # exit() | |
| '''LLM''' | |
| print(len(output)) | |
| import re | |
| def clean_text(text): | |
| if isinstance(text, str): | |
| return re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text | |
| responses_df["Experiment"] = responses_df["Experiment"].apply(clean_text) | |
| responses_df["Question_ID"] = responses_df["Question_ID"].apply(clean_text) | |
| responses_df["Item"] = responses_df["Item"].apply(clean_text) | |
| responses_df["Response"] = responses_df["Response"].apply(clean_text) | |
| output = [str(item) for item in output] | |
| self.data = pd.DataFrame(list( | |
| zip(responses_df["Experiment"], responses_df["Question_ID"], responses_df["Item"], responses_df["Response"],output)), | |
| columns=["Experiment", "Question_ID", "Item", "Response","Coding"]) | |
| return self.data | |
| def calculate_js_divergence(self, file_path_1, file_path_2): | |
| """ | |
| Calculate the Jensen-Shannon divergence for response distributions between two datasets. | |
| - Extracts E5 and E51 pairs, creates new data based on comparison, | |
| removes the original E5 and E51, and then calculates the JS divergence between the datasets. | |
| Parameters: | |
| file_path_1 (str): Path to the first dataset file (CSV format). | |
| file_path_2 (str): Path to the second dataset file (CSV format). | |
| Returns: | |
| float: The average JS divergence across all common Question_IDs. | |
| """ | |
| # Load the datasets | |
| human_df = pd.read_csv(file_path_1, encoding='ISO-8859-1') | |
| llm_df = pd.read_csv(file_path_2) | |
| def create_e5_entries(df): | |
| new_entries = [] | |
| for i in range(len(df) - 1): | |
| if 'E51' in df.iloc[i]['Experiment']: | |
| priming_id = df.iloc[i][0]-1 | |
| priming_row_id = df[df.iloc[:, 0] == priming_id].index[0] | |
| new_question_id = df.iloc[priming_row_id]['Question_ID'] | |
| label = 1 if df.iloc[i]['Coding'] == df.iloc[priming_row_id]['Coding'] else 0 | |
| new_entries.append({ | |
| 'Question_ID': new_question_id, | |
| 'Response': f'{df.iloc[i]["Coding"]}-{df.iloc[priming_row_id]["Coding"]}', | |
| 'Coding': label | |
| }) | |
| return pd.DataFrame(new_entries) | |
| # Create new E5 entries for both datasets | |
| human_e5 = create_e5_entries(human_df) | |
| llm_e5 = create_e5_entries(llm_df) | |
| # Remove E5 and E51 entries from both datasets | |
| human_df = human_df[~human_df['Question_ID'].str.contains('E5')] | |
| llm_df = llm_df[~llm_df['Question_ID'].str.contains('E5')] | |
| # Append new E5 entries to the cleaned dataframes | |
| human_df = pd.concat([human_df, human_e5], ignore_index=True) | |
| llm_df = pd.concat([llm_df, llm_e5], ignore_index=True) | |
| ### Calculate Average JS Divergence ### | |
| # Extract the relevant columns for JS divergence calculation | |
| human_responses = human_df[['Question_ID', 'Coding']] | |
| llm_responses = llm_df[['Question_ID', 'Coding']] | |
| # Remove 'Other' responses | |
| #human_responses = human_responses[human_responses['Coding'] != 'Other'] | |
| #llm_responses = llm_responses[llm_responses['Coding'] != 'Other'] | |
| # Get unique Question_IDs present in both datasets | |
| common_question_ids = set(human_responses['Question_ID']).intersection(set(llm_responses['Question_ID'])) | |
| # Initialize a dictionary to store JS divergence for each experiment | |
| js_divergence = {} | |
| # Calculate JS divergence for each common Question_ID | |
| for q_id in common_question_ids: | |
| # Get response distributions for the current Question_ID in both datasets | |
| human_dist = human_responses[human_responses['Question_ID'] == q_id]['Coding'].value_counts( | |
| normalize=True) | |
| llm_dist = llm_responses[llm_responses['Question_ID'] == q_id]['Coding'].value_counts(normalize=True) | |
| # Reindex the distributions to have the same index, filling missing values with 0 | |
| all_responses = set(human_dist.index).union(set(llm_dist.index)) | |
| human_dist = human_dist.reindex(all_responses, fill_value=0) | |
| llm_dist = llm_dist.reindex(all_responses, fill_value=0) | |
| # Calculate JS divergence | |
| js_div = jensenshannon(human_dist, llm_dist, base=2) | |
| experiment_id = q_id.split('_')[1] | |
| if experiment_id not in js_divergence: | |
| js_divergence[experiment_id] = [] | |
| js_divergence[experiment_id].append(js_div) | |
| # Calculate the average JS divergence per experiment and the confidence interval | |
| results = {} | |
| experiment_averages = [] | |
| for exp, divs in js_divergence.items(): | |
| avg_js_divergence = 1 - np.nanmean(divs) | |
| ci_lower, ci_upper = bootstrap((divs,), np.nanmean, confidence_level=0.95, | |
| n_resamples=1000).confidence_interval | |
| results[exp] = { | |
| 'average_js_divergence': avg_js_divergence, | |
| 'confidence_interval': (1 - ci_upper, 1 - ci_lower) # Adjust for 1 - score | |
| } | |
| experiment_averages.append(avg_js_divergence) | |
| # Calculate the weighted average JS divergence across all experiments | |
| weighted_js_divergence = np.mean(experiment_averages) # Simple average over experiments | |
| # Calculate the confidence interval for the overall JS divergence using bootstrap | |
| overall_ci_lower, overall_ci_upper = bootstrap( | |
| (experiment_averages,), | |
| np.nanmean, | |
| confidence_level=0.95, | |
| n_resamples=1000 | |
| ).confidence_interval | |
| # Combine all results into one dictionary | |
| all_results = { | |
| 'overall': { | |
| 'average_js_divergence': weighted_js_divergence, | |
| 'confidence_interval': (overall_ci_lower, overall_ci_upper) | |
| }, | |
| 'per_experiment': results | |
| } | |
| return all_results | |
| def evaluate_humanlike(self, responses_df: pd.DataFrame, human_data_path: object, result_save_path: str) -> object: | |
| ''' | |
| evaluate humanlike score | |
| 1. code the result | |
| 2. comput the similaritirs between human and model | |
| process model responses''' | |
| '''coding human data''' | |
| # self.huamn_df = pd.read_csv(human_data_path) | |
| # self.data = self.code_results(self.huamn_df) | |
| #save_path = human_data_path.replace('.csv','_coding.csv') | |
| #human_save_path = "./src/datasets/coding_human.xlsx" | |
| # if save_path is not None: | |
| # print(f'Save human coding results to {save_path}') | |
| # fpath = Path(save_path) | |
| # fpath.parent.mkdir(parents=True, exist_ok=True) | |
| # self.data.to_csv(fpath) | |
| '''coding llm data''' | |
| save_path = result_save_path.replace('.csv','_coding.csv') | |
| self.llm_df = self.code_results_llm(responses_df) | |
| if save_path is not None: | |
| print(f'Save LLM coding results to {save_path}') | |
| fpath = Path(save_path) | |
| fpath.parent.mkdir(parents=True, exist_ok=True) | |
| self.llm_df.to_csv(fpath) | |
| envs.API.upload_file( | |
| path_or_fileobj=save_path,#./generation_results/meta-llama/Llama-2-13b-chat-hf_coding.csv | |
| path_in_repo=f"{save_path.replace('generation_results/','')}",# | |
| repo_id=envs.RESULTS_REPO, | |
| repo_type="dataset", | |
| ) | |
| # file_path_1 = '/Users/simon/Downloads/coding_human.xlsx' | |
| # file_path_2 = '/Users/simon/Downloads/Meta-Llama-3.1-70B-Instruct_coding.csv' | |
| avg_js_divergence = self.calculate_js_divergence(human_data_path, save_path) | |
| return avg_js_divergence | |