File size: 6,044 Bytes
1faccd4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | # Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import numpy as np
import torch
from tensordict import TensorDict
from verl.utils import tensordict_utils as tu
from verl.utils.dataset.dataset_utils import DatasetPadMode
from verl.utils.device import is_npu_available
from verl.utils.py_functional import append_to_dict
from verl.utils.seqlen_balancing import rearrange_micro_batches, restore_dynamic_batch
def enable_full_determinism(seed: int):
"""
Helper function for reproducibility in distributed training.
See https://pytorch.org/docs/stable/notes/randomness.html for details.
"""
os.environ["PYTHONHASHSEED"] = str(seed)
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8"
os.environ["NCCL_DETERMINISTIC"] = "1"
os.environ["FLASH_ATTENTION_DETERMINISTIC"] = "1"
if is_npu_available:
# The environment variable required to enable deterministic mode on Ascend NPUs.
os.environ["NCCL_DETERMINISTIC"] = "true"
os.environ["CLOSE_MATMUL_K_SHIFT"] = "1"
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.use_deterministic_algorithms(True, warn_only=True)
# Enable CUDNN deterministic mode
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled = False
if is_npu_available:
torch.npu.manual_seed(seed)
torch.npu.manual_seed_all(seed)
def prepare_micro_batches(
data: TensorDict,
dp_group=None,
num_batches_divided_by=None,
same_micro_num_in_dp=True,
min_num_micro_batch=None,
use_dynamic_bsz_balance=True,
):
"""
Prepare micro batches from data.
"""
use_dynamic_bsz = tu.get_non_tensor_data(data=data, key="use_dynamic_bsz", default=True)
sp_size = tu.get_non_tensor_data(data=data, key="sp_size", default=1)
force_group_size = tu.get_non_tensor_data(data=data, key="force_group_size", default=1)
if use_dynamic_bsz:
assert "max_token_len_per_gpu" in data.keys(), "max_token_len_per_gpu must be set when use_dynamic_bsz is True"
max_token_len_per_gpu = data["max_token_len_per_gpu"]
max_token_len = max_token_len_per_gpu * sp_size
micro_batches, batch_idx_list = rearrange_micro_batches(
data,
max_token_len=max_token_len,
dp_group=dp_group,
num_batches_divided_by=num_batches_divided_by,
same_micro_num_in_dp=same_micro_num_in_dp,
min_num_micro_batch=min_num_micro_batch,
use_dynamic_bsz_balance=use_dynamic_bsz_balance,
force_group_size=force_group_size,
)
else:
total_data_size = len(data)
micro_batch_size_per_gpu = data["micro_batch_size_per_gpu"]
assert total_data_size % (force_group_size * micro_batch_size_per_gpu) == 0, (
"data size must be divisible by force_group_size * micro_batch_size_per_gpu"
)
micro_batches = tu.chunk_tensordict(data, total_data_size // (micro_batch_size_per_gpu * force_group_size))
batch_idx_list = None
return micro_batches, batch_idx_list
def postprocess_batch_func(output_lst, indices, data: TensorDict):
"""postprocess the output of a forward_backward_batch.
output_lst is a list of dict containing outputs for each micro-batch
reorder entropy and outputs. Return None for other pp ranks
only on last rank. It should be on every tp rank
each losses_reduced contains 1. model_output, 2. loss, 3. metrics.
"""
use_dynamic_bsz = tu.get_non_tensor_data(data=data, key="use_dynamic_bsz", default=True)
pad_mode = tu.get_non_tensor_data(data=data, key="pad_mode", default=DatasetPadMode.NO_PADDING)
assert pad_mode == DatasetPadMode.NO_PADDING, "postprocess_batch_func only support NO_PADDING pad_mode"
# losses_reduced is a list of dict containing outputs for each micro-batch
# reorder entropy and outputs. Return None for other pp ranks
# only on last rank. It should be on every tp rank
# losses_reduced contains 1. model_output, 2. loss, 3. metrics.
# We perform reverse
model_output = {}
losses = []
aggregated_metrics = {}
# model output
for o in output_lst:
if "model_output" in o:
for key, val in o["model_output"].items():
if key not in model_output:
model_output[key] = []
model_output[key].append(val)
# concat results from micro batches
for key, val in model_output.items():
if pad_mode == DatasetPadMode.NO_PADDING:
tensors = [tensor for nt in model_output[key] for tensor in nt.unbind()]
model_output[key] = torch.nested.as_nested_tensor(tensors, layout=torch.jagged)
else:
raise NotImplementedError(f"pad_mode {pad_mode} not implemented")
# reverse with dynamic bsz
if use_dynamic_bsz:
model_output[key] = restore_dynamic_batch(model_output[key], indices)
# loss
for o in output_lst:
if "loss" in o:
losses.append(o["loss"])
# metrics
for o in output_lst:
if "metrics" in o:
metrics = o["metrics"]
append_to_dict(aggregated_metrics, metrics)
output = {
"model_output": model_output,
"loss": losses,
"metrics": aggregated_metrics,
}
return output
|