Tsukihjy/testcase / methods /CruxEval /parallel_executor.py
Tsukihjy's picture
download
raw
5.11 kB
import os
import json
from datetime import datetime
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
import logging
from typing import List, Optional
import random
import re
def is_decimal(s):
try:
a = float(s)
except:
return False
return bool(re.match(r"^-?\d+\.\d+$", s))
def get_testcases(testcase_path):
data = []
if not os.path.exists(testcase_path):
return []
with open(testcase_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
data.append(json.loads(line))
return data
def check_difference(arr):
for num in arr:
if not is_decimal(num):
return False
# 遍历每一对元素
for i in range(len(arr)):
for j in range(i + 1, len(arr)):
# 计算差异并判断是否满足条件
if abs(float(arr[i]) - float(arr[j])) > 1e-6:
return False # 如果有任何一对差异大于 1e-6,则返回 False
return True # 所有对的差异都不大于 1e-6,返回 True
def append_dict_to_jsonl(file_path, data_dict):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(data_dict, ensure_ascii=False) + '\n')
def setup_logging():
os.makedirs("logs", exist_ok=True)
log_file = f"logs/execution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger()
logger = logging.getLogger(__name__)
from execute_tool import run_cpp_code_linux
def process_code_with_logging(data_item):
"""包装函数,用于添加日志"""
problem_id = data_item["problem_id"]
try:
result = run_cpp_code_linux(data_item)
status = result.get("error", "Unknown")
logger.info(f"执行完成 - 问题ID: {problem_id}, 状态: {status}")
return result
except Exception as e:
logger.error(f"执行异常 - 问题ID: {problem_id}, 错误: {str(e)}")
data_item["error"] = ["EXE"]
data_item["details"] = str(e)
return data_item
from load_response import get_response_function, load_data, load_qwen3_result
if __name__ == "__main__":
datasets_name = "tcb"
testcase_alg = "crux"
model_name = "Qwen2.5-32B-Instruct"
if not os.path.exists(f"/home/luoxianzhen/yang/save_tests_{model_name}"):
os.mkdir(f"/home/luoxianzhen/yang/save_tests_{model_name}")
if not os.path.exists(f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/"):
os.mkdir(f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/")
test_dir = f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/" + "tests-{}.jsonl"
pass_rate_save_file = f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/test_pass_rate.jsonl"
logger = setup_logging()
logger.info("开始执行代码评估...")
cpu = 50
# cpu = 2
logger.info(f" === Stage 1 == 整理输出")
import time
data = get_response_function(repsonse_path=f"/home/luoxianzhen/yang/data/response-orginal/orginal_response_{testcase_alg}_{model_name}.jsonl", model_name=model_name)
logger.info(f"使用 {cpu} 个CPU核心进行并行处理 === Stage 2 == 执行得到输出")
testcase_data = load_data(data)
with Pool(cpu) as pool:
results = list(tqdm(
pool.imap_unordered(process_code_with_logging, testcase_data),
total=len(testcase_data),
desc="执行进度"
))
logger.info("所有代码执行完成 - 开始保存筛选后的测试样例")
result_dict = {}
for item in results:
if item['problem_id'] not in result_dict.keys():
result_dict[item['problem_id']] = [item, ]
else:
result_dict[item['problem_id']].append(item)
for k, v in result_dict.items():
testcases = data[k]
save_path = test_dir.format(k)
status_array_length = len(v[0]["error"])
saved_nums = 0
for i in range(status_array_length):
all_AC = True
output_list = []
for item in v:
if i >= len(item["error"]):
continue
if item["error"][i] == "EXE":
all_AC = False
break
output_list.append(item['details'][i])
if not item["error"][i] == "AC":
all_AC = False
if all_AC and (len(list(set(output_list))) == 1 or check_difference(output_list)):
if len(output_list) <= 0 or output_list[0] == "":
continue
append_dict_to_jsonl(save_path, testcases[i])
saved_nums += 1
append_dict_to_jsonl(pass_rate_save_file, {
"tcb_id": k,
"gen_nums": len(testcases),
"right_nums": saved_nums
})

Xet Storage Details

Size:
5.11 kB
·
Xet hash:
37f60d5bc10ab64a7c2b6d8badd492b65986a588284045f7d2e7eb1bc60d1156

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.