Tsukihjy/testcase / methods /Predo /load_response.py
Tsukihjy's picture
download
raw
8.52 kB
import json
import os
def read_jsonl(file_path):
data = []
with open(file_path, 'r') as file:
for line in file:
data.append(json.loads(line))
return data
def extract_code_between_tags(text, start_tag="<BEGIN>", end_tag="<END>"):
"""
Extracts the code between <BEGIN> and <END> tags in a given text.
Args:
text (str): The full text containing the code block.
start_tag (str): The start tag (default is "<BEGIN>").
end_tag (str): The end tag (default is "<END>").
Returns:
str: The extracted code block, or an empty string if not found.
"""
match = re.search(f"{re.escape(start_tag)}\\s*(.*?)\\s*{re.escape(end_tag)}", text, re.DOTALL)
return match.group(1).strip() if match else ""
import re
def extract_code(ans_str):
pattern = r'```python\n(.*?)```'
matches = re.findall(pattern, ans_str, re.DOTALL)
return matches[-1]
def extract_content_code(ans_str):
pattern = r'<ASSISTANT>(.*?)</ASSISTANT>'
matches = re.findall(pattern, ans_str, re.DOTALL)
return matches[-1]
def extract_json(ans_str):
pattern = r'```json\n(.*?)```'
matches = re.findall(pattern, ans_str, re.DOTALL)
return matches[-1]
def get_response_function(repsonse_path, model_name, test_al):
model_name_trans = {
"qwen3-nothink": "qwen3-235b-a22b",
"claude4":"claude-sonnet-4-20250514"
}
# ds = json.load(open("/home/luoxianzhen/yang/data/Ours/TestcaseBench-v28.json", "r", encoding="utf-8"))
# rank_dict = {}
# for item in ds:
# rank_dict[item['tcb_id']] = len(item['wrong_code'])
# gen_nums = json.load(open(f"/home/luoxianzhen/yang/tests_count/{model_name}-{test_al}-tests-count.json", "r", encoding="utf-8"))
# if model_name in model_name_trans.keys():
# model_name = model_name_trans[model_name]
test_func_list = read_jsonl(repsonse_path.format(test_al, model_name))
if "qwen3" in model_name:
tests_response = {}
failed = 0
gen_fail = 0
for response_item in test_func_list:
if 'testcase_json' not in response_item:
gen_fail+=1
continue
if response_item['tcb_id'] not in tests_response:
tests_response[response_item['tcb_id']] = {
"tests": [],
"code": []
}
if '```json' in response_item['testcase_json']:
try:
testcase_list = list(json.loads(extract_json(response_item['testcase_json'])).values())
except:
failed += 1
continue
else:
try:
testcase_list = list(json.loads(response_item['testcase_json']).values())
except:
failed += 1
continue
tests_response[response_item['tcb_id']]['tests'] += testcase_list
codes = response_item['code_gen']
for code in codes:
try:
code_string = extract_code_between_tags(code)
except:
continue
tests_response[response_item['tcb_id']]['code'].append(code_string)
print(gen_fail, failed)
list_all = 0
list_count = 0
all_case = 0
list_case = 0
data_list = []
code_none = 0
code_with_python = 0
code_less_2 = 0
code_none = 0
all_code = 0
for k, v in tests_response.items():
temp_count = 0
for test in v['tests']:
if isinstance(test, list):
temp_count += 1
if temp_count >= len(v['tests']) and len(v['tests']) != 0:
list_all += 1
if temp_count > 0 and len(v['tests']) != 0:
list_count += 1
list_case += temp_count
all_case += len(v['tests'])
if len(v['code']) == 0:
code_none += 1
code_temp_count = 0
for code in v['code']:
if code is None or code == "":
code_temp_count += 1
if "```python" in code:
code_with_python += 1
code_none += code_temp_count
all_code += len(v['code'])
if code_temp_count >= len(v['code']) - 2 and len(v['code']) != 0:
code_less_2 += 1
data_list.append({
"tcb_id": k,
"func_list": v
})
print(f"list_all: {list_all} list_count: {list_count} {list_case / all_case} code_none {code_none}")
print(f"code_less_2 {code_less_2} | code_with_python {code_with_python} | code_none {code_none} | all_code {all_code}")
return tests_response
tests_response = {}
for response_item in test_func_list:
if response_item['tcb_id'] not in tests_response:
tests_response[response_item['tcb_id']] = {
"tests": [],
"code": []
}
if response_item['type'] == "input":
try:
if '```json' in response_item['response']:
testcase_list = list(json.loads(extract_json(response_item['response'])).values())
else:
testcase_list = list(json.loads(response_item['response']).values())
except:
continue
tests_response[response_item['tcb_id']]['tests'] += testcase_list
else:
try:
code_string = extract_code_between_tags(response_item['response'])
except:
continue
tests_response[response_item['tcb_id']]['code'].append(code_string)
data_list = []
# empty_count = 0
# not_enough = 0
list_all = 0
list_count = 0
all_case = 0
list_case = 0
for k, v in tests_response.items():
# temp_count = 0
# for code in v['code']:
# if code == "":
# empty_count+=1
# temp_count += 1
# if len(v['code']) - temp_count <= 9:
# not_enough += 1
# if k not in rank_dict.keys():
# continue
# if 'gen_nums' in gen_nums[k].keys() and gen_nums[k]['gen_nums'] >= rank_dict[k] * 5:
# continue
temp_count = 0
for test in v['tests']:
if isinstance(test, list):
temp_count += 1
if temp_count >= len(v['tests']) and len(v['tests']) != 0:
list_all += 1
if temp_count > 0 and len(v['tests']) != 0:
list_count += 1
list_case += temp_count
all_case += len(v['tests'])
data_list.append({
"tcb_id": k,
"func_list": v
})
print(f"list_all: {list_all} list_count: {list_count} {list_case / all_case}")
return data_list
def load_data(test_inputs):
ds = json.load(open("/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/TestcaseBench-v22.json", "r", encoding="utf-8"))
res = []
for item in ds:
tests = [test_list for test_list in test_inputs if test_list["tcb_id"] == item['tcb_id']]
if len(tests) <= 0:
continue
tests = tests[0]['generate_testcases']
for c in (item['solutions'][0:5]):
res.append({
"code": c['code'],
"time_limit": item["runtime_limit"],
"memory_limit": item["memory_limit"],
"compileAndRunOptions": c["compileAndRunOptions"],
"test_cases": tests,
"problem_id": item['tcb_id'],
})
return res
if __name__ == "__main__":
# data = get_response_function(repsonse_path="/home/i-luoxianzhen/data/TestCase-Gen/data/response-orginal/orginal_response_crux_claude-sonnet-4-20250514-thinking.jsonl")
# res = load_data(data)
data = get_response_function(repsonse_path="/home/luoxianzhen/yang/data/response-orginal/orginal_response_{}_{}.jsonl", test_al='predo', model_name="qwen3-235b-a22b")
# count_list = 0
# count_empty=0
# print(len(data))
# for item in data:
# input_list = item['func_list']['tests']
# if len(input_list) == 0:
# count_empty+=1
# continue
# if isinstance(input_list[0], list):
# count_list += 1
# print(count_list, count_empty )

Xet Storage Details

Size:
8.52 kB
·
Xet hash:
19038e016a59e3fcac2259a854d8b486279fa235c27f70381936599ffc900803

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.