| import json | |
| import os | |
| def read_jsonl(file_path): | |
| data = [] | |
| with open(file_path, 'r') as file: | |
| for line in file: | |
| data.append(json.loads(line)) | |
| return data | |
| def read_pass_rate_file(file_path): | |
| data = {} | |
| with open(file_path, 'r') as file: | |
| for line in file: | |
| item = json.loads(line) | |
| if item['tcb_id'] in data.keys(): | |
| data[item['tcb_id']]['gen_nums'] += item['gen_nums'] | |
| data[item['tcb_id']]['right_nums'] += item['right_nums'] | |
| else: | |
| data[item['tcb_id']] = { | |
| 'gen_nums': item['gen_nums'], | |
| "right_nums": item['right_nums'] | |
| } | |
| return data | |
| def replace_newline_in_fstring(code: str) -> str: | |
| # 找到以 f" 开头的行,并且在行内替换 \n 为 \\n | |
| def replace_in_fstring(match): | |
| # 获取 f" 后面的部分,直到下一个 " 结束 | |
| string_content = match.group(1) | |
| # 替换其中的 \n 为 \\n | |
| modified_content = string_content.replace("\n", "\\n") | |
| # 返回修改后的 f"内容" | |
| return f'f"{modified_content}"' | |
| # 正则表达式:匹配 f" 开头的字符串,并且中间的内容捕获到括号内 | |
| pattern = r'f"([^"]*)"' | |
| def count_success(func_list): | |
| success_count = 0 | |
| for i, code_str in enumerate(func_list): | |
| try: | |
| exec(code_str) | |
| success_count+=1 | |
| except: | |
| func_list[i] = replace_newline_in_fstring(code_str) | |
| return success_count, func_list | |
| import re | |
| def extract_code(ans_str): | |
| pattern = r'```python\n(.*?)```' | |
| matches = re.findall(pattern, ans_str, re.DOTALL) | |
| return matches[-1] | |
| def extract_content_code(ans_str): | |
| pattern = r'<ASSISTANT>(.*?)</ASSISTANT>' | |
| matches = re.findall(pattern, ans_str, re.DOTALL) | |
| return matches[-1] | |
| def load_qwen3_result(repsonse_path): | |
| test_func_list = read_jsonl(repsonse_path) | |
| for item in test_func_list: | |
| item['func_list'] = { | |
| 'random': item['random_generator'], | |
| 'eage': item['adversarial_generator'] | |
| } | |
| return test_func_list | |
| def get_response_function(repsonse_path, model_name, test_al): | |
| ds = json.load(open("/home/luoxianzhen/yang/data/Ours/TestcaseBench-v28.json", "r", encoding="utf-8")) | |
| # ds = ds[0:100] | |
| gen_nums = json.load(open(f"/home/luoxianzhen/yang/tests_count/{model_name}-{test_al}-tests-count.json", "r", encoding="utf-8")) | |
| if model_name == "qwen3-nothink": | |
| model_name = "qwen3-235b-a22b" | |
| rank_dict = {} | |
| for item in ds: | |
| rank_dict[item['tcb_id']] = len(item['wrong_code']) | |
| # gen_nums = read_pass_rate_file("") | |
| test_func_list = read_jsonl(repsonse_path.format(test_al, model_name)) | |
| test_functions = {} | |
| for response_item in test_func_list: | |
| try: | |
| if "```python" in response_item["response"]: | |
| code_string = extract_code(response_item["response"]) | |
| elif '<ASSISTANT>'in response_item["response"]: | |
| code_string = extract_content_code(response_item["response"]) | |
| else: | |
| code_string = response_item["response"] | |
| except: | |
| continue | |
| if response_item['tcb_id'] not in test_functions.keys(): | |
| test_functions[response_item['tcb_id']] = { | |
| "random": [], | |
| "eage": [] | |
| } | |
| test_functions[response_item['tcb_id']][response_item['type']].append(code_string) | |
| func_data = [] | |
| for k, v in test_functions.items(): | |
| if k not in rank_dict.keys(): | |
| continue | |
| if 'gen_nums' in gen_nums[k].keys() and gen_nums[k]['gen_nums'] > 0: | |
| continue | |
| # if k not in subset.keys() or (k in pass_rate_dict.keys() and pass_rate_dict[k]['right_nums'] >= subset[k]): | |
| # continue | |
| # if k not in subset: | |
| # continue | |
| # success_count, random_function_list = count_success(v['random']) | |
| # v['random'] = random_function_list | |
| # if success_count >= 5: | |
| # continue | |
| func_data.append({ | |
| "tcb_id": k, | |
| "func_list": v, | |
| 'limit_nums': rank_dict[k] * 5 - gen_nums[k]['gen_nums'] if 'gen_nums' in gen_nums[k].keys() else rank_dict[k] * 5 | |
| }) | |
| return func_data | |
| import random | |
| def load_data(test_inputs, solutions_count = None): | |
| if solutions_count is None: | |
| solutions_count = 3 | |
| ds = json.load(open("/home/luoxianzhen/yang/data/Ours/TestcaseBench-v28.json", "r", encoding="utf-8")) | |
| res = [] | |
| for item in ds: | |
| # if os.path.exists("/home/luoxianzhen/yang/save_tests_gpt-4o/lcb/tests-{}.jsonl".format(item['tcb_id'])): | |
| # continue | |
| tests = [test_list for test_list in test_inputs if test_list["tcb_id"] == item['tcb_id']] | |
| if len(tests) <= 0: | |
| continue | |
| tests = tests[0]['generate_testcases'] | |
| solution_list = [] | |
| if solutions_count > len(item['solutions']): | |
| solution_list = item['solutions'] | |
| else: | |
| solution_list = random.sample(item['solutions'], solutions_count) | |
| for c in solution_list: | |
| res.append({ | |
| "code": c['code'], | |
| "time_limit": item["runtime_limit"], | |
| "memory_limit": item["memory_limit"], | |
| "compileAndRunOptions": c["compileAndRunOptions"], | |
| "test_cases": tests, | |
| "problem_id": item['tcb_id'], | |
| }) | |
| return res | |
| def test_code(repsonse_path, model_name, test_al): | |
| ds = json.load(open("/home/luoxianzhen/yang/data/Ours/TestcaseBench-v28.json", "r", encoding="utf-8")) | |
| # ds = ds[0:100] | |
| # subset = [item['tcb_id'] for item in ds] | |
| test_func_list = read_jsonl(repsonse_path.format(test_al, model_name)) | |
| test_functions = {} | |
| for response_item in test_func_list: | |
| try: | |
| if "```python" in response_item["response"]: | |
| code_string = extract_code(response_item["response"]) | |
| elif '<ASSISTANT>'in response_item["response"]: | |
| code_string = extract_content_code(response_item["response"]) | |
| else: | |
| code_string = response_item["response"] | |
| except: | |
| continue | |
| if response_item['tcb_id'] not in test_functions.keys(): | |
| test_functions[response_item['tcb_id']] = { | |
| "random": [], | |
| "eage": [] | |
| } | |
| test_functions[response_item['tcb_id']][response_item['type']].append(code_string) | |
| fial_func = 0 | |
| zero_ = 0 | |
| for k, v in test_functions.items(): | |
| # if k not in subset: | |
| # continue | |
| count = 0 | |
| random_list = test_functions[k]['random'] | |
| for code in random_list: | |
| try: | |
| exec(code) | |
| except: | |
| count += 1 | |
| fial_func += 1 | |
| if count > len(random_list) - 5: | |
| zero_ += 1 | |
| print(fial_func, zero_) | |
| return test_functions | |
| def write_to_file(code_str, filename): | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as file: | |
| file.write(code_str) | |
| print(f"成功将内容写入 {filename}") | |
| except Exception as e: | |
| print(f"写入文件时发生错误: {e}") | |
| if __name__ == "__main__": | |
| model_name = "claude-sonnet-4-20250514" | |
| func_dict = test_code(f"/home/luoxianzhen/yang/data/response-orginal/orginal_response_lcb_{model_name}.jsonl", model_name, "lcb") | |
| question_name = "火星人" | |
| func_list = func_dict[question_name]['random'] + func_dict[question_name]['eage'] | |
| for i, func_str in enumerate(func_list): | |
| if i < len(func_dict[question_name]['random']): | |
| write_to_file(func_str, f"/home/luoxianzhen/yang/methods/lcb/function_code_sample/random_func_{i}.py") | |
| else: | |
| write_to_file(func_str, f"/home/luoxianzhen/yang/methods/lcb/function_code_sample/edge_func_{i}.py") |
Xet Storage Details
- Size:
- 8.05 kB
- Xet hash:
- f81301cd851e2b460d6282ff707ba63eaab6f2b15312e619d3991a5f166cc359
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.