| |
| import copy |
| import datetime |
| import re |
| from collections import OrderedDict |
| from itertools import chain |
| from typing import List, Optional, Tuple |
|
|
| import numpy as np |
| import torch |
|
|
| from mmengine.device import (get_max_cuda_memory, get_max_musa_memory, |
| is_cuda_available, is_musa_available) |
| from mmengine.registry import LOG_PROCESSORS |
|
|
|
|
| @LOG_PROCESSORS.register_module() |
| class LogProcessor: |
| """A log processor used to format log information collected from |
| ``runner.message_hub.log_scalars``. |
| |
| ``LogProcessor`` instance is built by runner and will format |
| ``runner.message_hub.log_scalars`` to ``tag`` and ``log_str``, which can |
| directly used by ``LoggerHook`` and ``MMLogger``. Besides, the argument |
| ``custom_cfg`` of constructor can control the statistics method of logs. |
| |
| Args: |
| window_size (int): default smooth interval. Defaults to 10. |
| by_epoch (bool): Whether to format logs with epoch stype. Defaults to |
| True. |
| custom_cfg (list[dict], optional): Contains multiple log config dict, |
| in which key means the data source name of log and value means the |
| statistic method and corresponding arguments used to count the |
| data source. Defaults to None. |
| |
| - If custom_cfg is None, all logs will be formatted via default |
| methods, such as smoothing loss by default window_size. If |
| custom_cfg is defined as a list of config dict, for example: |
| [dict(data_src='loss', method='mean', log_name='global_loss', |
| window_size='global')]. It means the log item ``loss`` will be |
| counted as global mean and additionally logged as ``global_loss`` |
| (defined by ``log_name``). If ``log_name`` is not defined in |
| config dict, the original logged key will be overwritten. |
| |
| - The original log item cannot be overwritten twice. Here is |
| an error example: |
| [dict(data_src='loss', method='mean', window_size='global'), |
| dict(data_src='loss', method='mean', window_size='epoch')]. |
| Both log config dict in custom_cfg do not have ``log_name`` key, |
| which means the loss item will be overwritten twice. |
| |
| - For those statistic methods with the ``window_size`` argument, |
| if ``by_epoch`` is set to False, ``windows_size`` should not be |
| `epoch` to statistics log value by epoch. |
| num_digits (int): The number of significant digit shown in the |
| logging message. Defaults to 4. |
| log_with_hierarchy (bool): Whether to log with hierarchy. If it is |
| True, the information is written to visualizer backend such as |
| :obj:`LocalVisBackend` and :obj:`TensorboardBackend` |
| with hierarchy. For example, ``loss`` will be saved as |
| ``train/loss``, and accuracy will be saved as ``val/accuracy``. |
| Defaults to False. |
| `New in version 0.7.0.` |
| mean_pattern (str): This is a regular expression used to match the log |
| that need to be included in the smoothing statistics. |
| `New in version 0.7.3.` |
| |
| Examples: |
| >>> # `log_name` is defined, `loss_large_window` will be an additional |
| >>> # record. |
| >>> log_processor = dict( |
| >>> window_size=10, |
| >>> by_epoch=True, |
| >>> custom_cfg=[dict(data_src='loss', |
| >>> log_name='loss_large_window', |
| >>> method_name='mean', |
| >>> window_size=100)]) |
| >>> # `log_name` is not defined. `loss` will be overwritten. |
| >>> log_processor = dict( |
| >>> window_size=10, |
| >>> by_epoch=True, |
| >>> custom_cfg=[dict(data_src='loss', |
| >>> method_name='mean', |
| >>> window_size=100)]) |
| >>> # Record loss with different statistics methods. |
| >>> log_processor = dict( |
| >>> window_size=10, |
| >>> by_epoch=True, |
| >>> custom_cfg=[dict(data_src='loss', |
| >>> log_name='loss_large_window', |
| >>> method_name='mean', |
| >>> window_size=100), |
| >>> dict(data_src='loss', |
| >>> method_name='mean', |
| >>> window_size=100)]) |
| >>> # Overwrite loss item twice will raise an error. |
| >>> log_processor = dict( |
| >>> window_size=10, |
| >>> by_epoch=True, |
| >>> custom_cfg=[dict(data_src='loss', |
| >>> method_name='mean', |
| >>> window_size=100), |
| >>> dict(data_src='loss', |
| >>> method_name='max', |
| >>> window_size=100)]) |
| AssertionError |
| """ |
|
|
| def __init__(self, |
| window_size=10, |
| by_epoch=True, |
| custom_cfg: Optional[List[dict]] = None, |
| num_digits: int = 4, |
| log_with_hierarchy: bool = False, |
| mean_pattern=r'.*(loss|time|data_time|grad_norm).*'): |
| self.window_size = window_size |
| self.by_epoch = by_epoch |
| self.custom_cfg = custom_cfg if custom_cfg else [] |
| self.num_digits = num_digits |
| self.log_with_hierarchy = log_with_hierarchy |
| self.mean_pattern = re.compile(mean_pattern) |
| self._check_custom_cfg() |
|
|
| def get_log_after_iter(self, runner, batch_idx: int, |
| mode: str) -> Tuple[dict, str]: |
| """Format log string after training, validation or testing iteration. |
| |
| Args: |
| runner (Runner): The runner of training phase. |
| batch_idx (int): The index of the current batch in the current |
| loop. |
| mode (str): Current mode of runner, train, test or val. |
| |
| Return: |
| Tuple[dict, str]: Formatted log dict/string which will be |
| recorded by :obj:`runner.message_hub` and :obj:`runner.visualizer`. |
| """ |
| assert mode in ['train', 'test', 'val'] |
| |
| parsed_cfg = self._parse_windows_size(runner, batch_idx, |
| self.custom_cfg) |
| |
| log_tag = self._collect_scalars(parsed_cfg, runner, mode) |
|
|
| |
| |
| |
| if not self.log_with_hierarchy: |
| tag = copy.deepcopy(log_tag) |
| else: |
| tag = self._collect_scalars(parsed_cfg, runner, mode, True) |
|
|
| |
| lr_str_list = [] |
| for key, value in tag.items(): |
| if key.endswith('lr'): |
| key = self._remove_prefix(key, f'{mode}/') |
| log_tag.pop(key) |
| lr_str_list.append(f'{key}: ' |
| f'{value:.{self.num_digits}e}') |
| lr_str = ' '.join(lr_str_list) |
| |
| |
| |
| |
| |
| |
| |
| if self.by_epoch: |
| |
| |
| |
| |
| dataloader_len = self._get_dataloader_size(runner, mode) |
| cur_iter = self._get_iter(runner, batch_idx) |
| cur_iter_str = str(cur_iter).rjust(len(str(dataloader_len))) |
| if mode in ['train', 'val']: |
| cur_epoch = self._get_epoch(runner, mode) |
| if not (isinstance(runner._train_loop, dict) |
| or runner._train_loop is None): |
| |
| |
| |
| |
| max_epochs = runner.max_epochs |
| |
| |
| cur_epoch_str = f'[{cur_epoch}]'.rjust( |
| len(str(max_epochs)) + 3, ' ') |
| else: |
| cur_epoch_str = f'[{cur_epoch}]' |
| tag['epoch'] = cur_epoch |
| log_str = (f'Epoch({mode}){cur_epoch_str}' |
| f'[{cur_iter_str}/{dataloader_len}] ') |
| else: |
| log_str = (f'Epoch({mode}) ' |
| f'[{cur_iter_str}/{dataloader_len}] ') |
| else: |
| if mode == 'train': |
| cur_iter = self._get_iter(runner, batch_idx) |
| cur_iter_str = str(cur_iter).rjust(len(str(runner.max_iters))) |
| log_str = (f'Iter({mode}) ' |
| f'[{cur_iter_str}/{runner.max_iters}] ') |
| else: |
| dataloader_len = self._get_dataloader_size(runner, mode) |
| cur_iter_str = str(batch_idx + 1).rjust( |
| len(str(dataloader_len))) |
| log_str = (f'Iter({mode}) [{cur_iter_str}/{dataloader_len}] ') |
| |
| if isinstance(runner._train_loop, dict) or runner._train_loop is None: |
| tag['iter'] = 0 |
| else: |
| tag['iter'] = runner.iter + 1 |
| |
| log_str += f'{lr_str} ' |
| |
| |
| if (all(item in log_tag for item in ['time', 'data_time']) |
| and 'eta' in runner.message_hub.runtime_info): |
| eta = runner.message_hub.get_info('eta') |
| eta_str = str(datetime.timedelta(seconds=int(eta))) |
| log_str += f'eta: {eta_str} ' |
| log_str += (f'time: {log_tag["time"]:.{self.num_digits}f} ' |
| f'data_time: ' |
| f'{log_tag["data_time"]:.{self.num_digits}f} ') |
| |
| log_tag.pop('time') |
| log_tag.pop('data_time') |
|
|
| |
| |
| if is_cuda_available() or is_musa_available(): |
| max_memory = self._get_max_memory(runner) |
| log_str += f'memory: {max_memory} ' |
| tag['memory'] = max_memory |
|
|
| |
| if mode in ('train', 'val'): |
| log_items = [] |
| for name, val in log_tag.items(): |
| if mode == 'val' and not name.startswith('val/loss'): |
| continue |
| if isinstance(val, float): |
| val = f'{val:.{self.num_digits}f}' |
| log_items.append(f'{name}: {val}') |
| log_str += ' '.join(log_items) |
| return tag, log_str |
|
|
| def get_log_after_epoch(self, |
| runner, |
| batch_idx: int, |
| mode: str, |
| with_non_scalar: bool = False) -> Tuple[dict, str]: |
| """Format log string after validation or testing epoch. |
| |
| Args: |
| runner (Runner): The runner of validation/testing phase. |
| batch_idx (int): The index of the current batch in the current |
| loop. |
| mode (str): Current mode of runner. |
| with_non_scalar (bool): Whether to include non-scalar infos in the |
| returned tag. Defaults to False. |
| |
| Return: |
| Tuple[dict, str]: Formatted log dict/string which will be |
| recorded by :obj:`runner.message_hub` and :obj:`runner.visualizer`. |
| """ |
| assert mode in [ |
| 'test', 'val' |
| ], ('`_get_metric_log_str` only accept val or test mode, but got ' |
| f'{mode}') |
| dataloader_len = self._get_dataloader_size(runner, mode) |
|
|
| |
| |
| |
| |
| |
| |
| if self.by_epoch: |
| if mode == 'val': |
| cur_epoch = self._get_epoch(runner, mode) |
| log_str = (f'Epoch({mode}) [{cur_epoch}][{dataloader_len}/' |
| f'{dataloader_len}] ') |
| else: |
| log_str = ( |
| f'Epoch({mode}) [{dataloader_len}/{dataloader_len}] ') |
|
|
| else: |
| log_str = (f'Iter({mode}) [{dataloader_len}/{dataloader_len}] ') |
|
|
| custom_cfg_copy = copy.deepcopy(self.custom_cfg) |
| |
| custom_keys = [ |
| self._remove_prefix(cfg['data_src'], f'{mode}/') |
| for cfg in custom_cfg_copy |
| ] |
| |
| if 'time' not in custom_keys: |
| custom_cfg_copy.append( |
| dict(data_src='time', window_size='epoch', method_name='mean')) |
| if 'data_time' not in custom_keys: |
| custom_cfg_copy.append( |
| dict( |
| data_src='data_time', |
| window_size='epoch', |
| method_name='mean')) |
| parsed_cfg = self._parse_windows_size(runner, batch_idx, |
| custom_cfg_copy) |
| |
| ori_tag = self._collect_scalars(parsed_cfg, runner, mode, |
| self.log_with_hierarchy) |
| non_scalar_tag = self._collect_non_scalars(runner, mode) |
| |
| tag = OrderedDict() |
| time_tag = OrderedDict() |
| for key, value in ori_tag.items(): |
| if key in (f'{mode}/time', f'{mode}/data_time', 'time', |
| 'data_time'): |
| time_tag[key] = value |
| else: |
| tag[key] = value |
| |
| log_items = [] |
| log_str += ' ' |
| for name, val in chain(tag.items(), non_scalar_tag.items(), |
| time_tag.items()): |
| if isinstance(val, float): |
| val = f'{val:.{self.num_digits}f}' |
| if isinstance(val, (torch.Tensor, np.ndarray)): |
| |
| val = f'\n{val}\n' |
| log_items.append(f'{name}: {val}') |
| log_str += ' '.join(log_items) |
|
|
| if with_non_scalar: |
| tag.update(non_scalar_tag) |
| tag.update(time_tag) |
| return tag, log_str |
|
|
| def _collect_scalars(self, |
| custom_cfg: List[dict], |
| runner, |
| mode: str, |
| reserve_prefix: bool = False) -> dict: |
| """Collect log information to compose a dict according to mode. |
| |
| Args: |
| custom_cfg (List[dict]): A copy of ``self.custom_cfg`` with int |
| ``window_size``. |
| runner (Runner): The runner of the training/testing/validation |
| process. |
| mode (str): Current mode of runner. |
| reserve_prefix (bool): Whether to reserve the prefix of the key. |
| |
| Returns: |
| dict: Statistical values of logs. |
| """ |
| custom_cfg = copy.deepcopy(custom_cfg) |
| tag = OrderedDict() |
| |
| history_scalars = runner.message_hub.log_scalars |
| |
| mode_history_scalars = OrderedDict() |
| |
| |
| for prefix_key, log_buffer in history_scalars.items(): |
| if prefix_key.startswith(mode): |
| if not reserve_prefix: |
| key = self._remove_prefix(prefix_key, f'{mode}/') |
| else: |
| key = prefix_key |
| mode_history_scalars[key] = log_buffer |
| for key in mode_history_scalars: |
| |
| if re.search(self.mean_pattern, key) is not None: |
| tag[key] = mode_history_scalars[key].mean(self.window_size) |
| else: |
| |
| tag[key] = mode_history_scalars[key].current() |
| |
| for log_cfg in custom_cfg: |
| data_src = log_cfg.pop('data_src') |
| log_name = log_cfg.pop('log_name', data_src) |
| if reserve_prefix: |
| data_src = f'{mode}/{data_src}' |
| log_name = f'{mode}/{log_name}' |
| |
| |
| if data_src in mode_history_scalars: |
| tag[log_name] = mode_history_scalars[data_src].statistics( |
| **log_cfg) |
| return tag |
|
|
| def _collect_non_scalars(self, runner, mode: str) -> dict: |
| """Collect log information to compose a dict according to mode. |
| |
| Args: |
| runner (Runner): The runner of the training/testing/validation |
| process. |
| mode (str): Current mode of runner. |
| |
| Returns: |
| dict: non-scalar infos of the specified mode. |
| """ |
| |
| infos = runner.message_hub.runtime_info |
| |
| mode_infos = OrderedDict() |
| |
| for prefix_key, value in infos.items(): |
| if prefix_key.startswith(mode): |
| if self.log_with_hierarchy: |
| key = prefix_key |
| else: |
| key = self._remove_prefix(prefix_key, f'{mode}/') |
| mode_infos[key] = value |
| return mode_infos |
|
|
| def _remove_prefix(self, string: str, prefix: str): |
| """Remove the prefix ``train``, ``val`` and ``test`` of the key.""" |
| if string.startswith(prefix): |
| return string[len(prefix):] |
| else: |
| return string |
|
|
| def _check_custom_cfg(self) -> None: |
| """Check the legality of ``self.custom_cfg``.""" |
|
|
| def _check_window_size(): |
| for log_cfg in self.custom_cfg: |
| if not self.by_epoch: |
| assert log_cfg['window_size'] != 'epoch', \ |
| 'window_size cannot be epoch if LoggerHook.by_epoch' \ |
| ' is False.' |
|
|
| def _check_repeated_log_name(): |
| |
| |
| |
| check_set = set() |
| for log_cfg in self.custom_cfg: |
| assert 'data_src' in log_cfg |
| data_src = log_cfg['data_src'] |
| log_name = log_cfg.get('log_name', data_src) |
| assert log_name not in check_set, ( |
| f'Found duplicate {log_name} for {data_src}. Please check' |
| 'your `custom_cfg` for `log_processor`. You should ' |
| f'neither define duplicate `{log_name}` for {data_src} ' |
| f'nor do not define any {log_name} for multiple ' |
| f'{data_src}, See more information in the docstring of ' |
| 'LogProcessor') |
|
|
| check_set.add(log_name) |
|
|
| _check_repeated_log_name() |
| _check_window_size() |
|
|
| def _parse_windows_size(self, |
| runner, |
| batch_idx: int, |
| custom_cfg: Optional[list] = None) -> list: |
| """Parse window_size defined in custom_cfg to int value. |
| |
| Args: |
| runner (Runner): The runner of the training/testing/validation |
| process. |
| batch_idx (int): The iteration index of current dataloader. |
| custom_cfg (list): A copy of ``self.custom_cfg``. Defaults to None |
| to keep backward compatibility. |
| """ |
| if custom_cfg is None: |
| custom_cfg = copy.deepcopy(self.custom_cfg) |
| else: |
| custom_cfg = copy.deepcopy(custom_cfg) |
| for log_cfg in custom_cfg: |
| window_size = log_cfg.get('window_size', None) |
| if window_size is None or isinstance(window_size, int): |
| continue |
| elif window_size == 'epoch': |
| log_cfg['window_size'] = batch_idx + 1 |
| elif window_size == 'global': |
| log_cfg['window_size'] = runner.iter + 1 |
| else: |
| raise TypeError( |
| 'window_size should be int, epoch or global, but got ' |
| f'invalid {window_size}') |
| return custom_cfg |
|
|
| def _get_max_memory(self, runner) -> int: |
| """Returns the maximum GPU memory occupied by tensors in megabytes (MB) |
| for a given device. |
| |
| Args: |
| runner (Runner): The runner of the training/testing/validation |
| process. |
| |
| Returns: |
| The maximum GPU memory occupied by tensors in megabytes for a given |
| device. |
| """ |
|
|
| device = getattr(runner.model, 'output_device', None) |
|
|
| if is_musa_available(): |
| return get_max_musa_memory(device) |
| return get_max_cuda_memory(device) |
|
|
| def _get_iter(self, runner, batch_idx: int) -> int: |
| """Get current iteration index. |
| |
| Args: |
| runner (Runner): The runner of the training/testing/validation |
| process. |
| batch_idx (int): The iteration index of current |
| dataloader. Defaults to None. |
| |
| Returns: |
| int: The current global iter or inner iter. |
| """ |
| if self.by_epoch: |
| current_iter = batch_idx + 1 |
| else: |
| current_iter = runner.iter + 1 |
| return current_iter |
|
|
| def _get_epoch(self, runner, mode: str) -> int: |
| """Get current epoch according to mode. |
| |
| Args: |
| runner (Runner): The runner of the training/testing/validation |
| process. |
| mode (str): Current mode of runner. |
| |
| Returns: |
| int: The current epoch. |
| """ |
| if mode == 'train': |
| epoch = runner.epoch + 1 |
| elif mode == 'val': |
| if (isinstance(runner._train_loop, dict) |
| or runner._train_loop is None): |
| epoch = 0 |
| else: |
| |
| |
| epoch = runner.epoch |
| else: |
| raise ValueError( |
| f"runner mode should be 'train' or 'val', but got {mode}") |
| return epoch |
|
|
| def _get_cur_loop(self, runner, mode: str): |
| """Get current loop according to mode. |
| |
| Args: |
| runner (Runner): The runner of the training/validation/testing |
| process. |
| mode (str): Current mode of runner. |
| |
| Returns: |
| BaseLoop: Current loop of runner. |
| """ |
| |
| if mode == 'train': |
| return runner.train_loop |
| elif mode == 'val': |
| return runner.val_loop |
| else: |
| return runner.test_loop |
|
|
| def _get_dataloader_size(self, runner, mode) -> int: |
| """Get dataloader size of current loop. |
| |
| Args: |
| runner (Runner): The runner of the training/validation/testing |
| mode (str): Current mode of runner. |
| |
| Returns: |
| int: The dataloader size of current loop. |
| """ |
| return len(self._get_cur_loop(runner=runner, mode=mode).dataloader) |
|
|