| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Helper utils.""" |
|
|
| import json |
| import copy |
| import itertools |
| import numpy as np |
| from functools import wraps |
| from contextlib import contextmanager |
| from typing import Tuple, Union, Optional |
| from fvcore.transforms.transform import Transform |
|
|
| import os |
| import torch |
| import torch.nn as nn |
|
|
|
|
| def adjust_intrinsic( |
| intrinsic: Union[np.array, torch.Tensor], |
| intrinsic_image_dim: Tuple, |
| image_dim: Tuple |
| ) -> Union[np.array, torch.Tensor]: |
| """ |
| Adjust intrinsic camera parameters for image dimension changes. |
| |
| Args: |
| intrinsic: Camera intrinsic matrix (numpy array or torch tensor) |
| intrinsic_image_dim: Original image dimensions (width, height) |
| image_dim: Target image dimensions (width, height) |
| |
| Returns: |
| Adjusted intrinsic matrix (same type as input) |
| """ |
| if intrinsic_image_dim == image_dim: |
| return intrinsic |
|
|
| |
| height_after = image_dim[1] |
| height_before = intrinsic_image_dim[1] |
| width_after = image_dim[0] |
| width_before = intrinsic_image_dim[0] |
|
|
| width_scale = float(width_after) / float(width_before) |
| height_scale = float(height_after) / float(height_before) |
| width_offset_scale = float(width_after - 1) / float(width_before - 1) |
| height_offset_scale = float(height_after - 1) / float(height_before - 1) |
|
|
| |
| if isinstance(intrinsic, np.ndarray): |
| intrinsic_return = np.copy(intrinsic) |
|
|
| intrinsic_return[0, 0] *= width_scale |
| intrinsic_return[1, 1] *= height_scale |
| |
| intrinsic_return[0, 2] *= width_offset_scale |
| intrinsic_return[1, 2] *= height_offset_scale |
|
|
| return intrinsic_return |
|
|
| |
| elif isinstance(intrinsic, torch.Tensor): |
| intrinsic_return = intrinsic.clone() |
|
|
| intrinsic_return[:, 0, 0] *= width_scale |
| intrinsic_return[:, 1, 1] *= height_scale |
|
|
| intrinsic_return[:, 0, 2] *= width_offset_scale |
| intrinsic_return[:, 1, 2] *= height_offset_scale |
|
|
| return intrinsic_return |
|
|
| else: |
| raise TypeError(f"Unsupported input type: {type(intrinsic)}.") |
|
|
|
|
| class ModelInputResize(Transform): |
| """Resize and pad the model input.""" |
|
|
| def __init__(self, size_divisibility: int = 0, pad_value: float = 0): |
| """Initialize model input resize transform.""" |
| super().__init__() |
| self.size_divisibility = size_divisibility |
| self.pad_value = pad_value |
|
|
| def apply_coords(self, coords): |
| """ Apply transforms to the coordinates. """ |
| return coords |
|
|
| def apply_image(self, array: torch.Tensor) -> torch.Tensor: |
| """ Apply transforms to the image. """ |
| assert len(array) > 0 |
| device = array.device |
| image_size = [array.shape[-2], array.shape[-1]] |
|
|
| max_size = torch.tensor(image_size, device=device) |
| if self.size_divisibility > 1: |
| stride = self.size_divisibility |
| max_size = (max_size + (stride - 1)).div(stride, rounding_mode="floor") * stride |
|
|
| u0 = max_size[-1] - image_size[1] |
| u1 = max_size[-2] - image_size[0] |
| padding_size = [0, u0, 0, u1] |
|
|
| array = F.pad(array, padding_size, value=self.pad_value) |
| return array |
|
|
| def apply_segmentation(self, array: torch.Tensor) -> torch.Tensor: |
| """ Apply transforms to the segmentation. """ |
| return array |
|
|
|
|
| @contextmanager |
| def _ignore_torch_cuda_oom(): |
| """ |
| A context which ignores CUDA OOM exception from pytorch. |
| """ |
| try: |
| yield |
| except RuntimeError as e: |
| |
| if "CUDA out of memory. " in str(e): |
| pass |
| else: |
| raise |
|
|
| def retry_if_cuda_oom(func): |
| """ |
| Makes a function retry itself after encountering |
| pytorch's CUDA OOM error. |
| It will first retry after calling `torch.cuda.empty_cache()`. |
| |
| If that still fails, it will then retry by trying to convert inputs to CPUs. |
| In this case, it expects the function to dispatch to CPU implementation. |
| The return values may become CPU tensors as well and it's user's |
| responsibility to convert it back to CUDA tensor if needed. |
| |
| Args: |
| func: a stateless callable that takes tensor-like objects as arguments |
| |
| Returns: |
| a callable which retries `func` if OOM is encountered. |
| |
| Examples: |
| :: |
| output = retry_if_cuda_oom(some_torch_function)(input1, input2) |
| # output may be on CPU even if inputs are on GPU |
| |
| Note: |
| 1. When converting inputs to CPU, it will only look at each argument and check |
| if it has `.device` and `.to` for conversion. Nested structures of tensors |
| are not supported. |
| |
| 2. Since the function might be called more than once, it has to be |
| stateless. |
| """ |
|
|
| def maybe_to_cpu(x): |
| """Convert to CPU.""" |
| try: |
| like_gpu_tensor = x.device.type == "cuda" and hasattr(x, "to") |
| except AttributeError: |
| like_gpu_tensor = False |
| if like_gpu_tensor: |
| return x.to(device="cpu") |
| return x |
|
|
| @wraps(func) |
| def wrapped(*args, **kwargs): |
| """Wrapped function.""" |
| with _ignore_torch_cuda_oom(): |
| return func(*args, **kwargs) |
|
|
| |
| torch.cuda.empty_cache() |
| with _ignore_torch_cuda_oom(): |
| return func(*args, **kwargs) |
|
|
| |
| logging.info(f"Attempting to copy inputs of {str(func)} to CPU due to CUDA OOM") |
| new_args = (maybe_to_cpu(x) for x in args) |
| new_kwargs = {k: maybe_to_cpu(v) for k, v in kwargs.items()} |
| return func(*new_args, **new_kwargs) |
|
|
| return wrapped |
|
|
|
|
| def prepare_kept_mapping(model, cfg, dataset, frustum_mask=None, intrinsic=None): |
| """ |
| Prepare kept and mapping tensors using back projection. |
| |
| Args: |
| model: The model instance with back_projection method |
| cfg: Configuration object |
| dataset: Dataset name ('front3d' or others) |
| frustum_mask: Optional frustum mask tensor |
| intrinsic: Intrinsic matrix tensor |
| |
| Returns: |
| tuple: (kept, mapping) tensors from back projection |
| """ |
| if dataset != "front3d": |
| intrinsic = adjust_intrinsic( |
| intrinsic, |
| tuple(cfg.dataset.target_size), |
| tuple(cfg.dataset.reduced_target_size) |
| ) |
| kept, mapping = model.back_projection( |
| tuple(cfg.dataset.reduced_target_size[::-1]) + (256,), |
| intrinsic, |
| frustum_mask |
| ) |
| return kept, mapping |
|
|
|
|
| def get_kept_mapping(model, cfg, batch, device): |
| """ |
| Get kept and mapping for a batch of data (used for non-front3d datasets). |
| |
| Args: |
| model: The model instance with back_projection method |
| cfg: Configuration object |
| batch: Batch data containing frustum_mask and intrinsic |
| device: Device to place tensors on |
| |
| Returns: |
| tuple: (kept, mapping) tensors |
| """ |
| frustum_mask = batch["frustum_mask"].to(device) |
| intrinsic = batch["intrinsic"].float().to(device) |
| dataset = cfg.dataset.name |
|
|
| kept, mapping = prepare_kept_mapping( |
| model, |
| cfg, |
| dataset, |
| frustum_mask=frustum_mask, |
| intrinsic=intrinsic |
| ) |
|
|
| return kept, mapping |
|
|
|
|
| def get_norm(norm, out_channels): |
| """ |
| Args: |
| norm (str or callable): either one of BN, SyncBN, FrozenBN, GN; |
| or a callable that takes a channel number and returns |
| the normalization layer as a nn.Module. |
| |
| Returns: |
| nn.Module or None: the normalization layer |
| """ |
| if norm is None: |
| return None |
| if isinstance(norm, str): |
| if len(norm) == 0: |
| return None |
| norm = { |
| "SyncBN": nn.SyncBatchNorm, |
| "GN": lambda channels: nn.GroupNorm(32, channels), |
| "LN": lambda channels: LayerNorm(channels), |
| }[norm] |
| return norm(out_channels) |
|
|
|
|
| class Conv2d(nn.Conv2d): |
| """ |
| A wrapper around :class:`torch.nn.Conv2d` to support empty inputs and more features. |
| """ |
|
|
| def __init__(self, *args, **kwargs): |
| """ |
| Extra keyword arguments supported in addition to those in `torch.nn.Conv2d`: |
| |
| Args: |
| norm (nn.Module, optional): a normalization layer |
| activation (callable(Tensor) -> Tensor): a callable activation function |
| |
| It assumes that norm layer is used before activation. |
| """ |
| norm = kwargs.pop("norm", None) |
| activation = kwargs.pop("activation", None) |
| super().__init__(*args, **kwargs) |
|
|
| self.norm = norm |
| self.activation = activation |
|
|
| def forward(self, x): |
| """Forward pass.""" |
| |
| |
| |
| |
| |
| |
| if not torch.jit.is_scripting(): |
| |
| is_dynamo_compiling = is_compiling() |
| if not is_dynamo_compiling: |
| with warnings.catch_warnings(record=True): |
| if x.numel() == 0 and self.training: |
| |
| assert not isinstance( |
| self.norm, torch.nn.SyncBatchNorm |
| ), "SyncBatchNorm does not support empty inputs!" |
|
|
| x = F.conv2d( |
| x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups |
| ) |
| if self.norm is not None: |
| x = self.norm(x) |
| if self.activation is not None: |
| x = self.activation(x) |
| return x |
|
|