| import json |
| import os.path as osp |
| import re |
|
|
| from datasets import Dataset |
|
|
| from opencompass.openicl.icl_evaluator import BaseEvaluator |
| from opencompass.registry import (ICL_EVALUATORS, LOAD_DATASET, |
| TEXT_POSTPROCESSORS) |
|
|
| from .base import BaseDataset |
|
|
|
|
| @LOAD_DATASET.register_module() |
| class BBHDataset(BaseDataset): |
|
|
| @staticmethod |
| def load(path: str, name: str): |
| with open(osp.join(path, f'{name}.json'), 'r') as f: |
| data = json.load(f)['examples'] |
| dataset = Dataset.from_list(data) |
| return dataset |
|
|
|
|
| @TEXT_POSTPROCESSORS.register_module('bbh-mcq') |
| def bbh_mcq_postprocess(text: str) -> str: |
| ans = text |
| ans_line = ans.split('answer is ') |
| if len(ans_line) != 1: |
| ans = ans_line[1].strip() |
| match = re.search(r'\(([A-Z])\)*', ans) |
| if match: |
| return match.group(1) |
| match = re.search(r'([A-Z])', ans) |
| if match: |
| return match.group(1) |
| return ans |
|
|
|
|
| @TEXT_POSTPROCESSORS.register_module('bbh-freeform') |
| def bbh_freeform_postprocess(text: str) -> str: |
| ans = text |
| ans_line = ans.split('answer is ') |
| if len(ans_line) != 1: |
| ans = ans_line[1].strip() |
| ans = ans.split('\n')[0] |
| if ans.endswith('.'): |
| ans = ans[:-1] |
| return ans |
|
|
|
|
| @ICL_EVALUATORS.register_module() |
| class BBHEvaluator(BaseEvaluator): |
|
|
| def score(self, predictions, references): |
| if len(predictions) != len(references): |
| return { |
| 'error': 'predictions and references have different ' |
| 'length' |
| } |
|
|
| predictions = [bbh_freeform_postprocess(pred) for pred in predictions] |
|
|
| details = [] |
| cnt = 0 |
| for pred, ref in zip(predictions, references): |
| detail = {'pred': pred, 'answer': ref, 'correct': False} |
| if pred == ref: |
| cnt += 1 |
| detail['correct'] = True |
| details.append(detail) |
|
|
| score = cnt / len(predictions) * 100 |
|
|
| return {'score': score, 'details': details} |
|
|
|
|
| @ICL_EVALUATORS.register_module() |
| class BBHEvaluator_mcq(BaseEvaluator): |
|
|
| def score(self, predictions, references): |
| if len(predictions) != len(references): |
| return { |
| 'error': 'predictions and references have different ' |
| 'length' |
| } |
| details = [] |
| cnt = 0 |
| for pred, ref in zip(predictions, references): |
| detail = {'pred': pred, 'answer': ref, 'correct': False} |
| if pred == ref: |
| cnt += 1 |
| detail['correct'] = True |
| details.append(detail) |
|
|
| score = cnt / len(predictions) * 100 |
|
|
| return {'score': score, 'details': details} |
|
|