|
|
"""Code generation benchmark evaluation utilities. |
|
|
|
|
|
This module provides evaluation functions for coding benchmarks including: |
|
|
- LiveCodeBench (v5 and v6) |
|
|
|
|
|
It includes code execution, test case verification, and correctness checking. |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import copy |
|
|
import glob |
|
|
import json |
|
|
import multiprocessing |
|
|
import os |
|
|
import re |
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed |
|
|
from typing import Optional |
|
|
|
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
|
|
|
from tools.code_verifier_utils import run_test |
|
|
|
|
|
|
|
|
def check_coding_correctness(problem_to_check: Optional[dict], timeout, debug=False): |
|
|
"""Check correctness of code generation with a global timeout. |
|
|
|
|
|
The global timeout is to catch some extreme/rare cases not handled by the timeouts |
|
|
inside run_test. |
|
|
|
|
|
Args: |
|
|
problem_to_check: Dictionary containing code and test cases |
|
|
timeout: Timeout in seconds for each test case |
|
|
debug: Whether to enable debug mode (default: False) |
|
|
|
|
|
Returns: |
|
|
bool: True if all test cases pass, False otherwise |
|
|
""" |
|
|
"""Check correctness of code generation with a global timeout. |
|
|
The global timeout is to catch some extreme/rare cases not handled by the timeouts |
|
|
inside `run_test`""" |
|
|
|
|
|
def _temp_run(problem_to_check, debug, result, metadata_list, timeout): |
|
|
try: |
|
|
res, metadata = run_test(problem_to_check, debug=debug, timeout=timeout) |
|
|
result.append(res) |
|
|
metadata_list.append(metadata) |
|
|
except Exception as e: |
|
|
result.append([-1 for i in range(len(problem_to_check['input_output']))]) |
|
|
metadata_list.append(e) |
|
|
|
|
|
manager = multiprocessing.Manager() |
|
|
result = manager.list() |
|
|
metadata_list = manager.list() |
|
|
|
|
|
total_timeout = (timeout + 1) * len(problem_to_check['input_output']) + 10 |
|
|
p = multiprocessing.Process(target=_temp_run, args=(problem_to_check, debug, result, metadata_list, timeout)) |
|
|
p.start() |
|
|
p.join(timeout=total_timeout + 1) |
|
|
if p.is_alive(): |
|
|
p.kill() |
|
|
|
|
|
judge_value = bool(result and np.all(np.array(result[0]) > 0)) |
|
|
return judge_value |
|
|
|
|
|
|
|
|
def update_results(result, timeout=6): |
|
|
"""Update results with correctness checking. |
|
|
|
|
|
Args: |
|
|
result: Dictionary containing generated code and test cases |
|
|
timeout: Timeout in seconds for code execution (default: 6) |
|
|
|
|
|
Returns: |
|
|
dict: Response entry with correctness status and reason |
|
|
""" |
|
|
response_entry = { |
|
|
"content": result['generation'], |
|
|
"correctness": None, |
|
|
"reason": None, |
|
|
} |
|
|
|
|
|
problem_to_check = copy.deepcopy(result) |
|
|
curr_res = check_coding_correctness(problem_to_check, timeout=timeout) |
|
|
|
|
|
if curr_res: |
|
|
response_entry["correctness"] = True |
|
|
response_entry["reason"] = "" |
|
|
else: |
|
|
response_entry["correctness"] = False |
|
|
response_entry["reason"] = "Code is incorrect." |
|
|
|
|
|
return response_entry |
|
|
|
|
|
|
|
|
def evaluate_livecodebench(input_datapath, test_datapath): |
|
|
"""Evaluate LiveCodeBench code generation performance. |
|
|
|
|
|
Args: |
|
|
input_datapath: Path to model output JSONL file |
|
|
test_datapath: Path to LiveCodeBench test JSON file |
|
|
|
|
|
Returns: |
|
|
float: Accuracy score (proportion of correctly solved problems) |
|
|
""" |
|
|
print("reading from %s" % input_datapath) |
|
|
id2generation = {} |
|
|
with open(input_datapath, "r") as f: |
|
|
for line in f: |
|
|
item = json.loads(line) |
|
|
id2generation[item['task_id']] = item['output'] |
|
|
print("length of id2generation:", len(id2generation)) |
|
|
|
|
|
print("reading from %s" % test_datapath) |
|
|
with open(test_datapath, "r") as f: |
|
|
test_list = json.load(f) |
|
|
print("length of test_list:", len(test_list)) |
|
|
|
|
|
combined_results = {} |
|
|
for data_item in test_list: |
|
|
id_ = data_item['question_id'] |
|
|
output = id2generation[id_] |
|
|
|
|
|
all_testcases = data_item['private_test_cases'] + json.loads(data_item['public_test_cases']) |
|
|
|
|
|
metadata = json.loads(data_item['metadata']) |
|
|
if "func_name" in metadata: |
|
|
func_name = metadata['func_name'] |
|
|
else: |
|
|
func_name = "" |
|
|
|
|
|
combined_results[id_] = { |
|
|
'input_output': all_testcases, |
|
|
'starter_code': func_name, |
|
|
'question_id': id_, |
|
|
'generation': output |
|
|
} |
|
|
|
|
|
total_questions = len(combined_results) |
|
|
print("length of combined_results:", total_questions) |
|
|
|
|
|
total_correct = 0 |
|
|
total_finish = 0 |
|
|
records = [] |
|
|
|
|
|
with ProcessPoolExecutor(max_workers=32) as executor: |
|
|
|
|
|
future_to_task = {} |
|
|
token_usages = {} |
|
|
for idx, (q_id, result) in enumerate(combined_results.items()): |
|
|
future_to_task[ |
|
|
executor.submit( |
|
|
update_results, result |
|
|
) |
|
|
] = idx |
|
|
|
|
|
for future in tqdm( |
|
|
as_completed(future_to_task), |
|
|
total=len(future_to_task), |
|
|
desc="Processing Generations", |
|
|
): |
|
|
idx = future_to_task[future] |
|
|
response_entry = future.result() |
|
|
total_correct += response_entry["correctness"] |
|
|
total_finish += 1 |
|
|
records.append(response_entry) |
|
|
|
|
|
acc = total_correct / total_questions |
|
|
print("accuracy:", acc) |
|
|
|
|
|
return acc |
|
|
|
|
|
|
|
|
def get_args(): |
|
|
"""Parse command-line arguments for code evaluation script. |
|
|
|
|
|
Returns: |
|
|
argparse.Namespace: Parsed arguments |
|
|
""" |
|
|
parser = argparse.ArgumentParser(description="Code Benchmark Evaluation") |
|
|
parser.add_argument("--modelfolder", type=str, required=True, |
|
|
help="Path to model output folder") |
|
|
parser.add_argument("--testfolder", type=str, required=True, |
|
|
help="Path to test data folder") |
|
|
args = parser.parse_args() |
|
|
return args |
|
|
|
|
|
|
|
|
def has_code(response): |
|
|
"""Check if response contains Python code blocks. |
|
|
|
|
|
Args: |
|
|
response: Model output string |
|
|
|
|
|
Returns: |
|
|
list: List of code blocks found in the response |
|
|
""" |
|
|
pattern = r"```python(?:[a-zA-Z0-9]*)\n(.*?)```" |
|
|
matches = re.findall(pattern, response, re.DOTALL) |
|
|
return matches |
|
|
|
|
|
|
|
|
def check_finish(input_datapath): |
|
|
finish_rates = [] |
|
|
with open(input_datapath, "r") as f: |
|
|
for line in f: |
|
|
item = json.loads(line) |
|
|
if not item['reason']: |
|
|
finish_rates.append(0) |
|
|
output = item['output'] |
|
|
finish_rates.append(1 if has_code(output) else 0) |
|
|
|
|
|
return np.mean(finish_rates) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main evaluation function for code generation benchmarks.""" |
|
|
args = get_args() |
|
|
|
|
|
model_folder = args.modelfolder |
|
|
test_datafolder = args.testfolder |
|
|
|
|
|
|
|
|
tmp_list = [] |
|
|
finish_list = [] |
|
|
input_datapaths = glob.glob(model_folder+"/outputs_*/lcb5.jsonl") |
|
|
for input_datapath in input_datapaths: |
|
|
test_datapath = os.path.join(test_datafolder, "livecodebench/test_aug2024tojan2025.json") |
|
|
print("="*80) |
|
|
lines = open(input_datapath).readlines() |
|
|
if len(lines) != 279: |
|
|
print(f"skipping {input_datapath} due to incorrect number of lines {len(lines)}") |
|
|
continue |
|
|
tmp_acc = evaluate_livecodebench(input_datapath, test_datapath) |
|
|
finish_rate = check_finish(input_datapath) |
|
|
tmp_list.append(tmp_acc) |
|
|
finish_list.append(finish_rate) |
|
|
|
|
|
acc = np.mean(tmp_list) |
|
|
finish = np.mean(finish_list) |
|
|
finish_std = np.std(finish_list)/(len(finish_list)**0.5) |
|
|
|
|
|
lcb5_acc = acc |
|
|
lcb5_finish = finish |
|
|
lcb5_std = np.std(tmp_list)/(len(tmp_list)**0.5) |
|
|
|
|
|
print("="*80) |
|
|
print("avg acc for livecodebench v5: %.4f (std of mean: %.4f) (runs: %d)" % (lcb5_acc, lcb5_std, len(tmp_list))) |
|
|
print("avg finish rate for livecodebench v5: %.4f (std of mean: %.4f) (runs: %d)" % (finish, finish_std, len(finish_list))) |
|
|
|
|
|
|
|
|
tmp_list = [] |
|
|
finish_list = [] |
|
|
input_datapaths = glob.glob(model_folder+"/outputs_*/lcb6.jsonl") |
|
|
for input_datapath in input_datapaths: |
|
|
test_datapath = os.path.join(test_datafolder, "livecodebench/test_feb2025toApr2025.json") |
|
|
print("="*80) |
|
|
lines = open(input_datapath).readlines() |
|
|
if len(lines) != 175: |
|
|
print(f"skipping {input_datapath} due to incorrect number of lines {len(lines)}") |
|
|
continue |
|
|
tmp_acc = evaluate_livecodebench(input_datapath, test_datapath) |
|
|
finish_rate = check_finish(input_datapath) |
|
|
tmp_list.append(tmp_acc) |
|
|
finish_list.append(finish_rate) |
|
|
|
|
|
acc = np.mean(tmp_list) |
|
|
finish = np.mean(finish_list) |
|
|
finish_std = np.std(finish_list) / (len(finish_list) ** 0.5) |
|
|
|
|
|
lcb6_acc = acc |
|
|
lcb6_finish = finish |
|
|
lcb6_std = np.std(tmp_list)/(len(tmp_list)**0.5) |
|
|
|
|
|
print("="*80) |
|
|
print("avg acc for livecodebench v6: %.4f (std of mean: %.4f) (runs: %d)" % (lcb6_acc, lcb6_std, len(tmp_list))) |
|
|
print("avg finish rate for livecodebench v6: %.4f (std of mean: %.4f) (runs: %d)" % (finish, finish_std, len(finish_list))) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|