File size: 9,150 Bytes
3cdba69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""Code generation benchmark evaluation utilities.

This module provides evaluation functions for coding benchmarks including:
- LiveCodeBench (v5 and v6)

It includes code execution, test case verification, and correctness checking.
"""

import argparse
import copy
import glob
import json
import multiprocessing
import os
import re
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Optional

import numpy as np
from tqdm import tqdm

from tools.code_verifier_utils import run_test


def check_coding_correctness(problem_to_check: Optional[dict], timeout, debug=False):
    """Check correctness of code generation with a global timeout.
    
    The global timeout is to catch some extreme/rare cases not handled by the timeouts
    inside run_test.
    
    Args:
        problem_to_check: Dictionary containing code and test cases
        timeout: Timeout in seconds for each test case
        debug: Whether to enable debug mode (default: False)
    
    Returns:
        bool: True if all test cases pass, False otherwise
    """
    """Check correctness of code generation with a global timeout.
    The global timeout is to catch some extreme/rare cases not handled by the timeouts
    inside `run_test`"""

    def _temp_run(problem_to_check, debug, result, metadata_list, timeout):
        try:
            res, metadata = run_test(problem_to_check, debug=debug, timeout=timeout)
            result.append(res)
            metadata_list.append(metadata)
        except Exception as e:
            result.append([-1 for i in range(len(problem_to_check['input_output']))])
            metadata_list.append(e)

    manager = multiprocessing.Manager()
    result = manager.list()
    metadata_list = manager.list()

    total_timeout = (timeout + 1) * len(problem_to_check['input_output']) + 10
    p = multiprocessing.Process(target=_temp_run, args=(problem_to_check, debug, result, metadata_list, timeout))
    p.start()
    p.join(timeout=total_timeout + 1)
    if p.is_alive():
        p.kill()

    judge_value = bool(result and np.all(np.array(result[0]) > 0))
    return judge_value


def update_results(result, timeout=6):
    """Update results with correctness checking.
    
    Args:
        result: Dictionary containing generated code and test cases
        timeout: Timeout in seconds for code execution (default: 6)
    
    Returns:
        dict: Response entry with correctness status and reason
    """
    response_entry = {
        "content": result['generation'],
        "correctness": None,
        "reason": None,
    }

    problem_to_check = copy.deepcopy(result)
    curr_res = check_coding_correctness(problem_to_check, timeout=timeout)

    if curr_res:
        response_entry["correctness"] = True
        response_entry["reason"] = ""
    else:
        response_entry["correctness"] = False
        response_entry["reason"] = "Code is incorrect."

    return response_entry


def evaluate_livecodebench(input_datapath, test_datapath):
    """Evaluate LiveCodeBench code generation performance.
    
    Args:
        input_datapath: Path to model output JSONL file
        test_datapath: Path to LiveCodeBench test JSON file
    
    Returns:
        float: Accuracy score (proportion of correctly solved problems)
    """
    print("reading from %s" % input_datapath)
    id2generation = {}
    with open(input_datapath, "r") as f:
        for line in f:
            item = json.loads(line)
            id2generation[item['task_id']] = item['output']
    print("length of id2generation:", len(id2generation))

    print("reading from %s" % test_datapath)
    with open(test_datapath, "r") as f:
        test_list = json.load(f)
    print("length of test_list:", len(test_list))

    combined_results = {}
    for data_item in test_list:
        id_ = data_item['question_id']
        output = id2generation[id_]

        all_testcases = data_item['private_test_cases'] + json.loads(data_item['public_test_cases'])
        
        metadata = json.loads(data_item['metadata'])
        if "func_name" in metadata:
            func_name = metadata['func_name']
        else:
            func_name = ""
        
        combined_results[id_] = {
            'input_output': all_testcases,
            'starter_code': func_name,
            'question_id': id_,
            'generation': output
        }

    total_questions = len(combined_results)
    print("length of combined_results:", total_questions)

    total_correct = 0
    total_finish = 0
    records = []

    with ProcessPoolExecutor(max_workers=32) as executor:

        future_to_task = {}
        token_usages = {}
        for idx, (q_id, result) in enumerate(combined_results.items()):
            future_to_task[
                executor.submit(
                    update_results, result
                )
            ] = idx

        for future in tqdm(
            as_completed(future_to_task),
            total=len(future_to_task),
            desc="Processing Generations",
        ):
            idx = future_to_task[future]
            response_entry = future.result()
            total_correct += response_entry["correctness"]
            total_finish += 1
            records.append(response_entry)
    
    acc = total_correct / total_questions
    print("accuracy:", acc)

    return acc


def get_args():
    """Parse command-line arguments for code evaluation script.
    
    Returns:
        argparse.Namespace: Parsed arguments
    """
    parser = argparse.ArgumentParser(description="Code Benchmark Evaluation")
    parser.add_argument("--modelfolder", type=str, required=True, 
                       help="Path to model output folder")
    parser.add_argument("--testfolder", type=str, required=True,
                       help="Path to test data folder")
    args = parser.parse_args()
    return args


def has_code(response):
    """Check if response contains Python code blocks.
    
    Args:
        response: Model output string
    
    Returns:
        list: List of code blocks found in the response
    """
    pattern = r"```python(?:[a-zA-Z0-9]*)\n(.*?)```"
    matches = re.findall(pattern, response, re.DOTALL)
    return matches


def check_finish(input_datapath):
    finish_rates = []
    with open(input_datapath, "r") as f:
        for line in f:
            item = json.loads(line)
            if not item['reason']:
                finish_rates.append(0)
            output = item['output']
            finish_rates.append(1 if has_code(output) else 0)

    return np.mean(finish_rates)


def main():
    """Main evaluation function for code generation benchmarks."""
    args = get_args()

    model_folder = args.modelfolder
    test_datafolder = args.testfolder

    # Evaluate LiveCodeBench v5
    tmp_list = []
    finish_list = []
    input_datapaths = glob.glob(model_folder+"/outputs_*/lcb5.jsonl")
    for input_datapath in input_datapaths:
        test_datapath = os.path.join(test_datafolder, "livecodebench/test_aug2024tojan2025.json")
        print("="*80)
        lines = open(input_datapath).readlines()
        if len(lines) != 279:
            print(f"skipping {input_datapath} due to incorrect number of lines {len(lines)}")
            continue
        tmp_acc = evaluate_livecodebench(input_datapath, test_datapath)
        finish_rate = check_finish(input_datapath)
        tmp_list.append(tmp_acc)
        finish_list.append(finish_rate)

    acc = np.mean(tmp_list)
    finish = np.mean(finish_list)
    finish_std = np.std(finish_list)/(len(finish_list)**0.5)

    lcb5_acc = acc
    lcb5_finish = finish
    lcb5_std = np.std(tmp_list)/(len(tmp_list)**0.5)

    print("="*80)
    print("avg acc for livecodebench v5: %.4f (std of mean: %.4f) (runs: %d)" % (lcb5_acc, lcb5_std, len(tmp_list)))
    print("avg finish rate for livecodebench v5: %.4f (std of mean: %.4f) (runs: %d)" % (finish, finish_std, len(finish_list)))

    # Evaluate LiveCodeBench v6
    tmp_list = []
    finish_list = []
    input_datapaths = glob.glob(model_folder+"/outputs_*/lcb6.jsonl")
    for input_datapath in input_datapaths:
        test_datapath = os.path.join(test_datafolder, "livecodebench/test_feb2025toApr2025.json")
        print("="*80)
        lines = open(input_datapath).readlines()
        if len(lines) != 175:
            print(f"skipping {input_datapath} due to incorrect number of lines {len(lines)}")
            continue
        tmp_acc = evaluate_livecodebench(input_datapath, test_datapath)
        finish_rate = check_finish(input_datapath)
        tmp_list.append(tmp_acc)
        finish_list.append(finish_rate)

    acc = np.mean(tmp_list)
    finish = np.mean(finish_list)
    finish_std = np.std(finish_list) / (len(finish_list) ** 0.5)

    lcb6_acc = acc
    lcb6_finish = finish
    lcb6_std = np.std(tmp_list)/(len(tmp_list)**0.5)

    print("="*80)
    print("avg acc for livecodebench v6: %.4f (std of mean: %.4f) (runs: %d)" % (lcb6_acc, lcb6_std, len(tmp_list)))
    print("avg finish rate for livecodebench v6: %.4f (std of mean: %.4f) (runs: %d)" % (finish, finish_std, len(finish_list)))


if __name__ == "__main__":
    main()