Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import re | |
| import json | |
| import shortuuid | |
| import numpy as np | |
| import pandas as pd | |
| from config import * | |
| from collections import defaultdict | |
| from eval.utils import * | |
| class BaseEvaluator: | |
| def __init__(self): | |
| super(BaseEvaluator, self).__init__() | |
| # Create evaluation results folder | |
| self.save_dir = os.path.join(DATASET_ROOT, "eval_results") | |
| if not os.path.exists(self.save_dir): | |
| os.makedirs(self.save_dir) | |
| def reset(self): | |
| # Reset results for new dataset evaluation | |
| self.gen_answers = [] | |
| self.inputs = [] | |
| def process(self, inputs, outputs): | |
| # Merge results | |
| self.inputs.extend(inputs) | |
| self.gen_answers.extend(outputs) | |
| class Evaluator(BaseEvaluator): | |
| def __init__(self): | |
| """ | |
| Eval Datasets | |
| - VQAv2 | |
| - GQA | |
| - SQA-IMG | |
| - VizWiz | |
| - TextVQA | |
| - POPE | |
| - MME | |
| - MMBench | |
| - MMBench-CN | |
| - QBench | |
| - MM-Vet | |
| - MMMU | |
| - MathVista | |
| - AI2D | |
| - HallusionBench | |
| - ChartQA | |
| - SEED | |
| - LLaVA Wild | |
| - BLINK | |
| - MathVerse | |
| """ | |
| super().__init__() | |
| def evaluate(self, model, dataset, accel): | |
| # gathering all gpu to one device | |
| self.inputs = accel.gather_for_metrics(self.inputs) | |
| self.gen_answers = accel.gather_for_metrics(self.gen_answers) | |
| if accel.is_main_process: | |
| # check for duplicates | |
| self.inputs, self.gen_answers = remove_duplicate(dataset, self.inputs, self.gen_answers) | |
| # Select evaluation for dataset | |
| if dataset == "vqav2": | |
| return self.evaluate_vqa(model, accel) | |
| elif dataset == "gqa": | |
| return self.evaluate_gqa(model, accel) | |
| elif dataset == "sqa": | |
| return self.evaluate_sqa(model, accel) | |
| elif dataset == "vizwiz": | |
| return self.evaluate_vizwiz(model, accel) | |
| elif dataset == "textvqa": | |
| return self.evaluate_textvqa(model, accel) | |
| elif dataset == "pope": | |
| return self.evaluate_pope(model, accel) | |
| elif dataset == "mme": | |
| return self.evaluate_mme(model, accel) | |
| elif dataset == "mmbench": | |
| return self.evaluate_mmbench(model, accel) | |
| elif dataset == "mmbench_dev": | |
| return self.evaluate_mmbench_dev(model, accel) | |
| elif dataset == "mmbench_cn": | |
| return self.evaluate_mmbench_cn(model, accel) | |
| elif dataset == "mmbench_cn_dev": | |
| return self.evaluate_mmbench_cn_dev(model, accel) | |
| elif dataset == "qbench": | |
| return self.evaluate_qbench(model, accel) | |
| elif dataset == "mm-vet": | |
| return self.evaluate_mmvet(model, accel) | |
| elif dataset == "mmmu": | |
| return self.evaluate_mmmu(model, accel) | |
| elif dataset == "mathvista": | |
| return self.evaluate_mathvista(model, accel) | |
| elif dataset == "ai2d": | |
| return self.evaluate_ai2d(model, accel) | |
| elif dataset == "hallusionbench": | |
| return self.evaluate_hallusionbench(model, accel) | |
| elif dataset == "chartqa": | |
| return self.evaluate_chartqa(model, accel) | |
| elif dataset == "seed": | |
| return self.evaluate_seed(model, accel) | |
| elif dataset == "llava": | |
| return self.evaluate_llava(model, accel) | |
| elif dataset == "blink": | |
| return self.evaluate_blink(model, accel) | |
| elif dataset == "mathverse": | |
| return self.evaluate_mathverse(model, accel) | |
| elif dataset == "mmstar": | |
| return self.evaluate_mmstar(model, accel) | |
| else: | |
| raise ValueError( | |
| f'{dataset} is not an available dataset.') | |
| else: | |
| return None | |
| def evaluate_vqa(self, model, accel): | |
| # VQAv2 Evaluation for EvalAI server | |
| pred_answers = [{'question_id': inputs['id'], 'answer': answer} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_vqav2_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| accel.print(f"Finished evaluating VQAv2. Evaluate the result file saved to {pred_pth} on EvalAI server.") | |
| return | |
| def evaluate_gqa(self, model, accel): | |
| # GQA Evaluation | |
| pred_answers = {inputs['id']: answer for inputs, answer in zip(self.inputs, self.gen_answers)} | |
| # pred_answers = [{'question_id': inputs['id'], 'answer': answer} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_gqa_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| accel.print("GQA Results:") | |
| results = eval_gqa(pred_answers, json.load(open(os.path.join(DATASET_ROOT, GQA)))) | |
| return results['accuracy'] | |
| def evaluate_sqa(self, model, accel): | |
| # SQA Evaluation | |
| pred_answers = [{'question_id': inputs['id'], 'answer': convert_to_choice(answer, inputs['candidates']), 'gt': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_sqa_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| # Compute accuracy | |
| results = [(answer['answer'] == answer['gt']) for answer in pred_answers] | |
| accel.print (f"SQA Accuracy: {np.mean(results)*100} %") | |
| return np.mean(results)*100 | |
| def evaluate_vizwiz(self, model, accel): | |
| # VizWiz Evaluation | |
| evaluator = EvalAIAnswerProcessor() | |
| pred_answers = [{'image': inputs['id'], 'answer': evaluator(answer)} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_vizwiz_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| accel.print(f"Finished evaluating VizWiz. Evaluate the result file saved to {pred_pth} on EvalAI server.") | |
| return | |
| def evaluate_textvqa(self, model, accel): | |
| # TextVQA Evaluation | |
| pred_answers = [{'question_id': inputs['id'], 'pred_answer': answer, 'question': inputs['question'], 'gt_answers': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_textvqa_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| evaluator = TextVQAAccuracyEvaluator() | |
| results = evaluator.eval_pred_list(pred_answers)*100 | |
| accel.print (f"TextVQA Accuracy: {results} %") | |
| return results | |
| def evaluate_pope(self, model, accel): | |
| # POPE Evaluation | |
| pred_answers = [{'question_id': inputs['id'], 'answer': answer, 'question': inputs['question'], 'category': inputs['category']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_pope_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| pope_results = {} | |
| pope_results['adversarial'] = None | |
| pope_results['popular'] = None | |
| pope_results['random'] = None | |
| categories = ['adversarial', 'popular', 'random'] | |
| files = [POPE_ADVERSARIAL, POPE_POPULAR, POPE_RANDOM] | |
| for category, file in zip(categories, files): | |
| cur_answers = [x for x in pred_answers if x['category'] == category] | |
| cur_answers = sorted(cur_answers, key=lambda x:x["question_id"]) | |
| pope_results[category] = eval_pope(cur_answers, os.path.join(DATASET_ROOT, file)) | |
| accel.print (f"POPE Adversarial Accuracy: {pope_results['adversarial']} %") | |
| accel.print (f"POPE Popular Accuracy: {pope_results['popular']} %") | |
| accel.print (f"POPE Random Accuracy: {pope_results['random']} %") | |
| return pope_results | |
| def evaluate_mme(self, model, accel): | |
| # MME Evaluation | |
| pred_answers = [{'question_id': inputs['id'], 'answer': answer, "question": inputs['question'], 'category': inputs['category']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_mme_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| ground_truth = get_gt(data_path=os.path.join(DATASET_ROOT, MME_DIR)) | |
| result_dir = os.path.join(self.save_dir, 'mme') | |
| os.makedirs(result_dir, exist_ok=True) | |
| results = defaultdict(list) | |
| for answer in pred_answers: | |
| file = answer['question_id'].split('/')[-1].split('.')[0] + '.txt' | |
| results[answer['category']].append((file, answer['question'], answer['answer'])) | |
| for category, cate_tups in results.items(): | |
| with open(os.path.join(result_dir, f'{category}.txt'), 'w') as fp: | |
| questions = set() # check for duplicates | |
| for file, prompt, answer in cate_tups: | |
| if 'Answer the question using a single word or phrase.' in prompt: | |
| prompt = prompt.replace('Answer the question using a single word or phrase.', '').strip() | |
| if 'Please answer yes or no.' not in prompt: | |
| prompt = prompt + ' Please answer yes or no.' | |
| if (category, file, prompt) not in ground_truth: | |
| prompt = prompt.replace(' Please answer yes or no.', ' Please answer yes or no.') | |
| gt_ans = ground_truth[category, file, prompt] | |
| dup = file, prompt, gt_ans | |
| tup = file, prompt, gt_ans, answer | |
| if dup in questions: | |
| continue | |
| questions.add(dup) | |
| fp.write('\t'.join(tup) + '\n') | |
| evaluator = MMEEvaluator() | |
| scores = evaluator.process_result(result_dir) | |
| accel.print("MME Scores:") | |
| accel.print(scores) | |
| for eval_type, eval_scores in scores.items(): | |
| accel.print("===========", eval_type, "===========") | |
| accel.print("total score:", eval_scores['total'], "\n") | |
| for task_name, score in eval_scores.items(): | |
| accel.print("\t", task_name, " score:", score) | |
| accel.print("\n") | |
| return scores | |
| def evaluate_mmbench(self, model, accel): | |
| # MMBench Evaluation | |
| df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH)) | |
| cur_df = df.copy() | |
| cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category']) | |
| cur_df.insert(6, 'prediction', None) | |
| for inputs, answer in zip(self.inputs, self.gen_answers): | |
| cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer | |
| pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_results.xlsx") | |
| cur_df.to_excel(pred_pth, index=False, engine='openpyxl') | |
| accel.print(f"Finished evaluating MMBench. Change {pred_pth} name to submission.xlsx and evaluate the result file saved to {pred_pth} on OpenCompass server.") | |
| return | |
| def evaluate_mmbench_dev(self, model, accel): | |
| # MMBench Dev Evaluation | |
| df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH_DEV)) | |
| cur_df = df.copy() | |
| cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category']) | |
| cur_df.insert(6, 'prediction', None) | |
| for inputs, answer in zip(self.inputs, self.gen_answers): | |
| cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer[0] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_dev_results.xlsx") | |
| cur_df.to_excel(pred_pth, index=False, engine='openpyxl') | |
| accuracy = (cur_df['prediction'] == cur_df['answer']).mean() | |
| accel.print(f'MMBench_dev Accuracy: {accuracy:.2%}') | |
| return | |
| def evaluate_mmbench_cn(self, model, accel): | |
| # MMBench_CN Evaluation | |
| df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH_CN)) | |
| cur_df = df.copy() | |
| cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category']) | |
| cur_df.insert(6, 'prediction', None) | |
| for inputs, answer in zip(self.inputs, self.gen_answers): | |
| cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer | |
| pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_cn_results.xlsx") | |
| cur_df.to_excel(pred_pth, index=False, engine='openpyxl') | |
| accel.print(f"Finished evaluating MMBench_CN. Change {pred_pth} name to submission.xlsx and evaluate the result file saved to {pred_pth} on OpenCompass server.") | |
| return | |
| def evaluate_mmbench_cn_dev(self, model, accel): | |
| # MMBench_CN Dev Evaluation | |
| df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH_CN_DEV)) | |
| cur_df = df.copy() | |
| cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category']) | |
| cur_df.insert(6, 'prediction', None) | |
| for inputs, answer in zip(self.inputs, self.gen_answers): | |
| cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer[0] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_cn_dev_results.xlsx") | |
| cur_df.to_excel(pred_pth, index=False, engine='openpyxl') | |
| accuracy = (cur_df['prediction'] == cur_df['answer']).mean() | |
| accel.print(f'MMBench_CN_dev Accuracy: {accuracy:.2%}') | |
| return | |
| def evaluate_qbench(self, model, accel): | |
| # QBench Evaluation | |
| pred_answers = [{'id': inputs['id'], 'answer': convert_to_choice(answer, inputs['candidates']), 'gt': inputs['gt'], 'candidates': inputs['candidates']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f'{model}_qbench_results.jsonl') | |
| with open(pred_pth, "w") as pf: | |
| pf.write(json.dumps(pred_answers) + "\n") | |
| results = [(pred['candidates'][pred['answer']] == pred['gt']) for pred in pred_answers] | |
| accel.print (f"QBench Accuracy: {np.mean(results)*100} %") | |
| return np.mean(results)*100 | |
| def evaluate_mmvet(self, model, accel): | |
| # MM-Vet Evaluation | |
| cur_result = {f"{inputs['id']}": answer for inputs, answer in zip(self.inputs, self.gen_answers)} | |
| pred_pth = os.path.join(self.save_dir, f'{model}_mmvet_results.json') | |
| with open(pred_pth, 'w') as f: | |
| json.dump(cur_result, f, indent=2) | |
| accel.print(f"Finished evaluating MM-Vet. Evaluate the result file saved to {pred_pth}.") | |
| return | |
| def evaluate_mmmu(self, model, accel): | |
| # MMMU Evaluation | |
| predictions = {inputs['id']: answer for inputs, answer in zip(self.inputs, self.gen_answers)} | |
| answers = {inputs['id']: {'ground_truth': inputs['gt'], 'question_type': inputs['question_type']} for inputs, answer in zip(self.inputs, self.gen_answers)} | |
| pred_pth = os.path.join(self.save_dir, f'{model}_mmmu_results.json') | |
| with open(pred_pth, "w") as f: | |
| json.dump(predictions, f, indent=2) | |
| ans_pth = os.path.join(self.save_dir, 'mmmu_answers.json') | |
| with open(ans_pth, "w") as pf: | |
| json.dump(answers, pf, indent=2) | |
| # group by category | |
| output_dict_w_cat = {} | |
| for data_id, parsed_pred in predictions.items(): | |
| category = "_".join(data_id.split("_")[1:-1]) | |
| if category not in output_dict_w_cat: | |
| output_dict_w_cat.update({category: {}}) | |
| output_dict_w_cat[category].update({data_id: parsed_pred}) | |
| # group by category | |
| answer_dict_w_cat = {} | |
| for data_id, parsed_pred in answers.items(): | |
| category = "_".join(data_id.split("_")[1:-1]) | |
| if category not in answer_dict_w_cat: | |
| answer_dict_w_cat.update({category: {}}) | |
| answer_dict_w_cat[category].update({data_id: parsed_pred}) | |
| evaluation_result = {} | |
| for category in CAT_SHORT2LONG.values(): | |
| accel.print("Evaluating: {}".format(category)) | |
| # get cat_outputs and cat_answers | |
| try: | |
| cat_outputs = output_dict_w_cat[category] | |
| cat_answers = answer_dict_w_cat[category] | |
| except KeyError: | |
| accel.print("Skipping {} for not found".format(category)) | |
| continue | |
| exampels_to_eval = [] | |
| for data_id, parsed_pred in cat_outputs.items(): | |
| question_type = cat_answers[data_id]['question_type'] | |
| if question_type != 'multiple-choice': | |
| parsed_pred = parse_open_response(parsed_pred) # mainly for type consistency (make it number, etc.) | |
| else: | |
| parsed_pred = parsed_pred | |
| exampels_to_eval.append({ | |
| "id": data_id, | |
| "question_type": question_type, | |
| "answer": cat_answers[data_id]['ground_truth'], | |
| "parsed_pred": parsed_pred | |
| }) | |
| judge_dict, metric_dict = evaluate(exampels_to_eval) | |
| metric_dict.update({"num_example": len(exampels_to_eval)}) | |
| evaluation_result[category] = metric_dict | |
| printable_results = {} | |
| # add domain Subject | |
| for domain, in_domain_cats in DOMAIN_CAT2SUB_CAT.items(): | |
| in_domain_cat_results = {} | |
| for cat_name in in_domain_cats: # use the order in DOMAIN_CAT2SUB_CAT | |
| if cat_name in evaluation_result.keys(): | |
| in_domain_cat_results[cat_name] = evaluation_result[cat_name] | |
| else: | |
| pass | |
| in_domain_ins_acc = calculate_ins_level_acc(in_domain_cat_results) | |
| in_domain_data_num = sum([cat_results['num_example'] for cat_results in in_domain_cat_results.values()]) | |
| printable_results['Overall-' + domain] = {"num": int(in_domain_data_num), | |
| "acc": round(in_domain_ins_acc, 3) | |
| } | |
| # add sub category | |
| for cat_name, cat_results in in_domain_cat_results.items(): | |
| printable_results[cat_name] = {"num": int(cat_results['num_example']), | |
| "acc": round(cat_results['acc'], 3) | |
| } | |
| # table.append(["-----------------------------", "-----", "----"]) | |
| all_ins_acc = calculate_ins_level_acc(evaluation_result) | |
| printable_results['Overall'] = {"num": sum([cat_results['num_example'] for cat_results in evaluation_result.values()]), | |
| "acc": round(all_ins_acc, 3) | |
| } | |
| accel.print(printable_results) | |
| return | |
| def evaluate_mathvista(self, model, accel): | |
| # MathVista Evaluation | |
| pred_answers = [{'pid': inputs['id'], 'image': inputs['id'], 'response': answer, | |
| 'question_type': inputs['question_type'], 'answer_type': inputs['answer_type'], 'metadata': inputs['metadata'], | |
| 'choices': inputs['choices'], 'query': inputs['question'], 'precision': inputs['precision'],} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| predictions = {pred['pid']: pred for pred in pred_answers} | |
| pred_pth = os.path.join(self.save_dir, f"{model}_mathvista_results.json") | |
| json.dump(predictions, open(pred_pth, "w")) | |
| accel.print(f"Finished evaluating MathVista. Evaluate the result file saved to {pred_pth}.") | |
| return | |
| def evaluate_ai2d(self, model, accel): | |
| # AI2D Evaluation | |
| pred_answers = [{'question_id': inputs['id'], 'answer': answer, 'gt': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_ai2d_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| # Compute accuracy | |
| pattern = re.compile(r'[A-Z]') | |
| results = [(char_to_int(pattern.findall(answer)[0]) == inputs['gt']) for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| accel.print(f"AI2D Accuracy: {np.mean(results)*100} %") | |
| return np.mean(results)*100 | |
| def evaluate_hallusionbench(self, model, accel): | |
| # HallusionBench Evaluation | |
| pred_answers = [{'answer': '1' if answer.lower().find('yes') != -1 else '0', 'question': inputs['question'], 'gt': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_hallusionbench_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| # Compute accuracy | |
| results = [(answer['answer'] == answer['gt']) for answer in pred_answers] | |
| accel.print(f"HallusionBench Accuracy: {np.mean(results)*100} %") | |
| return np.mean(results)*100 | |
| def evaluate_chartqa(self, model, accel): | |
| # ChartQA Evaluation | |
| # post processing | |
| processed_answers = [] | |
| for x in self.gen_answers: | |
| if any(i.isdigit() for i in x): | |
| processed_answers.append(x.split(" ")[0]) | |
| else: | |
| processed_answers.append(x) | |
| pred_answers = [{'answer': answer, 'question': inputs['question'], 'annotation': inputs['gt']} for inputs, answer in zip(self.inputs, processed_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_chartqa_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| # Compute accuracy | |
| acc = evaluate_relaxed_accuracy(pred_answers) | |
| accel.print(f"ChartQA Accuracy: {acc*100}%") | |
| return acc | |
| def evaluate_seed(self, model, accel): | |
| # SEED Evaluation | |
| pred_answers = [{'answer': answer, 'question': inputs['question'], 'question_id': inputs['id'], 'gt': inputs['gt'], 'question_type': inputs['question_type']} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f"{model}_seed_results.json") | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| # Compute accuracy | |
| results = [(answer['answer'] == answer['gt']) for answer in pred_answers] | |
| accel.print (f"SEED Accuracy: {np.mean(results)*100} %") | |
| # Per question type accuracy | |
| for k, v in SEED_TYPES.items(): | |
| sub_results = [] | |
| for pred in pred_answers: | |
| if pred['question_type'] == k: | |
| sub_results.append(pred['answer'] == pred['gt']) | |
| accel.print (f"{v}: {np.mean(sub_results)*100} %") | |
| return np.mean(results)*100 | |
| def evaluate_llava(self, model, accel): | |
| # LLaVA-in-the-Wild Evaluation | |
| pred_answers = [{'question_id': inputs['id'], 'prompt': inputs['question'], 'text': answer, "answer_id": shortuuid.uuid()} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| sorted_answers = sorted(pred_answers, key=lambda x: x['question_id']) | |
| pred_pth = os.path.join(self.save_dir, f'{model}_llava_results.jsonl') | |
| ans_file = open(pred_pth, "w") | |
| for pred in sorted_answers: | |
| ans_file.write(json.dumps(pred) + "\n") | |
| ans_file.flush() | |
| ans_file.close() | |
| accel.print(f"Finished evaluating LLaVA-in-the-wild. Evaluate the result file saved to {pred_pth}.") | |
| return | |
| def evaluate_blink(self, model, accel): | |
| # BLINK Evaluation | |
| # TODO | |
| return | |
| def evaluate_mathverse(self, model, accel): | |
| # Mathverse Evaluation | |
| pred_answers = [{'sample_index' : inputs['id'], 'problem_index' : inputs['problem_index'], 'problem_version' : inputs['problem_version'], | |
| 'question' : inputs['origin_question'], 'answer' : inputs['gt'], | |
| 'question_type': inputs['question_type'], 'question_type': inputs['question_type'], | |
| 'metadata': inputs['metadata'], 'query_wo': inputs['question'], 'query_cot' : inputs['query_cot'], 'model_answer' : answer} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| # answers = [item for item in pred_answers if item['problem_version'] != 'Text_Only'] | |
| # text_only_answers = [item for item in pred_answers if item['problem_version'] == 'Text_Only'] | |
| pred_pth = os.path.join(self.save_dir, f'{model}_mathverse_results.json') | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| pred_pth = os.path.join(self.save_dir, f'{model}_mathverse_scores.json') | |
| eval_mathverse(self.save_dir, pred_answers,f'{model}_mathverse_extracts.json', f'{model}_mathverse_scores.json') | |
| accel.print(f"Finished evaluating MathVerse. Evaluate the result file saved to {pred_pth}.") | |
| # TODO | |
| return | |
| def evaluate_mmstar(self, model, accel): | |
| pred_answers = [{'question': inputs['question'], | |
| 'answer': inputs['answer'], | |
| 'category': inputs['category'], | |
| 'l2_category': inputs['l2_category'], | |
| # 'bench': inputs['bench'], | |
| 'prediction' : answer} for inputs, answer in zip(self.inputs, self.gen_answers)] | |
| pred_pth = os.path.join(self.save_dir, f'{model}_mmstar_results.json') | |
| json.dump(pred_answers, open(pred_pth, "w")) | |
| df = pd.DataFrame(pred_answers) | |
| eval_mmstar(df, self.save_dir, f'{model}_mmstar_scores.json') | |
| pred_pth = os.path.join(self.save_dir, f'{model}_mmstar_scores.json') | |
| accel.print(f"Finished evaluating MMStar. Evaluate the result file saved to {pred_pth}.") | |