| import json |
| import os |
| import os.path as osp |
| import re |
| import subprocess |
| from collections import defaultdict |
| from typing import List, Optional |
|
|
| import numpy as np |
| from datasets import Dataset |
|
|
| from opencompass.openicl.icl_evaluator import BaseEvaluator |
| from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET |
|
|
| from .base import BaseDataset |
|
|
|
|
| def load_experiment(file: str) -> dict: |
| """Load single experiment file with solutions.""" |
| with open(file, 'r') as f: |
| notebook = json.load(f) |
| example = notebook['cells'] |
| metadata = notebook['metadata'] |
| modules = metadata.get('modules', []) |
| if modules: |
| |
| assert len(modules) == len(metadata.get('step_types')) |
| |
| modules = [[_m.strip() for _m in _modules.split('&')] |
| for _modules in modules] |
| questions = [] |
| source_codes = [] |
| outputs = [] |
| tags = [] |
| for cell in example: |
| if cell['cell_type'] == 'markdown': |
| text = ''.join(cell['source']).strip() |
| if modules: |
| _modules = modules.pop(0) |
| text += f"Please use {' and '.join(_modules)} modules." |
| text = text.strip() + '\n' |
| |
| questions.append(text) |
| elif cell['cell_type'] == 'code': |
| source_codes.append(''.join(cell['source'])) |
| if cell['outputs'] and 'data' in cell['outputs'][-1]: |
| if 'image/png' in cell['outputs'][-1]['data']: |
| |
| tags.append('vis') |
| outputs.append( |
| cell['outputs'][-1]['data']['image/png']) |
| elif 'text/plain' in cell['outputs'][-1]['data']: |
| tags.append('general') |
| outputs.append(''.join( |
| cell['outputs'][-1]['data']['text/plain'])) |
| else: |
| tags.append('exec') |
| outputs.append(None) |
| return dict( |
| experiment=file, |
| questions=sum(([ |
| dict(role='user', content=question), |
| dict(role='assistant', content=source_code) |
| ] for question, source_code in zip(questions, source_codes)), []), |
| references=dict(outputs=outputs, |
| tags=tags, |
| metadata=metadata, |
| experiment=file), |
| ) |
|
|
|
|
| def load_experiment_template(file: str) -> dict: |
| """Load single experiment file with solutions for template experiment.""" |
| with open(file, 'r') as f: |
| notebook = json.load(f) |
| example = notebook['cells'] |
| metadata = notebook['metadata'] |
| modules = metadata.get('modules', []) |
| if modules: |
| |
| assert len(modules) == len(metadata.get('step_types')) |
| |
| modules = [[_m.strip() for _m in _modules.split('&')] |
| for _modules in modules] |
| questions = [] |
| source_codes = [] |
| outputs = [] |
| tags = [] |
| for cell in example: |
| if cell['cell_type'] == 'markdown': |
| text = ''.join(cell['source']).strip() |
| if modules: |
| _modules = modules.pop(0) |
| if 'chinese' not in file: |
| text += f"Please use {' and '.join(_modules)} modules." |
| else: |
| text += f"请用 {' 和 '.join(_modules)} 模块." |
| text = text.strip() + '\n' |
| |
| questions.append(text) |
| elif cell['cell_type'] == 'code': |
| source_codes.append(''.join(cell['source'])) |
| output_flag = False |
| if cell['outputs']: |
| for _output in cell['outputs']: |
| if _output['output_type'] == 'display_data': |
| assert not output_flag |
| if 'image/png' in _output['data']: |
| output_flag = True |
| tags.append('vis') |
| outputs.append(_output['data']['image/png']) |
| for _output in cell['outputs'][::-1]: |
| if output_flag: |
| break |
| if _output['output_type'] == 'stream' and _output[ |
| 'name'] == 'stdout': |
| assert not output_flag |
| output_flag = True |
| tags.append('general') |
| outputs.append(''.join(_output['text'])) |
| elif _output['output_type'] == 'execute_result': |
| assert not output_flag |
| output_flag = True |
| tags.append('general') |
| outputs.append(''.join( |
| _output['data']['text/plain'])) |
| if not output_flag: |
| |
| tags.append('exec') |
| outputs.append(None) |
| return dict( |
| experiment=file, |
| questions=sum(([ |
| dict(role='user', content=question), |
| dict(role='assistant', content=source_code) |
| ] for question, source_code in zip(questions, source_codes)), []), |
| references=dict(outputs=outputs, |
| tags=tags, |
| metadata=metadata, |
| experiment=file), |
| ) |
|
|
|
|
| def check_internet(): |
| """A tricky way to check internet.""" |
| import socket |
|
|
| import nltk |
| socket.setdefaulttimeout(10) |
| ret = nltk.download('stopwords', quiet=True) |
| socket.setdefaulttimeout(None) |
| if not ret: |
| raise ConnectionError('CIBench needs internet to get response. Please' |
| 'check your internet and proxy.') |
|
|
|
|
| @LOAD_DATASET.register_module() |
| class CIBenchDataset(BaseDataset): |
| """Code Interpreter dataset.""" |
|
|
| @staticmethod |
| def load(path: str, internet_check: bool = False): |
| """Load whole dataset. |
| |
| Args: |
| path(str): Path of cibench dataset. |
| internet_check(bool): Whether to check internet. |
| Defaults to False. |
| """ |
| if internet_check: |
| check_internet() |
| assert os.path.exists(path), f'Path {path} does not exist.' |
| data_list = [] |
| for cwd, dirs, files in os.walk(path): |
| dirs.sort() |
| files.sort() |
| for f in files: |
| if '.ipynb' in f: |
| data = load_experiment(os.path.join(cwd, f)) |
| data_list.append(data) |
|
|
| dataset = Dataset.from_list(data_list) |
| return dataset |
|
|
|
|
| @LOAD_DATASET.register_module() |
| class CIBenchTemplateDataset(BaseDataset): |
| """Code Interpreter dataset for template dataset.""" |
|
|
| @staticmethod |
| def load(path: str, internet_check: bool = False): |
| """Load whole dataset. |
| |
| Args: |
| path(str): Path of cibench dataset. |
| internet_check(bool): Whether to check internet. |
| Defaults to False. |
| """ |
| if internet_check: |
| check_internet() |
| assert os.path.exists(path), f'Path {path} does not exist.' |
| data_list = [] |
| for cwd, dirs, files in os.walk(path): |
| dirs.sort() |
| files.sort() |
| for f in files: |
| if '.ipynb' in f: |
| data = load_experiment_template(os.path.join(cwd, f)) |
| data_list.append(data) |
|
|
| dataset = Dataset.from_list(data_list) |
| return dataset |
|
|
|
|
| class CIBenchEvaluator(BaseEvaluator): |
| """Evaluator for CI dataset. |
| |
| Args: |
| text_evaluator (optional, dict): The text evaluator for text result |
| comparison[]. Defaults to None, which use Rouge as defaults. |
| Please notice that a extra key for `metric_name` should be set |
| to get the exact metric result, such as `rouge1`. |
| output_dir (optional, str): The directory to save experiment |
| files in a markdown or notebook format. |
| with_ipynb (bool): Generate ipynb correspondingly. |
| Defaults to False. |
| user_data_dir (str): The directory to load local files. |
| Defaults to 'ENV', which means use environment variable |
| `USER_DATA_DIR` to get the data dir. |
| """ |
|
|
| def __init__(self, |
| text_evaluator: Optional[dict] = None, |
| output_dir: Optional[str] = None, |
| with_ipynb: bool = False, |
| user_data_dir: str = 'ENV') -> None: |
| if text_evaluator is None: |
| from opencompass.openicl.icl_evaluator import RougeEvaluator |
| self.text_evaluator = ICL_EVALUATORS.build( |
| dict(type=RougeEvaluator)) |
| self.text_eval_metric = 'rouge1' |
| else: |
| self.text_eval_metric = text_evaluator.pop('metric_name') |
| self.text_evaluator = ICL_EVALUATORS.build(text_evaluator) |
| |
| self.output_dir = output_dir |
| self.user_data_dir = self.check_user_data_dir(user_data_dir) |
| self.with_ipynb = with_ipynb |
| self.TAG_MAPPING = { |
| 'exec': ('executable', self.valid_step), |
| 'general': ('general_correct', self.correct_step), |
| 'num': ('numeric_correct', self.correct_step), |
| 'text': ('text_score', self.text_step), |
| 'vis': ('vis_sim', self.vis_similarity_step), |
| } |
|
|
| def check_user_data_dir(self, user_data_dir): |
| if user_data_dir == 'ENV': |
| default_path = osp.abspath('./data/cibench_dataset/datasources') |
| user_data_dir = os.environ.get('USER_DATA_DIR', default_path) |
| user_data_dir = user_data_dir.rstrip('/') |
| basename = osp.basename(user_data_dir) |
| if basename and basename != 'data': |
| user_data_dir = osp.join(user_data_dir, 'data') |
| assert osp.exists(user_data_dir), \ |
| f'a subfolder named `data` should exist under {user_data_dir}.' |
| elif basename: |
| assert osp.exists(user_data_dir), \ |
| f'{user_data_dir} does not exist.' |
| return user_data_dir |
|
|
| @staticmethod |
| def valid_step(step): |
| """Whether the step is executable and valid.""" |
| |
| for action in step[::-1]: |
| if action['type'] == 'IPythonInterpreter': |
| if action['errmsg']: |
| return False |
| else: |
| return True |
| |
| return False |
|
|
| @staticmethod |
| def correct_step(step, target): |
| """Whether the step output is correct.""" |
| |
| for action in step[::-1]: |
| if action['type'] == 'IPythonInterpreter': |
| if action['result']: |
| try: |
| pred = action['result']['text'] |
| match_exec = re.search( |
| 'execute_result:\n\n```\n(.*?)\n```', pred, |
| re.DOTALL) |
| match_stdout = re.search('stdout:\n\n```\n(.*?)\n```', |
| pred, re.DOTALL) |
| |
| |
| if match_exec and match_stdout: |
| match = match_exec |
| elif match_exec: |
| match = match_exec |
| elif match_stdout: |
| match = match_stdout |
| else: |
| match = None |
| if match: |
| out = match.group(1) |
| score = (out.strip() == target.strip() |
| or target.strip() in out.strip()) |
| return score |
| except Exception: |
| return False |
| |
| return False |
|
|
| def text_step(self, step, target): |
| """Whether the step output is correct.""" |
| |
| for action in step[::-1]: |
| if action['type'] == 'IPythonInterpreter': |
| if action['result']: |
| try: |
| pred = action['result']['text'] |
| match = re.search('```\n(.*?)\n```', pred, re.DOTALL) |
| if match: |
| out = match.group(1) |
| score = self.text_evaluator.score([out], [target]) |
| return score[self.text_eval_metric] / 100 |
| except Exception: |
| return False |
| |
| return False |
|
|
| @staticmethod |
| def vis_similarity_step(step, target): |
| """Whether the step output image has the same structure similarity with |
| the given images.""" |
| |
| import base64 |
|
|
| import skimage |
|
|
| for action in step[::-1]: |
| if action['type'] == 'IPythonInterpreter': |
| if action['result']: |
| try: |
| pred = action['result']['text'] |
| match = re.search(r'!\[fig-[0-9]*\]\((.*?)\)', pred, |
| re.DOTALL) |
| if match: |
| img_pred = match.group(1) |
| img2 = base64.b64decode(target) |
| img2 = skimage.io.imread(img2, plugin='imageio') |
| img1 = skimage.io.imread(img_pred, plugin='imageio') |
| img1 = skimage.transform.resize(img1, img2.shape[:2]) |
| img1 = 255 * img1 |
| |
| img1 = img1.astype(np.uint8) |
| ssim = skimage.metrics.structural_similarity( |
| img1, img2, channel_axis=-1) |
| |
| |
| |
| return ssim |
| except Exception: |
| return 0 |
| |
| return 0 |
|
|
| def save_results(self, origin_prompt, steps): |
| """Save the prediction result in a markdown and notebook format.""" |
|
|
| def check_jupytext(): |
| """Check requirements existence.""" |
| from shutil import which |
|
|
| assert which('jupytext'), ( |
| "Please install jupytext use 'pip install jupytext' to ensure" |
| 'the conversion processes.') |
|
|
| check_jupytext() |
| p_list = [] |
| from opencompass.lagent.actions.ipython_interpreter import extract_code |
| for idx, (example_origin_prompt, |
| example_steps) in enumerate(zip(origin_prompt, steps)): |
| markdown_lines = [] |
| for prompt, step in zip(example_origin_prompt, example_steps): |
| for action in step[::-1]: |
| if action['type'] == 'IPythonInterpreter': |
| valid_action = action |
| break |
| |
| valid_action = step[-1] |
| markdown_lines.append(prompt) |
| markdown_lines.append('\n') |
| code_text = valid_action['args']['text'] |
| code_text = extract_code(code_text) |
| code_text = '```python\n' + code_text + '\n```' |
| markdown_lines.append(code_text) |
| markdown_lines.append('\n') |
|
|
| md_file = f'experiment{idx}.md' |
| with open(md_file, 'w') as f: |
| f.writelines(markdown_lines) |
|
|
| |
| |
| |
| |
| if self.with_ipynb: |
| p = subprocess.Popen( |
| 'jupytext --to ipynb --pipe-fmt ipynb ' |
| "--pipe 'jupyter nbconvert --to ipynb --execute " |
| f"--allow-errors --stdin --stdout' {md_file}", |
| shell=True) |
| p_list.append(p) |
| |
| for p in p_list: |
| p.wait() |
|
|
| def set_data_dir(self, work_dir): |
| """Set work directory and link data files for save notebook results.""" |
| if self.user_data_dir: |
| basename = osp.basename(self.user_data_dir) |
|
|
| if not osp.exists(osp.join(self.output_dir, basename)): |
| os.symlink(self.user_data_dir, |
| osp.join(self.output_dir, basename)) |
| os.chdir(work_dir) |
|
|
| def unset_data_dir(self, work_dir): |
| """Change work directory and keep the symlink.""" |
| os.chdir(work_dir) |
|
|
| def single_exp(self, gold, steps): |
| tags = gold['tags'] |
| outputs = gold['outputs'] |
| metadata = gold['metadata'] |
| hard_tags = metadata.get('step_types', []) |
| if hard_tags: |
| tags = hard_tags |
|
|
| |
| |
| |
| |
| |
|
|
| |
| result = dict() |
| if hard_tags: |
| check_tags = ['exec', 'num', 'text', 'vis'] |
| else: |
| check_tags = ['exec', 'general', 'vis'] |
| for tag in check_tags: |
| key = self.TAG_MAPPING[tag][0] |
| result[key] = [] |
|
|
| for tag, step, output in zip(tags, steps, outputs): |
| |
| result['executable'].append(self.valid_step(step)) |
| if tag != 'exec': |
| key, func = self.TAG_MAPPING[tag] |
| result[key].append(func(step, output)) |
|
|
| return result |
|
|
| def get_output_dir(self): |
| """Get output dir from eval task. |
| |
| Notice: output dir should be in format xxx/data. |
| All the needed files should be |
| """ |
| |
| if hasattr(self, '_out_dir') and self.output_dir is None: |
| self.output_dir = self._out_dir |
|
|
| def score(self, predictions: List, references: List, steps: List, |
| origin_prompt: List): |
| """Calculate accuracy.""" |
| if len(steps) != len(references): |
| return {'error': 'steps and refrs have different length'} |
| cwd = os.getcwd() |
| self.get_output_dir() |
| if self.output_dir: |
| if not osp.exists(self.output_dir): |
| os.makedirs(self.output_dir) |
| self.set_data_dir(self.output_dir) |
| self.save_results(origin_prompt, steps) |
| self.unset_data_dir(cwd) |
|
|
| total_results = defaultdict(float) |
| total_scores = defaultdict(float) |
| total_nums = defaultdict(int) |
| for gold, single_steps in zip(references, steps): |
| result = self.single_exp(gold, single_steps) |
|
|
| for k, v in result.items(): |
| total_scores[k] += sum(v) |
| total_nums[k] += len(v) |
|
|
| for k, v in total_scores.items(): |
| if total_nums[k] > 0: |
| total_results[k] = total_scores[k] / total_nums[k] * 100 |
| else: |
| total_results[k] = -1 |
|
|
| return total_results |
|
|