| import os | |
| import json | |
| from datetime import datetime | |
| from multiprocessing import Pool, cpu_count | |
| from tqdm import tqdm | |
| import logging | |
| from typing import List, Optional | |
| import random | |
| import re | |
| def is_decimal(s): | |
| try: | |
| a = float(s) | |
| except: | |
| return False | |
| return bool(re.match(r"^-?\d+\.\d+$", s)) | |
| def get_testcases(testcase_path): | |
| data = [] | |
| if not os.path.exists(testcase_path): | |
| return [] | |
| with open(testcase_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.strip(): | |
| data.append(json.loads(line)) | |
| return data | |
| def check_difference(arr): | |
| for num in arr: | |
| if not is_decimal(num): | |
| return False | |
| # 遍历每一对元素 | |
| for i in range(len(arr)): | |
| for j in range(i + 1, len(arr)): | |
| # 计算差异并判断是否满足条件 | |
| if abs(float(arr[i]) - float(arr[j])) > 1e-6: | |
| return False # 如果有任何一对差异大于 1e-6,则返回 False | |
| return True # 所有对的差异都不大于 1e-6,返回 True | |
| def append_dict_to_jsonl(file_path, data_dict): | |
| with open(file_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(data_dict, ensure_ascii=False) + '\n') | |
| def setup_logging(): | |
| os.makedirs("logs", exist_ok=True) | |
| log_file = f"logs/execution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_file), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| return logging.getLogger() | |
| from execute_tool import run_python_code_linux | |
| from collections import Counter | |
| logger = logging.getLogger(__name__) | |
| def process_function(data_item): | |
| tcb_id = data_item['tcb_id'] | |
| # 获取生成的tests & code | |
| tests_list = data_item['func_list'].get('tests', []) | |
| code_list = data_item['func_list'].get('code', []) | |
| generate_testcases: List = [] | |
| # 所有的code执行投票得到结果 | |
| for test_input in tests_list[0: 200]: | |
| temp_outputs = [] | |
| for code_string in code_list: | |
| res = run_python_code_linux(code_string, test_input, time_limit=3) | |
| if isinstance(res, str) or res.returncode != 0: | |
| continue | |
| actual_lines = [line.strip() for line in res.stdout.splitlines()] | |
| actual_lines = [line for line in actual_lines if line] | |
| result_string = "\n".join(actual_lines) | |
| temp_outputs.append(result_string) | |
| if len(temp_outputs) <= 0: | |
| continue | |
| counter = Counter(temp_outputs) | |
| most_common, _ = counter.most_common(1)[0] | |
| current_test = { | |
| 'input': test_input, | |
| 'output': most_common | |
| } | |
| generate_testcases.append(current_test) | |
| return { | |
| "tcb_id": tcb_id, | |
| "generate_testcases": generate_testcases | |
| } | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| def run_all(data, max_workers: int | None = None): | |
| if max_workers is None: | |
| # 线程池调度 + 任务内部子进程执行:一般设置为 CPU 数量或稍小 | |
| cpu = max(1, (os.cpu_count() or 8) - 2) | |
| max_workers = cpu | |
| logger.info(f"使用 {max_workers} 个并发 worker 进行处理(线程池调度,任务内子进程可强杀)") | |
| results = [None] * len(data) | |
| def _task(idx_item): | |
| idx, item = idx_item | |
| try: | |
| return idx, process_function(item) | |
| except Exception as e: | |
| # 兜底,任何未捕获异常都返回空结果,避免整体崩溃 | |
| logger.exception("处理单条数据时发生异常") | |
| return idx, {"tcb_id": item.get("tcb_id"), "generate_testcases": []} | |
| with ThreadPoolExecutor(max_workers=max_workers) as ex: | |
| futures = [ex.submit(_task, (i, data[i])) for i in range(len(data))] | |
| for f in tqdm(as_completed(futures), total=len(futures), desc="执行进度"): | |
| idx, res = f.result() | |
| results[idx] = res | |
| logger.info("所有代码执行完成 - 开始保存筛选后的测试样例") | |
| return results | |
| from execute_tool import run_cpp_code_linux | |
| def process_code_with_logging(data_item): | |
| """包装函数,用于添加日志""" | |
| problem_id = data_item["problem_id"] | |
| try: | |
| result = run_cpp_code_linux(data_item) | |
| status = result.get("error", "Unknown") | |
| logger.info(f"执行完成 - 问题ID: {problem_id}, 状态: {status}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"执行异常 - 问题ID: {problem_id}, 错误: {str(e)}") | |
| data_item["error"] = ["EXE"] | |
| data_item["details"] = str(e) | |
| return data_item | |
| from load_response import get_response_function, load_data | |
| if __name__ == "__main__": | |
| datasets_name = "tcb" | |
| testcase_alg = "predo" | |
| model_name = "Qwen2.5-7B-Instruct" | |
| if not os.path.exists(f"/home/luoxianzhen/yang/save_tests_{model_name}"): | |
| os.mkdir(f"/home/luoxianzhen/yang/save_tests_{model_name}") | |
| if not os.path.exists(f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/"): | |
| os.mkdir(f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/") | |
| test_dir = f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/" + "tests-{}.jsonl" | |
| pass_rate_save_file = f"/home/luoxianzhen/yang/save_tests_{model_name}/{testcase_alg}/test_pass_rate.jsonl" | |
| logger = setup_logging() | |
| logger.info("开始执行代码评估...") | |
| data = get_response_function(repsonse_path="/home/luoxianzhen/yang/data/response-orginal/orginal_response_{}_{}.jsonl", test_al=testcase_alg, model_name=model_name) | |
| logger.info(f"加载了 {len(data)} 个代码项目") | |
| cpu = 1 | |
| # cpu = 2 | |
| logger.info(f"使用 {cpu} 个CPU核心进行并行处理 === Stage 1 == 生成函数") | |
| import time | |
| results_input = run_all(data, cpu) | |
| logger.info(f"使用 {cpu} 个CPU核心进行并行处理 === Stage 2 == 执行得到输出") | |
| testcase_data = load_data(results_input) | |
| with Pool(cpu) as pool: | |
| results = list(tqdm( | |
| pool.imap_unordered(process_code_with_logging, testcase_data), | |
| total=len(testcase_data), | |
| desc="执行进度" | |
| )) | |
| logger.info("所有代码执行完成 - 开始保存筛选后的测试样例") | |
| result_dict = {} | |
| for item in results: | |
| if item['problem_id'] not in result_dict.keys(): | |
| result_dict[item['problem_id']] = [item, ] | |
| else: | |
| result_dict[item['problem_id']].append(item) | |
| for k, v in result_dict.items(): | |
| testcases = [test_list for test_list in results_input if test_list["tcb_id"] == k][0]['generate_testcases'] | |
| save_path = test_dir.format(k) | |
| status_array_length = len(v[0]["error"]) | |
| saved_nums = 0 | |
| for i in range(status_array_length): | |
| all_AC = True | |
| output_list = [] | |
| for item in v: | |
| if i >= len(item["error"]): | |
| logger.info(f"{k} error list not the same") | |
| continue | |
| if item["error"][i] == "EXE": | |
| all_AC = False | |
| break | |
| output_list.append(item['details'][i]) | |
| if not item["error"][i] == "AC": | |
| all_AC = False | |
| if all_AC and (len(list(set(output_list))) == 1 or check_difference(output_list)): | |
| if output_list[0] == "": | |
| continue | |
| curr_tests = { | |
| 'input': testcases[i], | |
| 'output': output_list[0] | |
| } | |
| append_dict_to_jsonl(save_path, curr_tests) | |
| saved_nums += 1 | |
| append_dict_to_jsonl(pass_rate_save_file, { | |
| "tcb_id": k, | |
| "gen_nums": len(testcases), | |
| "right_nums": saved_nums | |
| }) | |
Xet Storage Details
- Size:
- 8.02 kB
- Xet hash:
- 7380e81107dba2a846edb37af9d541160aa7bd35ca6315bb779c0dd7e3c5b27b
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.