| import multiprocessing | |
| import traceback | |
| import resource | |
| import inspect | |
| def all_params_have_defaults(func): | |
| sig = inspect.signature(func) | |
| for name, param in sig.parameters.items(): | |
| # 排除掉 *args 和 **kwargs,这类本身不需要默认值 | |
| if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): | |
| continue | |
| # 如果参数没有默认值,则返回 False | |
| if param.default is inspect.Parameter.empty: | |
| return False | |
| return True | |
| def function_execute_box_process(code_str, funcname="construct_inputs", time_limit=100): | |
| def target_func(shared_dict): | |
| try: | |
| env = {} | |
| exec(code_str, env) | |
| if funcname in env and callable(env[funcname]): | |
| result = env[funcname](batch_size=1) | |
| shared_dict['status'] = 'success' | |
| shared_dict['result'] = result[0] | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = 'No function named construct_inputs found' | |
| except Exception: | |
| shared_dict['status'] = 'error' | |
| shared_dict['result'] = traceback.format_exc() | |
| manager = multiprocessing.Manager() | |
| shared_dict = manager.dict() | |
| process = multiprocessing.Process(target=target_func, args=(shared_dict,)) | |
| process.start() | |
| process.join(time_limit) | |
| if process.is_alive(): | |
| process.terminate() | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = 'TimeoutError: Execution exceeded time limit' | |
| return shared_dict | |
| if shared_dict.get('status') == 'success': | |
| return shared_dict | |
| else: | |
| shared_dict['status'] = 'error' | |
| shared_dict['details'] = 'TimeoutError: Execution exceeded time limit' | |
| return shared_dict | |
| import multiprocessing as mp | |
| import traceback | |
| def function_execute_box_subprocess(code_str, funcname="construct_inputs", time_limit=100): | |
| def worker(conn): | |
| try: | |
| env = {} | |
| exec(code_str, env) | |
| if funcname in env and callable(env[funcname]): | |
| res = env[funcname]() | |
| conn.send({'status': 'success', 'result': res}) | |
| else: | |
| conn.send({'status': 'error', 'details': f'No function named {funcname} found'}) | |
| except Exception: | |
| conn.send({'status': 'error', 'details': traceback.format_exc()}) | |
| finally: | |
| conn.close() | |
| parent_conn, child_conn = mp.Pipe(duplex=False) | |
| p = mp.Process(target=worker, args=(child_conn,)) | |
| p.start() | |
| child_conn.close() # 父进程不需要子端 | |
| p.join(timeout=time_limit) | |
| if p.is_alive(): | |
| p.terminate() # ⬅️ 真·强制结束 | |
| p.join() | |
| return {'status': 'error', 'details': 'TimeoutError: Execution exceeded time limit'} | |
| if parent_conn.poll(): | |
| return parent_conn.recv() | |
| else: | |
| return {'status': 'error', 'details': 'Child exited without returning result'} | |
| import subprocess | |
| import tempfile | |
| import os | |
| import uuid | |
| import json | |
| import re | |
| def is_decimal(s): | |
| try: | |
| a = float(s) | |
| except: | |
| return False | |
| return bool(re.match(r"^-?\d+\.\d+$", s)) | |
| def run_cpp_code_linux(infos, test_mode = False, rank_p = 1): | |
| code = infos["code"] | |
| time_limit = infos["time_limit"] | |
| memory_limit = infos["memory_limit"] | |
| test_cases = infos["test_cases"] | |
| infos["error"] = [] | |
| infos["details"] = [] | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| unique_id = uuid.uuid4() | |
| cpp_file = os.path.join(tmpdirname, f"{unique_id}.cpp") | |
| exe_file = os.path.join(tmpdirname, f"{unique_id}.out") | |
| # Write C++ code to file | |
| with open(cpp_file, "w") as f: | |
| f.write(code) | |
| # Compile the C++ code | |
| compile_result = subprocess.run( | |
| ["g++", cpp_file, "-o", exe_file, f"-std={infos['compileAndRunOptions']['std']}"], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if compile_result.returncode != 0: | |
| infos["error"].append("CE") | |
| infos["details"].append(compile_result.stderr) | |
| return infos | |
| memory_kb = int(memory_limit) * 1024 * 5 | |
| time_limit_int = int(time_limit) // 1000 + 3 | |
| cmd = f"ulimit -t {time_limit_int} && ulimit -v {memory_kb} && {exe_file}" | |
| # cmd = f"{exe_file}" | |
| for idx, testcase in enumerate(test_cases): | |
| input_string = testcase["input"] | |
| output_string = testcase["output"] | |
| error = "" | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| input=input_string, | |
| text=True, | |
| capture_output=True, | |
| shell=True, | |
| timeout=time_limit_int | |
| ) | |
| # 检查返回码 | |
| if result.returncode != 0: | |
| if result.returncode == 137: # SIGKILL - 通常是内存超限 | |
| error = "MLE" | |
| details = f"{error}: Testcase:{idx}" | |
| elif result.returncode == 124: # timeout命令的超时返回码 | |
| error = "TLE" | |
| details = f"{error}: Testcase:{idx}" | |
| else: | |
| error = "RE" | |
| details = f"{error}: Testcase:{idx}" | |
| except subprocess.TimeoutExpired: | |
| error = "TLE" | |
| details = f"{error}: Testcase:{idx}" | |
| except Exception as e: | |
| error = "RE" | |
| details = f"{error}: Testcase:{idx}" | |
| if not error: | |
| if isinstance(output_string, float): | |
| output_string = str(output_string) | |
| expected_lines = [line.strip() for line in output_string.splitlines()] | |
| actual_lines = [line.strip() for line in result.stdout.splitlines()] | |
| # 移除空行 | |
| expected_lines = [line for line in expected_lines if line] | |
| actual_lines = [line for line in actual_lines if line] | |
| # ## 合并为一行 | |
| actual_lines = (" ".join(actual_lines)).strip() | |
| expected_lines = (" ".join(expected_lines)).strip() | |
| # ## 小数保留 6 位 | |
| if is_decimal(actual_lines) and is_decimal(expected_lines): | |
| if abs(float(actual_lines) - float(expected_lines)) > 1e-6: | |
| error = "WA" | |
| details = f"{error}: Testcase:{idx}" | |
| else: | |
| if actual_lines != expected_lines: | |
| error = "WA" | |
| details = f"{error}: Testcase:{idx}" | |
| if error: | |
| infos["error"].append(error) | |
| infos["details"].append(details) | |
| if not error: | |
| error = "AC" | |
| details = f"{error}: Testcase:{idx}" | |
| infos["error"].append("AC") | |
| infos["details"].append(details) | |
| return infos | |
| import subprocess | |
| import tempfile | |
| import os | |
| def run_python_code_linux(code_str, test_input, time_limit=2): | |
| with tempfile.NamedTemporaryFile(mode='w+', suffix='.py', delete=False) as tmp_file: | |
| tmp_file.write(code_str) | |
| tmp_file.flush() | |
| tmp_filename = tmp_file.name | |
| try: | |
| # 使用 subprocess 运行 Python 程序,并提供 stdin 输入 | |
| result = subprocess.run( | |
| ['python3', tmp_filename], | |
| input=test_input, | |
| text=True, | |
| capture_output=True, | |
| timeout=time_limit | |
| ) | |
| return result | |
| except subprocess.TimeoutExpired: | |
| return "TimeoutError: Execution exceeded time limit" | |
| except Exception as e: | |
| return f"ExecutionError: {str(e)}" | |
| finally: | |
| os.remove(tmp_filename) # 清理临时文件 |
Xet Storage Details
- Size:
- 8.13 kB
- Xet hash:
- b77f81e38d11e6ac5fb42b144306d0eebc5b8fa36f6114dfc09eeeaa405877e2
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.