Tsukihjy/testcase / methods /lcb /execute_tool.py
Tsukihjy's picture
download
raw
10.5 kB
import multiprocessing
import traceback
import resource
import re
def replace_newline_in_fstring(code: str) -> str:
# 找到以 f" 开头的行,并且在行内替换 \n 为 \\n
def replace_in_fstring(match):
# 获取 f" 后面的部分,直到下一个 " 结束
string_content = match.group(1)
# 替换其中的 \n 为 \\n
modified_content = string_content.replace("\n", "\\n")
# 返回修改后的 f"内容"
return f'f"{modified_content}"'
# 正则表达式:匹配 f" 开头的字符串,并且中间的内容捕获到括号内
pattern = r'f"([^"]*)"'
# 使用 re.sub() 进行替换
return re.sub(pattern, replace_in_fstring, code)
import time
def function_execute_box_process(code_str, funcname="construct_inputs", time_limit=10):
def target_func(shared_dict):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
result = env[funcname]()
shared_dict['status'] = 'success'
shared_dict['result'] = result
else:
shared_dict['status'] = 'error'
shared_dict['result'] = 'No function named construct_inputs found'
except Exception:
shared_dict['status'] = 'error'
# shared_dict['result'] = traceback.format_exc()
manager = multiprocessing.Manager()
shared_dict = manager.dict()
start_time = time.time()
try:
exec(code_str)
except:
code_str = replace_newline_in_fstring(code_str)
process = multiprocessing.Process(target=target_func, args=(shared_dict,))
process.start()
process.join(time_limit)
if process.is_alive():
process.terminate()
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
if shared_dict.get('status') == 'success':
return shared_dict
else:
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
if __name__ == "__main__":
code = """
import numpy as np
def random_input_generator(a_min, a_max, n_min, n_max, k_min, k_max):
# Generate random size for the integer sequence
n = np.random.randint(n_min, n_max + 1)
# Generate random integer sequence
a_sequence = ' '.join(str(np.random.randint(a_min, a_max + 1)) for _ in range(n))
# Generate random size for the monotonic sequence
k = np.random.randint(k_min, k_max + 1)
# Generate random monotonic sequence
symbols = ['<', '>', '=']
s_sequence = ' '.join(np.random.choice(symbols) for _ in range(k))
# Return the formatted string for n, k, a_sequence, and s_sequence
return f"{n} {k}\n{a_sequence}\n{s_sequence}"
def construct_inputs():
inputs_list = []
# Small inputs (integer values between 1 and 100, sequence size between 1 and 10)
for i in range(10):
inputs_list.append(random_input_generator(1, 100, 1, 10, 1, 10))
# Medium inputs (integer values between 1 and 1000, sequence size between 10 and 100)
for i in range(10):
inputs_list.append(random_input_generator(1, 1000, 10, 100, 10, 100))
# Large inputs (integer values between 1 and 1000000, sequence size between 100 and 500000)
for i in range(10):
inputs_list.append(random_input_generator(1, 1000000, 100, 500000, 100, 500000))
return inputs_list
"""
replace_newline_in_fstring(code)
import multiprocessing
import traceback
import psutil
import os
import time
def function_execute_box_process_limit_mm(code_str, funcname="construct_inputs", time_limit=50, memory_limit_mb=200):
def target_func(shared_dict):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
result = env[funcname]()
shared_dict['status'] = 'success'
shared_dict['result'] = result
else:
shared_dict['status'] = 'error'
shared_dict['result'] = 'No function named construct_inputs found'
except Exception:
shared_dict['status'] = 'error'
shared_dict['result'] = traceback.format_exc()
manager = multiprocessing.Manager()
shared_dict = manager.dict()
process = multiprocessing.Process(target=target_func, args=(shared_dict,))
process.start()
# 获取进程的 PID
pid = process.pid
p = psutil.Process(pid)
start_time = time.time() # 记录开始时间
# 动态监控进程内存使用情况和时间
while process.is_alive():
current_time = time.time() # 获取当前时间
elapsed_time = current_time - start_time # 计算已经过去的时间
if elapsed_time > time_limit:
process.terminate() # 超过时间限制,终止进程
shared_dict['status'] = 'error'
shared_dict['details'] = 'TimeoutError: Execution exceeded time limit'
return shared_dict
memory_info = p.memory_info()
memory_usage = memory_info.rss # 获取实际内存使用量
if memory_usage > memory_limit_mb * 1024 * 1024: # 转换为字节
process.terminate() # 超过内存限制,终止进程
shared_dict['status'] = 'error'
shared_dict['details'] = f'Memory limit exceeded: {memory_usage / (1024 * 1024)} MB'
return shared_dict
# 等待一小段时间,避免高频率的CPU占用
time.sleep(1)
if shared_dict.get('status') == 'success':
return shared_dict
else:
shared_dict['status'] = 'error'
shared_dict['details'] = 'Execution failed'
return shared_dict
import multiprocessing as mp
import traceback
def function_execute_box_subprocess(code_str, funcname="construct_inputs", time_limit=100):
def worker(conn):
try:
env = {}
exec(code_str, env)
if funcname in env and callable(env[funcname]):
res = env[funcname]()
conn.send({'status': 'success', 'result': res})
else:
conn.send({'status': 'error', 'details': f'No function named {funcname} found'})
except Exception:
conn.send({'status': 'error', 'details': traceback.format_exc()})
finally:
conn.close()
parent_conn, child_conn = mp.Pipe(duplex=False)
p = mp.Process(target=worker, args=(child_conn,))
p.start()
child_conn.close() # 父进程不需要子端
p.join(timeout=time_limit)
if p.is_alive():
p.terminate() # ⬅️ 真·强制结束
p.join()
return {'status': 'error', 'details': 'TimeoutError: Execution exceeded time limit'}
if parent_conn.poll():
return parent_conn.recv()
else:
return {'status': 'error', 'details': 'Child exited without returning result'}
import subprocess
import tempfile
import os
import uuid
import json
import re
def is_decimal(s):
try:
a = float(s)
except:
return False
return bool(re.match(r"^-?\d+\.\d+$", s))
def run_cpp_code_linux(infos, test_mode = False, rank_p = 1):
code = infos["code"]
time_limit = infos["time_limit"]
memory_limit = infos["memory_limit"]
test_cases = infos["test_cases"]
infos["error"] = []
infos["details"] = []
with tempfile.TemporaryDirectory() as tmpdirname:
unique_id = uuid.uuid4()
cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp")
exe_file = os.path.join(tmpdirname, f"{unique_id}.out")
# Write C++ code to file
with open(cpp_file, "w") as f:
f.write(code)
# Compile the C++ code
compile_result = subprocess.run(
["g++", cpp_file, "-o", exe_file, f"-std={infos['compileAndRunOptions']['std']}"],
capture_output=True,
text=True
)
if compile_result.returncode != 0:
infos["error"].append("CE")
infos["details"].append(compile_result.stderr)
return infos
memory_kb = int(memory_limit) * 1024 * 3
time_limit_int = int(time_limit) // 1000 + 3
cmd = f"ulimit -t {time_limit_int} && ulimit -v {memory_kb} && {exe_file}"
# cmd = f"{exe_file}"
for idx, testcase in enumerate(test_cases):
input_string = testcase
## TODO:暂时跳过了,需要清理空缺输出的
error = ""
try:
result = subprocess.run(
cmd,
input=input_string,
text=True,
capture_output=True,
shell=True,
timeout=time_limit_int
)
# 检查返回码
if result.returncode != 0:
if result.returncode == 137: # SIGKILL - 通常是内存超限
error = "MLE"
details = f"{error}: Testcase:{idx}"
elif result.returncode == 124: # timeout命令的超时返回码
error = "TLE"
details = f"{error}: Testcase:{idx}"
else:
error = "RE"
details = f"{error}: Testcase:{idx}"
except subprocess.TimeoutExpired:
error = "TLE"
details = f"{error}: Testcase:{idx}"
except Exception as e:
error = "RE"
details = f"{error}: Testcase:{idx}"
if not error:
actual_lines = [line.strip() for line in result.stdout.splitlines()]
actual_lines = [line for line in actual_lines if line]
actual_lines = (" ".join(actual_lines)).strip()
if error:
infos["error"].append(error)
infos["details"].append(details)
# if test_mode:
# return infos
if not error:
error = "AC"
details = f"{actual_lines}"
infos["error"].append("AC")
infos["details"].append(details)
return infos

Xet Storage Details

Size:
10.5 kB
·
Xet hash:
89472d6fb484d07c9d15a198d8fa8a1ac7cab88ca75da2e07db67141a1302deb

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.