| import argparse |
| import json |
| import warnings |
| from collections import OrderedDict |
| from copy import deepcopy |
| from typing import Any, Dict, List |
|
|
| import numpy as np |
| import torch |
| from transformers import AutoTokenizer |
|
|
| from groundingdino.util.slconfig import SLConfig |
|
|
|
|
| def slprint(x, name="x"): |
| if isinstance(x, (torch.Tensor, np.ndarray)): |
| print(f"{name}.shape:", x.shape) |
| elif isinstance(x, (tuple, list)): |
| print("type x:", type(x)) |
| for i in range(min(10, len(x))): |
| slprint(x[i], f"{name}[{i}]") |
| elif isinstance(x, dict): |
| for k, v in x.items(): |
| slprint(v, f"{name}[{k}]") |
| else: |
| print(f"{name}.type:", type(x)) |
|
|
|
|
| def clean_state_dict(state_dict): |
| new_state_dict = OrderedDict() |
| for k, v in state_dict.items(): |
| if k[:7] == "module.": |
| k = k[7:] |
| new_state_dict[k] = v |
| return new_state_dict |
|
|
|
|
| def renorm( |
| img: torch.FloatTensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] |
| ) -> torch.FloatTensor: |
| |
| |
| assert img.dim() == 3 or img.dim() == 4, "img.dim() should be 3 or 4 but %d" % img.dim() |
| if img.dim() == 3: |
| assert img.size(0) == 3, 'img.size(0) shoule be 3 but "%d". (%s)' % ( |
| img.size(0), |
| str(img.size()), |
| ) |
| img_perm = img.permute(1, 2, 0) |
| mean = torch.Tensor(mean) |
| std = torch.Tensor(std) |
| img_res = img_perm * std + mean |
| return img_res.permute(2, 0, 1) |
| else: |
| assert img.size(1) == 3, 'img.size(1) shoule be 3 but "%d". (%s)' % ( |
| img.size(1), |
| str(img.size()), |
| ) |
| img_perm = img.permute(0, 2, 3, 1) |
| mean = torch.Tensor(mean) |
| std = torch.Tensor(std) |
| img_res = img_perm * std + mean |
| return img_res.permute(0, 3, 1, 2) |
|
|
|
|
| class CocoClassMapper: |
| def __init__(self) -> None: |
| self.category_map_str = { |
| "1": 1, |
| "2": 2, |
| "3": 3, |
| "4": 4, |
| "5": 5, |
| "6": 6, |
| "7": 7, |
| "8": 8, |
| "9": 9, |
| "10": 10, |
| "11": 11, |
| "13": 12, |
| "14": 13, |
| "15": 14, |
| "16": 15, |
| "17": 16, |
| "18": 17, |
| "19": 18, |
| "20": 19, |
| "21": 20, |
| "22": 21, |
| "23": 22, |
| "24": 23, |
| "25": 24, |
| "27": 25, |
| "28": 26, |
| "31": 27, |
| "32": 28, |
| "33": 29, |
| "34": 30, |
| "35": 31, |
| "36": 32, |
| "37": 33, |
| "38": 34, |
| "39": 35, |
| "40": 36, |
| "41": 37, |
| "42": 38, |
| "43": 39, |
| "44": 40, |
| "46": 41, |
| "47": 42, |
| "48": 43, |
| "49": 44, |
| "50": 45, |
| "51": 46, |
| "52": 47, |
| "53": 48, |
| "54": 49, |
| "55": 50, |
| "56": 51, |
| "57": 52, |
| "58": 53, |
| "59": 54, |
| "60": 55, |
| "61": 56, |
| "62": 57, |
| "63": 58, |
| "64": 59, |
| "65": 60, |
| "67": 61, |
| "70": 62, |
| "72": 63, |
| "73": 64, |
| "74": 65, |
| "75": 66, |
| "76": 67, |
| "77": 68, |
| "78": 69, |
| "79": 70, |
| "80": 71, |
| "81": 72, |
| "82": 73, |
| "84": 74, |
| "85": 75, |
| "86": 76, |
| "87": 77, |
| "88": 78, |
| "89": 79, |
| "90": 80, |
| } |
| self.origin2compact_mapper = {int(k): v - 1 for k, v in self.category_map_str.items()} |
| self.compact2origin_mapper = {int(v - 1): int(k) for k, v in self.category_map_str.items()} |
|
|
| def origin2compact(self, idx): |
| return self.origin2compact_mapper[int(idx)] |
|
|
| def compact2origin(self, idx): |
| return self.compact2origin_mapper[int(idx)] |
|
|
|
|
| def to_device(item, device): |
| if isinstance(item, torch.Tensor): |
| return item.to(device) |
| elif isinstance(item, list): |
| return [to_device(i, device) for i in item] |
| elif isinstance(item, dict): |
| return {k: to_device(v, device) for k, v in item.items()} |
| else: |
| raise NotImplementedError( |
| "Call Shilong if you use other containers! type: {}".format(type(item)) |
| ) |
|
|
|
|
| |
| def get_gaussian_mean(x, axis, other_axis, softmax=True): |
| """ |
| |
| Args: |
| x (float): Input images(BxCxHxW) |
| axis (int): The index for weighted mean |
| other_axis (int): The other index |
| |
| Returns: weighted index for axis, BxC |
| |
| """ |
| mat2line = torch.sum(x, axis=other_axis) |
| |
| if softmax: |
| u = torch.softmax(mat2line, axis=2) |
| else: |
| u = mat2line / (mat2line.sum(2, keepdim=True) + 1e-6) |
| size = x.shape[axis] |
| ind = torch.linspace(0, 1, size).to(x.device) |
| batch = x.shape[0] |
| channel = x.shape[1] |
| index = ind.repeat([batch, channel, 1]) |
| mean_position = torch.sum(index * u, dim=2) |
| return mean_position |
|
|
|
|
| def get_expected_points_from_map(hm, softmax=True): |
| """get_gaussian_map_from_points |
| B,C,H,W -> B,N,2 float(0, 1) float(0, 1) |
| softargmax function |
| |
| Args: |
| hm (float): Input images(BxCxHxW) |
| |
| Returns: |
| weighted index for axis, BxCx2. float between 0 and 1. |
| |
| """ |
| |
| B, C, H, W = hm.shape |
| y_mean = get_gaussian_mean(hm, 2, 3, softmax=softmax) |
| x_mean = get_gaussian_mean(hm, 3, 2, softmax=softmax) |
| |
| return torch.stack([x_mean, y_mean], dim=2) |
|
|
|
|
| |
| |
| class Embedder: |
| def __init__(self, **kwargs): |
| self.kwargs = kwargs |
| self.create_embedding_fn() |
|
|
| def create_embedding_fn(self): |
| embed_fns = [] |
| d = self.kwargs["input_dims"] |
| out_dim = 0 |
| if self.kwargs["include_input"]: |
| embed_fns.append(lambda x: x) |
| out_dim += d |
|
|
| max_freq = self.kwargs["max_freq_log2"] |
| N_freqs = self.kwargs["num_freqs"] |
|
|
| if self.kwargs["log_sampling"]: |
| freq_bands = 2.0 ** torch.linspace(0.0, max_freq, steps=N_freqs) |
| else: |
| freq_bands = torch.linspace(2.0**0.0, 2.0**max_freq, steps=N_freqs) |
|
|
| for freq in freq_bands: |
| for p_fn in self.kwargs["periodic_fns"]: |
| embed_fns.append(lambda x, p_fn=p_fn, freq=freq: p_fn(x * freq)) |
| out_dim += d |
|
|
| self.embed_fns = embed_fns |
| self.out_dim = out_dim |
|
|
| def embed(self, inputs): |
| return torch.cat([fn(inputs) for fn in self.embed_fns], -1) |
|
|
|
|
| def get_embedder(multires, i=0): |
| import torch.nn as nn |
|
|
| if i == -1: |
| return nn.Identity(), 3 |
|
|
| embed_kwargs = { |
| "include_input": True, |
| "input_dims": 3, |
| "max_freq_log2": multires - 1, |
| "num_freqs": multires, |
| "log_sampling": True, |
| "periodic_fns": [torch.sin, torch.cos], |
| } |
|
|
| embedder_obj = Embedder(**embed_kwargs) |
| embed = lambda x, eo=embedder_obj: eo.embed(x) |
| return embed, embedder_obj.out_dim |
|
|
|
|
| class APOPMeter: |
| def __init__(self) -> None: |
| self.tp = 0 |
| self.fp = 0 |
| self.tn = 0 |
| self.fn = 0 |
|
|
| def update(self, pred, gt): |
| """ |
| Input: |
| pred, gt: Tensor() |
| """ |
| assert pred.shape == gt.shape |
| self.tp += torch.logical_and(pred == 1, gt == 1).sum().item() |
| self.fp += torch.logical_and(pred == 1, gt == 0).sum().item() |
| self.tn += torch.logical_and(pred == 0, gt == 0).sum().item() |
| self.tn += torch.logical_and(pred == 1, gt == 0).sum().item() |
|
|
| def update_cm(self, tp, fp, tn, fn): |
| self.tp += tp |
| self.fp += fp |
| self.tn += tn |
| self.tn += fn |
|
|
|
|
| def inverse_sigmoid(x, eps=1e-5): |
| x = x.clamp(min=0, max=1) |
| x1 = x.clamp(min=eps) |
| x2 = (1 - x).clamp(min=eps) |
| return torch.log(x1 / x2) |
|
|
|
|
| def get_raw_dict(args): |
| """ |
| return the dicf contained in args. |
| |
| e.g: |
| >>> with open(path, 'w') as f: |
| json.dump(get_raw_dict(args), f, indent=2) |
| """ |
| if isinstance(args, argparse.Namespace): |
| return vars(args) |
| elif isinstance(args, dict): |
| return args |
| elif isinstance(args, SLConfig): |
| return args._cfg_dict |
| else: |
| raise NotImplementedError("Unknown type {}".format(type(args))) |
|
|
|
|
| def stat_tensors(tensor): |
| assert tensor.dim() == 1 |
| tensor_sm = tensor.softmax(0) |
| entropy = (tensor_sm * torch.log(tensor_sm + 1e-9)).sum() |
|
|
| return { |
| "max": tensor.max(), |
| "min": tensor.min(), |
| "mean": tensor.mean(), |
| "var": tensor.var(), |
| "std": tensor.var() ** 0.5, |
| "entropy": entropy, |
| } |
|
|
|
|
| class NiceRepr: |
| """Inherit from this class and define ``__nice__`` to "nicely" print your |
| objects. |
| |
| Defines ``__str__`` and ``__repr__`` in terms of ``__nice__`` function |
| Classes that inherit from :class:`NiceRepr` should redefine ``__nice__``. |
| If the inheriting class has a ``__len__``, method then the default |
| ``__nice__`` method will return its length. |
| |
| Example: |
| >>> class Foo(NiceRepr): |
| ... def __nice__(self): |
| ... return 'info' |
| >>> foo = Foo() |
| >>> assert str(foo) == '<Foo(info)>' |
| >>> assert repr(foo).startswith('<Foo(info) at ') |
| |
| Example: |
| >>> class Bar(NiceRepr): |
| ... pass |
| >>> bar = Bar() |
| >>> import pytest |
| >>> with pytest.warns(None) as record: |
| >>> assert 'object at' in str(bar) |
| >>> assert 'object at' in repr(bar) |
| |
| Example: |
| >>> class Baz(NiceRepr): |
| ... def __len__(self): |
| ... return 5 |
| >>> baz = Baz() |
| >>> assert str(baz) == '<Baz(5)>' |
| """ |
|
|
| def __nice__(self): |
| """str: a "nice" summary string describing this module""" |
| if hasattr(self, "__len__"): |
| |
| |
| return str(len(self)) |
| else: |
| |
| raise NotImplementedError(f"Define the __nice__ method for {self.__class__!r}") |
|
|
| def __repr__(self): |
| """str: the string of the module""" |
| try: |
| nice = self.__nice__() |
| classname = self.__class__.__name__ |
| return f"<{classname}({nice}) at {hex(id(self))}>" |
| except NotImplementedError as ex: |
| warnings.warn(str(ex), category=RuntimeWarning) |
| return object.__repr__(self) |
|
|
| def __str__(self): |
| """str: the string of the module""" |
| try: |
| classname = self.__class__.__name__ |
| nice = self.__nice__() |
| return f"<{classname}({nice})>" |
| except NotImplementedError as ex: |
| warnings.warn(str(ex), category=RuntimeWarning) |
| return object.__repr__(self) |
|
|
|
|
| def ensure_rng(rng=None): |
| """Coerces input into a random number generator. |
| |
| If the input is None, then a global random state is returned. |
| |
| If the input is a numeric value, then that is used as a seed to construct a |
| random state. Otherwise the input is returned as-is. |
| |
| Adapted from [1]_. |
| |
| Args: |
| rng (int | numpy.random.RandomState | None): |
| if None, then defaults to the global rng. Otherwise this can be an |
| integer or a RandomState class |
| Returns: |
| (numpy.random.RandomState) : rng - |
| a numpy random number generator |
| |
| References: |
| .. [1] https://gitlab.kitware.com/computer-vision/kwarray/blob/master/kwarray/util_random.py#L270 # noqa: E501 |
| """ |
|
|
| if rng is None: |
| rng = np.random.mtrand._rand |
| elif isinstance(rng, int): |
| rng = np.random.RandomState(rng) |
| else: |
| rng = rng |
| return rng |
|
|
|
|
| def random_boxes(num=1, scale=1, rng=None): |
| """Simple version of ``kwimage.Boxes.random`` |
| |
| Returns: |
| Tensor: shape (n, 4) in x1, y1, x2, y2 format. |
| |
| References: |
| https://gitlab.kitware.com/computer-vision/kwimage/blob/master/kwimage/structs/boxes.py#L1390 |
| |
| Example: |
| >>> num = 3 |
| >>> scale = 512 |
| >>> rng = 0 |
| >>> boxes = random_boxes(num, scale, rng) |
| >>> print(boxes) |
| tensor([[280.9925, 278.9802, 308.6148, 366.1769], |
| [216.9113, 330.6978, 224.0446, 456.5878], |
| [405.3632, 196.3221, 493.3953, 270.7942]]) |
| """ |
| rng = ensure_rng(rng) |
|
|
| tlbr = rng.rand(num, 4).astype(np.float32) |
|
|
| tl_x = np.minimum(tlbr[:, 0], tlbr[:, 2]) |
| tl_y = np.minimum(tlbr[:, 1], tlbr[:, 3]) |
| br_x = np.maximum(tlbr[:, 0], tlbr[:, 2]) |
| br_y = np.maximum(tlbr[:, 1], tlbr[:, 3]) |
|
|
| tlbr[:, 0] = tl_x * scale |
| tlbr[:, 1] = tl_y * scale |
| tlbr[:, 2] = br_x * scale |
| tlbr[:, 3] = br_y * scale |
|
|
| boxes = torch.from_numpy(tlbr) |
| return boxes |
|
|
|
|
| class ModelEma(torch.nn.Module): |
| def __init__(self, model, decay=0.9997, device=None): |
| super(ModelEma, self).__init__() |
| |
| self.module = deepcopy(model) |
| self.module.eval() |
|
|
| |
|
|
| self.decay = decay |
| self.device = device |
| if self.device is not None: |
| self.module.to(device=device) |
|
|
| def _update(self, model, update_fn): |
| with torch.no_grad(): |
| for ema_v, model_v in zip( |
| self.module.state_dict().values(), model.state_dict().values() |
| ): |
| if self.device is not None: |
| model_v = model_v.to(device=self.device) |
| ema_v.copy_(update_fn(ema_v, model_v)) |
|
|
| def update(self, model): |
| self._update(model, update_fn=lambda e, m: self.decay * e + (1.0 - self.decay) * m) |
|
|
| def set(self, model): |
| self._update(model, update_fn=lambda e, m: m) |
|
|
|
|
| class BestMetricSingle: |
| def __init__(self, init_res=0.0, better="large") -> None: |
| self.init_res = init_res |
| self.best_res = init_res |
| self.best_ep = -1 |
|
|
| self.better = better |
| assert better in ["large", "small"] |
|
|
| def isbetter(self, new_res, old_res): |
| if self.better == "large": |
| return new_res > old_res |
| if self.better == "small": |
| return new_res < old_res |
|
|
| def update(self, new_res, ep): |
| if self.isbetter(new_res, self.best_res): |
| self.best_res = new_res |
| self.best_ep = ep |
| return True |
| return False |
|
|
| def __str__(self) -> str: |
| return "best_res: {}\t best_ep: {}".format(self.best_res, self.best_ep) |
|
|
| def __repr__(self) -> str: |
| return self.__str__() |
|
|
| def summary(self) -> dict: |
| return { |
| "best_res": self.best_res, |
| "best_ep": self.best_ep, |
| } |
|
|
|
|
| class BestMetricHolder: |
| def __init__(self, init_res=0.0, better="large", use_ema=False) -> None: |
| self.best_all = BestMetricSingle(init_res, better) |
| self.use_ema = use_ema |
| if use_ema: |
| self.best_ema = BestMetricSingle(init_res, better) |
| self.best_regular = BestMetricSingle(init_res, better) |
|
|
| def update(self, new_res, epoch, is_ema=False): |
| """ |
| return if the results is the best. |
| """ |
| if not self.use_ema: |
| return self.best_all.update(new_res, epoch) |
| else: |
| if is_ema: |
| self.best_ema.update(new_res, epoch) |
| return self.best_all.update(new_res, epoch) |
| else: |
| self.best_regular.update(new_res, epoch) |
| return self.best_all.update(new_res, epoch) |
|
|
| def summary(self): |
| if not self.use_ema: |
| return self.best_all.summary() |
|
|
| res = {} |
| res.update({f"all_{k}": v for k, v in self.best_all.summary().items()}) |
| res.update({f"regular_{k}": v for k, v in self.best_regular.summary().items()}) |
| res.update({f"ema_{k}": v for k, v in self.best_ema.summary().items()}) |
| return res |
|
|
| def __repr__(self) -> str: |
| return json.dumps(self.summary(), indent=2) |
|
|
| def __str__(self) -> str: |
| return self.__repr__() |
|
|
|
|
| def targets_to(targets: List[Dict[str, Any]], device): |
| """Moves the target dicts to the given device.""" |
| excluded_keys = [ |
| "questionId", |
| "tokens_positive", |
| "strings_positive", |
| "tokens", |
| "dataset_name", |
| "sentence_id", |
| "original_img_id", |
| "nb_eval", |
| "task_id", |
| "original_id", |
| "token_span", |
| "caption", |
| "dataset_type", |
| ] |
| return [ |
| {k: v.to(device) if k not in excluded_keys else v for k, v in t.items()} for t in targets |
| ] |
|
|
|
|
| def get_phrases_from_posmap( |
| posmap: torch.BoolTensor, tokenized: Dict, tokenizer: AutoTokenizer, left_idx: int = 0, right_idx: int = 255 |
| ): |
| assert isinstance(posmap, torch.Tensor), "posmap must be torch.Tensor" |
| if posmap.dim() == 1: |
| posmap[0: left_idx + 1] = False |
| posmap[right_idx:] = False |
| non_zero_idx = posmap.nonzero(as_tuple=True)[0].tolist() |
| token_ids = [tokenized["input_ids"][i] for i in non_zero_idx] |
| return tokenizer.decode(token_ids) |
| else: |
| raise NotImplementedError("posmap must be 1-dim") |
|
|