Spaces:
Runtime error
Runtime error
| from collections import OrderedDict | |
| from copy import deepcopy | |
| import json | |
| import warnings | |
| import torch | |
| import numpy as np | |
| def clean_state_dict(state_dict): | |
| new_state_dict = OrderedDict() | |
| for k, v in state_dict.items(): | |
| if k[:7] == 'module.': | |
| k = k[7:] # remove `module.` | |
| new_state_dict[k] = v | |
| return new_state_dict | |
| def renorm(img: torch.FloatTensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) \ | |
| -> torch.FloatTensor: | |
| # img: tensor(3,H,W) or tensor(B,3,H,W) | |
| # return: same as img | |
| assert img.dim() == 3 or img.dim( | |
| ) == 4, 'img.dim() should be 3 or 4 but %d' % img.dim() | |
| if img.dim() == 3: | |
| assert img.size(0) == 3, 'img.size(0) shoule be 3 but "%d". (%s)' % ( | |
| img.size(0), str(img.size())) | |
| img_perm = img.permute(1, 2, 0) | |
| mean = torch.Tensor(mean) | |
| std = torch.Tensor(std) | |
| img_res = img_perm * std + mean | |
| return img_res.permute(2, 0, 1) | |
| else: # img.dim() == 4 | |
| assert img.size(1) == 3, 'img.size(1) shoule be 3 but "%d". (%s)' % ( | |
| img.size(1), str(img.size())) | |
| img_perm = img.permute(0, 2, 3, 1) | |
| mean = torch.Tensor(mean) | |
| std = torch.Tensor(std) | |
| img_res = img_perm * std + mean | |
| return img_res.permute(0, 3, 1, 2) | |
| class CocoClassMapper(): | |
| def __init__(self) -> None: | |
| self.category_map_str = { | |
| '1': 1, | |
| '2': 2, | |
| '3': 3, | |
| '4': 4, | |
| '5': 5, | |
| '6': 6, | |
| '7': 7, | |
| '8': 8, | |
| '9': 9, | |
| '10': 10, | |
| '11': 11, | |
| '13': 12, | |
| '14': 13, | |
| '15': 14, | |
| '16': 15, | |
| '17': 16, | |
| '18': 17, | |
| '19': 18, | |
| '20': 19, | |
| '21': 20, | |
| '22': 21, | |
| '23': 22, | |
| '24': 23, | |
| '25': 24, | |
| '27': 25, | |
| '28': 26, | |
| '31': 27, | |
| '32': 28, | |
| '33': 29, | |
| '34': 30, | |
| '35': 31, | |
| '36': 32, | |
| '37': 33, | |
| '38': 34, | |
| '39': 35, | |
| '40': 36, | |
| '41': 37, | |
| '42': 38, | |
| '43': 39, | |
| '44': 40, | |
| '46': 41, | |
| '47': 42, | |
| '48': 43, | |
| '49': 44, | |
| '50': 45, | |
| '51': 46, | |
| '52': 47, | |
| '53': 48, | |
| '54': 49, | |
| '55': 50, | |
| '56': 51, | |
| '57': 52, | |
| '58': 53, | |
| '59': 54, | |
| '60': 55, | |
| '61': 56, | |
| '62': 57, | |
| '63': 58, | |
| '64': 59, | |
| '65': 60, | |
| '67': 61, | |
| '70': 62, | |
| '72': 63, | |
| '73': 64, | |
| '74': 65, | |
| '75': 66, | |
| '76': 67, | |
| '77': 68, | |
| '78': 69, | |
| '79': 70, | |
| '80': 71, | |
| '81': 72, | |
| '82': 73, | |
| '84': 74, | |
| '85': 75, | |
| '86': 76, | |
| '87': 77, | |
| '88': 78, | |
| '89': 79, | |
| '90': 80 | |
| } | |
| self.origin2compact_mapper = { | |
| int(k): v - 1 | |
| for k, v in self.category_map_str.items() | |
| } | |
| self.compact2origin_mapper = { | |
| int(v - 1): int(k) | |
| for k, v in self.category_map_str.items() | |
| } | |
| def origin2compact(self, idx): | |
| return self.origin2compact_mapper[int(idx)] | |
| def compact2origin(self, idx): | |
| return self.compact2origin_mapper[int(idx)] | |
| def to_device(item, device): | |
| if isinstance(item, torch.Tensor): | |
| return item.to(device) | |
| elif isinstance(item, list): | |
| return [to_device(i, device) for i in item] | |
| elif isinstance(item, dict): | |
| return {k: to_device(v, device) for k, v in item.items()} | |
| else: | |
| raise NotImplementedError('You use other containers! type: {}'.format( | |
| type(item))) | |
| # | |
| def get_gaussian_mean(x, axis, other_axis, softmax=True): | |
| """ | |
| Args: | |
| x (float): Input images(BxCxHxW) | |
| axis (int): The index for weighted mean | |
| other_axis (int): The other index | |
| Returns: weighted index for axis, BxC | |
| """ | |
| mat2line = torch.sum(x, axis=other_axis) | |
| # mat2line = mat2line / mat2line.mean() * 10 | |
| if softmax: | |
| u = torch.softmax(mat2line, axis=2) | |
| else: | |
| u = mat2line / (mat2line.sum(2, keepdim=True) + 1e-6) | |
| size = x.shape[axis] | |
| ind = torch.linspace(0, 1, size).to(x.device) | |
| batch = x.shape[0] | |
| channel = x.shape[1] | |
| index = ind.repeat([batch, channel, 1]) | |
| mean_position = torch.sum(index * u, dim=2) | |
| return mean_position | |
| def get_expected_points_from_map(hm, softmax=True): | |
| """get_gaussian_map_from_points B,C,H,W -> B,N,2 float(0, 1) float(0, 1) | |
| softargmax function. | |
| Args: | |
| hm (float): Input images(BxCxHxW) | |
| Returns: | |
| weighted index for axis, BxCx2. float between 0 and 1. | |
| """ | |
| # hm = 10*hm | |
| B, C, H, W = hm.shape | |
| y_mean = get_gaussian_mean(hm, 2, 3, softmax=softmax) # B,C | |
| x_mean = get_gaussian_mean(hm, 3, 2, softmax=softmax) # B,C | |
| # return torch.cat((x_mean.unsqueeze(-1), y_mean.unsqueeze(-1)), 2) | |
| return torch.stack([x_mean, y_mean], dim=2) | |
| # Positional encoding (section 5.1) | |
| # borrow from nerf | |
| class Embedder: | |
| def __init__(self, **kwargs): | |
| self.kwargs = kwargs | |
| self.create_embedding_fn() | |
| def create_embedding_fn(self): | |
| embed_fns = [] | |
| d = self.kwargs['input_dims'] | |
| out_dim = 0 | |
| if self.kwargs['include_input']: | |
| embed_fns.append(lambda x: x) | |
| out_dim += d | |
| max_freq = self.kwargs['max_freq_log2'] | |
| N_freqs = self.kwargs['num_freqs'] | |
| if self.kwargs['log_sampling']: | |
| freq_bands = 2.**torch.linspace(0., max_freq, steps=N_freqs) | |
| else: | |
| freq_bands = torch.linspace(2.**0., 2.**max_freq, steps=N_freqs) | |
| for freq in freq_bands: | |
| for p_fn in self.kwargs['periodic_fns']: | |
| embed_fns.append( | |
| lambda x, p_fn=p_fn, freq=freq: p_fn(x * freq)) | |
| out_dim += d | |
| self.embed_fns = embed_fns | |
| self.out_dim = out_dim | |
| def embed(self, inputs): | |
| return torch.cat([fn(inputs) for fn in self.embed_fns], -1) | |
| def get_embedder(multires, i=0): | |
| import torch.nn as nn | |
| if i == -1: | |
| return nn.Identity(), 3 | |
| embed_kwargs = { | |
| 'include_input': True, | |
| 'input_dims': 3, | |
| 'max_freq_log2': multires - 1, | |
| 'num_freqs': multires, | |
| 'log_sampling': True, | |
| 'periodic_fns': [torch.sin, torch.cos], | |
| } | |
| embedder_obj = Embedder(**embed_kwargs) | |
| embed = lambda x, eo=embedder_obj: eo.embed(x) | |
| return embed, embedder_obj.out_dim | |
| class APOPMeter(): | |
| def __init__(self) -> None: | |
| self.tp = 0 | |
| self.fp = 0 | |
| self.tn = 0 | |
| self.fn = 0 | |
| def update(self, pred, gt): | |
| """ | |
| Input: | |
| pred, gt: Tensor() | |
| """ | |
| assert pred.shape == gt.shape | |
| self.tp += torch.logical_and(pred == 1, gt == 1).sum().item() | |
| self.fp += torch.logical_and(pred == 1, gt == 0).sum().item() | |
| self.tn += torch.logical_and(pred == 0, gt == 0).sum().item() | |
| self.tn += torch.logical_and(pred == 1, gt == 0).sum().item() | |
| def update_cm(self, tp, fp, tn, fn): | |
| self.tp += tp | |
| self.fp += fp | |
| self.tn += tn | |
| self.tn += fn | |
| def inverse_sigmoid(x, eps=1e-5): | |
| x = x.clamp(min=0, max=1) | |
| x1 = x.clamp(min=eps) | |
| x2 = (1 - x).clamp(min=eps) | |
| return torch.log(x1 / x2) | |
| import argparse | |
| from util.config import Config | |
| def get_raw_dict(args): | |
| """return the dicf contained in args. | |
| e.g: | |
| >>> with open(path, 'w') as f: | |
| json.dump(get_raw_dict(args), f, indent=2) | |
| """ | |
| if isinstance(args, argparse.Namespace): | |
| return vars(args) | |
| elif isinstance(args, dict): | |
| return args | |
| elif isinstance(args, Config): | |
| return args._cfg_dict | |
| else: | |
| raise NotImplementedError('Unknown type {}'.format(type(args))) | |
| def stat_tensors(tensor): | |
| assert tensor.dim() == 1 | |
| tensor_sm = tensor.softmax(0) | |
| entropy = (tensor_sm * torch.log(tensor_sm + 1e-9)).sum() | |
| return { | |
| 'max': tensor.max(), | |
| 'min': tensor.min(), | |
| 'mean': tensor.mean(), | |
| 'var': tensor.var(), | |
| 'std': tensor.var()**0.5, | |
| 'entropy': entropy | |
| } | |
| class NiceRepr: | |
| """Inherit from this class and define ``__nice__`` to "nicely" print your | |
| objects. | |
| Defines ``__str__`` and ``__repr__`` in terms of ``__nice__`` function | |
| Classes that inherit from :class:`NiceRepr` should redefine ``__nice__``. | |
| If the inheriting class has a ``__len__``, method then the default | |
| ``__nice__`` method will return its length. | |
| Example: | |
| >>> class Foo(NiceRepr): | |
| ... def __nice__(self): | |
| ... return 'info' | |
| >>> foo = Foo() | |
| >>> assert str(foo) == '<Foo(info)>' | |
| >>> assert repr(foo).startswith('<Foo(info) at ') | |
| Example: | |
| >>> class Bar(NiceRepr): | |
| ... pass | |
| >>> bar = Bar() | |
| >>> import pytest | |
| >>> with pytest.warns(None) as record: | |
| >>> assert 'object at' in str(bar) | |
| >>> assert 'object at' in repr(bar) | |
| Example: | |
| >>> class Baz(NiceRepr): | |
| ... def __len__(self): | |
| ... return 5 | |
| >>> baz = Baz() | |
| >>> assert str(baz) == '<Baz(5)>' | |
| """ | |
| def __nice__(self): | |
| """str: a "nice" summary string describing this module""" | |
| if hasattr(self, '__len__'): | |
| # It is a common pattern for objects to use __len__ in __nice__ | |
| # As a convenience we define a default __nice__ for these objects | |
| return str(len(self)) | |
| else: | |
| # In all other cases force the subclass to overload __nice__ | |
| raise NotImplementedError( | |
| f'Define the __nice__ method for {self.__class__!r}') | |
| def __repr__(self): | |
| """str: the string of the module""" | |
| try: | |
| nice = self.__nice__() | |
| classname = self.__class__.__name__ | |
| return f'<{classname}({nice}) at {hex(id(self))}>' | |
| except NotImplementedError as ex: | |
| warnings.warn(str(ex), category=RuntimeWarning) | |
| return object.__repr__(self) | |
| def __str__(self): | |
| """str: the string of the module""" | |
| try: | |
| classname = self.__class__.__name__ | |
| nice = self.__nice__() | |
| return f'<{classname}({nice})>' | |
| except NotImplementedError as ex: | |
| warnings.warn(str(ex), category=RuntimeWarning) | |
| return object.__repr__(self) | |
| def ensure_rng(rng=None): | |
| """Coerces input into a random number generator. | |
| If the input is None, then a global random state is returned. | |
| If the input is a numeric value, then that is used as a seed to construct a | |
| random state. Otherwise the input is returned as-is. | |
| Adapted from [1]_. | |
| Args: | |
| rng (int | numpy.random.RandomState | None): | |
| if None, then defaults to the global rng. Otherwise this can be an | |
| integer or a RandomState class | |
| Returns: | |
| (numpy.random.RandomState) : rng - | |
| a numpy random number generator | |
| References: | |
| .. [1] https://gitlab.kitware.com/computer-vision/kwarray/blob/master/kwarray/util_random.py#L270 # noqa: E501 | |
| """ | |
| if rng is None: | |
| rng = np.random.mtrand._rand | |
| elif isinstance(rng, int): | |
| rng = np.random.RandomState(rng) | |
| else: | |
| rng = rng | |
| return rng | |
| def random_boxes(num=1, scale=1, rng=None): | |
| """Simple version of ``kwimage.Boxes.random`` | |
| Returns: | |
| Tensor: shape (n, 4) in x1, y1, x2, y2 format. | |
| References: | |
| https://gitlab.kitware.com/computer-vision/kwimage/blob/master/kwimage/structs/boxes.py#L1390 | |
| Example: | |
| >>> num = 3 | |
| >>> scale = 512 | |
| >>> rng = 0 | |
| >>> boxes = random_boxes(num, scale, rng) | |
| >>> print(boxes) | |
| tensor([[280.9925, 278.9802, 308.6148, 366.1769], | |
| [216.9113, 330.6978, 224.0446, 456.5878], | |
| [405.3632, 196.3221, 493.3953, 270.7942]]) | |
| """ | |
| rng = ensure_rng(rng) | |
| tlbr = rng.rand(num, 4).astype(np.float32) | |
| tl_x = np.minimum(tlbr[:, 0], tlbr[:, 2]) | |
| tl_y = np.minimum(tlbr[:, 1], tlbr[:, 3]) | |
| br_x = np.maximum(tlbr[:, 0], tlbr[:, 2]) | |
| br_y = np.maximum(tlbr[:, 1], tlbr[:, 3]) | |
| tlbr[:, 0] = tl_x * scale | |
| tlbr[:, 1] = tl_y * scale | |
| tlbr[:, 2] = br_x * scale | |
| tlbr[:, 3] = br_y * scale | |
| boxes = torch.from_numpy(tlbr) | |
| return boxes | |
| class ModelEma(torch.nn.Module): | |
| def __init__(self, model, decay=0.9997, device=None): | |
| super(ModelEma, self).__init__() | |
| # make a copy of the model for accumulating moving average of weights | |
| self.module = deepcopy(model) | |
| self.module.eval() | |
| # import pdb; pdb.set_trace() | |
| self.decay = decay | |
| self.device = device # perform ema on different device from model if set | |
| if self.device is not None: | |
| self.module.to(device=device) | |
| def _update(self, model, update_fn): | |
| with torch.no_grad(): | |
| for ema_v, model_v in zip(self.module.state_dict().values(), | |
| model.state_dict().values()): | |
| if self.device is not None: | |
| model_v = model_v.to(device=self.device) | |
| ema_v.copy_(update_fn(ema_v, model_v)) | |
| def update(self, model): | |
| self._update(model, | |
| update_fn=lambda e, m: self.decay * e + | |
| (1. - self.decay) * m) | |
| def set(self, model): | |
| self._update(model, update_fn=lambda e, m: m) | |
| class BestMetricSingle(): | |
| def __init__(self, init_res=0.0, better='large') -> None: | |
| self.init_res = init_res | |
| self.best_res = init_res | |
| self.best_ep = -1 | |
| self.better = better | |
| assert better in ['large', 'small'] | |
| def isbetter(self, new_res, old_res): | |
| if self.better == 'large': | |
| return new_res > old_res | |
| if self.better == 'small': | |
| return new_res < old_res | |
| def update(self, new_res, ep): | |
| if self.isbetter(new_res, self.best_res): | |
| self.best_res = new_res | |
| self.best_ep = ep | |
| return True | |
| return False | |
| def __str__(self) -> str: | |
| return 'best_res: {}\t best_ep: {}'.format(self.best_res, self.best_ep) | |
| def __repr__(self) -> str: | |
| return self.__str__() | |
| def summary(self) -> dict: | |
| return { | |
| 'best_res': self.best_res, | |
| 'best_ep': self.best_ep, | |
| } | |
| class BestMetricHolder(): | |
| def __init__(self, init_res=0.0, better='large', use_ema=False) -> None: | |
| self.best_all = BestMetricSingle(init_res, better) | |
| self.use_ema = use_ema | |
| if use_ema: | |
| self.best_ema = BestMetricSingle(init_res, better) | |
| self.best_regular = BestMetricSingle(init_res, better) | |
| def update(self, new_res, epoch, is_ema=False): | |
| """return if the results is the best.""" | |
| if not self.use_ema: | |
| return self.best_all.update(new_res, epoch) | |
| else: | |
| if is_ema: | |
| self.best_ema.update(new_res, epoch) | |
| return self.best_all.update(new_res, epoch) | |
| else: | |
| self.best_regular.update(new_res, epoch) | |
| return self.best_all.update(new_res, epoch) | |
| def summary(self): | |
| if not self.use_ema: | |
| return self.best_all.summary() | |
| res = {} | |
| res.update({f'all_{k}': v for k, v in self.best_all.summary().items()}) | |
| res.update({ | |
| f'regular_{k}': v | |
| for k, v in self.best_regular.summary().items() | |
| }) | |
| res.update({f'ema_{k}': v for k, v in self.best_ema.summary().items()}) | |
| return res | |
| def __repr__(self) -> str: | |
| return json.dumps(self.summary(), indent=2) | |
| def __str__(self) -> str: | |
| return self.__repr__() | |
| def merge_configs(cfg1, cfg2): | |
| # Merge cfg2 into cfg1 | |
| # Overwrite cfg1 if repeated, ignore if value is None. | |
| cfg1 = {} if cfg1 is None else cfg1.copy() | |
| cfg2 = {} if cfg2 is None else cfg2 | |
| for k, v in cfg2.items(): | |
| if v: | |
| cfg1[k] = v | |
| return cfg1 | |