| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import asyncio |
| import os |
|
|
| import pytest |
| import ray |
| from omegaconf import DictConfig |
| from openai import AsyncOpenAI, OpenAI |
|
|
| from tests.experimental.agent_loop.agent_utils import init_agent_loop_manager |
| from verl.checkpoint_engine import CheckpointEngineManager |
| from verl.utils import omega_conf_to_dataclass |
| from verl.workers.rollout.replica import get_rollout_replica_class |
|
|
|
|
| @pytest.fixture |
| def init_config() -> DictConfig: |
| from hydra import compose, initialize_config_dir |
|
|
| with initialize_config_dir(config_dir=os.path.abspath("verl/trainer/config")): |
| config = compose(config_name="ppo_trainer") |
|
|
| config.trainer.n_gpus_per_node = 4 |
| config.trainer.nnodes = 2 |
| config.actor_rollout_ref.actor.use_dynamic_bsz = True |
| config.actor_rollout_ref.model.path = os.path.expanduser("~/models/Qwen/Qwen2.5-1.5B-Instruct") |
| config.actor_rollout_ref.rollout.name = os.environ["ROLLOUT_NAME"] |
| config.actor_rollout_ref.rollout.mode = "async" |
| config.actor_rollout_ref.rollout.skip_tokenizer_init = False |
|
|
| return config |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.parametrize("tp_size", [2, 4]) |
| async def test_standalone_rollout(init_config, tp_size): |
| """Test standalone rollout single node and multi nodes.""" |
| ray.init( |
| runtime_env={ |
| "env_vars": { |
| "TOKENIZERS_PARALLELISM": "true", |
| "NCCL_DEBUG": "WARN", |
| "VLLM_LOGGING_LEVEL": "INFO", |
| "VLLM_USE_V1": "1", |
| "NCCL_P2P_DISABLE": "1", |
| } |
| } |
| ) |
|
|
| init_config.actor_rollout_ref.rollout.tensor_model_parallel_size = tp_size |
| num_replicas = (init_config.trainer.n_gpus_per_node * init_config.trainer.nnodes) // tp_size |
| rollout_config = init_config.actor_rollout_ref.rollout |
| model_config = init_config.actor_rollout_ref.model |
|
|
| |
| rollout_server_class = get_rollout_replica_class(init_config.actor_rollout_ref.rollout.name) |
| rollout_servers = [ |
| rollout_server_class( |
| replica_rank=replica_rank, |
| config=rollout_config, |
| model_config=model_config, |
| gpus_per_node=init_config.trainer.n_gpus_per_node, |
| ) |
| for replica_rank in range(num_replicas) |
| ] |
| await asyncio.gather(*[server.init_standalone() for server in rollout_servers]) |
|
|
| server_handles = [server._server_handle for server in rollout_servers] |
| server_addresses = [server._server_address for server in rollout_servers] |
| assert len(server_handles) == num_replicas |
| assert len(server_addresses) == num_replicas |
|
|
| os.environ.pop("HTTPS_PROXY", None) |
| os.environ.pop("HTTP_PROXY", None) |
| os.environ.pop("NO_PROXY", None) |
|
|
| client = AsyncOpenAI( |
| api_key="123-abc", |
| base_url=f"http://{server_addresses[0]}/v1", |
| ) |
|
|
| completion = await client.chat.completions.create( |
| model=init_config.actor_rollout_ref.model.path, |
| messages=[{"role": "user", "content": "What can you do?"}], |
| ) |
| print(completion.choices[0].message.content) |
|
|
| ray.shutdown() |
|
|
|
|
| @pytest.mark.skip(reason="local test only") |
| def test_hybrid_rollout_with_ep(init_config): |
| """Test hybrid rollout with expert parallelism, DP=2, TP=4, EP=8.""" |
| ray.init( |
| runtime_env={ |
| "env_vars": { |
| "TOKENIZERS_PARALLELISM": "true", |
| "NCCL_DEBUG": "WARN", |
| "VLLM_LOGGING_LEVEL": "INFO", |
| "VLLM_USE_V1": "1", |
| } |
| } |
| ) |
|
|
| model_path = os.path.expanduser("~/models/Qwen/Qwen3-30B-A3B-Instruct-2507") |
| init_config.actor_rollout_ref.model.path = model_path |
|
|
| |
| init_config.actor_rollout_ref.rollout.tensor_model_parallel_size = 2 |
| init_config.actor_rollout_ref.rollout.data_parallel_size = 4 |
| init_config.actor_rollout_ref.rollout.expert_parallel_size = 8 |
|
|
| |
| |
| |
| |
| agent_loop_manager = init_agent_loop_manager(init_config) |
| checkpoint_manager = CheckpointEngineManager( |
| config=omega_conf_to_dataclass(init_config.actor_rollout_ref.rollout.checkpoint_engine), |
| trainer=agent_loop_manager.worker_group, |
| replicas=agent_loop_manager.rollout_replicas, |
| ) |
| checkpoint_manager.sleep_replicas() |
| checkpoint_manager.update_weights() |
|
|
| |
| server_address = agent_loop_manager.server_addresses[0] |
| client = OpenAI( |
| api_key="123-abc", |
| base_url=f"http://{server_address}/v1", |
| ) |
|
|
| smapling_params = { |
| "temperature": 1.0, |
| "top_p": 1.0, |
| "max_tokens": 512, |
| } |
|
|
| response = client.chat.completions.create( |
| model=model_path, |
| messages=[{"role": "user", "content": "What can you do?"}], |
| **smapling_params, |
| ) |
|
|
| completion = response.choices[0].message.content |
| print(f"response: {completion}") |
|
|
| print("Test passed!") |
| ray.shutdown() |
|
|