| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import re |
| | import os |
| | import pytest |
| | import shutil |
| |
|
| | from parameterized import parameterized |
| | from testing_utils import ( |
| | CaptureStdout, |
| | TestCasePlus, |
| | execute_subprocess_async, |
| | get_accelerator_count, |
| | require_deepspeed, |
| | require_torch_accelerator, |
| | require_torch_multi_accelerator, |
| | set_seed |
| | ) |
| |
|
| | set_seed(42) |
| |
|
| |
|
| | def parameterized_custom_name_func(func, param_num, param): |
| | |
| | |
| | param_based_name = parameterized.to_safe_name("_to_".join(str(x) for x in param.args)) |
| | return f"{func.__name__}_{param_based_name}" |
| |
|
| |
|
| | params = [ |
| | |
| | ["1_1_1", "1_1_1"], |
| | ["2_1_1", "1_1_1"], |
| | ["1_2_1", "1_1_1"], |
| | ["1_1_2", "1_1_1"], |
| |
|
| | ["1_1_1", "2_1_1"], |
| | ["1_1_1", "1_2_1"], |
| | ["1_1_1", "1_1_2"], |
| |
|
| | ["1_1_2", "1_1_2"], |
| | ["1_1_2", "2_1_1"], |
| | ["1_1_2", "1_2_1"], |
| |
|
| | ["1_2_1", "1_2_1"], |
| | ["1_2_1", "2_1_1"], |
| | ["1_2_1", "1_1_2"], |
| |
|
| | ["2_1_1", "2_1_1"], |
| | ["2_1_1", "1_2_1"], |
| | ["2_1_1", "1_1_2"], |
| |
|
| | ["2_2_2", "1_1_1"], |
| | ["2_2_2", "2_2_2"], |
| | ["1_1_1", "2_2_2"], |
| |
|
| | ["1_1_8", "2_2_2"], |
| |
|
| | ] |
| |
|
| |
|
| | def get_launcher(num_accelerators): |
| | |
| | |
| | return f"deepspeed --num_nodes 1 --num_gpus {num_accelerators}".split() |
| |
|
| |
|
| | @require_deepspeed |
| | @require_torch_accelerator |
| | class MegDSTestCheckpoints(TestCasePlus): |
| | """ """ |
| |
|
| | def setUp(self): |
| | super().setUp() |
| |
|
| | |
| | |
| | meg_lock_file_path = self.repo_root_dir_str + "/megatron/fused_kernels/build/lock" |
| | if os.path.exists(meg_lock_file_path): |
| | os.unlink(meg_lock_file_path) |
| |
|
| | @staticmethod |
| | def find_lines_with_pattern_in_buffer(buffer, pattern): |
| | lines = buffer.splitlines() |
| | res = [] |
| | for line in lines: |
| | if line.find(pattern) != -1: |
| | res.append(line) |
| | return res |
| |
|
| | def get_config(self, output_dir, tp_size, pp_size, dp_size, n_iters=None, |
| | exit_interval=None, save_interval= None, skip_train=False, |
| | use_bloom=False): |
| |
|
| | data_dir = os.getenv("HL_DATA_DIR_ROOT", "") |
| | if data_dir == "": |
| | data_dir = f"{self.data_dir}/gpt2" |
| |
|
| | num_accelerators = pp_size * tp_size * dp_size |
| | print(f"Using {num_accelerators} Accelerators") |
| |
|
| | n_iters = 8 if n_iters is None else n_iters |
| | exit_interval = n_iters // 2 if exit_interval is None else exit_interval |
| | save_interval = 1 if save_interval is None else save_interval |
| | seq_len = 8 |
| |
|
| | |
| |
|
| | ds_args = f""" |
| | --deepspeed |
| | --deepspeed_config {self.test_file_dir_str}/ds_config_bf16.json |
| | --zero-stage 0 |
| | --deepspeed-activation-checkpointing |
| | """.split() |
| |
|
| | args = f""" |
| | --tensor-model-parallel-size {tp_size} |
| | --pipeline-model-parallel-size {pp_size} |
| | --distributed-backend hccl |
| | |
| | --log-interval 1 |
| | --save-interval {save_interval} |
| | --eval-interval 10 |
| | --eval-iters 1 |
| | --exit-interval {exit_interval} |
| | |
| | --merge-file {data_dir}/merges.txt |
| | --vocab-file {data_dir}/vocab.json |
| | --data-path {data_dir}/c4_en_6_c4_spm_text_document |
| | |
| | --split 99,0,1 |
| | --save {output_dir}/checkpoints |
| | --load {output_dir}/checkpoints |
| | |
| | --num-layers 2 |
| | --hidden-size 8 |
| | --num-attention-heads 2 |
| | --seq-length {seq_len} |
| | --max-position-embeddings 8 |
| | --micro-batch-size 1 |
| | --global-batch-size 16 |
| | --train-iters {n_iters} |
| | |
| | --recompute-granularity=full |
| | --recompute-method=uniform |
| | --partition-activations |
| | |
| | --optimizer adam |
| | --adam-beta1 0.9 |
| | --adam-beta2 0.95 |
| | --adam-eps 1e-8 |
| | --lr 1e-4 |
| | --lr-warmup-iters 1 |
| | --lr-decay-iters 6 |
| | --clip-grad 1.0 |
| | --weight-decay 1e-1 |
| | --bf16 |
| | --no-gradient-accumulation-fusion |
| | """ |
| |
|
| | |
| | _ = f""" |
| | --tensorboard-dir {output_dir}/tensorboard |
| | --tensorboard-queue-size 5 |
| | --log-timers-to-tensorboard |
| | --log-batch-size-to-tensorboard |
| | --log-validation-ppl-to-tensorboard |
| | """ |
| |
|
| | if skip_train: |
| | args += "--skip-train" |
| |
|
| | args = args.split() |
| |
|
| | if use_bloom: |
| | bloom_args = f""" |
| | --embed-layernorm |
| | --use-alibi-position-embeddings |
| | --use-fused-sdpa 0 |
| | """.split() |
| | args.extend(bloom_args) |
| |
|
| | return args, ds_args, num_accelerators |
| |
|
| | def train_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1, |
| | n_iters=None, exit_interval=None, save_interval=None, |
| | skip_train=False, use_bloom=False): |
| | src_dir = self.src_dir |
| | script = [f"{src_dir}/pretrain_gpt.py"] |
| |
|
| | args, ds_args, num_accelerators = self.get_config(output_dir, tp_size, pp_size, dp_size, |
| | n_iters=n_iters, exit_interval=exit_interval, |
| | save_interval=save_interval, |
| | skip_train=skip_train, use_bloom=use_bloom) |
| | launcher = get_launcher(num_accelerators) |
| | cmd = launcher + script + args + ds_args |
| | |
| | |
| |
|
| | |
| | with CaptureStdout() as cs: |
| | execute_subprocess_async(cmd, env=self.get_env()) |
| |
|
| | |
| | self.assertIn("DeepSpeed info", cs.out) |
| |
|
| | |
| | self.assertIn("consumed samples", cs.out) |
| |
|
| | |
| | self.assertIn(f"Unable to find latest file at {output_dir}/checkpoints/latest", cs.out) |
| |
|
| | |
| | self.assertIn("successfully saved checkpoint at iteration", cs.out) |
| | return cs.out |
| |
|
| | def convert_checkpoint_to_universal(self, output_dir, step): |
| | DEEPSPEED_ROOT = os.getenv("DEEPSPEED_FORK_ROOT", "") |
| | if DEEPSPEED_ROOT == "": |
| | assert False, "please set DEEPSPEED_FORK_ROOT to deepspeed path" |
| | cmd = f""" |
| | python {DEEPSPEED_ROOT}/deepspeed/checkpoint/ds_to_universal.py |
| | --input_folder {output_dir}/checkpoints/global_step{step} |
| | --output_folder {output_dir}/checkpoints/global_step{step}_universal |
| | """.split() |
| | |
| | |
| |
|
| | with CaptureStdout() as cs: |
| | execute_subprocess_async(cmd, env=self.get_env()) |
| |
|
| | self.assertIn("Convert DeepSpeed Checkpoint to Universal Checkpoint", cs.out) |
| |
|
| | def resume_from_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1): |
| | src_dir = self.src_dir |
| | script = [f"{src_dir}/pretrain_gpt.py"] |
| |
|
| | args, ds_args, num_accelerators = self.get_config(output_dir, tp_size, pp_size, dp_size) |
| | launcher = get_launcher(num_accelerators) |
| | cmd = launcher + script + args + ds_args |
| | |
| | |
| |
|
| | with CaptureStdout() as cs: |
| | execute_subprocess_async(cmd, env=self.get_env()) |
| |
|
| | |
| | self.assertIn(f"successfully loaded checkpoint from {output_dir}/checkpoints", cs.out) |
| |
|
| | |
| | self.assertIn("consumed samples", cs.out) |
| |
|
| | |
| | self.assertIn("successfully saved checkpoint at iteration", cs.out) |
| | return cs.out |
| |
|
| | def resume_from_universal_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1, |
| | n_iters=None, exit_interval=None, save_interval=None, |
| | skip_train=False, use_bloom=False): |
| | src_dir = self.src_dir |
| | script = [f"{src_dir}/pretrain_gpt.py"] |
| |
|
| | args, ds_args, num_accelerators = self.get_config(output_dir, tp_size, pp_size, dp_size, |
| | n_iters=n_iters, exit_interval=exit_interval, |
| | save_interval=save_interval, |
| | skip_train=skip_train, use_bloom=use_bloom) |
| | launcher = get_launcher(num_accelerators) |
| | extra_args = ["--universal-checkpoint"] |
| | if skip_train: |
| | extra_args.append("--skip-train") |
| |
|
| | cmd = launcher + script + args + ds_args + extra_args |
| | |
| | |
| |
|
| | with CaptureStdout() as cs: |
| | execute_subprocess_async(cmd, env=self.get_env()) |
| |
|
| | |
| | self.assertIn(f"successfully loaded checkpoint from {output_dir}/checkpoints", cs.out) |
| |
|
| | |
| | if not skip_train: |
| | self.assertIn("consumed samples", cs.out) |
| |
|
| | |
| | self.assertIn("successfully saved checkpoint at iteration", cs.out) |
| | return cs.out |
| |
|
| | @staticmethod |
| | def copy_checkpoint(src_ckp_root, dst_ckp_root, ckp_name, is_universal=False): |
| | src_root = os.path.join(src_ckp_root, 'checkpoints') |
| | dst_root = os.path.join(dst_ckp_root, 'checkpoints') |
| | os.makedirs(dst_root, exist_ok=True) |
| | src_folder = os.path.join(src_root, ckp_name) |
| | dst_folder = os.path.join(dst_root, ckp_name) |
| | shutil.copytree(src=src_folder, dst=dst_folder) |
| | latest_filename = 'latest_universal' if is_universal else 'latest' |
| | dst_latest = os.path.join(dst_root, latest_filename) |
| | with open(dst_latest, "w") as f: |
| | f.write(ckp_name) |
| |
|
| | @require_torch_multi_accelerator |
| | @parameterized.expand(params, name_func=parameterized_custom_name_func) |
| | def _test_checkpoint_reshaping_main(self, src, tgt): |
| | |
| |
|
| | tp_size_src, pp_size_src, dp_size_src = list(map(int, src.split('_'))) |
| | tp_size_tgt, pp_size_tgt, dp_size_tgt = list(map(int, tgt.split('_'))) |
| |
|
| | n_accelerators = get_accelerator_count() |
| | n_accelerators_src = tp_size_src * pp_size_src * dp_size_src |
| | n_accelerators_tgt = tp_size_tgt * pp_size_tgt * dp_size_tgt |
| |
|
| | if n_accelerators_src > n_accelerators: |
| | pytest.skip(f"the test requires {n_accelerators_src} accelerators for source topology but have only {n_accelerators}") |
| | if n_accelerators_tgt > n_accelerators: |
| | pytest.skip(f"the test requires {n_accelerators_tgt} accelerators for target topology but have only {n_accelerators}") |
| |
|
| | output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) |
| |
|
| | |
| | self.train_checkpoint(output_dir, tp_size=tp_size_src, pp_size=pp_size_src, dp_size=dp_size_src) |
| |
|
| | |
| | self.convert_checkpoint_to_universal(output_dir=output_dir, step=1) |
| |
|
| | |
| | self.resume_from_universal_checkpoint(output_dir, tp_size=tp_size_tgt, pp_size=pp_size_tgt, dp_size=dp_size_tgt) |
| |
|
| | @require_torch_multi_accelerator |
| | def _test_checkpoint_reshaping_empty_dir(self): |
| |
|
| | output_dir = self.get_auto_remove_tmp_dir() |
| | with self.assertRaises(RuntimeError): |
| | self.convert_checkpoint_to_universal(output_dir=output_dir, step=1) |
| |
|
| | @require_torch_multi_accelerator |
| | @parameterized.expand([True, False]) |
| | def test_checkpoint_reshaping_2x2x2_to_2x2x1_to_2x2x2(self, use_bloom): |
| | |
| |
|
| | tp_size_src, pp_size_src, dp_size_src = 2, 2, 2 |
| | tp_size_tgt, pp_size_tgt, dp_size_tgt = 2, 2, 1 |
| |
|
| | n_accelerators = get_accelerator_count() |
| | n_accelerators_src = tp_size_src * pp_size_src * dp_size_src |
| | n_accelerators_tgt = tp_size_tgt * pp_size_tgt * dp_size_tgt |
| | n_required_accelerators = max(n_accelerators_src, n_accelerators_tgt) |
| | if n_required_accelerators > n_accelerators: |
| | pytest.skip(f"the test requires {n_required_accelerators} accelerators but have only {n_accelerators}") |
| |
|
| | root_dir = self.get_auto_remove_tmp_dir(after=True) |
| | output_2x2x2_dir = os.path.join(root_dir, 'topo_2x2x2') |
| | output_2x2x1_dir = os.path.join(root_dir, 'topo_2x2x1') |
| | output_2x2x2_final_dir = os.path.join(root_dir, 'topo_2x2x2_final') |
| |
|
| | total_n_iters = 20 |
| | checkpoint_iter = total_n_iters // 2 |
| |
|
| | |
| | out = self.train_checkpoint(output_2x2x2_dir, |
| | tp_size=tp_size_src, |
| | pp_size=pp_size_src, |
| | dp_size=dp_size_src, |
| | n_iters=total_n_iters, |
| | exit_interval=total_n_iters + 1, |
| | save_interval=checkpoint_iter, |
| | use_bloom=use_bloom) |
| |
|
| | try: |
| | orig_2x2x2_test_loss = float(re.search( |
| | 'test set \| lm loss value: (\d+\.\d+E+\++\d+)', out).group(1)) |
| | except AttributeError: |
| | assert False, 'Not found test set loss in original 2x2x2 training' |
| |
|
| | |
| | self.convert_checkpoint_to_universal(output_dir=output_2x2x2_dir, step=checkpoint_iter) |
| |
|
| | |
| | univ_ckp_name = f'global_step{checkpoint_iter}_universal' |
| | self.copy_checkpoint(src_ckp_root=output_2x2x2_dir, |
| | dst_ckp_root=output_2x2x1_dir, |
| | ckp_name=univ_ckp_name, |
| | is_universal=True) |
| |
|
| | |
| | |
| | |
| | |
| | self.resume_from_universal_checkpoint(output_2x2x1_dir, |
| | tp_size=tp_size_tgt, |
| | pp_size=pp_size_tgt, |
| | dp_size=dp_size_tgt, |
| | n_iters=total_n_iters, |
| | exit_interval=checkpoint_iter, |
| | save_interval=total_n_iters, |
| | skip_train=True, |
| | use_bloom=use_bloom) |
| |
|
| | |
| | ckp_name = f'global_step{checkpoint_iter}' |
| | self.copy_checkpoint(src_ckp_root=output_2x2x1_dir, |
| | dst_ckp_root=output_2x2x2_final_dir, |
| | ckp_name=ckp_name, |
| | is_universal=False) |
| |
|
| | |
| | self.convert_checkpoint_to_universal(output_dir=output_2x2x2_final_dir, step=checkpoint_iter) |
| |
|
| | |
| | out = self.resume_from_universal_checkpoint(output_2x2x2_final_dir, |
| | tp_size=tp_size_src, |
| | pp_size=pp_size_src, |
| | dp_size=dp_size_src, |
| | n_iters=total_n_iters, |
| | exit_interval=total_n_iters + 1, |
| | save_interval=total_n_iters, |
| | use_bloom=use_bloom) |
| | try: |
| | final_2x2x2_test_loss = float(re.search( |
| | 'test set \| lm loss value: (\d+\.\d+E+\++\d+)', out).group(1)) |
| | except AttributeError: |
| | assert False, 'Not found test set loss in final 2x2x2 training' |
| |
|
| | |
| | assert orig_2x2x2_test_loss == final_2x2x2_test_loss |
| |
|