|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import atexit
|
| import functools
|
| import logging
|
| import sys
|
| import uuid
|
| from typing import Any, Dict, Optional, Union
|
|
|
| from hydra.utils import instantiate
|
|
|
| from iopath.common.file_io import g_pathmgr
|
| from numpy import ndarray
|
| from torch import Tensor
|
| from torch.utils.tensorboard import SummaryWriter
|
|
|
| from training.utils.train_utils import get_machine_local_and_dist_rank, makedir
|
|
|
| Scalar = Union[Tensor, ndarray, int, float]
|
|
|
|
|
| def make_tensorboard_logger(log_dir: str, **writer_kwargs: Any):
|
| makedir(log_dir)
|
| summary_writer_method = SummaryWriter
|
| return TensorBoardLogger(
|
| path=log_dir, summary_writer_method=summary_writer_method, **writer_kwargs
|
| )
|
|
|
|
|
| class TensorBoardWriterWrapper:
|
| """
|
| A wrapper around a SummaryWriter object.
|
| """
|
|
|
| def __init__(
|
| self,
|
| path: str,
|
| *args: Any,
|
| filename_suffix: str = None,
|
| summary_writer_method: Any = SummaryWriter,
|
| **kwargs: Any,
|
| ) -> None:
|
| """Create a new TensorBoard logger.
|
| On construction, the logger creates a new events file that logs
|
| will be written to. If the environment variable `RANK` is defined,
|
| logger will only log if RANK = 0.
|
|
|
| NOTE: If using the logger with distributed training:
|
| - This logger can call collective operations
|
| - Logs will be written on rank 0 only
|
| - Logger must be constructed synchronously *after* initializing distributed process group.
|
|
|
| Args:
|
| path (str): path to write logs to
|
| *args, **kwargs: Extra arguments to pass to SummaryWriter
|
| """
|
| self._writer: Optional[SummaryWriter] = None
|
| _, self._rank = get_machine_local_and_dist_rank()
|
| self._path: str = path
|
| if self._rank == 0:
|
| logging.info(
|
| f"TensorBoard SummaryWriter instantiated. Files will be stored in: {path}"
|
| )
|
| self._writer = summary_writer_method(
|
| log_dir=path,
|
| *args,
|
| filename_suffix=filename_suffix or str(uuid.uuid4()),
|
| **kwargs,
|
| )
|
| else:
|
| logging.debug(
|
| f"Not logging meters on this host because env RANK: {self._rank} != 0"
|
| )
|
| atexit.register(self.close)
|
|
|
| @property
|
| def writer(self) -> Optional[SummaryWriter]:
|
| return self._writer
|
|
|
| @property
|
| def path(self) -> str:
|
| return self._path
|
|
|
| def flush(self) -> None:
|
| """Writes pending logs to disk."""
|
|
|
| if not self._writer:
|
| return
|
|
|
| self._writer.flush()
|
|
|
| def close(self) -> None:
|
| """Close writer, flushing pending logs to disk.
|
| Logs cannot be written after `close` is called.
|
| """
|
|
|
| if not self._writer:
|
| return
|
|
|
| self._writer.close()
|
| self._writer = None
|
|
|
|
|
| class TensorBoardLogger(TensorBoardWriterWrapper):
|
| """
|
| A simple logger for TensorBoard.
|
| """
|
|
|
| def log_dict(self, payload: Dict[str, Scalar], step: int) -> None:
|
| """Add multiple scalar values to TensorBoard.
|
|
|
| Args:
|
| payload (dict): dictionary of tag name and scalar value
|
| step (int, Optional): step value to record
|
| """
|
| if not self._writer:
|
| return
|
| for k, v in payload.items():
|
| self.log(k, v, step)
|
|
|
| def log(self, name: str, data: Scalar, step: int) -> None:
|
| """Add scalar data to TensorBoard.
|
|
|
| Args:
|
| name (string): tag name used to group scalars
|
| data (float/int/Tensor): scalar data to log
|
| step (int, optional): step value to record
|
| """
|
| if not self._writer:
|
| return
|
| self._writer.add_scalar(name, data, global_step=step, new_style=True)
|
|
|
| def log_hparams(
|
| self, hparams: Dict[str, Scalar], meters: Dict[str, Scalar]
|
| ) -> None:
|
| """Add hyperparameter data to TensorBoard.
|
|
|
| Args:
|
| hparams (dict): dictionary of hyperparameter names and corresponding values
|
| meters (dict): dictionary of name of meter and corersponding values
|
| """
|
| if not self._writer:
|
| return
|
| self._writer.add_hparams(hparams, meters)
|
|
|
|
|
| class Logger:
|
| """
|
| A logger class that can interface with multiple loggers. It now supports tensorboard only for simplicity, but you can extend it with your own logger.
|
| """
|
|
|
| def __init__(self, logging_conf):
|
|
|
| tb_config = logging_conf.tensorboard_writer
|
| tb_should_log = tb_config and tb_config.pop("should_log", True)
|
| self.tb_logger = instantiate(tb_config) if tb_should_log else None
|
|
|
| def log_dict(self, payload: Dict[str, Scalar], step: int) -> None:
|
| if self.tb_logger:
|
| self.tb_logger.log_dict(payload, step)
|
|
|
| def log(self, name: str, data: Scalar, step: int) -> None:
|
| if self.tb_logger:
|
| self.tb_logger.log(name, data, step)
|
|
|
| def log_hparams(
|
| self, hparams: Dict[str, Scalar], meters: Dict[str, Scalar]
|
| ) -> None:
|
| if self.tb_logger:
|
| self.tb_logger.log_hparams(hparams, meters)
|
|
|
|
|
|
|
|
|
| @functools.lru_cache(maxsize=None)
|
| def _cached_log_stream(filename):
|
|
|
|
|
| log_buffer_kb = 10 * 1024
|
| io = g_pathmgr.open(filename, mode="a", buffering=log_buffer_kb)
|
| atexit.register(io.close)
|
| return io
|
|
|
|
|
| def setup_logging(
|
| name,
|
| output_dir=None,
|
| rank=0,
|
| log_level_primary="INFO",
|
| log_level_secondary="ERROR",
|
| ):
|
| """
|
| Setup various logging streams: stdout and file handlers.
|
| For file handlers, we only setup for the master gpu.
|
| """
|
|
|
| log_filename = None
|
| if output_dir:
|
| makedir(output_dir)
|
| if rank == 0:
|
| log_filename = f"{output_dir}/log.txt"
|
|
|
| logger = logging.getLogger(name)
|
| logger.setLevel(log_level_primary)
|
|
|
|
|
| FORMAT = "%(levelname)s %(asctime)s %(filename)s:%(lineno)4d: %(message)s"
|
| formatter = logging.Formatter(FORMAT)
|
|
|
|
|
| for h in logger.handlers:
|
| logger.removeHandler(h)
|
| logger.root.handlers = []
|
|
|
|
|
| console_handler = logging.StreamHandler(sys.stdout)
|
| console_handler.setFormatter(formatter)
|
| logger.addHandler(console_handler)
|
| if rank == 0:
|
| console_handler.setLevel(log_level_primary)
|
| else:
|
| console_handler.setLevel(log_level_secondary)
|
|
|
|
|
| if log_filename and rank == 0:
|
| file_handler = logging.StreamHandler(_cached_log_stream(log_filename))
|
| file_handler.setLevel(log_level_primary)
|
| file_handler.setFormatter(formatter)
|
| logger.addHandler(file_handler)
|
|
|
| logging.root = logger
|
|
|
|
|
| def shutdown_logging():
|
| """
|
| After training is done, we ensure to shut down all the logger streams.
|
| """
|
| logging.info("Shutting down loggers...")
|
| handlers = logging.root.handlers
|
| for handler in handlers:
|
| handler.close()
|
|
|