Tsukihjy/testcase / methods /Predo /execute_tool.py
Tsukihjy's picture
download
raw
7.79 kB
import multiprocessing
import traceback
import resource
def function_execute_box_process(code_str, funcname="construct_inputs", time_limit=100):
def target_func(shared_dict):
# resource.setrlimit(resource.RLIMIT_AS, (100 * 1024 * 1024, 100 * 1024 * 1024)) # 100 MB
resource.setrlimit(resource.RLIMIT_AS, (0, 0))
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
result = env[funcname]()
shared_dict['status'] = 'success'
shared_dict['result'] = result
else:
shared_dict['status'] = 'error'
shared_dict['result'] = 'No function named construct_inputs found'
except Exception:
shared_dict['status'] = 'error'
shared_dict['result'] = traceback.format_exc()
manager = multiprocessing.Manager()
shared_dict = manager.dict()
process = multiprocessing.Process(target=target_func, args=(shared_dict,))
process.start()
process.join(time_limit)
if process.is_alive():
process.terminate()
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
if shared_dict.get('status') == 'success':
return shared_dict
else:
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
import multiprocessing as mp
import traceback
def function_execute_box_subprocess(code_str, funcname="construct_inputs", time_limit=100):
def worker(conn):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
res = env[funcname]()
conn.send({'status': 'success', 'result': res})
else:
conn.send({'status': 'error', 'details': f'No function named {funcname} found'})
except Exception:
conn.send({'status': 'error', 'details': traceback.format_exc()})
finally:
conn.close()
parent_conn, child_conn = mp.Pipe(duplex=False)
p = mp.Process(target=worker, args=(child_conn,))
p.start()
child_conn.close() # 父进程不需要子端
p.join(timeout=time_limit)
if p.is_alive():
p.terminate() # ⬅️ 真·强制结束
p.join()
return {'status': 'error', 'details': 'TimeoutError: Execution exceeded time limit'}
if parent_conn.poll():
return parent_conn.recv()
else:
return {'status': 'error', 'details': 'Child exited without returning result'}
import subprocess
import tempfile
import os
import uuid
import json
import re
def is_decimal(s):
try:
a = float(s)
except:
return False
return bool(re.match(r"^-?\d+\.\d+$", s))
def run_cpp_code_linux(infos, test_mode = False, rank_p = 1):
code = infos["code"]
time_limit = infos["time_limit"]
memory_limit = infos["memory_limit"]
test_cases = infos["test_cases"]
infos["error"] = []
infos["details"] = []
with tempfile.TemporaryDirectory() as tmpdirname:
unique_id = uuid.uuid4()
cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp")
exe_file = os.path.join(tmpdirname, f"{unique_id}.out")
# Write C++ code to file
with open(cpp_file, "w") as f:
f.write(code)
# Compile the C++ code
compile_result = subprocess.run(
["g++", cpp_file, "-o", exe_file, f"-std={infos['compileAndRunOptions']['std']}"],
capture_output=True,
text=True
)
if compile_result.returncode != 0:
infos["error"].append("CE")
infos["details"].append(compile_result.stderr)
return infos
memory_kb = int(memory_limit) * 1024
time_limit_int = int(time_limit) // 1000 + 1
cmd = f"ulimit -t {time_limit_int} && ulimit -v {memory_kb} && {exe_file}"
# cmd = f"{exe_file}"
for idx, testcase in enumerate(test_cases):
input_string = testcase["input"]
output_string = testcase["output"]
error = ""
try:
result = subprocess.run(
cmd,
input=input_string,
text=True,
capture_output=True,
shell=True,
timeout=time_limit_int
)
# 检查返回码
if result.returncode != 0:
if result.returncode == 137: # SIGKILL - 通常是内存超限
error = "MLE"
details = f"{error}: Testcase:{idx}"
elif result.returncode == 124: # timeout命令的超时返回码
error = "TLE"
details = f"{error}: Testcase:{idx}"
else:
error = "RE"
details = f"{error}: Testcase:{idx}"
except subprocess.TimeoutExpired:
error = "TLE"
details = f"{error}: Testcase:{idx}"
except Exception as e:
error = "RE"
details = f"{error}: Testcase:{idx}"
if not error:
if isinstance(output_string, float):
output_string = str(output_string)
expected_lines = [line.strip() for line in output_string.splitlines()]
actual_lines = [line.strip() for line in result.stdout.splitlines()]
# 移除空行
expected_lines = [line for line in expected_lines if line]
actual_lines = [line for line in actual_lines if line]
# ## 合并为一行
actual_lines = (" ".join(actual_lines)).strip()
expected_lines = (" ".join(expected_lines)).strip()
# ## 小数保留 6 位
if is_decimal(actual_lines) and is_decimal(expected_lines):
if abs(float(actual_lines) - float(expected_lines)) > 1e-6:
error = "WA"
details = f"{error}: Testcase:{idx}"
else:
if actual_lines != expected_lines:
error = "WA"
details = f"{error}: Testcase:{idx}"
if error:
infos["error"].append(error)
infos["details"].append(details)
if not error:
error = "AC"
details = f"{error}: Testcase:{idx}"
infos["error"].append("AC")
infos["details"].append(details)
return infos
import subprocess
import tempfile
import os
def run_python_code_linux(code_str, test_input, time_limit=2):
with tempfile.NamedTemporaryFile(mode='w+', suffix='.py', delete=False) as tmp_file:
tmp_file.write(code_str)
tmp_file.flush()
tmp_filename = tmp_file.name
try:
# 使用 subprocess 运行 Python 程序,并提供 stdin 输入
result = subprocess.run(
['python3', tmp_filename],
input=test_input,
text=True,
capture_output=True,
timeout=time_limit
)
return result
except subprocess.TimeoutExpired:
return "TimeoutError: Execution exceeded time limit"
except Exception as e:
return f"ExecutionError: {str(e)}"
finally:
os.remove(tmp_filename) # 清理临时文件

Xet Storage Details

Size:
7.79 kB
·
Xet hash:
c31566614188bf34f32da5e4a7a3fd5163b2cd7655821c661531cd210016a018

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.