Spaces:
Runtime error
Runtime error
| import dataclasses | |
| import itertools | |
| import os | |
| import re | |
| import tempfile | |
| from collections import defaultdict, Counter | |
| from pathlib import Path | |
| import datasets | |
| import evaluate | |
| import numpy as np | |
| from tqdm import tqdm | |
| from .execution import execute_predictions | |
| STDOUT_PARSE_REGEX = re.compile(r"^TEST-(.+)\.\.\.(.+)$", flags=re.MULTILINE) | |
| _CITATION = """\ | |
| @article{orlanski2023measuring, | |
| title={Measuring The Impact Of Programming Language Distribution}, | |
| author={Orlanski, Gabriel and Xiao, Kefan and Garcia, Xavier and Hui, Jeffrey and Howland, Joshua and Malmaud, Jonathan and Austin, Jacob and Singh, Rishah and Catasta, Michele}, | |
| journal={arXiv preprint arXiv:2302.01973}, | |
| year={2023} | |
| } | |
| """ | |
| _DESCRIPTION = """\ | |
| This metric implements the evaluation harness for datasets translated with the BabelCode framework as described in the paper "Measuring The Impact Of Programming Language Distribution" (https://arxiv.org/abs/2302.01973). | |
| """ | |
| _KWARGS_DESCRIPTION = """ | |
| Calculates how many predictions per question pass a set of tests for the given problem. | |
| Args: | |
| predictions: The list of predictions for each question to execute. | |
| languages: The language to use for each question. | |
| question_dicts: The information for each question. | |
| k: number of code candidates to consider in the evaluation (Default: [1, 10, 100]) | |
| num_workers: number of workers used to evaluate the candidate programs (Default: 4). | |
| language_timeout: Timeouts to use for each language. If it is not set, will default to the one in the question dict (Default: None). | |
| Returns: | |
| pass_at_k: dict with pass rates for each k | |
| results: dict with granular results of each unittest | |
| Examples: | |
| >>> bc_eval = evaluate.load("bc_eval") | |
| >>> predictions = [["def add(a,b):\n\treturn a+b", "def add(a,b):\n\treturn a-b"]] | |
| >>> languages = ["Python"] | |
| >>> question_dicts = [{"test_code": "...", "entry_fn_name": "add","entry_cls_name":"Solution", "test_case_ids":["0","1"],"test_list":"..."}] | |
| >>> pass_at_k, results = code_eval.compute(predictions=predictions,languages=languages, question_dicts=question_dicts, k=[1, 2]) | |
| >>> print(pass_at_k) | |
| {'pass@1': 0.5, 'pass@2': 1.0} | |
| """ | |
| _WARNING = """ | |
| ################################################################################ | |
| !!!WARNING!!! | |
| ################################################################################ | |
| The "bc_eval" metric executes untrusted model-generated code in Python. | |
| Although it is highly unlikely that model-generated code will do something | |
| overtly malicious in response to this test suite, model-generated code may act | |
| destructively due to a lack of model capability or alignment. | |
| Users are strongly encouraged to sandbox this evaluation suite so that it | |
| does not perform destructive actions on their host or network. For more | |
| information on how OpenAI sandboxes its code, see the paper "Evaluating Large | |
| Language Models Trained on Code" (https://arxiv.org/abs/2107.03374). | |
| Once you have read this disclaimer and taken appropriate precautions, | |
| set the environment variable HF_ALLOW_CODE_EVAL="1". Within Python you can to this | |
| with: | |
| import os | |
| os.environ["HF_ALLOW_CODE_EVAL"] = "1" | |
| ################################################################################\ | |
| """ | |
| _QUESTION_INFO_KEYS = { | |
| "entry_fn_name", | |
| "entry_cls_name", | |
| "test_code", | |
| "test_list", | |
| "test_case_ids", | |
| "extension" | |
| } | |
| def make_file_and_command(qid, idx, pred, question, working_dir, timeout_override=None): | |
| file_name = f"pred.{question['extension']}" | |
| pred_dir = working_dir.joinpath(f'{qid}_{idx}') | |
| pred_dir.mkdir(parents=True) | |
| pred_file = pred_dir.joinpath(file_name) | |
| with pred_file.open("w",encoding='utf-8') as f: | |
| code = question["test_code"] | |
| code = question["test_code"].replace("PLACEHOLDER_CODE_BODY", pred) | |
| code = code.replace("PLACEHOLDER_FN_NAME", question["entry_fn_name"]) | |
| code = code.replace("PLACEHOLDER_CLS_NAME", question["entry_cls_name"]) | |
| f.write(code) | |
| commands = [] | |
| for cmd, t in zip(question["commands"], question["timeouts"]): | |
| commands.append( | |
| { | |
| "timeout": t if timeout_override is None else timeout_override, | |
| "command": [c if c != "__FILENAME__" else file_name for c in cmd], | |
| } | |
| ) | |
| return {"qid": qid, "idx": idx, "commands": commands, "cwd": pred_dir} | |
| def _write_preds( | |
| preds, | |
| languages, | |
| language_timeout, | |
| question_dicts, | |
| tmp_dir, | |
| ): | |
| commands = [] | |
| question_id_to_dict = {} | |
| for pred_list, l, q_dict in tqdm( | |
| zip(preds, languages, question_dicts), desc="Setup", total=len(preds) | |
| ): | |
| qid = len(question_id_to_dict) | |
| q_dict["language"] = l | |
| question_id_to_dict[qid] = q_dict | |
| for p_idx, p in enumerate(pred_list): | |
| commands.append( | |
| make_file_and_command( | |
| qid=qid, | |
| idx=str(p_idx), | |
| pred=p, | |
| question=q_dict, | |
| timeout_override=language_timeout.get(l), | |
| working_dir=tmp_dir, | |
| ) | |
| ) | |
| return question_id_to_dict, commands | |
| class BabelCodeEval(evaluate.Metric): | |
| def _info(self): | |
| list_keys = ["timeouts", "commands", "test_case_ids"] | |
| question_info_type = { | |
| k: datasets.Value(dtype="string") | |
| for k in _QUESTION_INFO_KEYS | |
| if k not in list_keys | |
| } | |
| question_info_type["test_case_ids"] = datasets.Sequence(datasets.Value("string")) | |
| question_info_type["commands"] = datasets.Sequence(datasets.Sequence(datasets.Value("string"))) | |
| question_info_type["timeouts"] = datasets.Sequence(datasets.Value("int32")) | |
| return evaluate.MetricInfo( | |
| # This is the description that will appear on the metrics page. | |
| description=_DESCRIPTION, | |
| citation=_CITATION, | |
| inputs_description=_KWARGS_DESCRIPTION, | |
| # This defines the format of each prediction and reference | |
| features=datasets.Features( | |
| { | |
| "predictions": datasets.Sequence(datasets.Value("string")), | |
| "languages": datasets.Value("string"), | |
| "question_dicts": question_info_type, | |
| } | |
| ), | |
| homepage="https://github.com/google-research/babelcode", | |
| codebase_urls=["https://github.com/google-research/babelcode"], | |
| reference_urls=["https://github.com/google-research/babelcode"], | |
| ) | |
| def _compute( | |
| self, | |
| predictions, | |
| languages, | |
| question_dicts, | |
| k=[1, 10, 100], | |
| num_workers=4, | |
| language_timeout=None, | |
| max_task_per_child=10, | |
| gc_freq=500 | |
| ): | |
| """Returns the scores""" | |
| if os.getenv("HF_ALLOW_CODE_EVAL", 0) != "1": | |
| raise ValueError(_WARNING) | |
| language_timeout = language_timeout or {} | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| working_dir = Path(tmp_dir) | |
| question_map, pred_commands = _write_preds( | |
| preds=predictions, | |
| languages=languages, | |
| language_timeout=language_timeout, | |
| question_dicts=question_dicts, | |
| tmp_dir=working_dir, | |
| ) | |
| results = execute_predictions( | |
| pred_commands, | |
| num_workers=num_workers, | |
| max_task_per_child=max_task_per_child, | |
| garbage_collection_freq=gc_freq, | |
| ) | |
| all_results, q_passes, q_pct, o_count = _eval_predictions(results, question_map) | |
| assert len(q_passes) == len(q_pct) | |
| metrics = {} | |
| for lang in q_passes: | |
| metrics.update( | |
| _calculate_metrics(lang, q_passes[lang], q_pct[lang], o_count[lang], k_vals=k) | |
| ) | |
| return metrics, all_results | |
| def _eval_single_pred(result, test_ids, num_expected_commands): | |
| test_case_results = {k: "MISSING" for k in test_ids} | |
| if len(result["results"]) != num_expected_commands: | |
| return "HAD_ERROR", 0, test_case_results | |
| last_result = result["results"][-1] | |
| if last_result.timed_out: | |
| return "TIMED_OUT", 0, test_case_results | |
| elif last_result.return_code != 0: | |
| return "HAD_ERROR", 0, test_case_results | |
| elif not last_result.stdout: | |
| return "HAD_ERROR", 0, test_case_results | |
| for match in STDOUT_PARSE_REGEX.findall(last_result.stdout): | |
| idx, test_result = match | |
| if idx in test_ids: | |
| if test_case_results[idx] != "MISSING": | |
| return "UNKNOWN_ERROR", 0, test_case_results | |
| test_case_results[idx] = test_result.strip() | |
| did_test_fail = False | |
| had_error = False | |
| num_passed = 0 | |
| for r in test_case_results.values(): | |
| if r == "PASSED": | |
| num_passed += 1 | |
| elif r == "FAILED": | |
| did_test_fail = True | |
| else: | |
| had_error = True | |
| if had_error: | |
| return "HAD_RUNTIME_ERROR", num_passed, test_case_results | |
| if did_test_fail: | |
| return "FAILED", num_passed, test_case_results | |
| return "PASSED", num_passed, test_case_results | |
| def _eval_predictions(pred_results, question_map): | |
| out = [] | |
| question_results = defaultdict(lambda: defaultdict(list)) | |
| question_pct_pass = defaultdict(lambda: defaultdict(list)) | |
| outcome_counts = defaultdict(Counter) | |
| for p in pred_results: | |
| question = question_map[p["qid"]] | |
| test_cases = question["test_case_ids"] | |
| num_expected_commands = len(question["commands"]) | |
| outcome, num_passed, test_case_results = _eval_single_pred( | |
| p, test_ids=test_cases, num_expected_commands=num_expected_commands | |
| ) | |
| p["results"] = [dataclasses.asdict(r) for r in p["results"]] | |
| p["test_cases"] = test_case_results | |
| p["outcome"] = outcome | |
| lang = question["language"] | |
| question_results[lang][p["qid"]].append(num_passed == len(test_case_results)) | |
| question_pct_pass[lang][p["qid"]].append(num_passed / len(test_case_results)) | |
| outcome_counts[lang][outcome] += 1 | |
| out.append(p) | |
| return out, question_results, question_pct_pass, outcome_counts | |
| def _calculate_metrics(lang, q_passed, q_pcts, o_count, k_vals): | |
| assert len(q_passed) == len(q_pcts) | |
| num_samples = np.zeros(len(q_passed)) | |
| num_correct = np.zeros(len(q_passed)) | |
| pcts_passed = np.zeros(len(q_passed)) | |
| for i, (k, v) in enumerate(q_passed.items()): | |
| num_samples[i] = len(v) | |
| num_correct[i] = sum(v) | |
| pcts_passed[i] = np.mean(q_pcts[k]) | |
| out = { | |
| f"{lang}/pass@{k}": estimate_pass_at_k(num_samples, num_correct, k).mean() | |
| for k in k_vals | |
| } | |
| out[f"{lang}/mean_pct_pass"] = np.mean(pcts_passed) | |
| out[f"{lang}/median_pct_pass"] = np.median(pcts_passed) | |
| for outcome, val in o_count.items(): | |
| out[f"{lang}/pct_{outcome}"] = val/len(q_passed) | |
| return out | |
| def estimate_pass_at_k(num_samples, num_correct, k): | |
| """Estimates pass@k of each problem and returns them in an array.""" | |
| def estimator(n: int, c: int, k: int) -> float: | |
| """Calculates 1 - comb(n - c, k) / comb(n, k).""" | |
| if n - c < k: | |
| return 1.0 | |
| return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1)) | |
| if isinstance(num_samples, int): | |
| num_samples_it = itertools.repeat(num_samples, len(num_correct)) | |
| else: | |
| assert len(num_samples) == len(num_correct) | |
| num_samples_it = iter(num_samples) | |
| return np.array( | |
| [estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)] | |
| ) | |