JustinTX's picture
Add files using upload-large-folder tool
d7b3a74 verified
import logging
import random
import numpy as np
import torch
from megatron.core import mpu, tensor_parallel
from megatron.core.config import set_experimental_flag
from megatron.core.num_microbatches_calculator import init_num_microbatches_calculator
from megatron.training.global_vars import _build_tokenizer, set_args
logger = logging.getLogger(__name__)
def _set_random_seed(
seed_: int,
data_parallel_random_init: bool = False,
te_rng_tracker: bool = False,
inference_rng_tracker: bool = False,
use_cudagraphable_rng: bool = False,
):
"""Set random seed for reproducability."""
# Ensure that different pipeline MP stages get different seeds.
seed = seed_ + (100 * mpu.get_pipeline_model_parallel_rank())
# Ensure different data parallel ranks get different seeds
if data_parallel_random_init:
seed = seed + (10 * mpu.get_data_parallel_rank(with_context_parallel=False))
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
tensor_parallel.model_parallel_cuda_manual_seed(seed, te_rng_tracker, inference_rng_tracker, use_cudagraphable_rng)
def _initialize_distributed(args, get_embedding_ranks=None, get_position_embedding_ranks=None):
"""Initialize torch.distributed and core model parallel."""
# Set the tensor model-parallel, pipeline model-parallel, and
# data-parallel communicators.
mpu.initialize_model_parallel(
args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size,
pipeline_model_parallel_comm_backend=args.pipeline_model_parallel_comm_backend,
context_parallel_size=args.context_parallel_size,
hierarchical_context_parallel_sizes=args.hierarchical_context_parallel_sizes,
expert_model_parallel_size=args.expert_model_parallel_size,
num_distributed_optimizer_instances=args.num_distributed_optimizer_instances,
expert_tensor_parallel_size=args.expert_tensor_parallel_size,
distributed_timeout_minutes=args.distributed_timeout_minutes,
nccl_communicator_config_path=args.nccl_communicator_config_path,
order="tp-cp-ep-dp-pp" if not args.use_tp_pp_dp_mapping else "tp-cp-ep-pp-dp",
get_embedding_ranks=get_embedding_ranks,
get_position_embedding_ranks=get_position_embedding_ranks,
create_gloo_process_groups=args.enable_gloo_process_groups,
)
def init(args):
set_args(args)
if args.enable_experimental:
logger.info("Enable megatron experimental")
set_experimental_flag(True)
# Pytorch distributed.
_initialize_distributed(args)
# https://github.com/NVIDIA/Megatron-LM/issues/1563
assert np.__version__.startswith("1."), "Megatron does not support numpy 2.x"
# Random seeds for reproducibility.
if args.rank == 0:
logger.info(f"> setting random seeds to {args.seed} ...")
_set_random_seed(
args.seed,
args.data_parallel_random_init,
args.te_rng_tracker,
args.inference_rng_tracker,
)
_build_tokenizer(args)
# We won't use this. initialize to pass some validation in megatron.
init_num_microbatches_calculator(
args.rank,
args.rampup_batch_size,
args.global_batch_size,
args.micro_batch_size,
args.data_parallel_size,
args.decrease_batch_size_if_needed,
)
if args.deterministic_mode:
if args.rank == 0:
logger.info("> running in deterministic mode")
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.use_deterministic_algorithms(True, warn_only=False)
if args.tp_comm_overlap:
from megatron.training.initialize import _initialize_tp_communicators
_initialize_tp_communicators()
if getattr(args, "custom_megatron_init_path", None):
from slime.utils.misc import load_function
custom_init = load_function(args.custom_megatron_init_path)
custom_init(args)
# TODO shall we use a simpler method to determine which rank to init wandb?
def is_megatron_main_rank():
return (
mpu.get_data_parallel_rank(with_context_parallel=True) == 0
and mpu.get_tensor_model_parallel_rank() == 0
and mpu.get_pipeline_model_parallel_rank() == mpu.get_pipeline_model_parallel_world_size() - 1
)