| """MMLU benchmark evaluation utilities. | |
| This module provides evaluation functions for MMLU (Massive Multitask Language Understanding) | |
| benchmark variants including MMLU, MMLU Pro, and MMLU Redux. It supports boxed answer | |
| extraction and multiple-choice answer formats. | |
| """ | |
| import argparse | |
| import glob | |
| import json | |
| import os | |
| import re | |
| import numpy as np | |
| import pandas | |
| def read_jsonl_data(datapath): | |
| """Read model outputs from JSONL file. | |
| Args: | |
| datapath: Path to JSONL file containing model outputs | |
| Returns: | |
| list: List of output strings | |
| """ | |
| data_list = [] | |
| with open(datapath, "r") as f: | |
| for line in f: | |
| data_item = json.loads(line.strip()) | |
| data_list.append(data_item['output']) | |
| return data_list | |
| def get_option_char(s: str): | |
| """Extract single-letter option from LaTeX \\boxed{} construct. | |
| Handles multiple formats: | |
| - \\boxed{B} | |
| - \\boxed{\\text{D}} | |
| - \\boxed{\\text{(E)}} | |
| - \\boxed{{A}} | |
| - \\boxed{(A)} | |
| Args: | |
| s: String containing potential boxed answer | |
| Returns: | |
| str or None: Extracted letter, or None if no match found | |
| """ | |
| pattern = r""" | |
| \\boxed\{ # \boxed{ | |
| \s* | |
| (?: # one of: | |
| \\text\{ # \text{…} | |
| \s*\(?([A-Za-z])\)?\s* | |
| \} | |
| | # or | |
| \{([A-Za-z])\} # {A} | |
| | # or | |
| \(\s*([A-Za-z])\s*\) # (A) | |
| | # or | |
| ([A-Za-z]) # B | |
| ) | |
| \s* | |
| \} # } | |
| """ | |
| m = re.search(pattern, s, re.VERBOSE) | |
| if not m: | |
| return None | |
| return m.group(1) or m.group(2) or m.group(3) or m.group(4) | |
| def evaluate_mmlu_boxed(input_datapath, test_datapath, print_per_category_acc=False): | |
| """Evaluate MMLU with boxed answer format. | |
| Supports batch split files for large-scale evaluation. | |
| Args: | |
| input_datapath: Path to model output JSONL file (or base path for splits) | |
| test_datapath: Path to MMLU test CSV file | |
| print_per_category_acc: Whether to print per-category accuracy (default: False) | |
| Returns: | |
| float: Accuracy score | |
| """ | |
| print("reading from %s" % test_datapath) | |
| df = pandas.read_csv(test_datapath) | |
| if os.path.exists(input_datapath): | |
| output_list = read_jsonl_data(input_datapath) | |
| test_list = [row.to_dict() for _, row in df.iterrows()] | |
| else: | |
| test_list_full = [row.to_dict() for _, row in df.iterrows()] | |
| test_list = [] | |
| output_list = [] | |
| for i in range(32): | |
| input_datapath_i = os.path.join('/'.join(input_datapath.split("/")[:-1]), f"mmlu_{i*439}to{((i+1)*439)}_thinking.jsonl") | |
| if os.path.exists(input_datapath_i): | |
| output_list.extend(read_jsonl_data(input_datapath_i)) | |
| test_list += test_list_full[i*439:(i+1)*439] | |
| print("#Test_samples:", len(test_list), "#Output_samples:", len(output_list)) | |
| assert len(output_list) == len(test_list) | |
| correct = 0 | |
| count_not_found = 0 | |
| by_subject_len = {} | |
| by_subject_correct = {} | |
| for i, (output, gold) in enumerate(zip(output_list, test_list)): | |
| subject = gold['Subject'] | |
| if subject not in by_subject_len: | |
| by_subject_len[subject] = 0 | |
| by_subject_correct[subject] = 0 | |
| by_subject_len[subject] += 1 | |
| output_ans = get_option_char(output) | |
| if output_ans is None: | |
| pattern2 = r'\((A|B|C|D)\)' | |
| match2 = re.search(pattern2, output) | |
| output_ans = match2.group(1) if match2 else None | |
| if output_ans is None: | |
| for opt_letter in ['A', 'B', 'C', 'D']: | |
| if str(output).lower().strip() == str(test_list[i][opt_letter]).lower().strip(): | |
| output_ans = opt_letter | |
| break | |
| if output_ans is None: | |
| count_not_found += 1 | |
| gold_ans = gold['Answer'] | |
| if output_ans == gold_ans: | |
| correct += 1 | |
| by_subject_correct[subject] += 1 | |
| if print_per_category_acc: | |
| for subject in by_subject_len: | |
| print(f"{subject}: {by_subject_len[subject]}, {by_subject_correct[subject] / by_subject_len[subject]}") | |
| print("count_not_found:", count_not_found) | |
| acc = correct / len(output_list) | |
| print("score:", acc) | |
| return acc | |
| def evaluate_mmlu_redux_boxed(input_datapath, test_datapath, print_per_category_acc=False): | |
| """Evaluate MMLU-Redux with boxed answer format. | |
| MMLU-Redux is a refined version of MMLU with improved question quality. | |
| Args: | |
| input_datapath: Path to model output JSONL file (or base path for splits) | |
| test_datapath: Path to MMLU-Redux test JSON file | |
| print_per_category_acc: Whether to print per-category accuracy (default: False) | |
| Returns: | |
| float: Accuracy score | |
| """ | |
| if os.path.exists(input_datapath): | |
| output_list = read_jsonl_data(input_datapath) | |
| test_list = json.load(open(test_datapath, "r")) | |
| else: | |
| test_list_full = json.load(open(test_datapath, "r")) | |
| test_list = [] | |
| output_list = [] | |
| for i in range(8): | |
| input_datapath_i = os.path.join('/'.join(input_datapath.split("/")[:-1]), f"mmlu_redux_{i*375}to{((i+1)*375)}_thinking.jsonl") | |
| if os.path.exists(input_datapath_i): | |
| output_list.extend(read_jsonl_data(input_datapath_i)) | |
| test_list += test_list_full[i*375:(i+1)*375] | |
| print("#Test_samples:", len(test_list), "#Output_samples:", len(output_list)) | |
| assert len(output_list) == len(test_list) | |
| correct = 0 | |
| count_not_found = 0 | |
| by_subject_len = {} | |
| by_subject_correct = {} | |
| for i, (output, gold) in enumerate(zip(output_list, test_list)): | |
| subject = gold['category'] | |
| if subject not in by_subject_len: | |
| by_subject_len[subject] = 0 | |
| by_subject_correct[subject] = 0 | |
| by_subject_len[subject] += 1 | |
| output_ans = get_option_char(output) | |
| if output_ans is None: | |
| pattern2 = r'\((A|B|C|D)\)' | |
| match2 = re.search(pattern2, output) | |
| output_ans = match2.group(1) if match2 else None | |
| if output_ans is None: | |
| for j, opt_letter in enumerate(['A', 'B', 'C', 'D']): | |
| if str(output).lower().strip() == str(test_list[i]['options'][j]).lower().strip(): | |
| output_ans = opt_letter | |
| break | |
| if output_ans is None: | |
| count_not_found += 1 | |
| gold_ans = gold['answer'] | |
| if output_ans == gold_ans: | |
| correct += 1 | |
| by_subject_correct[subject] += 1 | |
| if print_per_category_acc: | |
| for subject in by_subject_len: | |
| print(f"{subject}: {by_subject_len[subject]}, {by_subject_correct[subject] / by_subject_len[subject]}") | |
| print("count_not_found:", count_not_found) | |
| acc = correct / len(output_list) | |
| print("score:", acc) | |
| return acc | |
| def evaluate_mmlu_pro_boxed(input_datapath, test_datapath, print_per_category_acc=False): | |
| """Evaluate MMLU-Pro with boxed answer format. | |
| MMLU-Pro extends MMLU with more challenging questions and up to 16 options (A-P). | |
| Args: | |
| input_datapath: Path to model output JSONL file (or base path for splits) | |
| test_datapath: Path to MMLU-Pro test JSON file | |
| print_per_category_acc: Whether to print per-category accuracy (default: False) | |
| Returns: | |
| float: Accuracy score | |
| """ | |
| if os.path.exists(input_datapath): | |
| output_list = read_jsonl_data(input_datapath) | |
| test_list = json.load(open(test_datapath, "r")) | |
| else: | |
| test_list_full = json.load(open(test_datapath, "r")) | |
| test_list = [] | |
| output_list = [] | |
| for i in range(32): | |
| input_datapath_i = os.path.join('/'.join(input_datapath.split("/")[:-1]), f"mmlu_pro_{i*376}to{((i+1)*376)}_thinking.jsonl") | |
| if os.path.exists(input_datapath_i): | |
| output_list.extend(read_jsonl_data(input_datapath_i)) | |
| test_list += test_list_full[i*376:(i+1)*376] | |
| if len(output_list) == 0: | |
| for i in range(64): | |
| input_datapath_i = os.path.join('/'.join(input_datapath.split("/")[:-1]), f"mmlu_pro_{i*188}to{((i+1)*188)}_thinking.jsonl") | |
| if os.path.exists(input_datapath_i): | |
| output_list.extend(read_jsonl_data(input_datapath_i)) | |
| test_list += test_list_full[i*188:(i+1)*188] | |
| else: | |
| print(f"mmlu_pro_{i*188}to{((i+1)*188)}_thinking.jsonl not found") | |
| print("#Test_samples:", len(test_list), "#Output_samples:", len(output_list)) | |
| assert len(output_list) == len(test_list) | |
| correct = 0 | |
| count_not_found = 0 | |
| by_subject_len = {} | |
| by_subject_correct = {} | |
| for output, gold in zip(output_list, test_list): | |
| subject = gold['category'] | |
| if subject not in by_subject_len: | |
| by_subject_len[subject] = 0 | |
| by_subject_correct[subject] = 0 | |
| by_subject_len[subject] += 1 | |
| output_ans = get_option_char(output) | |
| if output_ans is None: | |
| pattern2 = r'\((A|B|C|D|E|F|G|H|I|J|K|L|M|N|O|P)\)' | |
| pattern2_re = re.compile(pattern2, re.IGNORECASE) | |
| matches2 = pattern2_re.findall(output) | |
| if len(matches2) >= 1: | |
| output_ans = matches2[-1] | |
| else: | |
| output_ans = None | |
| if output_ans is None: | |
| count_not_found += 1 | |
| gold_ans = gold['answer'] | |
| if output_ans and output_ans.lower() == gold_ans.lower(): | |
| correct += 1 | |
| by_subject_correct[subject] += 1 | |
| if print_per_category_acc: | |
| for subject in by_subject_len: | |
| print(f"{subject}: {by_subject_len[subject]}, {by_subject_correct[subject] / by_subject_len[subject]}") | |
| acc = correct / len(output_list) | |
| print("count_not_found:", count_not_found) | |
| print("score:", acc) | |
| return acc | |
| def get_args(): | |
| """Parse command-line arguments for MMLU evaluation script. | |
| Returns: | |
| argparse.Namespace: Parsed arguments | |
| """ | |
| parser = argparse.ArgumentParser(description="MMLU Benchmark Evaluation") | |
| parser.add_argument("--modelfolder", type=str, required=True, | |
| help="Path to model output folder") | |
| parser.add_argument("--testfolder", type=str, required=True, | |
| help="Path to test data folder") | |
| parser.add_argument('--verbose', action='store_true', default=False, | |
| help='Print per-category accuracy') | |
| args = parser.parse_args() | |
| return args | |
| def main(): | |
| """Main evaluation function for MMLU benchmark variants.""" | |
| args = get_args() | |
| model_folder = args.modelfolder | |
| test_datafolder = args.testfolder | |
| print_per_category_acc = args.verbose | |
| print("Evaluating MMLU Pass@1 results...") | |
| import glob | |
| input_datapaths = glob.glob(f"{model_folder}/outputs_*/mmlu_r1_0to15000.jsonl") | |
| test_datapath = os.path.join(test_datafolder, "mmlu/mmlu_test.csv") | |
| accs = [] | |
| for input_datapath in input_datapaths: | |
| acc = evaluate_mmlu_boxed(input_datapath, test_datapath, print_per_category_acc) | |
| accs.append(acc) | |
| mmlu_acc = np.mean(accs) | |
| mmlu_std = np.std(accs) if len(accs) > 1 else 0 | |
| print("avg acc for MMLU:", mmlu_acc, "std:", mmlu_std) | |
| input_datapaths = os.path.join(f"{model_folder}/outputs_vllm073_topp0.95_seed111", "mmlu_pro_thinking.jsonl") | |
| test_datapath = os.path.join(test_datafolder, "mmlu_pro/test.json") | |
| mmlu_pro_accs = [] | |
| acc = evaluate_mmlu_pro_boxed(input_datapaths, test_datapath, print_per_category_acc) | |
| mmlu_pro_accs.append(acc) | |
| mmlu_pro_acc = np.mean(mmlu_pro_accs) | |
| mmlu_pro_std = np.std(mmlu_pro_accs) if len(mmlu_pro_accs) > 1 else 0 | |
| print("avg acc for MMLU-Pro:", mmlu_pro_acc, "std:", mmlu_pro_std) | |
| if __name__ == "__main__": | |
| main() |