| import argparse |
| import json |
| import os |
| import pickle |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| from copy import deepcopy |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from rich.progress import track |
|
|
| from evalplus.data import write_jsonl |
| from tools.tsr.coverage_init import collect_coverage_info |
| from tools.tsr.mutation_init import collect_mutation_info |
| from tools.tsr.sample_init import collect_sample_info |
| from tools.tsr.utils import get_problems, get_task_ids, to_path |
|
|
|
|
| def global_util_init(dataset: str): |
| global problems |
| global task_ids |
| global problem_count |
| problems = get_problems(dataset) |
| task_ids = get_task_ids(dataset) |
| problem_count = len(problems) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def merge_set_cover(*args) -> Dict[str, List[str]]: |
| merged_set_cover = {task_id: [] for task_id in task_ids} |
| for set_cover_dict in args: |
| for task_id, plus_tests in set_cover_dict.items(): |
| for plus_test in plus_tests: |
| if plus_test not in merged_set_cover[task_id]: |
| merged_set_cover[task_id].append(plus_test) |
| return merged_set_cover |
|
|
|
|
| def greedy_cover( |
| task_id: str, tests: Dict[str, List[Any]], exclude_model: str |
| ) -> Tuple[str, List[str]]: |
| q, U = [], set() |
| for test_name, test_cover in tests.items(): |
| cover_set = set() |
| for model_path, i_code in test_cover: |
| if exclude_model not in model_path: |
| cover_set.add((model_path, i_code)) |
| q.append((test_name, cover_set)) |
| U = U.union(cover_set) |
| |
| min_cover = [] |
| while len(U) > 0: |
| max_uncover_set, max_test_name = {}, "" |
| for test_name, cover_set in q: |
| if len(cover_set) > len(max_uncover_set): |
| max_uncover_set = cover_set |
| max_test_name = test_name |
| min_cover.append(max_test_name) |
| U = U - max_uncover_set |
| qq = [] |
| for test_name, cover_set in q: |
| new_cover_set = U.intersection(cover_set) |
| if len(new_cover_set) != 0: |
| qq.append((test_name, new_cover_set)) |
| q = qq |
| return task_id, min_cover |
|
|
|
|
| def parallel_greedy_cover( |
| info_dict: Optional[Dict[str, Dict[str, List[Any]]]], |
| exclude_model: str, |
| type: str, |
| **kwargs, |
| ) -> Dict[str, List[str]]: |
| plus_tests = {task_id: [] for task_id in task_ids} |
|
|
| with ProcessPoolExecutor(max_workers=32) as executor: |
| futures = [] |
| for task_id in task_ids: |
| if type == "sample": |
| path_task_id = to_path(task_id) |
| sample_dir = kwargs["sample_dir"] |
| with open(os.path.join(sample_dir, f"{path_task_id}.pkl"), "rb") as f: |
| td = pickle.load(f) |
| args = (task_id, td, exclude_model) |
| else: |
| args = (task_id, info_dict[task_id], exclude_model) |
| futures.append(executor.submit(greedy_cover, *args)) |
| for future in track(as_completed(futures), f"min set cover :: {type}"): |
| task_id, min_cover = future.result() |
| plus_tests[task_id] = min_cover |
|
|
| return plus_tests |
|
|
|
|
| |
| |
| |
|
|
|
|
| def get_coverage_set_cover( |
| coverage_dir: str, exclude_model: str, dataset: str |
| ) -> Dict[str, List[str]]: |
| coverage_info_dict = collect_coverage_info(coverage_dir, dataset) |
| return parallel_greedy_cover(coverage_info_dict, exclude_model, "coverage") |
|
|
|
|
| def get_mutation_set_cover( |
| mutation_dir: str, exclude_model: str, dataset: str |
| ) -> Dict[str, List[str]]: |
| mutation_info_dict = collect_mutation_info( |
| os.path.join(mutation_dir, "eval_results.json"), dataset |
| ) |
| return parallel_greedy_cover(mutation_info_dict, exclude_model, "mutation") |
|
|
|
|
| def get_sample_set_cover( |
| sample_dir: str, sample_eval_dir: str, exclude_model: str, dataset: str |
| ) -> Dict[str, List[str]]: |
| collect_sample_info(sample_dir, sample_eval_dir, dataset) |
| return parallel_greedy_cover(None, exclude_model, "sample", sample_dir=sample_dir) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def compute_avg_test(set_cover_info: Dict[str, List[str]]) -> float: |
| sum_tests = sum( |
| len(problems[task_id]["base_input"]) + len(set_cover_info[task_id]) |
| for task_id in task_ids |
| ) |
| return sum_tests / problem_count |
|
|
|
|
| def gen_report(set_cover_info: Dict[str, List[str]], sample_eval_dir: str, model: str): |
| tsr_dict = {"ntests": compute_avg_test(set_cover_info), "pass@1": 0} |
| model_path = os.path.join(sample_eval_dir, f"{model}_temp_0.0", "eval_results.json") |
| with open(model_path, "r") as f: |
| mdict = json.load(f) |
| correct_cnt = 0 |
| for task_id in task_ids: |
| legacy_task_id = task_id |
| if legacy_task_id not in mdict["eval"]: |
| legacy_task_id = legacy_task_id.replace("/", "_") |
| if mdict["eval"][legacy_task_id]["base"][0][0] != "success": |
| continue |
| correct = True |
| for plus_id in set_cover_info[task_id]: |
| index = int(plus_id.split("_")[-1]) |
| if mdict["eval"][legacy_task_id]["plus"][0][1][index] == False: |
| correct = False |
| break |
| if correct: |
| correct_cnt += 1 |
| tsr_dict["pass@1"] = correct_cnt / problem_count |
| return tsr_dict |
|
|
|
|
| def dump_humaneval_plus_mini(set_cover_info: Dict[str, List[str]], mini_path: str): |
| new_problems = [] |
| for task_id in task_ids: |
| otask = problems[task_id] |
| task = { |
| "task_id": task_id, |
| "prompt": otask["prompt"], |
| "contract": otask["contract"], |
| "canonical_solution": otask["canonical_solution"], |
| "entry_point": otask["entry_point"], |
| "base_input": otask["base_input"], |
| "plus_input": [], |
| "atol": otask["atol"], |
| } |
| for plus_test in set_cover_info[task_id]: |
| index = int(plus_test.split("_")[-1]) |
| task["plus_input"].append(otask["plus_input"][index]) |
| new_problems.append(deepcopy(task)) |
| write_jsonl(os.path.join(mini_path, "HumanEvalPlus-Mini.jsonl"), new_problems) |
|
|
|
|
| def main(flags): |
| coverage_dir = os.path.join(flags.report_dir, "coverage_cache") |
| mutation_dir = os.path.join(flags.report_dir, "mutation_cache") |
| sample_dir = os.path.join(flags.report_dir, "sample_cache") |
| os.makedirs(flags.report_dir, exist_ok=True) |
|
|
| exclude_model: str = flags.model |
| if exclude_model.endswith("b"): |
| exclude_model = "".join(exclude_model.split("-")[:-1]) |
|
|
| coverage_set_cover = get_coverage_set_cover( |
| coverage_dir, exclude_model, flags.dataset |
| ) |
| mutation_set_cover = get_mutation_set_cover( |
| mutation_dir, exclude_model, flags.dataset |
| ) |
| sample_set_cover = get_sample_set_cover( |
| sample_dir, flags.sample_eval_dir, exclude_model, flags.dataset |
| ) |
| merged_set_cover = merge_set_cover( |
| coverage_set_cover, mutation_set_cover, sample_set_cover |
| ) |
|
|
| if flags.model != "ALL": |
| final_report = dict() |
| |
| final_report["coverage"] = gen_report( |
| coverage_set_cover, flags.sample_eval_dir, flags.model |
| ) |
| |
| final_report["mutation"] = gen_report( |
| mutation_set_cover, flags.sample_eval_dir, flags.model |
| ) |
| |
| final_report["sample"] = gen_report( |
| sample_set_cover, flags.sample_eval_dir, flags.model |
| ) |
| |
| final_report["full"] = gen_report( |
| merged_set_cover, flags.sample_eval_dir, flags.model |
| ) |
| with open( |
| os.path.join(flags.report_dir, f"report_{flags.model}.json"), "w" |
| ) as f: |
| json.dump(final_report, f, indent=4) |
| else: |
| dump_humaneval_plus_mini(merged_set_cover, flags.mini_path) |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--model", required=True, type=str, help="Model for testing") |
| parser.add_argument("--dataset", type=str, choices=["humaneval", "mbpp"]) |
| parser.add_argument( |
| "--report_dir", type=str, help="Path to JSON report and cache files" |
| ) |
| parser.add_argument( |
| "--sample_eval_dir", type=str, help="Path to sample evaluation files" |
| ) |
| parser.add_argument("--mini_path", type=str, help="Path to Mini Dataset") |
| args = parser.parse_args() |
|
|
| global_util_init(args.dataset) |
| main(args) |
|
|