import json from collections import OrderedDict from itertools import repeat from pathlib import Path from typing import Any, Callable, List, Optional, Tuple import pandas as pd import PIL.Image as Image import torch def read_json(fname): fname = Path(fname) with fname.open('rt') as handle: return json.load(handle, object_hook=OrderedDict) def write_json(content, fname): fname = Path(fname) with fname.open('wt') as handle: json.dump(content, handle, indent=4, sort_keys=False) def pil_loader(fname: str) -> Image.Image: return Image.open(fname) def prepare_device(n_gpu_use): """ setup GPU device if available. get gpu device indices which are used for DataParallel """ n_gpu = torch.cuda.device_count() if n_gpu_use > 0 and n_gpu == 0: print("Warning: There\'s no GPU available on this machine," "training will be performed on CPU.") n_gpu_use = 0 if n_gpu_use > n_gpu: print(f"Warning: The number of GPU\'s configured to use is {n_gpu_use}, but only {n_gpu} are " "available on this machine.") n_gpu_use = n_gpu device = torch.device('cuda:0' if n_gpu_use > 0 else 'cpu') list_ids = list(range(n_gpu_use)) return device, list_ids class TransformMultiple: def __init__(self, transform: Optional[Callable] = None) -> None: self.transform = transform def __call__(self, data: Any) -> Tuple: if self.transform is not None: cat_data = torch.cat(data) cat_data = self.transform(cat_data) return torch.split(cat_data, [x.size()[0] for x in data]) return data def _format_transform_repr(self, transform: Callable, head: str) -> List[str]: lines = transform.__repr__().splitlines() return [f"{head}{lines[0]}"] + ["{}{}".format(" " * len(head), line) for line in lines[1:]] def __repr__(self) -> str: body = [self.__class__.__name__] if self.transform is not None: body += self._format_transform_repr(self.transform, "Transform: ") return "\n".join(body) class MetricTracker: def __init__(self, *keys, writer=None): self.writer = writer self._data = pd.DataFrame(index=keys, columns=['total', 'counts', 'average']) self.reset() def reset(self): for col in self._data.columns: self._data[col].values[:] = 0 def update(self, key, value, n=1): self._data.total[key] += value * n self._data.counts[key] += n self._data.average[key] = self._data.total[key] / self._data.counts[key] def add_scalers(self): if self.writer is not None: for key in self._data.index: self.writer.add_scalar(key, self._data.average[key]) def avg(self, key): return self._data.average[key] def result(self): return dict(self._data.average)