| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| Contains commonly used utilities for ray |
| """ |
|
|
| import asyncio |
| import concurrent.futures |
| import functools |
| import inspect |
| import os |
| from typing import Any, Optional |
|
|
| import ray |
|
|
|
|
| def ray_noset_visible_devices(env_vars=os.environ): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| NOSET_VISIBLE_DEVICES_ENV_VARS_LIST = [ |
| "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", |
| "RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES", |
| "RAY_EXPERIMENTAL_NOSET_HIP_VISIBLE_DEVICES", |
| "RAY_EXPERIMENTAL_NOSET_ASCEND_RT_VISIBLE_DEVICES", |
| "RAY_EXPERIMENTAL_NOSET_HABANA_VISIBLE_MODULES", |
| "RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES", |
| "RAY_EXPERIMENTAL_NOSET_TPU_VISIBLE_CHIPS", |
| "RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR", |
| ] |
| return any(env_vars.get(env_var) for env_var in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST) |
|
|
|
|
| def parallel_put(data_list: list[Any], max_workers: Optional[int] = None): |
| """ |
| Puts a list of data into the Ray object store in parallel using a thread pool. |
| |
| Args: |
| data_list (List[Any]): A list of Python objects to be put into the Ray object store. |
| max_workers (int, optional): The maximum number of worker threads to use. |
| Defaults to min(len(data_list), 16). |
| |
| Returns: |
| List[ray.ObjectRef]: A list of Ray object references corresponding to the input data_list, |
| maintaining the original order. |
| """ |
| assert len(data_list) > 0, "data_list must not be empty" |
|
|
| def put_data(index, data): |
| return index, ray.put(data) |
|
|
| if max_workers is None: |
| max_workers = min(len(data_list), 16) |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| data_list_f = [executor.submit(put_data, i, data) for i, data in enumerate(data_list)] |
| res_lst = [] |
| for future in concurrent.futures.as_completed(data_list_f): |
| res_lst.append(future.result()) |
|
|
| |
| output = [None for _ in range(len(data_list))] |
| for res in res_lst: |
| index, data_ref = res |
| output[index] = data_ref |
|
|
| return output |
|
|
|
|
| def get_event_loop(): |
| try: |
| loop = asyncio.get_event_loop() |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| return loop |
|
|
|
|
| def auto_await(func): |
| """Auto await a coroutine function. |
| |
| Handles three cases: |
| 1. When the decorated function is called with await: returns the coroutine |
| so the caller can await it. |
| 2. When called directly and there is no running event loop: runs the |
| coroutine with asyncio.run() and returns the result. |
| 3. When called directly and the event loop is already running: runs the |
| coroutine (e.g. in a thread pool to avoid deadlock) and returns the result. |
| """ |
|
|
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| coro = func(*args, **kwargs) |
|
|
| if not inspect.iscoroutine(coro): |
| return coro |
|
|
| try: |
| loop = asyncio.get_running_loop() |
| except RuntimeError: |
| loop = None |
|
|
| |
| if loop is None: |
| return asyncio.run(coro) |
|
|
| |
| caller_frame = inspect.currentframe() |
| if caller_frame is not None: |
| caller_frame = caller_frame.f_back |
| caller_is_async = caller_frame is not None and (caller_frame.f_code.co_flags & inspect.CO_COROUTINE) != 0 |
| if caller_is_async: |
| return coro |
|
|
| |
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: |
| future = pool.submit(asyncio.run, coro) |
| return future.result() |
|
|
| return wrapper |
|
|