| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import random |
|
|
| import numpy as np |
| import torch |
| from tensordict import TensorDict |
|
|
| from verl.utils import tensordict_utils as tu |
| from verl.utils.dataset.dataset_utils import DatasetPadMode |
| from verl.utils.device import is_npu_available |
| from verl.utils.py_functional import append_to_dict |
| from verl.utils.seqlen_balancing import rearrange_micro_batches, restore_dynamic_batch |
|
|
|
|
| def enable_full_determinism(seed: int): |
| """ |
| Helper function for reproducibility in distributed training. |
| See https://pytorch.org/docs/stable/notes/randomness.html for details. |
| """ |
|
|
| os.environ["PYTHONHASHSEED"] = str(seed) |
| os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8" |
| os.environ["NCCL_DETERMINISTIC"] = "1" |
| os.environ["FLASH_ATTENTION_DETERMINISTIC"] = "1" |
| if is_npu_available: |
| |
| os.environ["NCCL_DETERMINISTIC"] = "true" |
| os.environ["CLOSE_MATMUL_K_SHIFT"] = "1" |
|
|
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| torch.use_deterministic_algorithms(True, warn_only=True) |
| |
| torch.backends.cudnn.deterministic = True |
| torch.backends.cudnn.benchmark = False |
| torch.backends.cudnn.enabled = False |
| if is_npu_available: |
| torch.npu.manual_seed(seed) |
| torch.npu.manual_seed_all(seed) |
|
|
|
|
| def prepare_micro_batches( |
| data: TensorDict, |
| dp_group=None, |
| num_batches_divided_by=None, |
| same_micro_num_in_dp=True, |
| min_num_micro_batch=None, |
| use_dynamic_bsz_balance=True, |
| ): |
| """ |
| Prepare micro batches from data. |
| """ |
| use_dynamic_bsz = tu.get_non_tensor_data(data=data, key="use_dynamic_bsz", default=True) |
| sp_size = tu.get_non_tensor_data(data=data, key="sp_size", default=1) |
|
|
| force_group_size = tu.get_non_tensor_data(data=data, key="force_group_size", default=1) |
|
|
| if use_dynamic_bsz: |
| assert "max_token_len_per_gpu" in data.keys(), "max_token_len_per_gpu must be set when use_dynamic_bsz is True" |
| max_token_len_per_gpu = data["max_token_len_per_gpu"] |
| max_token_len = max_token_len_per_gpu * sp_size |
| micro_batches, batch_idx_list = rearrange_micro_batches( |
| data, |
| max_token_len=max_token_len, |
| dp_group=dp_group, |
| num_batches_divided_by=num_batches_divided_by, |
| same_micro_num_in_dp=same_micro_num_in_dp, |
| min_num_micro_batch=min_num_micro_batch, |
| use_dynamic_bsz_balance=use_dynamic_bsz_balance, |
| force_group_size=force_group_size, |
| ) |
| else: |
| total_data_size = len(data) |
| micro_batch_size_per_gpu = data["micro_batch_size_per_gpu"] |
| assert total_data_size % (force_group_size * micro_batch_size_per_gpu) == 0, ( |
| "data size must be divisible by force_group_size * micro_batch_size_per_gpu" |
| ) |
| micro_batches = tu.chunk_tensordict(data, total_data_size // (micro_batch_size_per_gpu * force_group_size)) |
| batch_idx_list = None |
| return micro_batches, batch_idx_list |
|
|
|
|
| def postprocess_batch_func(output_lst, indices, data: TensorDict): |
| """postprocess the output of a forward_backward_batch. |
| output_lst is a list of dict containing outputs for each micro-batch |
| reorder entropy and outputs. Return None for other pp ranks |
| only on last rank. It should be on every tp rank |
| |
| each losses_reduced contains 1. model_output, 2. loss, 3. metrics. |
| """ |
|
|
| use_dynamic_bsz = tu.get_non_tensor_data(data=data, key="use_dynamic_bsz", default=True) |
| pad_mode = tu.get_non_tensor_data(data=data, key="pad_mode", default=DatasetPadMode.NO_PADDING) |
| assert pad_mode == DatasetPadMode.NO_PADDING, "postprocess_batch_func only support NO_PADDING pad_mode" |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| model_output = {} |
| losses = [] |
| aggregated_metrics = {} |
|
|
| |
| for o in output_lst: |
| if "model_output" in o: |
| for key, val in o["model_output"].items(): |
| if key not in model_output: |
| model_output[key] = [] |
| model_output[key].append(val) |
|
|
| |
| for key, val in model_output.items(): |
| if pad_mode == DatasetPadMode.NO_PADDING: |
| tensors = [tensor for nt in model_output[key] for tensor in nt.unbind()] |
| model_output[key] = torch.nested.as_nested_tensor(tensors, layout=torch.jagged) |
| else: |
| raise NotImplementedError(f"pad_mode {pad_mode} not implemented") |
|
|
| |
| if use_dynamic_bsz: |
| model_output[key] = restore_dynamic_batch(model_output[key], indices) |
|
|
| |
| for o in output_lst: |
| if "loss" in o: |
| losses.append(o["loss"]) |
|
|
| |
| for o in output_lst: |
| if "metrics" in o: |
| metrics = o["metrics"] |
| append_to_dict(aggregated_metrics, metrics) |
|
|
| output = { |
| "model_output": model_output, |
| "loss": losses, |
| "metrics": aggregated_metrics, |
| } |
|
|
| return output |
|
|