| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import functools |
| import json |
| import os |
|
|
| import ray |
|
|
| |
| from verl.single_controller.base.decorator import * |
|
|
|
|
| def maybe_remote(main): |
| """Schedule main function as ray remote task if VERL_DRIVER_NUM_GPUS or VERL_DRIVER_RESOURCES specified in config. |
| - VERL_DRIVER_NUM_GPUS: number of GPUs for driver task. |
| - VERL_DRIVER_RESOURCES: custom resources for driver task, e.g {"verl_driver": 1.0}. |
| |
| For job submission to ray cluster, you can specify these two envs in runtime.yaml. |
| ```yaml |
| working_dir: "." |
| env_vars: |
| VERL_DRIVER_NUM_GPUS: "1" |
| VERL_DRIVER_RESOURCES: '{"verl_driver": 1.0}' |
| ``` |
| |
| ray job submit --runtime-env=runtime.yaml -- python3 test.py |
| |
| Args: |
| main (Callable): main function to be schedule. |
| """ |
|
|
| num_gpus = 0 |
| resources = {} |
| env_num_gpus = os.getenv("VERL_DRIVER_NUM_GPUS") |
| if env_num_gpus: |
| num_gpus = int(env_num_gpus) |
| env_resources = os.getenv("VERL_DRIVER_RESOURCES") |
| if env_resources: |
| resources = json.loads(env_resources) |
| print(f"verl driver num_gpus: {num_gpus}, resources={resources}") |
| assert isinstance(resources, dict), f"resources must be dict, got {type(resources)}" |
|
|
| @functools.wraps(main) |
| def _main(*args, **kwargs): |
| |
| if num_gpus == 0 and len(resources) == 0: |
| return main(*args, **kwargs) |
|
|
| |
| f = ray.remote(num_gpus=num_gpus, resources=resources)(main) |
| return ray.get(f.remote(*args, **kwargs)) |
|
|
| return _main |
|
|