| import json | |
| import multiprocessing | |
| import os | |
| import shutil | |
| from ctypes import c_int, c_int64 | |
| from typing import Callable, Dict, List, Optional | |
| from tqdm import tqdm | |
| # we save the current working directory and restore them later | |
| cwd = os.getcwd() | |
| cache_wd = cwd + "/cache" | |
| tmp_chmod = os.chmod | |
| tmp_fchmod = os.fchmod | |
| tmp_chdir = os.chdir | |
| tmp_rmdir = os.rmdir | |
| # tmp_open = open | |
| tmp_print = print | |
| tmp_rm_tree = shutil.rmtree | |
| tmp_unlink = os.unlink | |
| def process_one_model( | |
| model_name: str, | |
| dataset_name: str, | |
| programs_and_test: List[Dict], | |
| test_set_name: str = "default", | |
| max_execution_time: float = 5.0, | |
| binary_grade: bool = False, | |
| fast_algo: bool = True, | |
| ) -> None: | |
| """Get the inferences in f"inferences output/{dataset_name}/{model_name}", calculate each entry's | |
| accuracy, then write to f"inferences output/{dataset_name}/processed/{test_set_name}/{model_name}" where: | |
| - each entry is appended with its accuracy | |
| - each entry is processed using processing_func | |
| We do this because 1) executing program can be costly, and we do not want to reexecute | |
| them each time. 2) For pairRM, we would like to present the whole program in an | |
| executable state, which might not be possible without post processing. | |
| Parameter: | |
| model_name: the model name, ex: "deepseek_coder_1.3b_greedy" | |
| dataset_name: the dataset name, ex: "mbpp" or "evol" | |
| tests: a list of a list of strings | |
| processing_func: the function which is used to process the output further before | |
| being evaluated. The default is to just return the whole string itself. | |
| max_execution_time: the maximum execution time given to each test case | |
| binary_grade: if set to True, will give a grade of 1 if all tests passed. However | |
| will give a grade of 0 if even one test failed. If set to false, the grade will | |
| be a floating number between 0 and 1. | |
| fast_algo: if set to True, then we will use a faster algorithm but less accurate (usually | |
| used for best of n). The slow version is much slower but more accurate (greedy) | |
| """ | |
| os.makedirs( | |
| f"acecoder output/{dataset_name}/processed/{test_set_name}/", exist_ok=True | |
| ) | |
| programs = programs_and_test["program"] | |
| if len(programs) == 0: | |
| return # we already processed this | |
| output = [] | |
| for i in tqdm(programs.keys(), leave=False): | |
| # run the outputted program as it is | |
| for program_idx, program in enumerate(programs[i]): | |
| question_tests = tests[i] | |
| if len(question_tests) == 0: | |
| acc = 0 | |
| test_case_status = [] | |
| elif fast_algo: | |
| test_case_status = get_successful_tests_slow( | |
| program=program, | |
| tests=question_tests, | |
| max_execution_time=max_execution_time, | |
| ) | |
| acc = sum(test_case_status) / len(test_case_status) | |
| if binary_grade and acc < 1: | |
| acc = 0 | |
| output.append( | |
| json.dumps( | |
| { | |
| "id": i, | |
| "inference_id": program_idx, | |
| "response": program, | |
| "accuracy": acc, | |
| "test_case_status": test_case_status, | |
| } | |
| ) | |
| ) | |
| # ------------------------------------------------------------- | |
| # The fast but not accurate version | |
| # ------------------------------------------------------------- | |
| return_var_2 = multiprocessing.Value(c_int64, 0) | |
| def run_tests_against_program_helper_2(func: str, tests: List[str]) -> float: | |
| """Return 1 if func finish running, 0 otherwise""" | |
| execution_context = {} | |
| execution_context.update({"__builtins__": __builtins__}) | |
| try: | |
| # try running the function declaration first | |
| exec(func, execution_context) | |
| # tmp_print("Function Executed Correctly") | |
| except Exception as e: | |
| # if this fails then tests will not work | |
| # tmp_print(e) | |
| return_var_2.value = 0 | |
| return 0 | |
| for idx, test in enumerate(tests): | |
| try: | |
| # try the tests individually | |
| exec(test, execution_context) | |
| return_var_2.value += 2**idx | |
| # tmp_print("a test passed") | |
| except Exception as e: | |
| # tmp_print(e) | |
| pass | |
| # tmp_print(f"Return value: {return_var.value}") | |
| # tmp_print("---------------------------") | |
| return return_var_2.value | |
| # very unstable, seems to work, seeing there are no uses will be deprecated for now. | |
| def get_successful_tests_fast( | |
| program: str, tests: List[str], max_execution_time: float = 1.0 | |
| ) -> List[int]: | |
| """Run a program against a list of tests, if the program exited successfully then we consider | |
| the test to be passed. Note that you SHOULD ONLY RUN THIS FUNCTION IN A VIRTUAL ENVIRONMENT | |
| as we do not gurantee the safety of the program provided. | |
| Parameter: | |
| program: a string representation of the python program you want to run | |
| tests: a list of assert statements which are considered to be the test cases | |
| max_execution_time: the number of second each individual test can run before | |
| it is considered failed and terminated | |
| Return: | |
| a list of 0/1 indicating passed or not""" | |
| test_ct = len(tests) | |
| if test_ct == 0: | |
| return [] | |
| if not should_execute(program=program, tests=tests): | |
| return [0] * len(tests) | |
| reliability_guard() | |
| return_var_2.value = 0 | |
| p = multiprocessing.Process( | |
| target=run_tests_against_program_helper_2, args=(program, tests) | |
| ) | |
| p.start() | |
| p.join(timeout=max_execution_time) | |
| if p.is_alive(): | |
| p.kill() | |
| partial_undo_reliability_guard() | |
| num = int(return_var_2.value) | |
| return [(num >> i) & 1 for i in range(len(tests))] | |
| # ------------------------------------------------------------- | |
| # The slow but accurate version | |
| # ------------------------------------------------------------- | |
| return_var = multiprocessing.Value(c_int, 0) | |
| def run_single_test_against_program_helper(func: str, test: str) -> int: | |
| """Return 1 if func finish running, 0 otherwise""" | |
| execution_context = {} | |
| execution_context.update({"__builtins__": __builtins__}) | |
| try: | |
| exec(func, execution_context) | |
| exec(test, execution_context) | |
| return_var.value = 1 | |
| return 1 | |
| except Exception as e: | |
| return_var.value = 0 | |
| return 0 | |
| # very unstable, seems to work, seeing there are no uses will be deprecated for now. | |
| def get_successful_tests_slow( | |
| program: str, tests: List[str], max_execution_time: float = 1.0 | |
| ) -> List[int]: | |
| """Run a program against a list of tests, if the program exited successfully then we consider | |
| the test to be passed. Note that you SHOULD ONLY RUN THIS FUNCTION IN A VIRTUAL ENVIRONMENT | |
| as we do not gurantee the safety of the program provided. | |
| Parameter: | |
| program: a string representation of the python program you want to run | |
| tests: a list of assert statements which are considered to be the test cases | |
| max_execution_time: the number of second each individual test can run before | |
| it is considered failed and terminated | |
| Return: | |
| a list of 0/1 indicating passed or not""" | |
| test_ct = len(tests) | |
| if test_ct == 0: | |
| return [] | |
| if not should_execute(program=program, tests=tests): | |
| return [0] * len(tests) | |
| reliability_guard() | |
| result = [] | |
| for test in tests: | |
| return_var.value = 0 | |
| p = multiprocessing.Process( | |
| target=run_single_test_against_program_helper, args=(program, test) | |
| ) | |
| p.start() | |
| p.join(timeout=max_execution_time) | |
| if p.is_alive(): | |
| p.kill() | |
| result.append(return_var.value) | |
| partial_undo_reliability_guard() | |
| return result | |
| # ------------------------------------------------------------- | |
| # Utility | |
| # ------------------------------------------------------------- | |
| def should_execute(program: str, tests: List[str]) -> bool: | |
| """Determine if we should try to execute this program at all for safety | |
| reasons.""" | |
| dangerous_commands = [ | |
| "threading", | |
| "multiprocess", | |
| "multiprocessing", | |
| "import os", | |
| "from os", | |
| "shutil", | |
| "import torch", | |
| "from torch", | |
| "import sklearn", | |
| "from sklearn", | |
| ] | |
| for comm in dangerous_commands: | |
| if comm in program: | |
| return False # assume the program fails | |
| return True | |
| # ------------------------------------------------------------- | |
| # For safety handling | |
| # ------------------------------------------------------------- | |
| def partial_undo_reliability_guard(): | |
| """Undo the chmod, fchmod, print and open operation""" | |
| import builtins | |
| os.chmod = tmp_chmod | |
| os.fchmod = tmp_fchmod | |
| os.chdir = tmp_chdir | |
| os.unlink = tmp_unlink | |
| os.rmdir = tmp_rmdir | |
| # shutil.rmtree = tmp_rmtree | |
| # builtins.open = tmp_open | |
| builtins.print = tmp_print | |
| # restore working directory | |
| os.chdir(cwd) | |
| # shutil.rmtree(cache_wd) | |
| shutil.rmtree = tmp_rm_tree | |
| def reliability_guard(maximum_memory_bytes: Optional[int] = None): | |
| """ | |
| This function is copied from https://github.com/openai/human-eval/blob/master/human_eval/execution.py. | |
| It disables various destructive functions and prevents the generated code | |
| from interfering with the test (e.g. fork bomb, killing other processes, | |
| removing filesystem files, etc.) | |
| WARNING | |
| This function is NOT a security sandbox. Untrusted code, including, model- | |
| generated code, should not be blindly executed outside of one. See the | |
| Codex paper for more information about OpenAI's code sandbox, and proceed | |
| with caution. | |
| """ | |
| import faulthandler | |
| import platform | |
| if maximum_memory_bytes is not None: | |
| import resource | |
| resource.setrlimit( | |
| resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes) | |
| ) | |
| resource.setrlimit( | |
| resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes) | |
| ) | |
| if not platform.uname().system == "Darwin": | |
| resource.setrlimit( | |
| resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes) | |
| ) | |
| faulthandler.disable() | |
| import builtins | |
| builtins.exit = None | |
| builtins.quit = None | |
| # builtins.open = None | |
| builtins.print = lambda *args, **kwargs: None | |
| import os | |
| # we save the current working directory and restore them later | |
| os.makedirs(cache_wd, exist_ok=True) | |
| os.chdir(cache_wd) | |
| # os.environ["OMP_NUM_THREADS"] = "1" | |
| # os.kill = None | |
| os.system = None | |
| os.putenv = None | |
| os.remove = None | |
| os.removedirs = None | |
| os.rmdir = None | |
| os.fchdir = None | |
| os.setuid = None | |
| # os.fork = None | |
| os.forkpty = None | |
| os.killpg = None | |
| os.rename = None | |
| os.renames = None | |
| os.truncate = None | |
| os.replace = None | |
| os.unlink = None | |
| os.fchmod = None | |
| os.fchown = None | |
| os.chmod = None | |
| os.chown = None | |
| os.chroot = None | |
| os.fchdir = None | |
| os.lchflags = None | |
| os.lchmod = None | |
| os.lchown = None | |
| os.getcwd = None | |
| os.chdir = None | |
| import shutil | |
| shutil.rmtree = None | |
| shutil.move = None | |
| shutil.chown = None | |
| import subprocess | |
| subprocess.Popen = None # type: ignore | |
| # __builtins__['help'] = None | |
| import sys | |
| sys.modules["ipdb"] = None | |
| sys.modules["joblib"] = None | |
| sys.modules["resource"] = None | |
| sys.modules["psutil"] = None | |
| sys.modules["tkinter"] = None | |
| if __name__ == "__main__": | |
| # for testing purpose | |
| program = "a = 1" | |
| bad_test = "assert False" | |
| good_test = "assert count_good_splits('aabbbb') == 3" | |
| time_out_test = f""" | |
| def count_excellent_splits(s): | |
| # Helper function to check if a substring can be split into AABB | |
| def is_excellent_split(sub): | |
| if len(sub) % 2 != 0: | |
| return 0 # Only even length substrings can be split into AABB | |
| mid = len(sub) // 2 | |
| if sub[:mid] == sub[mid:]: | |
| return 1 # First half equals second half => valid AABB | |
| return 0 | |
| count = 0 | |
| # Iterate over all substrings | |
| for i in range(len(s)): | |
| for j in range(i + 1, len(s) + 1): | |
| count += is_excellent_split(s[i:j]) | |
| return count | |
| # Test the function with an example | |
| S = "abacaba" | |
| print(count_excellent_splits(S)) | |
| """ | |
| tests = [ | |
| # bad_test, | |
| # good_test, | |
| # bad_test, | |
| # good_test, | |
| # good_test, | |
| # time_out_test, | |
| good_test | |
| ] | |
| test_case_status = get_successful_tests_slow( | |
| program=program, | |
| tests=tests, | |
| ) | |
| print(test_case_status) | |
| # res = run_tests_against_program_helper_2(program, tests) | |
| test_case_status = get_successful_tests_fast( | |
| program=program, | |
| tests=[ | |
| bad_test, | |
| bad_test, | |
| time_out_test, | |
| time_out_test, | |
| time_out_test, | |
| time_out_test, | |
| ], | |
| ) | |
| print(test_case_status) | |
Xet Storage Details
- Size:
- 13.7 kB
- Xet hash:
- 4cdf24b4f7b8209949e639b71b4625e0a99e54756c6fc8103ba82b96bb7dbd19
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.