| import multiprocessing | |
| import traceback | |
| import resource | |
| import re | |
| def replace_newline_in_fstring(code: str) -> str: | |
| # 找到以 f" 开头的行,并且在行内替换 \n 为 \\n | |
| def replace_in_fstring(match): | |
| # 获取 f" 后面的部分,直到下一个 " 结束 | |
| string_content = match.group(1) | |
| # 替换其中的 \n 为 \\n | |
| modified_content = string_content.replace("\n", "\\n") | |
| # 返回修改后的 f"内容" | |
| return f'f"{modified_content}"' | |
| # 正则表达式:匹配 f" 开头的字符串,并且中间的内容捕获到括号内 | |
| pattern = r'f"([^"]*)"' | |
| # 使用 re.sub() 进行替换 | |
| return re.sub(pattern, replace_in_fstring, code) | |
| import time | |
| def function_execute_box_process(code_str, funcname="construct_inputs", time_limit=10): | |
| def target_func(shared_dict): | |
| try: | |
| env = {} | |
| exec(code_str, env) | |
| if funcname in env and callable(env[funcname]): | |
| result = env[funcname]() | |
| shared_dict['status'] = 'success' | |
| shared_dict['result'] = result | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = 'No function named construct_inputs found' | |
| except Exception: | |
| shared_dict['status'] = 'error' | |
| # shared_dict['result'] = traceback.format_exc() | |
| manager = multiprocessing.Manager() | |
| shared_dict = manager.dict() | |
| start_time = time.time() | |
| try: | |
| exec(code_str) | |
| except: | |
| code_str = replace_newline_in_fstring(code_str) | |
| process = multiprocessing.Process(target=target_func, args=(shared_dict,)) | |
| process.start() | |
| process.join(time_limit) | |
| if process.is_alive(): | |
| process.terminate() | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = 'TimeoutError: Execution exceeded time limit' | |
| return shared_dict | |
| if shared_dict.get('status') == 'success': | |
| return shared_dict | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = 'TimeoutError: Execution exceeded time limit' | |
| return shared_dict | |
| if __name__ == "__main__": | |
| code = """ | |
| import numpy as np | |
| def random_input_generator(a_min, a_max, n_min, n_max, k_min, k_max): | |
| # Generate random size for the integer sequence | |
| n = np.random.randint(n_min, n_max + 1) | |
| # Generate random integer sequence | |
| a_sequence = ' '.join(str(np.random.randint(a_min, a_max + 1)) for _ in range(n)) | |
| # Generate random size for the monotonic sequence | |
| k = np.random.randint(k_min, k_max + 1) | |
| # Generate random monotonic sequence | |
| symbols = ['<', '>', '='] | |
| s_sequence = ' '.join(np.random.choice(symbols) for _ in range(k)) | |
| # Return the formatted string for n, k, a_sequence, and s_sequence | |
| return f"{n} {k}\n{a_sequence}\n{s_sequence}" | |
| def construct_inputs(): | |
| inputs_list = [] | |
| # Small inputs (integer values between 1 and 100, sequence size between 1 and 10) | |
| for i in range(10): | |
| inputs_list.append(random_input_generator(1, 100, 1, 10, 1, 10)) | |
| # Medium inputs (integer values between 1 and 1000, sequence size between 10 and 100) | |
| for i in range(10): | |
| inputs_list.append(random_input_generator(1, 1000, 10, 100, 10, 100)) | |
| # Large inputs (integer values between 1 and 1000000, sequence size between 100 and 500000) | |
| for i in range(10): | |
| inputs_list.append(random_input_generator(1, 1000000, 100, 500000, 100, 500000)) | |
| return inputs_list | |
| """ | |
| replace_newline_in_fstring(code) | |
| import multiprocessing | |
| import traceback | |
| import psutil | |
| import os | |
| import time | |
| def function_execute_box_process_limit_mm(code_str, funcname="construct_inputs", time_limit=50, memory_limit_mb=200): | |
| def target_func(shared_dict): | |
| try: | |
| env = {} | |
| exec(code_str, env) | |
| if funcname in env and callable(env[funcname]): | |
| result = env[funcname]() | |
| shared_dict['status'] = 'success' | |
| shared_dict['result'] = result | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = 'No function named construct_inputs found' | |
| except Exception: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = traceback.format_exc() | |
| manager = multiprocessing.Manager() | |
| shared_dict = manager.dict() | |
| process = multiprocessing.Process(target=target_func, args=(shared_dict,)) | |
| process.start() | |
| # 获取进程的 PID | |
| pid = process.pid | |
| p = psutil.Process(pid) | |
| start_time = time.time() # 记录开始时间 | |
| # 动态监控进程内存使用情况和时间 | |
| while process.is_alive(): | |
| current_time = time.time() # 获取当前时间 | |
| elapsed_time = current_time - start_time # 计算已经过去的时间 | |
| if elapsed_time > time_limit: | |
| process.terminate() # 超过时间限制,终止进程 | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = 'TimeoutError: Execution exceeded time limit' | |
| return shared_dict | |
| memory_info = p.memory_info() | |
| memory_usage = memory_info.rss # 获取实际内存使用量 | |
| if memory_usage > memory_limit_mb * 1024 * 1024: # 转换为字节 | |
| process.terminate() # 超过内存限制,终止进程 | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = f'Memory limit exceeded: {memory_usage / (1024 * 1024)} MB' | |
| return shared_dict | |
| # 等待一小段时间,避免高频率的CPU占用 | |
| time.sleep(1) | |
| if shared_dict.get('status') == 'success': | |
| return shared_dict | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = 'Execution failed' | |
| return shared_dict | |
| import multiprocessing as mp | |
| import traceback | |
| def function_execute_box_subprocess(code_str, funcname="construct_inputs", time_limit=100): | |
| def worker(conn): | |
| try: | |
| env = {} | |
| exec(code_str, env) | |
| if funcname in env and callable(env[funcname]): | |
| res = env[funcname]() | |
| conn.send({'status': 'success', 'result': res}) | |
| else: | |
| conn.send({'status': 'error', 'details': f'No function named {funcname} found'}) | |
| except Exception: | |
| conn.send({'status': 'error', 'details': traceback.format_exc()}) | |
| finally: | |
| conn.close() | |
| parent_conn, child_conn = mp.Pipe(duplex=False) | |
| p = mp.Process(target=worker, args=(child_conn,)) | |
| p.start() | |
| child_conn.close() # 父进程不需要子端 | |
| p.join(timeout=time_limit) | |
| if p.is_alive(): | |
| p.terminate() # ⬅️ 真·强制结束 | |
| p.join() | |
| return {'status': 'error', 'details': 'TimeoutError: Execution exceeded time limit'} | |
| if parent_conn.poll(): | |
| return parent_conn.recv() | |
| else: | |
| return {'status': 'error', 'details': 'Child exited without returning result'} | |
| import subprocess | |
| import tempfile | |
| import os | |
| import uuid | |
| import json | |
| import re | |
| def is_decimal(s): | |
| try: | |
| a = float(s) | |
| except: | |
| return False | |
| return bool(re.match(r"^-?\d+\.\d+$", s)) | |
| def run_cpp_code_linux(infos, test_mode = False, rank_p = 1): | |
| code = infos["code"] | |
| time_limit = infos["time_limit"] | |
| memory_limit = infos["memory_limit"] | |
| test_cases = infos["test_cases"] | |
| infos["error"] = [] | |
| infos["details"] = [] | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| unique_id = uuid.uuid4() | |
| cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp") | |
| exe_file = os.path.join(tmpdirname, f"{unique_id}.out") | |
| # Write C++ code to file | |
| with open(cpp_file, "w") as f: | |
| f.write(code) | |
| # Compile the C++ code | |
| compile_result = subprocess.run( | |
| ["g++", cpp_file, "-o", exe_file, f"-std={infos['compileAndRunOptions']['std']}"], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if compile_result.returncode != 0: | |
| infos["error"].append("CE") | |
| infos["details"].append(compile_result.stderr) | |
| return infos | |
| memory_kb = int(memory_limit) * 1024 * 3 | |
| time_limit_int = int(time_limit) // 1000 + 3 | |
| cmd = f"ulimit -t {time_limit_int} && ulimit -v {memory_kb} && {exe_file}" | |
| # cmd = f"{exe_file}" | |
| for idx, testcase in enumerate(test_cases): | |
| input_string = testcase | |
| ## TODO:暂时跳过了,需要清理空缺输出的 | |
| error = "" | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| input=input_string, | |
| text=True, | |
| capture_output=True, | |
| shell=True, | |
| timeout=time_limit_int | |
| ) | |
| # 检查返回码 | |
| if result.returncode != 0: | |
| if result.returncode == 137: # SIGKILL - 通常是内存超限 | |
| error = "MLE" | |
| details = f"{error}: Testcase:{idx}" | |
| elif result.returncode == 124: # timeout命令的超时返回码 | |
| error = "TLE" | |
| details = f"{error}: Testcase:{idx}" | |
| else: | |
| error = "RE" | |
| details = f"{error}: Testcase:{idx}" | |
| except subprocess.TimeoutExpired: | |
| error = "TLE" | |
| details = f"{error}: Testcase:{idx}" | |
| except Exception as e: | |
| error = "RE" | |
| details = f"{error}: Testcase:{idx}" | |
| if not error: | |
| actual_lines = [line.strip() for line in result.stdout.splitlines()] | |
| actual_lines = [line for line in actual_lines if line] | |
| actual_lines = (" ".join(actual_lines)).strip() | |
| if error: | |
| infos["error"].append(error) | |
| infos["details"].append(details) | |
| # if test_mode: | |
| # return infos | |
| if not error: | |
| error = "AC" | |
| details = f"{actual_lines}" | |
| infos["error"].append("AC") | |
| infos["details"].append(details) | |
| return infos |
Xet Storage Details
- Size:
- 10.5 kB
- Xet hash:
- 89472d6fb484d07c9d15a198d8fa8a1ac7cab88ca75da2e07db67141a1302deb
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.