File size: 6,776 Bytes
663494c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import logging
import os
import sys
from typing import Dict, List, Optional, Tuple, Union
import mmcv
import torch.distributed as dist
from mmcv.runner.hooks.hook import HOOKS, Hook
from mmcv.runner.checkpoint import save_checkpoint
# from mmdet3d_plugin.models.utils import run_time
@HOOKS.register_module()
class GradChecker(Hook):
def after_train_iter(self, runner):
for key, val in runner.model.named_parameters():
if val.grad == None and val.requires_grad:
print(
"WARNNING: {key}'s parameters are not be used!!!!".format(key=key)
)
@HOOKS.register_module()
class SamplerSkipIterationHook(Hook):
"""Data-loading sampler for distributed training.
When distributed training, it is only useful in conjunction with
:obj:`EpochBasedRunner`, while :obj:`IterBasedRunner` achieves the same
purpose with :obj:`IterLoader`.
"""
def __init__(self, out_dir=None):
"""Init routine."""
self.out_dir = out_dir
def before_train_epoch(self, runner):
if hasattr(runner.data_loader.sampler, 'skip_iter_at_epoch_x'):
# in case the data loader uses `SequentialSampler` in Pytorch
runner.data_loader.sampler.skip_iter_at_epoch_x(runner._inner_iter)
elif hasattr(runner.data_loader.batch_sampler.sampler, 'skip_iter_at_epoch_x'):
# batch sampler in pytorch warps the sampler as its attributes.
runner.data_loader.batch_sampler.sampler.skip_iter_at_epoch_x(runner._inner_iter)
_logger = logging.getLogger("autoresume_hook")
sys.path.append(os.environ.get("SUBMIT_SCRIPTS", "."))
try:
_logger.info("Importing AutoResume lib...")
from userlib.auto_resume import AutoResume
AutoResume.init()
_logger.info("Found AutoResume SDK!")
except:
_logger.info("Did not find AutResume SDK!")
AutoResume = None
@HOOKS.register_module()
class AutoResumeHook(Hook):
"""AutoResume hook.
A hook to interface with ADLR's AutoResume SDK.
In order to use this hook, you must first import the AutoResume SDK
in the main training script:
sys.path.append(os.environ.get("SUBMIT_SCRIPTS", "."))
try:
_logger.info("Importing AutoResume lib...")
from userlib.auto_resume import AutoResume
AutoResume.init()
_logger.info("Success!")
except:
_logger.info("Failed!")
AutoResume = None
Also make sure you import the code for the auto-resume hook:
import autoresume_hook
In the main initialization routine, set the `resume` flag
in the MMCV configure depending on whether the job is being resumed:
if AutoResume is not None:
auto_resume_details = AutoResume.get_resume_details()
if auto_resume_details is not None:
print_log(f"AutoResume details: {auto_resume_details}")
cfg.resume = True
Finally, in your MMSEG config, add the following statements:
# Hook for auto-suspend/resume on ADLR clusters.
custom_hooks = [dict(type="AutoResumeHook", interval=2000)]
Args:
interval: interval (in number of iterations) between checks as to
whether to suspend.
"""
def __init__(self, interval: int = 1000, out_dir=None):
"""Init routine."""
self.interval = interval
self.out_dir = out_dir
def every_n_train_iters(self, runner, n):
# print(f"epoch {runner.epoch}")
# print(f"iter {runner.iter}")
# print(f"inner_iter {runner.inner_iter}")
return runner.iter % n == 0 if n > 0 else False
def after_train_iter(
self,
runner: mmcv.runner.Runner,
# batch_idx: int,
# data_batch: Optional[Union[Dict, Tuple, List]] = None,
# outputs: Optional[Dict] = None,
# **kwargs,
) -> None:
"""Training iteration post hook.
Args:
runner: The runner of the training process.
batch_idx: The index of the current batch in the train loop.
data_batch: Data from dataloader.
outputs: Outputs from model.
"""
if self.every_n_train_iters(runner, self.interval):
if dist.is_initialized():
global_rank = dist.get_rank()
else:
global_rank = 0
runner.logger.info("AutoResumeHook: Checking whether to suspend...")
# Check whether to suspend the job.
should_preempt = (
AutoResume is not None and AutoResume.termination_requested()
)
if should_preempt:
# if suspend, first save the checkpoint
# meta has 'env_info', 'config', 'seed', 'exp_name'
# if runner.meta is not None:
# filename_tmpl = f"iter_{runner.iter + 1}.pth"
# else:
# filename_tmpl = f"iter_latest.pth"
# epoch -1 because it automatically add 1 when saving
# https://mmcv.readthedocs.io/en/master/_modules/mmcv/runner/epoch_based_runner.html
# runner._epoch = runner._epoch - 1
# print(f"saving epoch {runner.epoch}")
meta = {
'epoch': runner.epoch,
'iter': runner.iter,
'inner_iter': runner.inner_iter,
}
print(f"saving info {meta}")
# saving
runner.logger.info(f"refresh the latest_iter.pth")
filename = f"iter_{runner.iter+1:010d}_epoch_{runner.epoch:04d}_inneriter_{runner.inner_iter+1:08d}.pth"
# runner.save_checkpoint(
# self.out_dir,
# filename_tmpl=filename_tmpl,
# save_optimizer=True,
# create_symlink=False,
# meta=meta,
# )
# filename = filename_tmpl.format(self.epoch + 1)
filepath = os.path.join(self.out_dir, filename)
# optimizer = if save_optimizer else None
save_checkpoint(runner.model, filepath, optimizer=runner.optimizer, meta=meta)
# create customized symbolic link
dst_file = os.path.join(self.out_dir, 'latest_iter.pth')
mmcv.symlink(filename, dst_file)
# stop the program
if global_rank == 0:
runner.logger.info(f"AutoResumeHook: Request resume...")
AutoResume.request_resume()
runner.logger.info(f"AutoResumeHook: Suspend the job...")
sys.exit(0)
|