Tsukihjy/testcase / methods /Hardtest /execute_tool.py
Tsukihjy's picture
download
raw
6.13 kB
import multiprocessing
import traceback
import resource
def function_execute_box_process(code_str, funcname="construct_inputs", param = None, time_limit=100):
def target_func(shared_dict):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
if param is None:
result = env[funcname]()
else:
result = env[funcname](param)
shared_dict['status'] = 'success'
shared_dict['result'] = result
else:
shared_dict['status'] = 'error'
shared_dict['result'] = 'No function named construct_inputs found'
except Exception:
shared_dict['status'] = 'error'
shared_dict['result'] = traceback.format_exc()
manager = multiprocessing.Manager()
shared_dict = manager.dict()
process = multiprocessing.Process(target=target_func, args=(shared_dict,))
process.start()
process.join(time_limit)
if process.is_alive():
process.terminate()
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
if shared_dict.get('status') == 'success':
return shared_dict
else:
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
import multiprocessing as mp
import traceback
def function_execute_box_subprocess(code_str, funcname="construct_inputs", time_limit=100):
def worker(conn):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
res = env[funcname]()
conn.send({'status': 'success', 'result': res})
else:
conn.send({'status': 'error', 'details': f'No function named {funcname} found'})
except Exception:
conn.send({'status': 'error', 'details': traceback.format_exc()})
finally:
conn.close()
parent_conn, child_conn = mp.Pipe(duplex=False)
p = mp.Process(target=worker, args=(child_conn,))
p.start()
child_conn.close() # 父进程不需要子端
p.join(timeout=time_limit)
if p.is_alive():
p.terminate() # ⬅️ 真·强制结束
p.join()
return {'status': 'error', 'details': 'TimeoutError: Execution exceeded time limit'}
if parent_conn.poll():
return parent_conn.recv()
else:
return {'status': 'error', 'details': 'Child exited without returning result'}
import subprocess
import tempfile
import os
import uuid
import json
import re
def is_decimal(s):
try:
a = float(s)
except:
return False
return bool(re.match(r"^-?\d+\.\d+$", s))
def run_cpp_code_linux(infos, test_mode = False, rank_p = 1, temp_dir_prefix=""):
code = infos["code"]
time_limit = infos["time_limit"]
memory_limit = infos["memory_limit"]
test_cases = infos["test_cases"]
infos["error"] = []
infos["details"] = []
with tempfile.TemporaryDirectory(prefix="ht_test") as tmpdirname:
unique_id = uuid.uuid4()
cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp")
exe_file = os.path.join(tmpdirname, f"{unique_id}.out")
# Write C++ code to file
with open(cpp_file, "w") as f:
f.write(code)
# Compile the C++ code
compile_result = subprocess.run(
["g++", cpp_file, "-o", exe_file, f"-std={infos['compileAndRunOptions']['std']}"],
capture_output=True,
text=True
)
if compile_result.returncode != 0:
infos["error"].append("CE")
infos["details"].append(compile_result.stderr)
return infos
memory_kb = int(memory_limit) * 1024
time_limit_int = int(time_limit) // 1000 + 1
cmd = f"ulimit -t {time_limit_int} && ulimit -v {memory_kb} && {exe_file}"
# cmd = f"{exe_file}"
for idx, testcase in enumerate(test_cases):
input_string = testcase['input']
## TODO:暂时跳过了,需要清理空缺输出的
error = ""
try:
result = subprocess.run(
cmd,
input=input_string,
text=True,
capture_output=True,
shell=True,
timeout=time_limit_int
)
# 检查返回码
if result.returncode != 0:
if result.returncode == 137: # SIGKILL - 通常是内存超限
error = "MLE"
details = f"{error}: Testcase:{idx}"
elif result.returncode == 124: # timeout命令的超时返回码
error = "TLE"
details = f"{error}: Testcase:{idx}"
else:
error = "RE"
details = f"{error}: Testcase:{idx}"
except subprocess.TimeoutExpired:
error = "TLE"
details = f"{error}: Testcase:{idx}"
except Exception as e:
error = "RE"
details = f"{error}: Testcase:{idx}"
if not error:
actual_lines = [line.strip() for line in result.stdout.splitlines()]
actual_lines = [line for line in actual_lines if line]
actual_lines = (" ".join(actual_lines)).strip()
if error:
infos["error"].append(error)
infos["details"].append(details)
# if test_mode:
# return infos
if not error:
error = "AC"
details = f"{actual_lines}"
infos["error"].append("AC")
infos["details"].append(details)
return infos

Xet Storage Details

Size:
6.13 kB
·
Xet hash:
cc559364d80d4e9f15503f46837f2e4efa4e7c2d1665bd1721bc1e2bd88ebea0

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.