Tsukihjy/testcase / methods /lcb /parallel_exe.py
Tsukihjy's picture
download
raw
2.97 kB
#/**
# 这里不安全,没有限制内存
# 只限制了时间,时间结束守护线程结束也会把目标线程结束
#**/
import concurrent.futures
import threading
import time
def run_func_from_code_with_timeout(code_str, funcname="construct_inputs", time_limit=100):
"""
Executes the specified function from the provided code string with a timeout.
If the execution exceeds the time limit, it tries to stop the thread.
"""
result = {}
exc = {}
# Event to signal the thread to stop
stop_event = threading.Event()
def target():
try:
local_vars = {}
exec(code_str, local_vars)
func = local_vars.get(funcname)
if not func:
raise ValueError(f"Function {funcname} not found in code")
# Check if stop event is set before execution
if stop_event.is_set():
return
result['value'] = func()
except Exception as e:
exc['error'] = e
# Start the thread
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
# Wait for the thread to complete with timeout
thread.join(timeout=time_limit)
# If the thread is still alive after the timeout, stop it
if thread.is_alive():
stop_event.set() # This will signal the thread to stop
raise TimeoutError(f"Function {funcname} execution timed out")
if 'error' in exc:
raise exc['error']
return result.get('value')
def run_func_code_parallel(random_generators, funcname="construct_inputs", time_limit=100):
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for code_str in random_generators:
futures.append(
executor.submit(run_func_from_code_with_timeout, code_str, funcname, time_limit)
)
for future in concurrent.futures.as_completed(futures):
try:
res = future.result()
results.append(res)
# except TimeoutError as e:
# # Handle timeout error specifically
# # print(f"Timeout occurred: {e}")
# continue
except Exception as e:
# Handle other exceptions
print(f"Error occurred: {e}")
continue
return results
# 示例用法
if __name__ == "__main__":
# Example usage:
random_generators = [
"""
import time
def construct_inputs():
time.sleep(5) # Simulate long-running task
return 'done'
""",
"""
import time
def construct_inputs():
time.sleep(5) # Simulate shorter task
a = 1
while True:
a += 1
return 'finished'
"""
]
try:
results = run_func_code_parallel(random_generators, time_limit=3) # Timeout of 3 seconds
print(results)
except Exception as e:
print(f"An error occurred: {e}")

Xet Storage Details

Size:
2.97 kB
·
Xet hash:
4252b4ec4e2905b90039f03e3963c9a3a57899e3ad19c11329393ab3606e31cc

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.