| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import os |
| |
|
| | os.environ["NCCL_DEBUG"] = "WARN" |
| |
|
| | from functools import partial |
| |
|
| | import numpy as np |
| | import pytest |
| | import ray |
| | import torch |
| | import torch.distributed as dist |
| | import torch.multiprocessing as mp |
| | from transformers import ( |
| | AutoConfig, |
| | AutoModelForCausalLM, |
| | AutoModelForTokenClassification, |
| | AutoTokenizer, |
| | Qwen3Config, |
| | Qwen3MoeConfig, |
| | ) |
| |
|
| | from verl import DataProto |
| | from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup |
| | from verl.trainer.config import CheckpointConfig |
| | from verl.utils import tensordict_utils as tu |
| | from verl.utils.model import compute_position_id_with_mask, create_random_mask |
| | from verl.utils.torch_functional import logprobs_from_logits_naive |
| | from verl.workers.config import ( |
| | ActorConfig, |
| | CriticConfig, |
| | FSDPEngineConfig, |
| | FSDPOptimizerConfig, |
| | HFModelConfig, |
| | McoreEngineConfig, |
| | McoreOptimizerConfig, |
| | ) |
| | from verl.workers.engine_workers import TrainingWorker, TrainingWorkerConfig |
| | from verl.workers.utils.losses import ppo_loss, sft_loss, value_loss |
| | from verl.workers.utils.padding import left_right_2_no_padding, no_padding_2_padding |
| |
|
| |
|
| | def get_test_language_model(device_count): |
| | if device_count == 1: |
| | model = "~/models/HuggingFaceTB/SmolLM2-135M-Instruct" |
| | else: |
| | model = "~/models/Qwen/Qwen2.5-0.5B" |
| | model = os.path.expanduser(model) |
| | return model |
| |
|
| |
|
| | def create_training_config(model_type, strategy, device_count, model): |
| | if device_count == 1: |
| | tp = pp = cp = fsdp_size = 1 |
| | else: |
| | tp = pp = cp = 2 |
| | fsdp_size = 4 |
| |
|
| | path = os.path.expanduser(model) |
| | model_config = HFModelConfig(path=path, use_remove_padding=True) |
| |
|
| | kwargs = dict( |
| | param_offload=True, |
| | optimizer_offload=True, |
| | grad_offload=True, |
| | use_dynamic_bsz=True, |
| | use_remove_padding=True, |
| | max_token_len_per_gpu=500, |
| | infer_max_token_len_per_gpu=1000, |
| | ) |
| |
|
| | if strategy == "megatron": |
| | engine_config = McoreEngineConfig( |
| | forward_only=False, |
| | use_mbridge=True, |
| | tensor_model_parallel_size=tp, |
| | pipeline_model_parallel_size=pp, |
| | context_parallel_size=cp, |
| | **kwargs, |
| | ) |
| | optimizer_config = McoreOptimizerConfig(lr_decay_steps=10) |
| | elif strategy in ["fsdp", "fsdp2"]: |
| | engine_config = FSDPEngineConfig( |
| | forward_only=False, fsdp_size=fsdp_size, strategy=strategy, ulysses_sequence_parallel_size=cp, **kwargs |
| | ) |
| | optimizer_config = FSDPOptimizerConfig() |
| | else: |
| | raise NotImplementedError(f"strategy {strategy} is not supported") |
| |
|
| | config = TrainingWorkerConfig( |
| | model_type=model_type, |
| | model_config=model_config, |
| | engine_config=engine_config, |
| | optimizer_config=optimizer_config, |
| | checkpoint_config=None, |
| | ) |
| | return config |
| |
|
| |
|
| | @pytest.mark.parametrize("strategy", ["fsdp", "fsdp2", "megatron"]) |
| | def test_actor_engine(strategy): |
| | ray.init() |
| | device_count = torch.cuda.device_count() |
| | config = create_training_config( |
| | model_type="language_model", |
| | strategy=strategy, |
| | device_count=device_count, |
| | model=get_test_language_model(device_count), |
| | ) |
| | ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(TrainingWorker), config=config) |
| | resource_pool = RayResourcePool(process_on_nodes=[device_count]) |
| | wg = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init) |
| | |
| | wg.reset() |
| |
|
| | sft_loss_ = partial(sft_loss, config=config) |
| |
|
| | wg.set_loss_fn(sft_loss_) |
| |
|
| | batch_size = 8 |
| | seqlen = 32 |
| |
|
| | response_length = seqlen // 2 |
| |
|
| | torch.manual_seed(1) |
| | np.random.seed(1) |
| |
|
| | input_ids = torch.randint(0, config.model_config.hf_config.vocab_size, (batch_size, seqlen)) |
| | attention_mask = create_random_mask( |
| | input_ids=input_ids, max_ratio_of_valid_token=0.8, max_ratio_of_left_padding=0.2, min_ratio_of_valid_token=0.6 |
| | ) |
| | position_ids = compute_position_id_with_mask(attention_mask) |
| |
|
| | global_token_num = torch.sum(attention_mask, dim=-1).tolist() |
| |
|
| | print(input_ids.float().mean(), attention_mask.float().mean()) |
| |
|
| | responses = input_ids[:, response_length:] |
| | response_mask = attention_mask[:, response_length:] |
| |
|
| | assert torch.all(response_mask[:, 0] == 1) |
| |
|
| | data = DataProto.from_single_dict( |
| | { |
| | "input_ids": input_ids, |
| | "prompts": input_ids[:, :response_length], |
| | "attention_mask": attention_mask, |
| | "position_ids": position_ids, |
| | "responses": responses, |
| | "response_mask": response_mask, |
| | }, |
| | meta_info={"temperature": 1.0, "global_token_num": global_token_num, "compute_loss": False}, |
| | ) |
| |
|
| | data_td = data.to_tensordict() |
| | data_td = left_right_2_no_padding(data_td) |
| |
|
| | |
| | output = wg.infer_batch(data_td) |
| | output = output.get() |
| | logprobs_unpad = tu.get(output, "log_probs").cpu() |
| | logprobs = no_padding_2_padding(logprobs_unpad, data_td) |
| |
|
| | output = DataProto.from_single_dict({"old_log_probs": logprobs}) |
| |
|
| | |
| | path = config.model_config.path |
| | hf_model = AutoModelForCausalLM.from_pretrained(path, torch_dtype=torch.bfloat16) |
| | hf_output = hf_model(input_ids, attention_mask=attention_mask) |
| | hf_logprobs = logprobs_from_logits_naive( |
| | hf_output.logits[:, -response_length - 1 : -1, :].float(), input_ids[:, -response_length:] |
| | ) |
| | hf_logprobs_mean = torch.mean(hf_logprobs * response_mask) |
| | mcore_logprobs_mean = torch.mean(output.batch["old_log_probs"] * response_mask) |
| |
|
| | torch.testing.assert_close(hf_logprobs_mean, mcore_logprobs_mean, atol=1e-3, rtol=1e-2) |
| |
|
| | data = data.union(output) |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | data.batch["advantages"] = torch.rand_like(responses, dtype=torch.float32) |
| | data.batch["ref_log_prob"] = torch.rand_like(responses, dtype=torch.float32) |
| |
|
| | |
| | actor_config = ActorConfig(strategy=strategy, rollout_n=1, ppo_micro_batch_size_per_gpu=-1) |
| |
|
| | |
| | ppo_loss_ = partial(ppo_loss, config=actor_config) |
| | wg.set_loss_fn(ppo_loss_) |
| |
|
| | |
| | data_td = data.to_tensordict() |
| | data_td = left_right_2_no_padding(data_td) |
| |
|
| | |
| | tu.assign_non_tensor(data_td, global_batch_size=data_td.shape[0]) |
| | ppo_metrics = wg.train_batch(data_td) |
| | ppo_metrics = ppo_metrics.get() |
| | ppo_metrics = tu.get(ppo_metrics, "metrics") |
| | print(ppo_metrics) |
| |
|
| | |
| | tu.assign_non_tensor(data_td, disable_auto_offload=True) |
| | wg.to("device") |
| | ppo_metrics = wg.train_batch(data_td) |
| | ppo_metrics = ppo_metrics.get() |
| | ppo_metrics = tu.get(ppo_metrics, "metrics") |
| | print(ppo_metrics) |
| | wg.to("cpu") |
| |
|
| | ray.shutdown() |
| |
|
| |
|
| | def create_value_model(language_model_path, output_path): |
| | config = AutoConfig.from_pretrained(language_model_path) |
| | config.num_labels = 1 |
| | config.classifier_dropout = 0 |
| | config.tie_word_embeddings = False |
| | model = AutoModelForTokenClassification.from_config(config) |
| | tokenizer = AutoTokenizer.from_pretrained(os.path.expanduser(language_model_path)) |
| | assert model.config.num_labels == 1 |
| | path = os.path.expanduser(output_path) |
| | model.save_pretrained(path) |
| | tokenizer.save_pretrained(path) |
| | config.save_pretrained(path) |
| | return path |
| |
|
| |
|
| | @pytest.mark.parametrize("strategy", ["fsdp", "fsdp2"]) |
| | def test_critic_engine(strategy): |
| | device_count = torch.cuda.device_count() |
| | value_model_path = os.path.expanduser("~/models/test_model") |
| | language_model_path = get_test_language_model(device_count=device_count) |
| | create_value_model(language_model_path, value_model_path) |
| |
|
| | torch.manual_seed(1) |
| | np.random.seed(1) |
| |
|
| | ray.init() |
| |
|
| | config = create_training_config( |
| | model_type="value_model", strategy=strategy, device_count=device_count, model=value_model_path |
| | ) |
| | ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(TrainingWorker), config=config) |
| | resource_pool = RayResourcePool(process_on_nodes=[device_count]) |
| | wg = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init) |
| | |
| | wg.reset() |
| |
|
| | batch_size = 8 |
| | seqlen = 32 |
| |
|
| | response_length = seqlen // 2 |
| | input_ids = torch.randint(0, config.model_config.hf_config.vocab_size, (batch_size, seqlen)) |
| | attention_mask = create_random_mask( |
| | input_ids=input_ids, max_ratio_of_valid_token=0.8, max_ratio_of_left_padding=0.2, min_ratio_of_valid_token=0.6 |
| | ) |
| | position_ids = compute_position_id_with_mask(attention_mask) |
| |
|
| | global_token_num = torch.sum(attention_mask, dim=-1).tolist() |
| |
|
| | print(input_ids.float().mean(), attention_mask.float().mean()) |
| |
|
| | responses = input_ids[:, response_length:] |
| | response_mask = attention_mask[:, response_length:] |
| |
|
| | assert torch.all(response_mask[:, 0] == 1) |
| |
|
| | data = DataProto.from_single_dict( |
| | { |
| | "input_ids": input_ids, |
| | "prompts": input_ids[:, :response_length], |
| | "attention_mask": attention_mask, |
| | "position_ids": position_ids, |
| | "responses": responses, |
| | "response_mask": response_mask, |
| | }, |
| | meta_info={"temperature": 1.0, "global_token_num": global_token_num, "compute_loss": False}, |
| | ) |
| |
|
| | data_td = data.to_tensordict() |
| | data_td = left_right_2_no_padding(data_td) |
| |
|
| | |
| | output = wg.infer_batch(data_td) |
| | output = output.get() |
| |
|
| | values_unpad = tu.get(output, "values").float().cpu() |
| | values = no_padding_2_padding(values_unpad, data_td) |
| |
|
| | output = DataProto.from_single_dict({"values": values}) |
| |
|
| | |
| | with torch.device("cuda"), torch.autocast(device_type="cuda", dtype=torch.bfloat16): |
| | hf_model = AutoModelForTokenClassification.from_pretrained( |
| | value_model_path, torch_dtype=torch.float32, attn_implementation="flash_attention_2" |
| | ) |
| | hf_output = hf_model(input_ids.cuda(), attention_mask=attention_mask.cuda()) |
| | hf_values = hf_output.logits[:, -response_length - 1 : -1, :].float().squeeze(-1).cpu() |
| |
|
| | hf_values_mean = torch.mean(hf_values * response_mask) |
| | engine_values = torch.mean(output.batch["values"] * response_mask) |
| |
|
| | torch.testing.assert_close(hf_values_mean, engine_values, atol=1e-2, rtol=1e-2) |
| |
|
| | data = data.union(output) |
| |
|
| | |
| | data.batch["returns"] = torch.rand_like(responses, dtype=torch.float32) |
| |
|
| | |
| | |
| | critic_config = CriticConfig( |
| | strategy=strategy, rollout_n=1, ppo_micro_batch_size_per_gpu=-1, model_config=config.model_config |
| | ) |
| | value_loss_ = partial(value_loss, config=critic_config) |
| | wg.set_loss_fn(value_loss_) |
| |
|
| | |
| | data_td = data.to_tensordict() |
| | data_td = left_right_2_no_padding(data_td) |
| |
|
| | |
| | tu.assign_non_tensor(data_td, global_batch_size=data_td.shape[0]) |
| | ppo_metrics = wg.train_batch(data_td) |
| | ppo_metrics = ppo_metrics.get() |
| | ppo_metrics = tu.get(ppo_metrics, "metrics") |
| | print(ppo_metrics) |
| |
|
| | ray.shutdown() |
| |
|
| |
|
| | def create_actor_model(tmp_path, config): |
| | model = AutoModelForCausalLM.from_config(config) |
| | path = os.path.join(tmp_path, "test_model") |
| | model.save_pretrained(path) |
| | config.save_pretrained(path) |
| | return path |
| |
|
| |
|
| | def _worker(rank: int, world_size: int, rendezvous_file: str, strategy: str, model_path: str): |
| | torch.cuda.set_device(rank) |
| | dist.init_process_group( |
| | backend="nccl", |
| | init_method=f"file://{rendezvous_file}", |
| | rank=rank, |
| | world_size=world_size, |
| | ) |
| |
|
| | ref_model_config = AutoConfig.from_pretrained(model_path) |
| | with torch.device("meta"): |
| | ref_model = AutoModelForCausalLM.from_config(ref_model_config) |
| |
|
| | from verl.workers.engine import BaseEngine, EngineRegistry |
| |
|
| | |
| | model_config = HFModelConfig(path=model_path, load_tokenizer=False) |
| |
|
| | if strategy == "megatron": |
| | engine_config = McoreEngineConfig( |
| | forward_only=False, |
| | use_mbridge=True, |
| | tensor_model_parallel_size=2, |
| | pipeline_model_parallel_size=2, |
| | context_parallel_size=1, |
| | ) |
| | optimizer_config = McoreOptimizerConfig(lr_decay_steps=10) |
| | elif strategy in ["fsdp", "fsdp2"]: |
| | engine_config = FSDPEngineConfig( |
| | forward_only=False, fsdp_size=4, strategy=strategy, ulysses_sequence_parallel_size=2 |
| | ) |
| | optimizer_config = FSDPOptimizerConfig() |
| | else: |
| | raise NotImplementedError(f"strategy {strategy} is not supported") |
| |
|
| | checkpoint_config = CheckpointConfig() |
| |
|
| | |
| | engine: BaseEngine = EngineRegistry.new( |
| | model_type="language_model", |
| | backend=engine_config.strategy, |
| | model_config=model_config, |
| | engine_config=engine_config, |
| | optimizer_config=optimizer_config, |
| | checkpoint_config=checkpoint_config, |
| | ) |
| |
|
| | engine.initialize() |
| |
|
| | |
| | per_tensor_params, _ = engine.get_per_tensor_param() |
| |
|
| | ref_state_dict = ref_model.state_dict() |
| |
|
| | |
| | for key, value in per_tensor_params: |
| | assert key in ref_state_dict, f"{key} not in ref_state_dict" |
| | assert value.shape == ref_state_dict[key].shape, ( |
| | f"{key} shape not equal, {value.shape} != {ref_state_dict[key].shape}" |
| | ) |
| | if rank == 0: |
| | print(key, value.shape) |
| |
|
| | dist.barrier() |
| | dist.destroy_process_group() |
| |
|
| |
|
| | @pytest.mark.parametrize("world_size", [8]) |
| | @pytest.mark.parametrize("config", [Qwen3Config(num_hidden_layers=2), Qwen3MoeConfig(num_hidden_layers=2)]) |
| | @pytest.mark.parametrize("strategy", ["megatron", "fsdp", "fsdp2"]) |
| | def test_per_tensor_generator(world_size, tmp_path, config, strategy): |
| | rendezvous_file = str(tmp_path / "rdzv_mask") |
| | os.makedirs(os.path.dirname(rendezvous_file), exist_ok=True) |
| | |
| | model_path = create_actor_model(tmp_path, config) |
| | |
| | mp.spawn( |
| | fn=_worker, |
| | args=(world_size, rendezvous_file, strategy, model_path), |
| | nprocs=world_size, |
| | join=True, |
| | ) |
| |
|