| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import json |
| import os |
| from typing import Any |
|
|
| import numpy as np |
| import pytest |
| import ray |
| from omegaconf import DictConfig |
| from transformers.utils import get_json_schema |
|
|
| from tests.experimental.agent_loop.agent_utils import init_agent_loop_manager |
| from verl.checkpoint_engine import CheckpointEngineManager |
| from verl.experimental.agent_loop.agent_loop import GlobalRequestLoadBalancer, get_trajectory_info |
| from verl.protocol import DataProto |
| from verl.tools.base_tool import BaseTool, OpenAIFunctionToolSchema |
| from verl.tools.schemas import ToolResponse |
| from verl.utils import hf_tokenizer |
| from verl.utils.config import omega_conf_to_dataclass |
| from verl.workers.config import CheckpointEngineConfig |
|
|
|
|
| @pytest.fixture |
| def init_config() -> DictConfig: |
| from hydra import compose, initialize_config_dir |
|
|
| with initialize_config_dir(config_dir=os.path.abspath("verl/trainer/config")): |
| config = compose( |
| config_name="ppo_trainer", |
| overrides=[ |
| "actor_rollout_ref.actor.use_dynamic_bsz=true", |
| |
| "actor_rollout_ref.actor.fsdp_config.param_offload=True", |
| "actor_rollout_ref.actor.fsdp_config.optimizer_offload=True", |
| "reward.reward_manager.name=dapo", |
| "+reward.reward_kwargs.overlong_buffer_cfg.enable=False", |
| "+reward.reward_kwargs.overlong_buffer_cfg.len=3072", |
| "+reward.reward_kwargs.max_resp_len=4096", |
| ], |
| ) |
|
|
| model_path = os.path.expanduser("~/models/Qwen/Qwen2.5-1.5B-Instruct") |
| config.actor_rollout_ref.model.path = model_path |
| config.actor_rollout_ref.rollout.name = os.environ["ROLLOUT_NAME"] |
| config.actor_rollout_ref.rollout.mode = "async" |
| config.actor_rollout_ref.rollout.enforce_eager = True |
| config.actor_rollout_ref.rollout.prompt_length = 4096 |
| config.actor_rollout_ref.rollout.response_length = 4096 |
| config.actor_rollout_ref.rollout.n = 4 |
| config.actor_rollout_ref.rollout.agent.num_workers = 2 |
| config.actor_rollout_ref.rollout.skip_tokenizer_init = True |
|
|
| return config |
|
|
|
|
| def test_single_turn(init_config): |
| ray.init( |
| runtime_env={ |
| "env_vars": { |
| "TOKENIZERS_PARALLELISM": "true", |
| "NCCL_DEBUG": "WARN", |
| "VLLM_LOGGING_LEVEL": "INFO", |
| "VLLM_USE_V1": "1", |
| } |
| } |
| ) |
|
|
| agent_loop_manager = init_agent_loop_manager(init_config) |
|
|
| raw_prompts = [ |
| [ |
| { |
| "role": "user", |
| "content": "Let's play a role playing game. Your name is Alice, your favorite color is blue.", |
| } |
| ], |
| [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}], |
| ] |
| batch = DataProto( |
| non_tensor_batch={ |
| "raw_prompt": np.array(raw_prompts), |
| "agent_name": np.array(["single_turn_agent"] * len(raw_prompts)), |
| "data_source": np.array(["openai/gsm8k"] * len(raw_prompts)), |
| "reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)), |
| }, |
| ) |
| n = init_config.actor_rollout_ref.rollout.n |
| batch = batch.repeat(n) |
| result = agent_loop_manager.generate_sequences(prompts=batch) |
| assert len(result) == len(raw_prompts) * n |
|
|
| |
| seq_len = result.batch["prompts"].size(1) + result.batch["responses"].size(1) |
| assert result.batch["input_ids"].size(1) == seq_len |
| assert result.batch["attention_mask"].size(1) == seq_len |
| assert result.batch["position_ids"].size(1) == seq_len |
|
|
| if init_config.actor_rollout_ref.rollout.calculate_log_probs: |
| assert result.batch["rollout_log_probs"].size(1) == result.batch["responses"].size(1) |
|
|
| |
| assert result.batch["rm_scores"].shape == result.batch["responses"].shape |
| reward_tensor = result.batch["rm_scores"] |
| reward_extra_keys = result.meta_info.get("reward_extra_keys", []) |
| reward_extra_info = {key: result.non_tensor_batch[key] for key in reward_extra_keys} |
| assert reward_tensor.shape == result.batch["responses"].shape |
| assert "acc" in reward_extra_info, f"reward_extra_info {reward_extra_info} should contain 'acc'" |
| assert reward_extra_info["acc"].shape == (len(result),), f"invalid acc: {reward_extra_info['acc']}" |
|
|
| |
| num_turns = result.non_tensor_batch["__num_turns__"] |
| assert np.all(num_turns == 2) |
|
|
| print("Test passed!") |
| ray.shutdown() |
|
|
|
|
| class WeatherTool(BaseTool): |
| def get_current_temperature(self, location: str, unit: str = "celsius"): |
| """Get current temperature at a location. |
| |
| Args: |
| location: The location to get the temperature for, in the format "City, State, Country". |
| unit: The unit to return the temperature in. Defaults to "celsius". (choices: ["celsius", "fahrenheit"]) |
| |
| Returns: |
| the temperature, the location, and the unit in a dict |
| """ |
| print(f"[DEBUG] get_current_temperature: {location}, {unit}") |
| return { |
| "temperature": 26.1, |
| "location": location, |
| "unit": unit, |
| } |
|
|
| def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema: |
| schema = get_json_schema(self.get_current_temperature) |
| return OpenAIFunctionToolSchema(**schema) |
|
|
| async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[ToolResponse, float, dict]: |
| try: |
| result = self.get_current_temperature(**parameters) |
| return ToolResponse(text=json.dumps(result)), 0, {} |
| except Exception as e: |
| return ToolResponse(text=str(e)), 0, {} |
|
|
|
|
| class WeatherToolWithData(BaseTool): |
| def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema: |
| schema = get_json_schema(self.get_temperature_date) |
| return OpenAIFunctionToolSchema(**schema) |
|
|
| def get_temperature_date(self, location: str, date: str, unit: str = "celsius"): |
| """Get temperature at a location and date. |
| |
| Args: |
| location: The location to get the temperature for, in the format "City, State, Country". |
| date: The date to get the temperature for, in the format "Year-Month-Day". |
| unit: The unit to return the temperature in. Defaults to "celsius". (choices: ["celsius", "fahrenheit"]) |
| |
| Returns: |
| the temperature, the location, the date and the unit in a dict |
| """ |
| print(f"[DEBUG] get_temperature_date: {location}, {date}, {unit}") |
| return { |
| "temperature": 25.9, |
| "location": location, |
| "date": date, |
| "unit": unit, |
| } |
|
|
| async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[ToolResponse, float, dict]: |
| try: |
| result = self.get_temperature_date(**parameters) |
| return ToolResponse(text=json.dumps(result)), 0, {} |
| except Exception as e: |
| return ToolResponse(text=str(e)), 0, {} |
|
|
|
|
| def test_tool_agent(init_config): |
| ray.init( |
| runtime_env={ |
| "env_vars": { |
| "TOKENIZERS_PARALLELISM": "true", |
| "NCCL_DEBUG": "WARN", |
| "VLLM_LOGGING_LEVEL": "INFO", |
| "VLLM_USE_V1": "1", |
| } |
| }, |
| ignore_reinit_error=True, |
| ) |
|
|
| |
| tool_config = { |
| "tools": [ |
| { |
| "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherTool", |
| "config": {"type": "native"}, |
| }, |
| { |
| "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherToolWithData", |
| "config": {"type": "native"}, |
| }, |
| ] |
| } |
| tool_config_path = "/tmp/tool_config.json" |
| with open(tool_config_path, "w") as f: |
| json.dump(tool_config, f) |
|
|
| n = 2 |
| init_config.actor_rollout_ref.rollout.n = n |
| init_config.actor_rollout_ref.rollout.multi_turn.tool_config_path = tool_config_path |
| init_config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls = 2 |
| init_config.actor_rollout_ref.rollout.calculate_log_probs = True |
| agent_loop_manager = init_agent_loop_manager(init_config) |
|
|
| |
| raw_prompts = [ |
| [ |
| {"role": "user", "content": "How are you?"}, |
| ], |
| [ |
| {"role": "user", "content": "What's the temperature in Los Angeles now?"}, |
| ], |
| [ |
| {"role": "user", "content": "What's the temperature in New York now?"}, |
| ], |
| [ |
| { |
| "role": "system", |
| "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\n" |
| "Current Date: 2024-09-30", |
| }, |
| {"role": "user", "content": "What's the temperature in San Francisco now? How about tomorrow?"}, |
| ], |
| ] |
| batch = DataProto( |
| non_tensor_batch={ |
| "raw_prompt": np.array([np.array(prompt) for prompt in raw_prompts], dtype=object), |
| "agent_name": np.array(["tool_agent"] * len(raw_prompts)), |
| "data_source": np.array(["openai/gsm8k"] * len(raw_prompts)), |
| "reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)), |
| }, |
| ) |
| batch = batch.repeat(n) |
| result = agent_loop_manager.generate_sequences(prompts=batch) |
| assert len(result) == len(raw_prompts) * n |
|
|
| |
| num_turns = result.non_tensor_batch["__num_turns__"] |
| print(f"num_turns: {num_turns}") |
| for i in range(len(num_turns)): |
| if i // n == 0: |
| |
| assert num_turns[i] == 2 |
| else: |
| |
| assert num_turns[i] == 4 |
|
|
| |
| tokenizer = hf_tokenizer(init_config.actor_rollout_ref.model.path) |
| responses = result.batch["responses"] |
| response_mask = result.batch["response_mask"] |
| attention_mask = result.batch["attention_mask"] |
| assert result.batch["rm_scores"].size(1) == responses.size(1) |
| assert responses.size() == response_mask.size(), f"{responses.size()} != {response_mask.size()}" |
| assert result.batch["rollout_log_probs"].size(1) == result.batch["responses"].size(1) |
|
|
| response_length = response_mask.size(1) |
| for i in range(len(responses)): |
| |
| valid_tokens = responses[i][attention_mask[i][-response_length:].bool()] |
| response_with_obs = tokenizer.decode(valid_tokens) |
|
|
| |
| valid_tokens = responses[i][response_mask[i].bool()] |
| response_without_obs = tokenizer.decode(valid_tokens) |
|
|
| assert "<tool_response>" not in response_without_obs, ( |
| f"found <tool_response> in response: {response_without_obs}" |
| ) |
| assert "</tool_response>" not in response_without_obs, ( |
| f"found </tool_response> in response: {response_without_obs}" |
| ) |
| print("=========================") |
| print(response_with_obs) |
| print("---") |
| print(response_without_obs) |
|
|
| print("Test passed!") |
| ray.shutdown() |
|
|
|
|
| def test_tool_agent_with_interaction(init_config): |
| ray.init( |
| runtime_env={ |
| "env_vars": { |
| "TOKENIZERS_PARALLELISM": "true", |
| "NCCL_DEBUG": "WARN", |
| "VLLM_LOGGING_LEVEL": "INFO", |
| "VLLM_USE_V1": "1", |
| } |
| } |
| ) |
|
|
| |
| tool_config = { |
| "tools": [ |
| { |
| "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherTool", |
| "config": {"type": "native"}, |
| }, |
| { |
| "class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherToolWithData", |
| "config": {"type": "native"}, |
| }, |
| ] |
| } |
| tool_config_path = "/tmp/tool_config.json" |
| with open(tool_config_path, "w") as f: |
| json.dump(tool_config, f) |
|
|
| interaction_config = { |
| "interaction": [ |
| {"name": "weather", "class_name": "verl.interactions.weather_interaction.WeatherInteraction", "config": {}} |
| ] |
| } |
| interaction_config_path = "/tmp/interaction_config.json" |
| with open(interaction_config_path, "w") as f: |
| json.dump(interaction_config, f) |
|
|
| n = 2 |
| init_config.actor_rollout_ref.rollout.n = n |
| init_config.actor_rollout_ref.rollout.multi_turn.tool_config_path = tool_config_path |
| init_config.actor_rollout_ref.rollout.multi_turn.interaction_config_path = interaction_config_path |
| init_config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls = 2 |
| agent_loop_manager = init_agent_loop_manager(init_config) |
| checkpoint_engine_config = omega_conf_to_dataclass( |
| init_config.actor_rollout_ref.rollout.checkpoint_engine, CheckpointEngineConfig |
| ) |
| checkpoint_manager = CheckpointEngineManager( |
| config=checkpoint_engine_config, |
| trainer=agent_loop_manager.worker_group, |
| replicas=agent_loop_manager.rollout_replicas, |
| ) |
| checkpoint_manager.sleep_replicas() |
| checkpoint_manager.update_weights() |
|
|
| |
| raw_prompts = [ |
| [ |
| {"role": "user", "content": "How are you?"}, |
| ], |
| [ |
| {"role": "user", "content": "What's the temperature in Los Angeles now?"}, |
| ], |
| [ |
| {"role": "user", "content": "What's the temperature in New York now?"}, |
| ], |
| [ |
| { |
| "role": "system", |
| "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\n" |
| "Current Date: 2024-09-30", |
| }, |
| {"role": "user", "content": "What's the temperature in San Francisco now? How about tomorrow?"}, |
| ], |
| ] |
| batch = DataProto( |
| non_tensor_batch={ |
| "raw_prompt": np.array([np.array(prompt) for prompt in raw_prompts], dtype=object), |
| "agent_name": np.array(["tool_agent"] * len(raw_prompts)), |
| "data_source": np.array(["openai/gsm8k"] * len(raw_prompts)), |
| "reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)), |
| "extra_info": np.array( |
| [ |
| {"interaction_kwargs": {"name": "weather"}}, |
| {"interaction_kwargs": {"name": "weather"}}, |
| {"interaction_kwargs": {"name": "weather"}}, |
| {"interaction_kwargs": {"name": "weather"}}, |
| ] |
| ), |
| }, |
| ) |
| batch = batch.repeat(n) |
| result = agent_loop_manager.generate_sequences(prompts=batch) |
| assert len(result) == len(raw_prompts) * n |
|
|
| |
| num_turns = result.non_tensor_batch["__num_turns__"] |
| print(f"num_turns: {num_turns}") |
| for i in range(len(num_turns)): |
| if i // n == 0: |
| |
| assert num_turns[i] == 3 |
| else: |
| |
| assert num_turns[i] == 5 |
|
|
| |
| tokenizer = hf_tokenizer(init_config.actor_rollout_ref.model.path) |
| responses = result.batch["responses"] |
| response_mask = result.batch["response_mask"] |
| attention_mask = result.batch["attention_mask"] |
| assert responses.size() == response_mask.size(), f"{responses.size()} != {response_mask.size()}" |
| response_length = response_mask.size(1) |
|
|
| for i in range(len(responses)): |
| |
| valid_tokens = responses[i][attention_mask[i][-response_length:].bool()] |
| response_with_obs = tokenizer.decode(valid_tokens) |
|
|
| |
| valid_tokens = responses[i][response_mask[i].bool()] |
| response_without_obs = tokenizer.decode(valid_tokens) |
|
|
| assert "\udb82\udc89" not in response_without_obs, f"found \udb82\udc89 in response: {response_without_obs}" |
| assert "\udb82\udc8a" not in response_without_obs, f"found \udb82\udc8a in response: {response_without_obs}" |
| print("=========================") |
| print(response_with_obs) |
| print("---") |
| print(response_without_obs) |
|
|
| print("Test passed!") |
| ray.shutdown() |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_get_trajectory_info(): |
| """Tests the get_trajectory_info method.""" |
| |
| step = 10 |
| index = [1, 1, 3, 3] |
| expected_info = [ |
| {"step": step, "sample_index": 1, "rollout_n": 0, "validate": False}, |
| {"step": step, "sample_index": 1, "rollout_n": 1, "validate": False}, |
| {"step": step, "sample_index": 3, "rollout_n": 0, "validate": False}, |
| {"step": step, "sample_index": 3, "rollout_n": 1, "validate": False}, |
| ] |
|
|
| trajectory_info = await get_trajectory_info(step, index, validate=False) |
|
|
| assert trajectory_info == expected_info |
|
|
|
|
| |
| |
| |
|
|
|
|
| @pytest.fixture(scope="module") |
| def ray_for_lb(): |
| ray.init(ignore_reinit_error=True) |
| yield |
| ray.shutdown() |
|
|
|
|
| class TestLoadBalancerRouting: |
| """Least-loaded selection.""" |
|
|
| def test_distributes_across_servers(self, ray_for_lb): |
| lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1", "s2"]) |
| servers = [ray.get(lb.acquire_server.remote(request_id=f"r{i}")) for i in range(3)] |
| assert sorted(servers) == ["s0", "s1", "s2"] |
|
|
| def test_new_requests_route_to_least_loaded(self, ray_for_lb): |
| lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1", "s2"]) |
| |
| ray.get(lb.acquire_server.remote(request_id="a")) |
| ray.get(lb.acquire_server.remote(request_id="a")) |
| ray.get(lb.acquire_server.remote(request_id="a")) |
| |
| ray.get(lb.acquire_server.remote(request_id="b")) |
| |
| s_new = ray.get(lb.acquire_server.remote(request_id="d")) |
| assert s_new == "s2" |
|
|
| def test_release_rebalances(self, ray_for_lb): |
| lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1"]) |
| s0 = ray.get(lb.acquire_server.remote(request_id="r0")) |
| s1 = ray.get(lb.acquire_server.remote(request_id="r1")) |
| assert s0 != s1 |
| ray.get(lb.release_server.remote(server_id=s0)) |
| ray.get(lb.release_server.remote(server_id=s1)) |
| s2 = ray.get(lb.acquire_server.remote(request_id="r2")) |
| s3 = ray.get(lb.acquire_server.remote(request_id="r3")) |
| assert s2 != s3 |
|
|
| def test_release_invalid_server_raises(self, ray_for_lb): |
| lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1"]) |
| with pytest.raises(ray.exceptions.RayTaskError, match="Invalid server_id") as excinfo: |
| ray.get(lb.release_server.remote(server_id="nonexistent")) |
| assert "Invalid server_id" in str(excinfo.value) |
|
|
| def test_release_without_inflight_raises(self, ray_for_lb): |
| lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1"]) |
| with pytest.raises(ray.exceptions.RayTaskError, match="no inflight") as excinfo: |
| ray.get(lb.release_server.remote(server_id="s1")) |
| assert "no inflight" in str(excinfo.value) |
|
|
|
|
| class TestLoadBalancerStickySession: |
| """Request-level sticky session.""" |
|
|
| def test_same_request_id_same_server(self, ray_for_lb): |
| lb = GlobalRequestLoadBalancer.remote(server_actor_ids=["s0", "s1", "s2", "s3"]) |
| s0 = ray.get(lb.acquire_server.remote(request_id="conv-abc")) |
| ray.get(lb.release_server.remote(server_id=s0)) |
| s1 = ray.get(lb.acquire_server.remote(request_id="conv-abc")) |
| assert s0 == s1 |
|
|