| """ |
| 3D point cloud augmentation |
| |
| Author: Xiaoyang Wu (xiaoyang.wu.cs@gmail.com) |
| Please cite our work if the code is helpful to you. |
| """ |
|
|
| import random |
| import numbers |
| import scipy |
| import scipy.ndimage |
| import scipy.interpolate |
| import scipy.stats |
| import numpy as np |
| import torch |
| import copy |
| from collections.abc import Sequence, Mapping |
|
|
| from pointcept.utils.registry import Registry |
|
|
| TRANSFORMS = Registry("transforms") |
|
|
|
|
| def index_operator(data_dict, index, duplicate=False): |
| |
| |
| if "index_valid_keys" not in data_dict: |
| data_dict["index_valid_keys"] = [ |
| "coord", |
| "color", |
| "normal", |
| "superpoint", |
| "spt", |
| "strength", |
| "segment", |
| "instance", |
| ] |
| if not duplicate: |
| for key in data_dict["index_valid_keys"]: |
| if key in data_dict: |
| data_dict[key] = data_dict[key][index] |
| return data_dict |
| else: |
| data_dict_ = dict() |
| for key in data_dict.keys(): |
| if key in data_dict["index_valid_keys"]: |
| data_dict_[key] = data_dict[key][index] |
| elif key == "index_valid_keys": |
| data_dict_[key] = copy.copy(data_dict[key]) |
| else: |
| data_dict_[key] = data_dict[key] |
| return data_dict_ |
|
|
|
|
| @TRANSFORMS.register_module() |
| class Collect(object): |
| def __init__(self, keys, offset_keys_dict=None, **kwargs): |
| """ |
| e.g. Collect(keys=[coord], feat_keys=[coord, color]) |
| """ |
| if offset_keys_dict is None: |
| offset_keys_dict = dict(offset="coord") |
| self.keys = keys |
| self.offset_keys = offset_keys_dict |
| self.kwargs = kwargs |
|
|
| def __call__(self, data_dict): |
| data = dict() |
| if isinstance(self.keys, str): |
| self.keys = [self.keys] |
| for key in self.keys: |
| data[key] = data_dict[key] |
| for key, value in self.offset_keys.items(): |
| data[key] = torch.tensor([data_dict[value].shape[0]]) |
| for name, keys in self.kwargs.items(): |
| name = name.replace("_keys", "") |
| assert isinstance(keys, Sequence) |
| data[name] = torch.cat([data_dict[key].float() for key in keys], dim=1) |
| return data |
|
|
|
|
| @TRANSFORMS.register_module() |
| class Copy(object): |
| def __init__(self, keys_dict=None): |
| if keys_dict is None: |
| keys_dict = dict(coord="origin_coord", segment="origin_segment") |
| self.keys_dict = keys_dict |
|
|
| def __call__(self, data_dict): |
| for key, value in self.keys_dict.items(): |
| if isinstance(data_dict[key], np.ndarray): |
| data_dict[value] = data_dict[key].copy() |
| elif isinstance(data_dict[key], torch.Tensor): |
| data_dict[value] = data_dict[key].clone().detach() |
| else: |
| data_dict[value] = copy.deepcopy(data_dict[key]) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class Update(object): |
| def __init__(self, keys_dict=None): |
| if keys_dict is None: |
| keys_dict = dict() |
| self.keys_dict = keys_dict |
|
|
| def __call__(self, data_dict): |
| for key, value in self.keys_dict.items(): |
| data_dict[key] = value |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ToTensor(object): |
| def __call__(self, data): |
| if isinstance(data, torch.Tensor): |
| return data |
| elif isinstance(data, str): |
| |
| return data |
| elif isinstance(data, int): |
| return torch.LongTensor([data]) |
| elif isinstance(data, float): |
| return torch.FloatTensor([data]) |
| elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, bool): |
| return torch.from_numpy(data) |
| elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, np.integer): |
| return torch.from_numpy(data).long() |
| elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, np.floating): |
| return torch.from_numpy(data).float() |
| elif isinstance(data, Mapping): |
| result = {sub_key: self(item) for sub_key, item in data.items()} |
| return result |
| elif isinstance(data, Sequence): |
| result = [self(item) for item in data] |
| return result |
| else: |
| raise TypeError(f"type {type(data)} cannot be converted to tensor.") |
|
|
|
|
| @TRANSFORMS.register_module() |
| class NormalizeColor(object): |
| def __call__(self, data_dict): |
| if "color" in data_dict.keys(): |
| data_dict["color"] = data_dict["color"] / 255 |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class NormalizeCoord(object): |
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| |
| centroid = np.mean(data_dict["coord"], axis=0) |
| data_dict["coord"] -= centroid |
| m = np.max(np.sqrt(np.sum(data_dict["coord"] ** 2, axis=1))) |
| data_dict["coord"] = data_dict["coord"] / m |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class PositiveShift(object): |
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| coord_min = np.min(data_dict["coord"], 0) |
| data_dict["coord"] -= coord_min |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class CenterShift(object): |
| def __init__(self, apply_z=True): |
| self.apply_z = apply_z |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| x_min, y_min, z_min = data_dict["coord"].min(axis=0) |
| x_max, y_max, _ = data_dict["coord"].max(axis=0) |
| if self.apply_z: |
| shift = [(x_min + x_max) / 2, (y_min + y_max) / 2, z_min] |
| else: |
| shift = [(x_min + x_max) / 2, (y_min + y_max) / 2, 0] |
| data_dict["coord"] -= shift |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomShift(object): |
| def __init__(self, shift=((-0.2, 0.2), (-0.2, 0.2), (0, 0))): |
| self.shift = shift |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| shift_x = np.random.uniform(self.shift[0][0], self.shift[0][1]) |
| shift_y = np.random.uniform(self.shift[1][0], self.shift[1][1]) |
| shift_z = np.random.uniform(self.shift[2][0], self.shift[2][1]) |
| data_dict["coord"] += [shift_x, shift_y, shift_z] |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class PointClip(object): |
| def __init__(self, point_cloud_range=(-80, -80, -3, 80, 80, 1)): |
| self.point_cloud_range = point_cloud_range |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| data_dict["coord"] = np.clip( |
| data_dict["coord"], |
| a_min=self.point_cloud_range[:3], |
| a_max=self.point_cloud_range[3:], |
| ) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomDropout(object): |
| def __init__(self, dropout_ratio=0.2, dropout_application_ratio=0.5): |
| """ |
| upright_axis: axis index among x,y,z, i.e. 2 for z |
| """ |
| self.dropout_ratio = dropout_ratio |
| self.dropout_application_ratio = dropout_application_ratio |
|
|
| def __call__(self, data_dict): |
| if random.random() < self.dropout_application_ratio: |
| n = len(data_dict["coord"]) |
| idx = np.random.choice(n, int(n * (1 - self.dropout_ratio)), replace=False) |
| if "sampled_index" in data_dict: |
| |
| idx = np.unique(np.append(idx, data_dict["sampled_index"])) |
| mask = np.zeros_like(data_dict["segment"]).astype(bool) |
| mask[data_dict["sampled_index"]] = True |
| data_dict["sampled_index"] = np.where(mask[idx])[0] |
| data_dict = index_operator(data_dict, idx) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomRotate(object): |
| def __init__(self, angle=None, center=None, axis="z", always_apply=False, p=0.5): |
| self.angle = [-1, 1] if angle is None else angle |
| self.axis = axis |
| self.always_apply = always_apply |
| self.p = p if not self.always_apply else 1 |
| self.center = center |
|
|
| def __call__(self, data_dict): |
| if random.random() > self.p: |
| return data_dict |
| angle = np.random.uniform(self.angle[0], self.angle[1]) * np.pi |
| rot_cos, rot_sin = np.cos(angle), np.sin(angle) |
| if self.axis == "x": |
| rot_t = np.array([[1, 0, 0], [0, rot_cos, -rot_sin], [0, rot_sin, rot_cos]]) |
| elif self.axis == "y": |
| rot_t = np.array([[rot_cos, 0, rot_sin], [0, 1, 0], [-rot_sin, 0, rot_cos]]) |
| elif self.axis == "z": |
| rot_t = np.array([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0], [0, 0, 1]]) |
| else: |
| raise NotImplementedError |
| if "coord" in data_dict.keys(): |
| if self.center is None: |
| x_min, y_min, z_min = data_dict["coord"].min(axis=0) |
| x_max, y_max, z_max = data_dict["coord"].max(axis=0) |
| center = [(x_min + x_max) / 2, (y_min + y_max) / 2, (z_min + z_max) / 2] |
| else: |
| center = self.center |
| data_dict["coord"] -= center |
| data_dict["coord"] = np.dot(data_dict["coord"], np.transpose(rot_t)) |
| data_dict["coord"] += center |
| if "normal" in data_dict.keys(): |
| data_dict["normal"] = np.dot(data_dict["normal"], np.transpose(rot_t)) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomRotateTargetAngle(object): |
| def __init__( |
| self, angle=(1 / 2, 1, 3 / 2), center=None, axis="z", always_apply=False, p=0.75 |
| ): |
| self.angle = angle |
| self.axis = axis |
| self.always_apply = always_apply |
| self.p = p if not self.always_apply else 1 |
| self.center = center |
|
|
| def __call__(self, data_dict): |
| if random.random() > self.p: |
| return data_dict |
| angle = np.random.choice(self.angle) * np.pi |
| rot_cos, rot_sin = np.cos(angle), np.sin(angle) |
| if self.axis == "x": |
| rot_t = np.array([[1, 0, 0], [0, rot_cos, -rot_sin], [0, rot_sin, rot_cos]]) |
| elif self.axis == "y": |
| rot_t = np.array([[rot_cos, 0, rot_sin], [0, 1, 0], [-rot_sin, 0, rot_cos]]) |
| elif self.axis == "z": |
| rot_t = np.array([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0], [0, 0, 1]]) |
| else: |
| raise NotImplementedError |
| if "coord" in data_dict.keys(): |
| if self.center is None: |
| x_min, y_min, z_min = data_dict["coord"].min(axis=0) |
| x_max, y_max, z_max = data_dict["coord"].max(axis=0) |
| center = [(x_min + x_max) / 2, (y_min + y_max) / 2, (z_min + z_max) / 2] |
| else: |
| center = self.center |
| data_dict["coord"] -= center |
| data_dict["coord"] = np.dot(data_dict["coord"], np.transpose(rot_t)) |
| data_dict["coord"] += center |
| if "normal" in data_dict.keys(): |
| data_dict["normal"] = np.dot(data_dict["normal"], np.transpose(rot_t)) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomScale(object): |
| def __init__(self, scale=None, anisotropic=False): |
| self.scale = scale if scale is not None else [0.95, 1.05] |
| self.anisotropic = anisotropic |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| scale = np.random.uniform( |
| self.scale[0], self.scale[1], 3 if self.anisotropic else 1 |
| ) |
| data_dict["coord"] *= scale |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomFlip(object): |
| def __init__(self, p=0.5): |
| self.p = p |
|
|
| def __call__(self, data_dict): |
| if np.random.rand() < self.p: |
| if "coord" in data_dict.keys(): |
| data_dict["coord"][:, 0] = -data_dict["coord"][:, 0] |
| if "normal" in data_dict.keys(): |
| data_dict["normal"][:, 0] = -data_dict["normal"][:, 0] |
| if np.random.rand() < self.p: |
| if "coord" in data_dict.keys(): |
| data_dict["coord"][:, 1] = -data_dict["coord"][:, 1] |
| if "normal" in data_dict.keys(): |
| data_dict["normal"][:, 1] = -data_dict["normal"][:, 1] |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomJitter(object): |
| def __init__(self, sigma=0.01, clip=0.05): |
| assert clip > 0 |
| self.sigma = sigma |
| self.clip = clip |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| jitter = np.clip( |
| self.sigma * np.random.randn(data_dict["coord"].shape[0], 3), |
| -self.clip, |
| self.clip, |
| ) |
| data_dict["coord"] += jitter |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ClipGaussianJitter(object): |
| def __init__(self, scalar=0.02, store_jitter=False): |
| self.scalar = scalar |
| self.mean = np.mean(3) |
| self.cov = np.identity(3) |
| self.quantile = 1.96 |
| self.store_jitter = store_jitter |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys(): |
| jitter = np.random.multivariate_normal( |
| self.mean, self.cov, data_dict["coord"].shape[0] |
| ) |
| jitter = self.scalar * np.clip(jitter / 1.96, -1, 1) |
| data_dict["coord"] += jitter |
| if self.store_jitter: |
| data_dict["jitter"] = jitter |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ChromaticAutoContrast(object): |
| def __init__(self, p=0.2, blend_factor=None): |
| self.p = p |
| self.blend_factor = blend_factor |
|
|
| def __call__(self, data_dict): |
| if "color" in data_dict.keys() and np.random.rand() < self.p: |
| lo = np.min(data_dict["color"], 0, keepdims=True) |
| hi = np.max(data_dict["color"], 0, keepdims=True) |
| scale = 255 / (hi - lo) |
| contrast_feat = (data_dict["color"][:, :3] - lo) * scale |
| blend_factor = ( |
| np.random.rand() if self.blend_factor is None else self.blend_factor |
| ) |
| data_dict["color"][:, :3] = (1 - blend_factor) * data_dict["color"][ |
| :, :3 |
| ] + blend_factor * contrast_feat |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ChromaticTranslation(object): |
| def __init__(self, p=0.95, ratio=0.05): |
| self.p = p |
| self.ratio = ratio |
|
|
| def __call__(self, data_dict): |
| if "color" in data_dict.keys() and np.random.rand() < self.p: |
| tr = (np.random.rand(1, 3) - 0.5) * 255 * 2 * self.ratio |
| data_dict["color"][:, :3] = np.clip(tr + data_dict["color"][:, :3], 0, 255) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ChromaticJitter(object): |
| def __init__(self, p=0.95, std=0.005): |
| self.p = p |
| self.std = std |
|
|
| def __call__(self, data_dict): |
| if "color" in data_dict.keys() and np.random.rand() < self.p: |
| noise = np.random.randn(data_dict["color"].shape[0], 3) |
| noise *= self.std * 255 |
| data_dict["color"][:, :3] = np.clip( |
| noise + data_dict["color"][:, :3], 0, 255 |
| ) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomColorGrayScale(object): |
| def __init__(self, p): |
| self.p = p |
|
|
| @staticmethod |
| def rgb_to_grayscale(color, num_output_channels=1): |
| if color.shape[-1] < 3: |
| raise TypeError( |
| "Input color should have at least 3 dimensions, but found {}".format( |
| color.shape[-1] |
| ) |
| ) |
|
|
| if num_output_channels not in (1, 3): |
| raise ValueError("num_output_channels should be either 1 or 3") |
|
|
| r, g, b = color[..., 0], color[..., 1], color[..., 2] |
| gray = (0.2989 * r + 0.587 * g + 0.114 * b).astype(color.dtype) |
| gray = np.expand_dims(gray, axis=-1) |
|
|
| if num_output_channels == 3: |
| gray = np.broadcast_to(gray, color.shape) |
|
|
| return gray |
|
|
| def __call__(self, data_dict): |
| if np.random.rand() < self.p: |
| data_dict["color"] = self.rgb_to_grayscale(data_dict["color"], 3) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomColorJitter(object): |
| """ |
| Random Color Jitter for 3D point cloud (refer torchvision) |
| """ |
|
|
| def __init__(self, brightness=0, contrast=0, saturation=0, hue=0, p=0.95): |
| self.brightness = self._check_input(brightness, "brightness") |
| self.contrast = self._check_input(contrast, "contrast") |
| self.saturation = self._check_input(saturation, "saturation") |
| self.hue = self._check_input( |
| hue, "hue", center=0, bound=(-0.5, 0.5), clip_first_on_zero=False |
| ) |
| self.p = p |
|
|
| @staticmethod |
| def _check_input( |
| value, name, center=1, bound=(0, float("inf")), clip_first_on_zero=True |
| ): |
| if isinstance(value, numbers.Number): |
| if value < 0: |
| raise ValueError( |
| "If {} is a single number, it must be non negative.".format(name) |
| ) |
| value = [center - float(value), center + float(value)] |
| if clip_first_on_zero: |
| value[0] = max(value[0], 0.0) |
| elif isinstance(value, (tuple, list)) and len(value) == 2: |
| if not bound[0] <= value[0] <= value[1] <= bound[1]: |
| raise ValueError("{} values should be between {}".format(name, bound)) |
| else: |
| raise TypeError( |
| "{} should be a single number or a list/tuple with length 2.".format( |
| name |
| ) |
| ) |
|
|
| |
| |
| if value[0] == value[1] == center: |
| value = None |
| return value |
|
|
| @staticmethod |
| def blend(color1, color2, ratio): |
| ratio = float(ratio) |
| bound = 255.0 |
| return ( |
| (ratio * color1 + (1.0 - ratio) * color2) |
| .clip(0, bound) |
| .astype(color1.dtype) |
| ) |
|
|
| @staticmethod |
| def rgb2hsv(rgb): |
| r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] |
| maxc = np.max(rgb, axis=-1) |
| minc = np.min(rgb, axis=-1) |
| eqc = maxc == minc |
| cr = maxc - minc |
| s = cr / (np.ones_like(maxc) * eqc + maxc * (1 - eqc)) |
| cr_divisor = np.ones_like(maxc) * eqc + cr * (1 - eqc) |
| rc = (maxc - r) / cr_divisor |
| gc = (maxc - g) / cr_divisor |
| bc = (maxc - b) / cr_divisor |
|
|
| hr = (maxc == r) * (bc - gc) |
| hg = ((maxc == g) & (maxc != r)) * (2.0 + rc - bc) |
| hb = ((maxc != g) & (maxc != r)) * (4.0 + gc - rc) |
| h = hr + hg + hb |
| h = (h / 6.0 + 1.0) % 1.0 |
| return np.stack((h, s, maxc), axis=-1) |
|
|
| @staticmethod |
| def hsv2rgb(hsv): |
| h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
| i = np.floor(h * 6.0) |
| f = (h * 6.0) - i |
| i = i.astype(np.int32) |
|
|
| p = np.clip((v * (1.0 - s)), 0.0, 1.0) |
| q = np.clip((v * (1.0 - s * f)), 0.0, 1.0) |
| t = np.clip((v * (1.0 - s * (1.0 - f))), 0.0, 1.0) |
| i = i % 6 |
| mask = np.expand_dims(i, axis=-1) == np.arange(6) |
|
|
| a1 = np.stack((v, q, p, p, t, v), axis=-1) |
| a2 = np.stack((t, v, v, q, p, p), axis=-1) |
| a3 = np.stack((p, p, t, v, v, q), axis=-1) |
| a4 = np.stack((a1, a2, a3), axis=-1) |
|
|
| return np.einsum("...na, ...nab -> ...nb", mask.astype(hsv.dtype), a4) |
|
|
| def adjust_brightness(self, color, brightness_factor): |
| if brightness_factor < 0: |
| raise ValueError( |
| "brightness_factor ({}) is not non-negative.".format(brightness_factor) |
| ) |
|
|
| return self.blend(color, np.zeros_like(color), brightness_factor) |
|
|
| def adjust_contrast(self, color, contrast_factor): |
| if contrast_factor < 0: |
| raise ValueError( |
| "contrast_factor ({}) is not non-negative.".format(contrast_factor) |
| ) |
| mean = np.mean(RandomColorGrayScale.rgb_to_grayscale(color)) |
| return self.blend(color, mean, contrast_factor) |
|
|
| def adjust_saturation(self, color, saturation_factor): |
| if saturation_factor < 0: |
| raise ValueError( |
| "saturation_factor ({}) is not non-negative.".format(saturation_factor) |
| ) |
| gray = RandomColorGrayScale.rgb_to_grayscale(color) |
| return self.blend(color, gray, saturation_factor) |
|
|
| def adjust_hue(self, color, hue_factor): |
| if not (-0.5 <= hue_factor <= 0.5): |
| raise ValueError( |
| "hue_factor ({}) is not in [-0.5, 0.5].".format(hue_factor) |
| ) |
| orig_dtype = color.dtype |
| hsv = self.rgb2hsv(color / 255.0) |
| h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
| h = (h + hue_factor) % 1.0 |
| hsv = np.stack((h, s, v), axis=-1) |
| color_hue_adj = (self.hsv2rgb(hsv) * 255.0).astype(orig_dtype) |
| return color_hue_adj |
|
|
| @staticmethod |
| def get_params(brightness, contrast, saturation, hue): |
| fn_idx = torch.randperm(4) |
| b = ( |
| None |
| if brightness is None |
| else np.random.uniform(brightness[0], brightness[1]) |
| ) |
| c = None if contrast is None else np.random.uniform(contrast[0], contrast[1]) |
| s = ( |
| None |
| if saturation is None |
| else np.random.uniform(saturation[0], saturation[1]) |
| ) |
| h = None if hue is None else np.random.uniform(hue[0], hue[1]) |
| return fn_idx, b, c, s, h |
|
|
| def __call__(self, data_dict): |
| ( |
| fn_idx, |
| brightness_factor, |
| contrast_factor, |
| saturation_factor, |
| hue_factor, |
| ) = self.get_params(self.brightness, self.contrast, self.saturation, self.hue) |
|
|
| for fn_id in fn_idx: |
| if ( |
| fn_id == 0 |
| and brightness_factor is not None |
| and np.random.rand() < self.p |
| ): |
| data_dict["color"] = self.adjust_brightness( |
| data_dict["color"], brightness_factor |
| ) |
| elif ( |
| fn_id == 1 and contrast_factor is not None and np.random.rand() < self.p |
| ): |
| data_dict["color"] = self.adjust_contrast( |
| data_dict["color"], contrast_factor |
| ) |
| elif ( |
| fn_id == 2 |
| and saturation_factor is not None |
| and np.random.rand() < self.p |
| ): |
| data_dict["color"] = self.adjust_saturation( |
| data_dict["color"], saturation_factor |
| ) |
| elif fn_id == 3 and hue_factor is not None and np.random.rand() < self.p: |
| data_dict["color"] = self.adjust_hue(data_dict["color"], hue_factor) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class HueSaturationTranslation(object): |
| @staticmethod |
| def rgb_to_hsv(rgb): |
| |
| |
| |
| rgb = rgb.astype("float") |
| hsv = np.zeros_like(rgb) |
| |
| hsv[..., 3:] = rgb[..., 3:] |
| r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] |
| maxc = np.max(rgb[..., :3], axis=-1) |
| minc = np.min(rgb[..., :3], axis=-1) |
| hsv[..., 2] = maxc |
| mask = maxc != minc |
| hsv[mask, 1] = (maxc - minc)[mask] / maxc[mask] |
| rc = np.zeros_like(r) |
| gc = np.zeros_like(g) |
| bc = np.zeros_like(b) |
| rc[mask] = (maxc - r)[mask] / (maxc - minc)[mask] |
| gc[mask] = (maxc - g)[mask] / (maxc - minc)[mask] |
| bc[mask] = (maxc - b)[mask] / (maxc - minc)[mask] |
| hsv[..., 0] = np.select( |
| [r == maxc, g == maxc], [bc - gc, 2.0 + rc - bc], default=4.0 + gc - rc |
| ) |
| hsv[..., 0] = (hsv[..., 0] / 6.0) % 1.0 |
| return hsv |
|
|
| @staticmethod |
| def hsv_to_rgb(hsv): |
| |
| |
| |
| |
| rgb = np.empty_like(hsv) |
| rgb[..., 3:] = hsv[..., 3:] |
| h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
| i = (h * 6.0).astype("uint8") |
| f = (h * 6.0) - i |
| p = v * (1.0 - s) |
| q = v * (1.0 - s * f) |
| t = v * (1.0 - s * (1.0 - f)) |
| i = i % 6 |
| conditions = [s == 0.0, i == 1, i == 2, i == 3, i == 4, i == 5] |
| rgb[..., 0] = np.select(conditions, [v, q, p, p, t, v], default=v) |
| rgb[..., 1] = np.select(conditions, [v, v, v, q, p, p], default=t) |
| rgb[..., 2] = np.select(conditions, [v, p, t, v, v, q], default=p) |
| return rgb.astype("uint8") |
|
|
| def __init__(self, hue_max=0.5, saturation_max=0.2): |
| self.hue_max = hue_max |
| self.saturation_max = saturation_max |
|
|
| def __call__(self, data_dict): |
| if "color" in data_dict.keys(): |
| |
| hsv = HueSaturationTranslation.rgb_to_hsv(data_dict["color"][:, :3]) |
| hue_val = (np.random.rand() - 0.5) * 2 * self.hue_max |
| sat_ratio = 1 + (np.random.rand() - 0.5) * 2 * self.saturation_max |
| hsv[..., 0] = np.remainder(hue_val + hsv[..., 0] + 1, 1) |
| hsv[..., 1] = np.clip(sat_ratio * hsv[..., 1], 0, 1) |
| data_dict["color"][:, :3] = np.clip( |
| HueSaturationTranslation.hsv_to_rgb(hsv), 0, 255 |
| ) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class RandomColorDrop(object): |
| def __init__(self, p=0.2, color_augment=0.0): |
| self.p = p |
| self.color_augment = color_augment |
|
|
| def __call__(self, data_dict): |
| if "color" in data_dict.keys() and np.random.rand() < self.p: |
| data_dict["color"] *= self.color_augment |
| return data_dict |
|
|
| def __repr__(self): |
| return "RandomColorDrop(color_augment: {}, p: {})".format( |
| self.color_augment, self.p |
| ) |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ElasticDistortion(object): |
| def __init__(self, distortion_params=None): |
| self.distortion_params = ( |
| [[0.2, 0.4], [0.8, 1.6]] if distortion_params is None else distortion_params |
| ) |
|
|
| @staticmethod |
| def elastic_distortion(coords, granularity, magnitude): |
| """ |
| Apply elastic distortion on sparse coordinate space. |
| pointcloud: numpy array of (number of points, at least 3 spatial dims) |
| granularity: size of the noise grid (in same scale[m/cm] as the voxel grid) |
| magnitude: noise multiplier |
| """ |
| blurx = np.ones((3, 1, 1, 1)).astype("float32") / 3 |
| blury = np.ones((1, 3, 1, 1)).astype("float32") / 3 |
| blurz = np.ones((1, 1, 3, 1)).astype("float32") / 3 |
| coords_min = coords.min(0) |
|
|
| |
| noise_dim = ((coords - coords_min).max(0) // granularity).astype(int) + 3 |
| noise = np.random.randn(*noise_dim, 3).astype(np.float32) |
|
|
| |
| for _ in range(2): |
| noise = scipy.ndimage.filters.convolve( |
| noise, blurx, mode="constant", cval=0 |
| ) |
| noise = scipy.ndimage.filters.convolve( |
| noise, blury, mode="constant", cval=0 |
| ) |
| noise = scipy.ndimage.filters.convolve( |
| noise, blurz, mode="constant", cval=0 |
| ) |
|
|
| |
| ax = [ |
| np.linspace(d_min, d_max, d) |
| for d_min, d_max, d in zip( |
| coords_min - granularity, |
| coords_min + granularity * (noise_dim - 2), |
| noise_dim, |
| ) |
| ] |
| interp = scipy.interpolate.RegularGridInterpolator( |
| ax, noise, bounds_error=False, fill_value=0 |
| ) |
| coords += interp(coords) * magnitude |
| return coords |
|
|
| def __call__(self, data_dict): |
| if "coord" in data_dict.keys() and self.distortion_params is not None: |
| if random.random() < 0.95: |
| for granularity, magnitude in self.distortion_params: |
| data_dict["coord"] = self.elastic_distortion( |
| data_dict["coord"], granularity, magnitude |
| ) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class GridSample(object): |
| def __init__( |
| self, |
| grid_size=0.05, |
| hash_type="fnv", |
| mode="train", |
| return_inverse=False, |
| return_grid_coord=False, |
| return_min_coord=False, |
| return_displacement=False, |
| project_displacement=False, |
| ): |
| self.grid_size = grid_size |
| self.hash = self.fnv_hash_vec if hash_type == "fnv" else self.ravel_hash_vec |
| assert mode in ["train", "test"] |
| self.mode = mode |
| self.return_inverse = return_inverse |
| self.return_grid_coord = return_grid_coord |
| self.return_min_coord = return_min_coord |
| self.return_displacement = return_displacement |
| self.project_displacement = project_displacement |
|
|
| def __call__(self, data_dict): |
| assert "coord" in data_dict.keys() |
| scaled_coord = data_dict["coord"] / np.array(self.grid_size) |
| grid_coord = np.floor(scaled_coord).astype(int) |
| min_coord = grid_coord.min(0) |
| grid_coord -= min_coord |
| scaled_coord -= min_coord |
| min_coord = min_coord * np.array(self.grid_size) |
| key = self.hash(grid_coord) |
| idx_sort = np.argsort(key) |
| key_sort = key[idx_sort] |
| _, inverse, count = np.unique(key_sort, return_inverse=True, return_counts=True) |
| if self.mode == "train": |
| idx_select = ( |
| np.cumsum(np.insert(count, 0, 0)[0:-1]) |
| + np.random.randint(0, count.max(), count.size) % count |
| ) |
| idx_unique = idx_sort[idx_select] |
| if "sampled_index" in data_dict: |
| |
| idx_unique = np.unique( |
| np.append(idx_unique, data_dict["sampled_index"]) |
| ) |
| mask = np.zeros_like(data_dict["segment"]).astype(bool) |
| mask[data_dict["sampled_index"]] = True |
| data_dict["sampled_index"] = np.where(mask[idx_unique])[0] |
| data_dict = index_operator(data_dict, idx_unique) |
| if self.return_inverse: |
| data_dict["inverse"] = np.zeros_like(inverse) |
| data_dict["inverse"][idx_sort] = inverse |
| if self.return_grid_coord: |
| data_dict["grid_coord"] = grid_coord[idx_unique] |
| if "grid_coord" not in data_dict["index_valid_keys"]: |
| data_dict["index_valid_keys"].append("grid_coord") |
| if self.return_min_coord: |
| data_dict["min_coord"] = min_coord.reshape([1, 3]) |
| if self.return_displacement: |
| displacement = ( |
| scaled_coord - grid_coord - 0.5 |
| ) |
| if self.project_displacement: |
| displacement = np.sum( |
| displacement * data_dict["normal"], axis=-1, keepdims=True |
| ) |
| data_dict["displacement"] = displacement[idx_unique] |
| if "displacement" not in data_dict["index_valid_keys"]: |
| data_dict["index_valid_keys"].append("displacement") |
| return data_dict |
|
|
| elif self.mode == "test": |
| data_part_list = [] |
| for i in range(count.max()): |
| idx_select = np.cumsum(np.insert(count, 0, 0)[0:-1]) + i % count |
| idx_part = idx_sort[idx_select] |
| data_part = index_operator(data_dict, idx_part, duplicate=True) |
| data_part["index"] = idx_part |
| if self.return_inverse: |
| data_part["inverse"] = np.zeros_like(inverse) |
| data_part["inverse"][idx_sort] = inverse |
| if self.return_grid_coord: |
| data_part["grid_coord"] = grid_coord[idx_part] |
| if "grid_coord" not in data_part["index_valid_keys"]: |
| data_part["index_valid_keys"].append("grid_coord") |
| if self.return_min_coord: |
| data_part["min_coord"] = min_coord.reshape([1, 3]) |
| if self.return_displacement: |
| displacement = ( |
| scaled_coord - grid_coord - 0.5 |
| ) |
| if self.project_displacement: |
| displacement = np.sum( |
| displacement * data_dict["normal"], axis=-1, keepdims=True |
| ) |
| data_part["displacement"] = displacement[idx_part] |
| if "displacement" not in data_part["index_valid_keys"]: |
| data_part["index_valid_keys"].append("displacement") |
| data_part_list.append(data_part) |
| return data_part_list |
| else: |
| raise NotImplementedError |
|
|
| @staticmethod |
| def ravel_hash_vec(arr): |
| """ |
| Ravel the coordinates after subtracting the min coordinates. |
| """ |
| assert arr.ndim == 2 |
| arr = arr.copy() |
| arr -= arr.min(0) |
| arr = arr.astype(np.uint64, copy=False) |
| arr_max = arr.max(0).astype(np.uint64) + 1 |
|
|
| keys = np.zeros(arr.shape[0], dtype=np.uint64) |
| |
| for j in range(arr.shape[1] - 1): |
| keys += arr[:, j] |
| keys *= arr_max[j + 1] |
| keys += arr[:, -1] |
| return keys |
|
|
| @staticmethod |
| def fnv_hash_vec(arr): |
| """ |
| FNV64-1A |
| """ |
| assert arr.ndim == 2 |
| |
| arr = arr.copy() |
| arr = arr.astype(np.uint64, copy=False) |
| hashed_arr = np.uint64(14695981039346656037) * np.ones( |
| arr.shape[0], dtype=np.uint64 |
| ) |
| for j in range(arr.shape[1]): |
| hashed_arr *= np.uint64(1099511628211) |
| hashed_arr = np.bitwise_xor(hashed_arr, arr[:, j]) |
| return hashed_arr |
|
|
|
|
| @TRANSFORMS.register_module() |
| class SphereCrop(object): |
| def __init__(self, point_max=80000, sample_rate=None, mode="random"): |
| self.point_max = point_max |
| self.sample_rate = sample_rate |
| assert mode in ["random", "center", "all"] |
| self.mode = mode |
|
|
| def __call__(self, data_dict): |
| point_max = ( |
| int(self.sample_rate * data_dict["coord"].shape[0]) |
| if self.sample_rate is not None |
| else self.point_max |
| ) |
|
|
| assert "coord" in data_dict.keys() |
| if data_dict["coord"].shape[0] > point_max: |
| if self.mode == "random": |
| center = data_dict["coord"][ |
| np.random.randint(data_dict["coord"].shape[0]) |
| ] |
| elif self.mode == "center": |
| center = data_dict["coord"][data_dict["coord"].shape[0] // 2] |
| else: |
| raise NotImplementedError |
| idx_crop = np.argsort(np.sum(np.square(data_dict["coord"] - center), 1))[ |
| :point_max |
| ] |
| data_dict = index_operator(data_dict, idx_crop) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ClassAwareSphereCrop(object): |
| def __init__( |
| self, |
| point_max=80000, |
| sample_rate=None, |
| mode="random", |
| target_classes=(3, 4, 6), |
| class_aware_prob=0.5, |
| min_target_points=32, |
| class_sampling_weights=None, |
| ): |
| self.point_max = point_max |
| self.sample_rate = sample_rate |
| assert mode in ["random", "center"] |
| self.mode = mode |
| self.target_classes = tuple(int(x) for x in target_classes) |
| self.class_aware_prob = float(class_aware_prob) |
| self.min_target_points = int(min_target_points) |
| self.class_sampling_weights = class_sampling_weights |
|
|
| def _sample_center(self, data_dict): |
| coord = data_dict["coord"] |
| segment = data_dict.get("segment") |
| if ( |
| segment is None |
| or len(self.target_classes) == 0 |
| or np.random.rand() >= self.class_aware_prob |
| ): |
| return None |
| segment = segment.reshape(-1) |
| candidates = [] |
| candidate_weights = [] |
| for i, cls in enumerate(self.target_classes): |
| cls_idx = np.flatnonzero(segment == cls) |
| if cls_idx.size >= self.min_target_points: |
| candidates.append(cls_idx) |
| if self.class_sampling_weights is not None: |
| candidate_weights.append(float(self.class_sampling_weights[i])) |
| else: |
| candidate_weights.append(1.0) |
| if not candidates: |
| return None |
| weights = np.asarray(candidate_weights, dtype=np.float64) |
| weights = weights / np.clip(weights.sum(), 1e-12, None) |
| chosen = candidates[np.random.choice(len(candidates), p=weights)] |
| center_idx = chosen[np.random.randint(chosen.size)] |
| return coord[center_idx] |
|
|
| def __call__(self, data_dict): |
| point_max = ( |
| int(self.sample_rate * data_dict["coord"].shape[0]) |
| if self.sample_rate is not None |
| else self.point_max |
| ) |
|
|
| assert "coord" in data_dict.keys() |
| if data_dict["coord"].shape[0] > point_max: |
| center = self._sample_center(data_dict) |
| if center is None: |
| if self.mode == "random": |
| center = data_dict["coord"][ |
| np.random.randint(data_dict["coord"].shape[0]) |
| ] |
| elif self.mode == "center": |
| center = data_dict["coord"][data_dict["coord"].shape[0] // 2] |
| else: |
| raise NotImplementedError |
| idx_crop = np.argsort(np.sum(np.square(data_dict["coord"] - center), 1))[ |
| :point_max |
| ] |
| data_dict = index_operator(data_dict, idx_crop) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ShufflePoint(object): |
| def __call__(self, data_dict): |
| assert "coord" in data_dict.keys() |
| shuffle_index = np.arange(data_dict["coord"].shape[0]) |
| np.random.shuffle(shuffle_index) |
| data_dict = index_operator(data_dict, shuffle_index) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class CropBoundary(object): |
| def __call__(self, data_dict): |
| assert "segment" in data_dict |
| segment = data_dict["segment"].flatten() |
| mask = (segment != 0) * (segment != 1) |
| data_dict = index_operator(data_dict, mask) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class ContrastiveViewsGenerator(object): |
| def __init__( |
| self, |
| view_keys=("coord", "color", "normal", "origin_coord"), |
| view_trans_cfg=None, |
| ): |
| self.view_keys = view_keys |
| self.view_trans = Compose(view_trans_cfg) |
|
|
| def __call__(self, data_dict): |
| view1_dict = dict() |
| view2_dict = dict() |
| for key in self.view_keys: |
| view1_dict[key] = data_dict[key].copy() |
| view2_dict[key] = data_dict[key].copy() |
| view1_dict = self.view_trans(view1_dict) |
| view2_dict = self.view_trans(view2_dict) |
| for key, value in view1_dict.items(): |
| data_dict["view1_" + key] = value |
| for key, value in view2_dict.items(): |
| data_dict["view2_" + key] = value |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class MultiViewGenerator(object): |
| def __init__( |
| self, |
| global_view_num=2, |
| global_view_scale=(0.4, 1.0), |
| local_view_num=4, |
| local_view_scale=(0.1, 0.4), |
| global_shared_transform=None, |
| global_transform=None, |
| local_transform=None, |
| max_size=65536, |
| center_height_scale=(0, 1), |
| shared_global_view=False, |
| view_keys=("coord", "origin_coord", "color", "normal"), |
| ): |
| self.global_view_num = global_view_num |
| self.global_view_scale = global_view_scale |
| self.local_view_num = local_view_num |
| self.local_view_scale = local_view_scale |
| self.global_shared_transform = Compose(global_shared_transform) |
| self.global_transform = Compose(global_transform) |
| self.local_transform = Compose(local_transform) |
| self.max_size = max_size |
| self.center_height_scale = center_height_scale |
| self.shared_global_view = shared_global_view |
| self.view_keys = view_keys |
| assert "coord" in view_keys |
|
|
| def get_view(self, point, center, scale): |
| coord = point["coord"] |
| max_size = min(self.max_size, coord.shape[0]) |
| size = int(np.random.uniform(*scale) * max_size) |
| index = np.argsort(np.sum(np.square(coord - center), axis=-1))[:size] |
| view = dict(index=index) |
| for key in point.keys(): |
| if key in self.view_keys: |
| view[key] = point[key][index] |
|
|
| if "index_valid_keys" in point.keys(): |
| |
| view["index_valid_keys"] = point["index_valid_keys"] |
| return view |
|
|
| def __call__(self, data_dict): |
| coord = data_dict["coord"] |
| point = self.global_shared_transform(copy.deepcopy(data_dict)) |
| z_min = coord[:, 2].min() |
| z_max = coord[:, 2].max() |
| z_min_ = z_min + (z_max - z_min) * self.center_height_scale[0] |
| z_max_ = z_min + (z_max - z_min) * self.center_height_scale[1] |
| center_mask = np.logical_and(coord[:, 2] >= z_min_, coord[:, 2] <= z_max_) |
| |
| major_center = coord[np.random.choice(np.where(center_mask)[0])] |
| major_view = self.get_view(point, major_center, self.global_view_scale) |
| major_coord = major_view["coord"] |
| |
| if not self.shared_global_view: |
| global_views = [ |
| self.get_view( |
| point=point, |
| center=major_coord[np.random.randint(major_coord.shape[0])], |
| scale=self.global_view_scale, |
| ) |
| for _ in range(self.global_view_num - 1) |
| ] |
| else: |
| global_views = [ |
| {key: value.copy() for key, value in major_view.items()} |
| for _ in range(self.global_view_num - 1) |
| ] |
|
|
| global_views = [major_view] + global_views |
|
|
| |
| cover_mask = np.zeros_like(major_view["index"], dtype=bool) |
| local_views = [] |
| for i in range(self.local_view_num): |
| if sum(~cover_mask) == 0: |
| |
| cover_mask[:] = False |
| local_view = self.get_view( |
| point=data_dict, |
| center=major_coord[np.random.choice(np.where(~cover_mask)[0])], |
| scale=self.local_view_scale, |
| ) |
| local_views.append(local_view) |
| cover_mask[np.isin(major_view["index"], local_view["index"])] = True |
|
|
| |
| view_dict = {} |
| for global_view in global_views: |
| global_view.pop("index") |
| global_view = self.global_transform(global_view) |
| for key in self.view_keys: |
| if f"global_{key}" in view_dict.keys(): |
| view_dict[f"global_{key}"].append(global_view[key]) |
| else: |
| view_dict[f"global_{key}"] = [global_view[key]] |
| view_dict["global_offset"] = np.cumsum( |
| [data.shape[0] for data in view_dict["global_coord"]] |
| ) |
| for local_view in local_views: |
| local_view.pop("index") |
| local_view = self.local_transform(local_view) |
| for key in self.view_keys: |
| if f"local_{key}" in view_dict.keys(): |
| view_dict[f"local_{key}"].append(local_view[key]) |
| else: |
| view_dict[f"local_{key}"] = [local_view[key]] |
| view_dict["local_offset"] = np.cumsum( |
| [data.shape[0] for data in view_dict["local_coord"]] |
| ) |
| for key in view_dict.keys(): |
| if "offset" not in key: |
| view_dict[key] = np.concatenate(view_dict[key], axis=0) |
| data_dict.update(view_dict) |
| return data_dict |
|
|
|
|
| @TRANSFORMS.register_module() |
| class InstanceParser(object): |
| def __init__(self, segment_ignore_index=(-1, 0, 1), instance_ignore_index=-1): |
| self.segment_ignore_index = segment_ignore_index |
| self.instance_ignore_index = instance_ignore_index |
|
|
| def __call__(self, data_dict): |
| coord = data_dict["coord"] |
| segment = data_dict["segment"] |
| instance = data_dict["instance"] |
| mask = ~np.in1d(segment, self.segment_ignore_index) |
| |
| instance[~mask] = self.instance_ignore_index |
| |
| unique, inverse = np.unique(instance[mask], return_inverse=True) |
| instance_num = len(unique) |
| instance[mask] = inverse |
| |
| centroid = np.ones((coord.shape[0], 3)) * self.instance_ignore_index |
| bbox = np.ones((instance_num, 8)) * self.instance_ignore_index |
| vacancy = [ |
| index for index in self.segment_ignore_index if index >= 0 |
| ] |
|
|
| for instance_id in range(instance_num): |
| mask_ = instance == instance_id |
| coord_ = coord[mask_] |
| bbox_min = coord_.min(0) |
| bbox_max = coord_.max(0) |
| bbox_centroid = coord_.mean(0) |
| bbox_center = (bbox_max + bbox_min) / 2 |
| bbox_size = bbox_max - bbox_min |
| bbox_theta = np.zeros(1, dtype=coord_.dtype) |
| bbox_class = np.array([segment[mask_][0]], dtype=coord_.dtype) |
| |
| bbox_class -= np.greater(bbox_class, vacancy).sum() |
|
|
| centroid[mask_] = bbox_centroid |
| bbox[instance_id] = np.concatenate( |
| [bbox_center, bbox_size, bbox_theta, bbox_class] |
| ) |
| data_dict["instance"] = instance |
| data_dict["instance_centroid"] = centroid |
| data_dict["bbox"] = bbox |
| return data_dict |
|
|
|
|
| class Compose(object): |
| def __init__(self, cfg=None): |
| self.cfg = cfg if cfg is not None else [] |
| self.transforms = [] |
| for t_cfg in self.cfg: |
| self.transforms.append(TRANSFORMS.build(t_cfg)) |
|
|
| def __call__(self, data_dict): |
| for t in self.transforms: |
| data_dict = t(data_dict) |
| return data_dict |
|
|