# Copyright (C) 2024 Habana Labs, Ltd. an Intel Company. # Copyright 2020 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re import os import pytest import shutil from parameterized import parameterized from testing_utils import ( CaptureStdout, TestCasePlus, execute_subprocess_async, get_accelerator_count, require_deepspeed, require_torch_accelerator, require_torch_multi_accelerator, set_seed ) set_seed(42) def parameterized_custom_name_func(func, param_num, param): # customize the test name generator function as we want both params to appear in the subtest # name, as by default it shows only the first param param_based_name = parameterized.to_safe_name("_to_".join(str(x) for x in param.args)) return f"{func.__name__}_{param_based_name}" params = [ # TP_PP_DP ["1_1_1", "1_1_1"], ["2_1_1", "1_1_1"], ["1_2_1", "1_1_1"], ["1_1_2", "1_1_1"], ["1_1_1", "2_1_1"], ["1_1_1", "1_2_1"], ["1_1_1", "1_1_2"], ["1_1_2", "1_1_2"], ["1_1_2", "2_1_1"], ["1_1_2", "1_2_1"], ["1_2_1", "1_2_1"], ["1_2_1", "2_1_1"], ["1_2_1", "1_1_2"], ["2_1_1", "2_1_1"], ["2_1_1", "1_2_1"], ["2_1_1", "1_1_2"], ["2_2_2", "1_1_1"], ["2_2_2", "2_2_2"], ["1_1_1", "2_2_2"], ["1_1_8", "2_2_2"], ] def get_launcher(num_accelerators): # 1. explicitly set --num_nodes=1 just in case these tests end up run on a multi-node setup # - it won't be able to handle that return f"deepspeed --num_nodes 1 --num_gpus {num_accelerators}".split() @require_deepspeed @require_torch_accelerator class MegDSTestCheckpoints(TestCasePlus): """ """ def setUp(self): super().setUp() # at times magatron fails to build kernels and doesn't remove the lock file, which makes # subsequent runs hang - so make sure there is no lock when starting the testing meg_lock_file_path = self.repo_root_dir_str + "/megatron/fused_kernels/build/lock" if os.path.exists(meg_lock_file_path): os.unlink(meg_lock_file_path) @staticmethod def find_lines_with_pattern_in_buffer(buffer, pattern): lines = buffer.splitlines() res = [] for line in lines: if line.find(pattern) != -1: res.append(line) return res def get_config(self, output_dir, tp_size, pp_size, dp_size, n_iters=None, exit_interval=None, save_interval= None, skip_train=False, use_bloom=False): data_dir = os.getenv("HL_DATA_DIR_ROOT", "") if data_dir == "": data_dir = f"{self.data_dir}/gpt2" num_accelerators = pp_size * tp_size * dp_size print(f"Using {num_accelerators} Accelerators") n_iters = 8 if n_iters is None else n_iters exit_interval = n_iters // 2 if exit_interval is None else exit_interval save_interval = 1 if save_interval is None else save_interval seq_len = 8 # common/shared configs ds_args = f""" --deepspeed --deepspeed_config {self.test_file_dir_str}/ds_config_bf16.json --zero-stage 0 --deepspeed-activation-checkpointing """.split() args = f""" --tensor-model-parallel-size {tp_size} --pipeline-model-parallel-size {pp_size} --distributed-backend hccl --log-interval 1 --save-interval {save_interval} --eval-interval 10 --eval-iters 1 --exit-interval {exit_interval} --merge-file {data_dir}/merges.txt --vocab-file {data_dir}/vocab.json --data-path {data_dir}/c4_en_6_c4_spm_text_document --split 99,0,1 --save {output_dir}/checkpoints --load {output_dir}/checkpoints --num-layers 2 --hidden-size 8 --num-attention-heads 2 --seq-length {seq_len} --max-position-embeddings 8 --micro-batch-size 1 --global-batch-size 16 --train-iters {n_iters} --recompute-granularity=full --recompute-method=uniform --partition-activations --optimizer adam --adam-beta1 0.9 --adam-beta2 0.95 --adam-eps 1e-8 --lr 1e-4 --lr-warmup-iters 1 --lr-decay-iters 6 --clip-grad 1.0 --weight-decay 1e-1 --bf16 --no-gradient-accumulation-fusion """ # removed below args to speedup test _ = f""" --tensorboard-dir {output_dir}/tensorboard --tensorboard-queue-size 5 --log-timers-to-tensorboard --log-batch-size-to-tensorboard --log-validation-ppl-to-tensorboard """ if skip_train: args += "--skip-train" args = args.split() if use_bloom: bloom_args = f""" --embed-layernorm --use-alibi-position-embeddings --use-fused-sdpa 0 """.split() args.extend(bloom_args) return args, ds_args, num_accelerators def train_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1, n_iters=None, exit_interval=None, save_interval=None, skip_train=False, use_bloom=False): src_dir = self.src_dir script = [f"{src_dir}/pretrain_gpt.py"] args, ds_args, num_accelerators = self.get_config(output_dir, tp_size, pp_size, dp_size, n_iters=n_iters, exit_interval=exit_interval, save_interval=save_interval, skip_train=skip_train, use_bloom=use_bloom) launcher = get_launcher(num_accelerators) cmd = launcher + script + args + ds_args # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] + cmd)); die # 1. test training from scratch (no checkpoint) with CaptureStdout() as cs: execute_subprocess_async(cmd, env=self.get_env()) # test deepspeed is running self.assertIn("DeepSpeed info", cs.out) # test reports self.assertIn("consumed samples", cs.out) # test there should be no checkpoint this round self.assertIn(f"Unable to find latest file at {output_dir}/checkpoints/latest", cs.out) # test checkpoint saving self.assertIn("successfully saved checkpoint at iteration", cs.out) return cs.out def convert_checkpoint_to_universal(self, output_dir, step): DEEPSPEED_ROOT = os.getenv("DEEPSPEED_FORK_ROOT", "") if DEEPSPEED_ROOT == "": assert False, "please set DEEPSPEED_FORK_ROOT to deepspeed path" cmd = f""" python {DEEPSPEED_ROOT}/deepspeed/checkpoint/ds_to_universal.py --input_folder {output_dir}/checkpoints/global_step{step} --output_folder {output_dir}/checkpoints/global_step{step}_universal """.split() # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] + cmd)); die with CaptureStdout() as cs: execute_subprocess_async(cmd, env=self.get_env()) self.assertIn("Convert DeepSpeed Checkpoint to Universal Checkpoint", cs.out) def resume_from_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1): src_dir = self.src_dir script = [f"{src_dir}/pretrain_gpt.py"] args, ds_args, num_accelerators = self.get_config(output_dir, tp_size, pp_size, dp_size) launcher = get_launcher(num_accelerators) cmd = launcher + script + args + ds_args # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] + cmd)); die with CaptureStdout() as cs: execute_subprocess_async(cmd, env=self.get_env()) # test checkpoint loading self.assertIn(f"successfully loaded checkpoint from {output_dir}/checkpoints", cs.out) # test reports self.assertIn("consumed samples", cs.out) # test checkpoint saving self.assertIn("successfully saved checkpoint at iteration", cs.out) return cs.out def resume_from_universal_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1, n_iters=None, exit_interval=None, save_interval=None, skip_train=False, use_bloom=False): src_dir = self.src_dir script = [f"{src_dir}/pretrain_gpt.py"] args, ds_args, num_accelerators = self.get_config(output_dir, tp_size, pp_size, dp_size, n_iters=n_iters, exit_interval=exit_interval, save_interval=save_interval, skip_train=skip_train, use_bloom=use_bloom) launcher = get_launcher(num_accelerators) extra_args = ["--universal-checkpoint"] if skip_train: extra_args.append("--skip-train") cmd = launcher + script + args + ds_args + extra_args # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] + cmd)); die with CaptureStdout() as cs: execute_subprocess_async(cmd, env=self.get_env()) # test checkpoint loading self.assertIn(f"successfully loaded checkpoint from {output_dir}/checkpoints", cs.out) # test reports if not skip_train: self.assertIn("consumed samples", cs.out) # test checkpoint saving self.assertIn("successfully saved checkpoint at iteration", cs.out) return cs.out @staticmethod def copy_checkpoint(src_ckp_root, dst_ckp_root, ckp_name, is_universal=False): src_root = os.path.join(src_ckp_root, 'checkpoints') dst_root = os.path.join(dst_ckp_root, 'checkpoints') os.makedirs(dst_root, exist_ok=True) src_folder = os.path.join(src_root, ckp_name) dst_folder = os.path.join(dst_root, ckp_name) shutil.copytree(src=src_folder, dst=dst_folder) latest_filename = 'latest_universal' if is_universal else 'latest' dst_latest = os.path.join(dst_root, latest_filename) with open(dst_latest, "w") as f: f.write(ckp_name) @require_torch_multi_accelerator @parameterized.expand(params, name_func=parameterized_custom_name_func) def _test_checkpoint_reshaping_main(self, src, tgt): # this test needs at least 2 accelerators - if there are more accelerators it will do more extensive testing tp_size_src, pp_size_src, dp_size_src = list(map(int, src.split('_'))) tp_size_tgt, pp_size_tgt, dp_size_tgt = list(map(int, tgt.split('_'))) n_accelerators = get_accelerator_count() n_accelerators_src = tp_size_src * pp_size_src * dp_size_src n_accelerators_tgt = tp_size_tgt * pp_size_tgt * dp_size_tgt if n_accelerators_src > n_accelerators: pytest.skip(f"the test requires {n_accelerators_src} accelerators for source topology but have only {n_accelerators}") if n_accelerators_tgt > n_accelerators: pytest.skip(f"the test requires {n_accelerators_tgt} accelerators for target topology but have only {n_accelerators}") output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) # 1. train with initial topology defined in the first arg of params self.train_checkpoint(output_dir, tp_size=tp_size_src, pp_size=pp_size_src, dp_size=dp_size_src) # 2. convert checkpoint to universal checkpoint (topology ) self.convert_checkpoint_to_universal(output_dir=output_dir, step=1) # 3. check we can resume training from a reshaped checkpoint to the target topology - the last arg of params self.resume_from_universal_checkpoint(output_dir, tp_size=tp_size_tgt, pp_size=pp_size_tgt, dp_size=dp_size_tgt) @require_torch_multi_accelerator def _test_checkpoint_reshaping_empty_dir(self): output_dir = self.get_auto_remove_tmp_dir() with self.assertRaises(RuntimeError): self.convert_checkpoint_to_universal(output_dir=output_dir, step=1) @require_torch_multi_accelerator @parameterized.expand([True, False]) def test_checkpoint_reshaping_2x2x2_to_2x2x1_to_2x2x2(self, use_bloom): # this test needs at least 8 accelerators tp_size_src, pp_size_src, dp_size_src = 2, 2, 2 tp_size_tgt, pp_size_tgt, dp_size_tgt = 2, 2, 1 n_accelerators = get_accelerator_count() n_accelerators_src = tp_size_src * pp_size_src * dp_size_src n_accelerators_tgt = tp_size_tgt * pp_size_tgt * dp_size_tgt n_required_accelerators = max(n_accelerators_src, n_accelerators_tgt) if n_required_accelerators > n_accelerators: pytest.skip(f"the test requires {n_required_accelerators} accelerators but have only {n_accelerators}") root_dir = self.get_auto_remove_tmp_dir(after=True) output_2x2x2_dir = os.path.join(root_dir, 'topo_2x2x2') output_2x2x1_dir = os.path.join(root_dir, 'topo_2x2x1') output_2x2x2_final_dir = os.path.join(root_dir, 'topo_2x2x2_final') total_n_iters = 20 checkpoint_iter = total_n_iters // 2 # 1. train with initial 2x2x2 topology out = self.train_checkpoint(output_2x2x2_dir, tp_size=tp_size_src, pp_size=pp_size_src, dp_size=dp_size_src, n_iters=total_n_iters, exit_interval=total_n_iters + 1, save_interval=checkpoint_iter, use_bloom=use_bloom) try: orig_2x2x2_test_loss = float(re.search( 'test set \| lm loss value: (\d+\.\d+E+\++\d+)', out).group(1)) except AttributeError: assert False, 'Not found test set loss in original 2x2x2 training' # 2. convert 2x2x2 checkpoint to universal checkpoint self.convert_checkpoint_to_universal(output_dir=output_2x2x2_dir, step=checkpoint_iter) # 3. copy 2x2x2 universal checkpoint (step 10) to 2x2x1 univ_ckp_name = f'global_step{checkpoint_iter}_universal' self.copy_checkpoint(src_ckp_root=output_2x2x2_dir, dst_ckp_root=output_2x2x1_dir, ckp_name=univ_ckp_name, is_universal=True) # 3. use trainer to convert from universal to 2x2x1: # 3.1. load universal checkpoint # 3.1. skip actual training # 3.1. save checkpoint for 2x2x1 topology self.resume_from_universal_checkpoint(output_2x2x1_dir, tp_size=tp_size_tgt, pp_size=pp_size_tgt, dp_size=dp_size_tgt, n_iters=total_n_iters, exit_interval=checkpoint_iter, save_interval=total_n_iters, skip_train=True, use_bloom=use_bloom) # 4. copy 2x2x1 checkpoint (step 10) to 2x2x2_final ckp_name = f'global_step{checkpoint_iter}' self.copy_checkpoint(src_ckp_root=output_2x2x1_dir, dst_ckp_root=output_2x2x2_final_dir, ckp_name=ckp_name, is_universal=False) # 5. convert 2x2x1 step 10 checkpoint to universal checkpoint self.convert_checkpoint_to_universal(output_dir=output_2x2x2_final_dir, step=checkpoint_iter) # 6. Load from universal created from 2x2x1 and resume training till end out = self.resume_from_universal_checkpoint(output_2x2x2_final_dir, tp_size=tp_size_src, pp_size=pp_size_src, dp_size=dp_size_src, n_iters=total_n_iters, exit_interval=total_n_iters + 1, save_interval=total_n_iters, use_bloom=use_bloom) try: final_2x2x2_test_loss = float(re.search( 'test set \| lm loss value: (\d+\.\d+E+\++\d+)', out).group(1)) except AttributeError: assert False, 'Not found test set loss in final 2x2x2 training' # 7. Verify same test loss for original training and final training assert orig_2x2x2_test_loss == final_2x2x2_test_loss