# Copyright 2024 Bytedance Ltd. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import os from typing import Any import numpy as np import pytest import ray from omegaconf import DictConfig from transformers.utils import get_json_schema from tests.experimental.agent_loop.agent_utils import init_agent_loop_manager from verl.checkpoint_engine import CheckpointEngineManager from verl.experimental.agent_loop.agent_loop import GlobalRequestLoadBalancer, get_trajectory_info from verl.protocol import DataProto from verl.tools.base_tool import BaseTool, OpenAIFunctionToolSchema from verl.tools.schemas import ToolResponse from verl.utils import hf_tokenizer from verl.utils.config import omega_conf_to_dataclass from verl.workers.config import CheckpointEngineConfig @pytest.fixture def init_config() -> DictConfig: from hydra import compose, initialize_config_dir with initialize_config_dir(config_dir=os.path.abspath("verl/trainer/config")): config = compose( config_name="ppo_trainer", overrides=[ "actor_rollout_ref.actor.use_dynamic_bsz=true", # test sleep/wake_up with fsdp offload "actor_rollout_ref.actor.fsdp_config.param_offload=True", "actor_rollout_ref.actor.fsdp_config.optimizer_offload=True", "reward.reward_manager.name=dapo", "+reward.reward_kwargs.overlong_buffer_cfg.enable=False", "+reward.reward_kwargs.overlong_buffer_cfg.len=3072", "+reward.reward_kwargs.max_resp_len=4096", ], ) model_path = os.path.expanduser("~/models/Qwen/Qwen2.5-1.5B-Instruct") config.actor_rollout_ref.model.path = model_path config.actor_rollout_ref.rollout.name = os.environ["ROLLOUT_NAME"] config.actor_rollout_ref.rollout.mode = "async" config.actor_rollout_ref.rollout.enforce_eager = True config.actor_rollout_ref.rollout.prompt_length = 4096 config.actor_rollout_ref.rollout.response_length = 4096 config.actor_rollout_ref.rollout.n = 4 config.actor_rollout_ref.rollout.agent.num_workers = 2 config.actor_rollout_ref.rollout.skip_tokenizer_init = True return config def test_single_turn(init_config): ray.init( runtime_env={ "env_vars": { "TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "INFO", "VLLM_USE_V1": "1", } } ) agent_loop_manager = init_agent_loop_manager(init_config) raw_prompts = [ [ { "role": "user", "content": "Let's play a role playing game. Your name is Alice, your favorite color is blue.", } ], [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}], ] batch = DataProto( non_tensor_batch={ "raw_prompt": np.array(raw_prompts), "agent_name": np.array(["single_turn_agent"] * len(raw_prompts)), "data_source": np.array(["openai/gsm8k"] * len(raw_prompts)), "reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)), }, ) n = init_config.actor_rollout_ref.rollout.n batch = batch.repeat(n) result = agent_loop_manager.generate_sequences(prompts=batch) assert len(result) == len(raw_prompts) * n # check result seq_len = result.batch["prompts"].size(1) + result.batch["responses"].size(1) assert result.batch["input_ids"].size(1) == seq_len assert result.batch["attention_mask"].size(1) == seq_len assert result.batch["position_ids"].size(1) == seq_len if init_config.actor_rollout_ref.rollout.calculate_log_probs: assert result.batch["rollout_log_probs"].size(1) == result.batch["responses"].size(1) # check compute score assert result.batch["rm_scores"].shape == result.batch["responses"].shape reward_tensor = result.batch["rm_scores"] reward_extra_keys = result.meta_info.get("reward_extra_keys", []) reward_extra_info = {key: result.non_tensor_batch[key] for key in reward_extra_keys} assert reward_tensor.shape == result.batch["responses"].shape assert "acc" in reward_extra_info, f"reward_extra_info {reward_extra_info} should contain 'acc'" assert reward_extra_info["acc"].shape == (len(result),), f"invalid acc: {reward_extra_info['acc']}" # check turns num_turns = result.non_tensor_batch["__num_turns__"] assert np.all(num_turns == 2) print("Test passed!") ray.shutdown() class WeatherTool(BaseTool): def get_current_temperature(self, location: str, unit: str = "celsius"): """Get current temperature at a location. Args: location: The location to get the temperature for, in the format "City, State, Country". unit: The unit to return the temperature in. Defaults to "celsius". (choices: ["celsius", "fahrenheit"]) Returns: the temperature, the location, and the unit in a dict """ print(f"[DEBUG] get_current_temperature: {location}, {unit}") return { "temperature": 26.1, "location": location, "unit": unit, } def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema: schema = get_json_schema(self.get_current_temperature) return OpenAIFunctionToolSchema(**schema) async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[ToolResponse, float, dict]: try: result = self.get_current_temperature(**parameters) return ToolResponse(text=json.dumps(result)), 0, {} except Exception as e: return ToolResponse(text=str(e)), 0, {} class WeatherToolWithData(BaseTool): def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema: schema = get_json_schema(self.get_temperature_date) return OpenAIFunctionToolSchema(**schema) def get_temperature_date(self, location: str, date: str, unit: str = "celsius"): """Get temperature at a location and date. Args: location: The location to get the temperature for, in the format "City, State, Country". date: The date to get the temperature for, in the format "Year-Month-Day". unit: The unit to return the temperature in. Defaults to "celsius". (choices: ["celsius", "fahrenheit"]) Returns: the temperature, the location, the date and the unit in a dict """ print(f"[DEBUG] get_temperature_date: {location}, {date}, {unit}") return { "temperature": 25.9, "location": location, "date": date, "unit": unit, } async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[ToolResponse, float, dict]: try: result = self.get_temperature_date(**parameters) return ToolResponse(text=json.dumps(result)), 0, {} except Exception as e: return ToolResponse(text=str(e)), 0, {} def test_tool_agent(init_config): ray.init( runtime_env={ "env_vars": { "TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "INFO", "VLLM_USE_V1": "1", } }, ignore_reinit_error=True, ) # =========================== 1. Init rollout manager =========================== tool_config = { "tools": [ { "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherTool", "config": {"type": "native"}, }, { "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherToolWithData", "config": {"type": "native"}, }, ] } tool_config_path = "/tmp/tool_config.json" with open(tool_config_path, "w") as f: json.dump(tool_config, f) n = 2 init_config.actor_rollout_ref.rollout.n = n init_config.actor_rollout_ref.rollout.multi_turn.tool_config_path = tool_config_path init_config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls = 2 init_config.actor_rollout_ref.rollout.calculate_log_probs = True agent_loop_manager = init_agent_loop_manager(init_config) # =========================== 2. Generate sequences =========================== raw_prompts = [ [ {"role": "user", "content": "How are you?"}, ], [ {"role": "user", "content": "What's the temperature in Los Angeles now?"}, ], [ {"role": "user", "content": "What's the temperature in New York now?"}, ], [ { "role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\n" "Current Date: 2024-09-30", }, {"role": "user", "content": "What's the temperature in San Francisco now? How about tomorrow?"}, ], ] batch = DataProto( non_tensor_batch={ "raw_prompt": np.array([np.array(prompt) for prompt in raw_prompts], dtype=object), "agent_name": np.array(["tool_agent"] * len(raw_prompts)), "data_source": np.array(["openai/gsm8k"] * len(raw_prompts)), "reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)), }, ) batch = batch.repeat(n) result = agent_loop_manager.generate_sequences(prompts=batch) assert len(result) == len(raw_prompts) * n # Check turns num_turns = result.non_tensor_batch["__num_turns__"] print(f"num_turns: {num_turns}") for i in range(len(num_turns)): if i // n == 0: # [user, assistant] assert num_turns[i] == 2 else: # [user, assistant, tool, assistant] assert num_turns[i] == 4 # Check response_mask tokenizer = hf_tokenizer(init_config.actor_rollout_ref.model.path) responses = result.batch["responses"] response_mask = result.batch["response_mask"] attention_mask = result.batch["attention_mask"] assert result.batch["rm_scores"].size(1) == responses.size(1) assert responses.size() == response_mask.size(), f"{responses.size()} != {response_mask.size()}" assert result.batch["rollout_log_probs"].size(1) == result.batch["responses"].size(1) response_length = response_mask.size(1) for i in range(len(responses)): # response with tool response valid_tokens = responses[i][attention_mask[i][-response_length:].bool()] response_with_obs = tokenizer.decode(valid_tokens) # response without tool response valid_tokens = responses[i][response_mask[i].bool()] response_without_obs = tokenizer.decode(valid_tokens) assert "" not in response_without_obs, ( f"found in response: {response_without_obs}" ) assert "" not in response_without_obs, ( f"found in response: {response_without_obs}" ) print("=========================") print(response_with_obs) print("---") print(response_without_obs) print("Test passed!") ray.shutdown() def test_tool_agent_with_interaction(init_config): ray.init( runtime_env={ "env_vars": { "TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "INFO", "VLLM_USE_V1": "1", } } ) # =========================== 1. Init rollout manager =========================== tool_config = { "tools": [ { "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherTool", "config": {"type": "native"}, }, { "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherToolWithData", "config": {"type": "native"}, }, ] } tool_config_path = "/tmp/tool_config.json" with open(tool_config_path, "w") as f: json.dump(tool_config, f) interaction_config = { "interaction": [ {"name": "weather", "class_name": "verl.interactions.weather_interaction.WeatherInteraction", "config": {}} ] } interaction_config_path = "/tmp/interaction_config.json" with open(interaction_config_path, "w") as f: json.dump(interaction_config, f) n = 2 init_config.actor_rollout_ref.rollout.n = n init_config.actor_rollout_ref.rollout.multi_turn.tool_config_path = tool_config_path init_config.actor_rollout_ref.rollout.multi_turn.interaction_config_path = interaction_config_path init_config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls = 2 agent_loop_manager = init_agent_loop_manager(init_config) checkpoint_engine_config = omega_conf_to_dataclass( init_config.actor_rollout_ref.rollout.checkpoint_engine, CheckpointEngineConfig ) checkpoint_manager = CheckpointEngineManager( config=checkpoint_engine_config, trainer=agent_loop_manager.worker_group, replicas=agent_loop_manager.rollout_replicas, ) checkpoint_manager.sleep_replicas() checkpoint_manager.update_weights() # =========================== 2. Generate sequences =========================== raw_prompts = [ [ {"role": "user", "content": "How are you?"}, ], [ {"role": "user", "content": "What's the temperature in Los Angeles now?"}, ], [ {"role": "user", "content": "What's the temperature in New York now?"}, ], [ { "role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\n" "Current Date: 2024-09-30", }, {"role": "user", "content": "What's the temperature in San Francisco now? How about tomorrow?"}, ], ] batch = DataProto( non_tensor_batch={ "raw_prompt": np.array([np.array(prompt) for prompt in raw_prompts], dtype=object), "agent_name": np.array(["tool_agent"] * len(raw_prompts)), "data_source": np.array(["openai/gsm8k"] * len(raw_prompts)), "reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)), "extra_info": np.array( [ {"interaction_kwargs": {"name": "weather"}}, {"interaction_kwargs": {"name": "weather"}}, {"interaction_kwargs": {"name": "weather"}}, {"interaction_kwargs": {"name": "weather"}}, ] ), }, ) batch = batch.repeat(n) result = agent_loop_manager.generate_sequences(prompts=batch) assert len(result) == len(raw_prompts) * n # Check turns num_turns = result.non_tensor_batch["__num_turns__"] print(f"num_turns: {num_turns}") for i in range(len(num_turns)): if i // n == 0: # [user, assistant, user] assert num_turns[i] == 3 else: # [user, assistant, tool, assistant, user] assert num_turns[i] == 5 # Check response_mask tokenizer = hf_tokenizer(init_config.actor_rollout_ref.model.path) responses = result.batch["responses"] response_mask = result.batch["response_mask"] attention_mask = result.batch["attention_mask"] assert responses.size() == response_mask.size(), f"{responses.size()} != {response_mask.size()}" response_length = response_mask.size(1) for i in range(len(responses)): # response with tool response valid_tokens = responses[i][attention_mask[i][-response_length:].bool()] response_with_obs = tokenizer.decode(valid_tokens) # response without tool response valid_tokens = responses[i][response_mask[i].bool()] response_without_obs = tokenizer.decode(valid_tokens) assert "\udb82\udc89" not in response_without_obs, f"found \udb82\udc89 in response: {response_without_obs}" assert "\udb82\udc8a" not in response_without_obs, f"found \udb82\udc8a in response: {response_without_obs}" print("=========================") print(response_with_obs) print("---") print(response_without_obs) print("Test passed!") ray.shutdown() @pytest.mark.asyncio async def test_get_trajectory_info(): """Tests the get_trajectory_info method.""" # Initialize the class to set up class-level attributes step = 10 index = [1, 1, 3, 3] expected_info = [ {"step": step, "sample_index": 1, "rollout_n": 0, "validate": False}, {"step": step, "sample_index": 1, "rollout_n": 1, "validate": False}, {"step": step, "sample_index": 3, "rollout_n": 0, "validate": False}, {"step": step, "sample_index": 3, "rollout_n": 1, "validate": False}, ] trajectory_info = await get_trajectory_info(step, index, validate=False) assert trajectory_info == expected_info # ────────────────────────────────────────────────────────────────────── # GlobalRequestLoadBalancer unit tests (lightweight, no GPU required) # ────────────────────────────────────────────────────────────────────── @pytest.fixture(scope="module") def ray_for_lb(): ray.init(ignore_reinit_error=True) yield ray.shutdown() class TestLoadBalancerRouting: """Least-loaded selection.""" def test_distributes_across_servers(self, ray_for_lb): lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1", "s2"]) servers = [ray.get(lb.acquire_server.remote(request_id=f"r{i}")) for i in range(3)] assert sorted(servers) == ["s0", "s1", "s2"] def test_new_requests_route_to_least_loaded(self, ray_for_lb): lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1", "s2"]) # Load s0 with 3 inflight requests ray.get(lb.acquire_server.remote(request_id="a")) # -> s0 ray.get(lb.acquire_server.remote(request_id="a")) # sticky -> s0 ray.get(lb.acquire_server.remote(request_id="a")) # sticky -> s0 # Load s1 with 1 inflight request ray.get(lb.acquire_server.remote(request_id="b")) # -> s1 # s2 has 0 inflight, so next new request must go to s2 s_new = ray.get(lb.acquire_server.remote(request_id="d")) assert s_new == "s2" def test_release_rebalances(self, ray_for_lb): lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1"]) s0 = ray.get(lb.acquire_server.remote(request_id="r0")) s1 = ray.get(lb.acquire_server.remote(request_id="r1")) assert s0 != s1 ray.get(lb.release_server.remote(server_id=s0)) ray.get(lb.release_server.remote(server_id=s1)) s2 = ray.get(lb.acquire_server.remote(request_id="r2")) s3 = ray.get(lb.acquire_server.remote(request_id="r3")) assert s2 != s3 def test_release_invalid_server_raises(self, ray_for_lb): lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1"]) with pytest.raises(ray.exceptions.RayTaskError, match="Invalid server_id") as excinfo: ray.get(lb.release_server.remote(server_id="nonexistent")) assert "Invalid server_id" in str(excinfo.value) def test_release_without_inflight_raises(self, ray_for_lb): lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1"]) with pytest.raises(ray.exceptions.RayTaskError, match="no inflight") as excinfo: ray.get(lb.release_server.remote(server_id="s1")) assert "no inflight" in str(excinfo.value) class TestLoadBalancerStickySession: """Request-level sticky session.""" def test_same_request_id_same_server(self, ray_for_lb): lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1", "s2", "s3"]) s0 = ray.get(lb.acquire_server.remote(request_id="conv-abc")) ray.get(lb.release_server.remote(server_id=s0)) s1 = ray.get(lb.acquire_server.remote(request_id="conv-abc")) assert s0 == s1