| """ |
| Events Utils |
| |
| Modified from Detectron2 |
| |
| Author: Xiaoyang Wu (xiaoyang.wu.cs@gmail.com) |
| Please cite our work if the code is helpful to you. |
| """ |
|
|
| import datetime |
| import json |
| import logging |
| import os |
| import time |
| import torch |
| import numpy as np |
| import traceback |
| import sys |
|
|
| from typing import List, Optional, Tuple |
| from collections import defaultdict |
| from contextlib import contextmanager |
|
|
| __all__ = [ |
| "get_event_storage", |
| "JSONWriter", |
| "TensorboardXWriter", |
| "CommonMetricPrinter", |
| "EventStorage", |
| "ExceptionWriter", |
| ] |
|
|
| _CURRENT_STORAGE_STACK = [] |
|
|
|
|
| def get_event_storage(): |
| """ |
| Returns: |
| The :class:`EventStorage` object that's currently being used. |
| Throws an error if no :class:`EventStorage` is currently enabled. |
| """ |
| assert len( |
| _CURRENT_STORAGE_STACK |
| ), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!" |
| return _CURRENT_STORAGE_STACK[-1] |
|
|
|
|
| class EventWriter: |
| """ |
| Base class for writers that obtain events from :class:`EventStorage` and process them. |
| """ |
|
|
| def write(self): |
| raise NotImplementedError |
|
|
| def close(self): |
| pass |
|
|
|
|
| class JSONWriter(EventWriter): |
| """ |
| Write scalars to a json file. |
| It saves scalars as one json per line (instead of a big json) for easy parsing. |
| Examples parsing such a json file: |
| :: |
| $ cat metrics.json | jq -s '.[0:2]' |
| [ |
| { |
| "data_time": 0.008433341979980469, |
| "iteration": 19, |
| "loss": 1.9228371381759644, |
| "loss_box_reg": 0.050025828182697296, |
| "loss_classifier": 0.5316952466964722, |
| "loss_mask": 0.7236229181289673, |
| "loss_rpn_box": 0.0856662318110466, |
| "loss_rpn_cls": 0.48198649287223816, |
| "lr": 0.007173333333333333, |
| "time": 0.25401854515075684 |
| }, |
| { |
| "data_time": 0.007216215133666992, |
| "iteration": 39, |
| "loss": 1.282649278640747, |
| "loss_box_reg": 0.06222952902317047, |
| "loss_classifier": 0.30682939291000366, |
| "loss_mask": 0.6970193982124329, |
| "loss_rpn_box": 0.038663312792778015, |
| "loss_rpn_cls": 0.1471673548221588, |
| "lr": 0.007706666666666667, |
| "time": 0.2490077018737793 |
| } |
| ] |
| $ cat metrics.json | jq '.loss_mask' |
| 0.7126231789588928 |
| 0.689423680305481 |
| 0.6776131987571716 |
| ... |
| """ |
|
|
| def __init__(self, json_file, window_size=20): |
| """ |
| Args: |
| json_file (str): path to the json file. New data will be appended if the file exists. |
| window_size (int): the window size of median smoothing for the scalars whose |
| `smoothing_hint` are True. |
| """ |
| self._file_handle = open(json_file, "a") |
| self._window_size = window_size |
| self._last_write = -1 |
|
|
| def write(self): |
| storage = get_event_storage() |
| to_save = defaultdict(dict) |
|
|
| for k, (v, iter) in storage.latest_with_smoothing_hint( |
| self._window_size |
| ).items(): |
| |
| if iter <= self._last_write: |
| continue |
| to_save[iter][k] = v |
| if len(to_save): |
| all_iters = sorted(to_save.keys()) |
| self._last_write = max(all_iters) |
|
|
| for itr, scalars_per_iter in to_save.items(): |
| scalars_per_iter["iteration"] = itr |
| self._file_handle.write(json.dumps(scalars_per_iter, sort_keys=True) + "\n") |
| self._file_handle.flush() |
| try: |
| os.fsync(self._file_handle.fileno()) |
| except AttributeError: |
| pass |
|
|
| def close(self): |
| self._file_handle.close() |
|
|
|
|
| class TensorboardXWriter(EventWriter): |
| """ |
| Write all scalars to a tensorboard file. |
| """ |
|
|
| def __init__(self, log_dir: str, window_size: int = 20, **kwargs): |
| """ |
| Args: |
| log_dir (str): the directory to save the output events |
| window_size (int): the scalars will be median-smoothed by this window size |
| kwargs: other arguments passed to `torch.utils.tensorboard.SummaryWriter(...)` |
| """ |
| self._window_size = window_size |
| from torch.utils.tensorboard import SummaryWriter |
|
|
| self._writer = SummaryWriter(log_dir, **kwargs) |
| self._last_write = -1 |
|
|
| def write(self): |
| storage = get_event_storage() |
| new_last_write = self._last_write |
| for k, (v, iter) in storage.latest_with_smoothing_hint( |
| self._window_size |
| ).items(): |
| if iter > self._last_write: |
| self._writer.add_scalar(k, v, iter) |
| new_last_write = max(new_last_write, iter) |
| self._last_write = new_last_write |
|
|
| |
| |
| if len(storage._vis_data) >= 1: |
| for img_name, img, step_num in storage._vis_data: |
| self._writer.add_image(img_name, img, step_num) |
| |
| |
| |
| |
| |
| storage.clear_images() |
|
|
| if len(storage._histograms) >= 1: |
| for params in storage._histograms: |
| self._writer.add_histogram_raw(**params) |
| storage.clear_histograms() |
|
|
| def close(self): |
| if hasattr(self, "_writer"): |
| self._writer.close() |
|
|
|
|
| class CommonMetricPrinter(EventWriter): |
| """ |
| Print **common** metrics to the terminal, including |
| iteration time, ETA, memory, all losses, and the learning rate. |
| It also applies smoothing using a window of 20 elements. |
| It's meant to print common metrics in common ways. |
| To print something in more customized ways, please implement a similar printer by yourself. |
| """ |
|
|
| def __init__(self, max_iter: Optional[int] = None, window_size: int = 20): |
| """ |
| Args: |
| max_iter: the maximum number of iterations to train. |
| Used to compute ETA. If not given, ETA will not be printed. |
| window_size (int): the losses will be median-smoothed by this window size |
| """ |
| self.logger = logging.getLogger(__name__) |
| self._max_iter = max_iter |
| self._window_size = window_size |
| self._last_write = ( |
| None |
| ) |
|
|
| def _get_eta(self, storage) -> Optional[str]: |
| if self._max_iter is None: |
| return "" |
| iteration = storage.iter |
| try: |
| eta_seconds = storage.history("time").median(1000) * ( |
| self._max_iter - iteration - 1 |
| ) |
| storage.put_scalar("eta_seconds", eta_seconds, smoothing_hint=False) |
| return str(datetime.timedelta(seconds=int(eta_seconds))) |
| except KeyError: |
| |
| eta_string = None |
| if self._last_write is not None: |
| estimate_iter_time = (time.perf_counter() - self._last_write[1]) / ( |
| iteration - self._last_write[0] |
| ) |
| eta_seconds = estimate_iter_time * (self._max_iter - iteration - 1) |
| eta_string = str(datetime.timedelta(seconds=int(eta_seconds))) |
| self._last_write = (iteration, time.perf_counter()) |
| return eta_string |
|
|
| def write(self): |
| storage = get_event_storage() |
| iteration = storage.iter |
| if iteration == self._max_iter: |
| |
| |
| |
| return |
|
|
| try: |
| data_time = storage.history("data_time").avg(20) |
| except KeyError: |
| |
| |
| data_time = None |
| try: |
| iter_time = storage.history("time").global_avg() |
| except KeyError: |
| iter_time = None |
| try: |
| lr = "{:.5g}".format(storage.history("lr").latest()) |
| except KeyError: |
| lr = "N/A" |
|
|
| eta_string = self._get_eta(storage) |
|
|
| if torch.cuda.is_available(): |
| max_mem_mb = torch.cuda.max_memory_allocated() / 1024.0 / 1024.0 |
| else: |
| max_mem_mb = None |
|
|
| |
| self.logger.info( |
| " {eta}iter: {iter} {losses} {time}{data_time}lr: {lr} {memory}".format( |
| eta=f"eta: {eta_string} " if eta_string else "", |
| iter=iteration, |
| losses=" ".join( |
| [ |
| "{}: {:.4g}".format(k, v.median(self._window_size)) |
| for k, v in storage.histories().items() |
| if "loss" in k |
| ] |
| ), |
| time=( |
| "time: {:.4f} ".format(iter_time) if iter_time is not None else "" |
| ), |
| data_time=( |
| "data_time: {:.4f} ".format(data_time) |
| if data_time is not None |
| else "" |
| ), |
| lr=lr, |
| memory=( |
| "max_mem: {:.0f}M".format(max_mem_mb) |
| if max_mem_mb is not None |
| else "" |
| ), |
| ) |
| ) |
|
|
|
|
| class EventStorage: |
| """ |
| The user-facing class that provides metric storage functionalities. |
| In the future we may add support for storing / logging other types of data if needed. |
| """ |
|
|
| def __init__(self, start_iter=0): |
| """ |
| Args: |
| start_iter (int): the iteration number to start with |
| """ |
| self._history = defaultdict(AverageMeter) |
| self._smoothing_hints = {} |
| self._latest_scalars = {} |
| self._iter = start_iter |
| self._current_prefix = "" |
| self._vis_data = [] |
| self._histograms = [] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def put_scalar(self, name, value, n=1, smoothing_hint=False): |
| """ |
| Add a scalar `value` to the `HistoryBuffer` associated with `name`. |
| Args: |
| smoothing_hint (bool): a 'hint' on whether this scalar is noisy and should be |
| smoothed when logged. The hint will be accessible through |
| :meth:`EventStorage.smoothing_hints`. A writer may ignore the hint |
| and apply custom smoothing rule. |
| It defaults to True because most scalars we save need to be smoothed to |
| provide any useful signal. |
| """ |
| name = self._current_prefix + name |
| history = self._history[name] |
| history.update(value, n) |
| self._latest_scalars[name] = (value, self._iter) |
|
|
| existing_hint = self._smoothing_hints.get(name) |
| if existing_hint is not None: |
| assert ( |
| existing_hint == smoothing_hint |
| ), "Scalar {} was put with a different smoothing_hint!".format(name) |
| else: |
| self._smoothing_hints[name] = smoothing_hint |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def history(self, name): |
| """ |
| Returns: |
| AverageMeter: the history for name |
| """ |
| ret = self._history.get(name, None) |
| if ret is None: |
| raise KeyError("No history metric available for {}!".format(name)) |
| return ret |
|
|
| def histories(self): |
| """ |
| Returns: |
| dict[name -> HistoryBuffer]: the HistoryBuffer for all scalars |
| """ |
| return self._history |
|
|
| def latest(self): |
| """ |
| Returns: |
| dict[str -> (float, int)]: mapping from the name of each scalar to the most |
| recent value and the iteration number its added. |
| """ |
| return self._latest_scalars |
|
|
| def latest_with_smoothing_hint(self, window_size=20): |
| """ |
| Similar to :meth:`latest`, but the returned values |
| are either the un-smoothed original latest value, |
| or a median of the given window_size, |
| depend on whether the smoothing_hint is True. |
| This provides a default behavior that other writers can use. |
| """ |
| result = {} |
| for k, (v, itr) in self._latest_scalars.items(): |
| result[k] = ( |
| self._history[k].median(window_size) if self._smoothing_hints[k] else v, |
| itr, |
| ) |
| return result |
|
|
| def smoothing_hints(self): |
| """ |
| Returns: |
| dict[name -> bool]: the user-provided hint on whether the scalar |
| is noisy and needs smoothing. |
| """ |
| return self._smoothing_hints |
|
|
| def step(self): |
| """ |
| User should either: (1) Call this function to increment storage.iter when needed. Or |
| (2) Set `storage.iter` to the correct iteration number before each iteration. |
| The storage will then be able to associate the new data with an iteration number. |
| """ |
| self._iter += 1 |
|
|
| @property |
| def iter(self): |
| """ |
| Returns: |
| int: The current iteration number. When used together with a trainer, |
| this is ensured to be the same as trainer.iter. |
| """ |
| return self._iter |
|
|
| @iter.setter |
| def iter(self, val): |
| self._iter = int(val) |
|
|
| @property |
| def iteration(self): |
| |
| return self._iter |
|
|
| def __enter__(self): |
| _CURRENT_STORAGE_STACK.append(self) |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| assert _CURRENT_STORAGE_STACK[-1] == self |
| _CURRENT_STORAGE_STACK.pop() |
|
|
| @contextmanager |
| def name_scope(self, name): |
| """ |
| Yields: |
| A context within which all the events added to this storage |
| will be prefixed by the name scope. |
| """ |
| old_prefix = self._current_prefix |
| self._current_prefix = name.rstrip("/") + "/" |
| yield |
| self._current_prefix = old_prefix |
|
|
| def clear_images(self): |
| """ |
| Delete all the stored images for visualization. This should be called |
| after images are written to tensorboard. |
| """ |
| self._vis_data = [] |
|
|
| def clear_histograms(self): |
| """ |
| Delete all the stored histograms for visualization. |
| This should be called after histograms are written to tensorboard. |
| """ |
| self._histograms = [] |
|
|
| def reset_history(self, name): |
| ret = self._history.get(name, None) |
| if ret is None: |
| raise KeyError("No history metric available for {}!".format(name)) |
| ret.reset() |
|
|
| def reset_histories(self): |
| for name in self._history.keys(): |
| self._history[name].reset() |
|
|
|
|
| class AverageMeter: |
| """Computes and stores the average and current value""" |
|
|
| def __init__(self): |
| self.val = 0 |
| self.avg = 0 |
| self.total = 0 |
| self.count = 0 |
|
|
| def reset(self): |
| self.val = 0 |
| self.avg = 0 |
| self.total = 0 |
| self.count = 0 |
|
|
| def update(self, val, n=1): |
| self.val = val |
| self.total += val * n |
| self.count += n |
| self.avg = self.total / self.count |
|
|
|
|
| class HistoryBuffer: |
| """ |
| Track a series of scalar values and provide access to smoothed values over a |
| window or the global average of the series. |
| """ |
|
|
| def __init__(self, max_length: int = 1000000) -> None: |
| """ |
| Args: |
| max_length: maximal number of values that can be stored in the |
| buffer. When the capacity of the buffer is exhausted, old |
| values will be removed. |
| """ |
| self._max_length: int = max_length |
| self._data: List[Tuple[float, float]] = [] |
| self._count: int = 0 |
| self._global_avg: float = 0 |
|
|
| def update(self, value: float, iteration: Optional[float] = None) -> None: |
| """ |
| Add a new scalar value produced at certain iteration. If the length |
| of the buffer exceeds self._max_length, the oldest element will be |
| removed from the buffer. |
| """ |
| if iteration is None: |
| iteration = self._count |
| if len(self._data) == self._max_length: |
| self._data.pop(0) |
| self._data.append((value, iteration)) |
|
|
| self._count += 1 |
| self._global_avg += (value - self._global_avg) / self._count |
|
|
| def latest(self) -> float: |
| """ |
| Return the latest scalar value added to the buffer. |
| """ |
| return self._data[-1][0] |
|
|
| def median(self, window_size: int) -> float: |
| """ |
| Return the median of the latest `window_size` values in the buffer. |
| """ |
| return np.median([x[0] for x in self._data[-window_size:]]) |
|
|
| def avg(self, window_size: int) -> float: |
| """ |
| Return the mean of the latest `window_size` values in the buffer. |
| """ |
| return np.mean([x[0] for x in self._data[-window_size:]]) |
|
|
| def global_avg(self) -> float: |
| """ |
| Return the mean of all the elements in the buffer. Note that this |
| includes those getting removed due to limited buffer storage. |
| """ |
| return self._global_avg |
|
|
| def values(self) -> List[Tuple[float, float]]: |
| """ |
| Returns: |
| list[(number, iteration)]: content of the current buffer. |
| """ |
| return self._data |
|
|
|
|
| class ExceptionWriter: |
|
|
| def __init__(self): |
| self.logger = logging.getLogger(__name__) |
|
|
| def __enter__(self): |
| pass |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| if exc_type: |
| tb = traceback.format_exception(exc_type, exc_val, exc_tb) |
| formatted_tb_str = "".join(tb) |
| self.logger.error(formatted_tb_str) |
| sys.exit(1) |
|
|