import torch import numpy as np import pgeof from src.utils import rgb2hsv, rgb2lab, sizes_to_pointers, to_float_rgb, \ POINT_FEATURES, sanitize_keys, filter_by_z_distance_of_global_min, \ filter_by_local_z_min, filter_by_verticality, single_plane_model, \ neighbor_interpolation_model, mlp_model from src.transforms import Transform from src.data import NAG __all__ = [ 'PointFeatures', 'GroundElevation', 'RoomPosition', 'ColorAutoContrast', 'NAGColorAutoContrast', 'ColorDrop', 'NAGColorDrop', 'ColorNormalize', 'NAGColorNormalize'] class PointFeatures(Transform): """Compute pointwise features based on what is already available in the Data object. All local geometric features assume the input ``Data`` has a ``neighbors`` attribute, holding a ``(num_nodes, k)`` tensor of indices. All k neighbors will be used for local geometric features computation, unless some are missing (indicated by -1 indices). If the latter, only positive indices will be used. The supported feature keys are the following: - rgb: RGB color. Assumes Data.rgb holds either [0, 1] floats or [0, 255] integers - hsv: HSV color. Assumes Data.rgb holds either [0, 1] floats or [0, 255] integers - lab: LAB color. Assumes Data.rgb holds either [0, 1] floats or [0, 255] integers - density: local density. Assumes ``Data.neighbor_index`` and ``Data.neighbor_distance`` - linearity: local linearity. Assumes ``Data.neighbor_index`` - planarity: local planarity. Assumes ``Data.neighbor_index`` - scattering: local scattering. Assumes ``Data.neighbor_index`` - verticality: local verticality. Assumes ``Data.neighbor_index`` - normal: local normal. Assumes ``Data.neighbor_index`` - length: local length. Assumes ``Data.neighbor_index`` - surface: local surface. Assumes ``Data.neighbor_index`` - volume: local volume. Assumes ``Data.neighbor_index`` - curvature: local curvature. Assumes ``Data.neighbor_index`` :param keys: List(str) Features to be computed. Attributes will be saved under `` :param k_min: int Minimum number of neighbors to consider for geometric features computation. Points with less than k_min neighbors will receive 0-features. Assumes ``Data.neighbor_index``. :param k_step: int Step size to take when searching for the optimal neighborhood size following: http://lareg.ensg.eu/labos/matis/pdf/articles_revues/2015/isprs_wjhm_15.pdf If k_step < 1, the optimal neighborhood will be computed based on all the neighbors available for each point. :param k_min_search: int Minimum neighborhood size used when searching the optimal neighborhood size. It is advised to use a value of 10 or higher. :param overwrite: bool When False, attributes of the input Data which are in `keys` will not be updated with the here-computed features. An exception to this rule is 'rgb' for which we always enforce [0, 1] float encoding """ def __init__( self, keys=None, k_min=5, k_step=-1, k_min_search=25, overwrite=True): self.keys = sanitize_keys(keys, default=POINT_FEATURES) self.k_min = k_min self.k_step = k_step self.k_min_search = k_min_search self.overwrite = overwrite def _process(self, data): assert data.has_neighbors, \ "Data is expected to have a 'neighbor_index' attribute" assert data.num_nodes < np.iinfo(np.uint32).max, \ "Too many nodes for `uint32` indices" assert data.neighbor_index.max() < np.iinfo(np.uint32).max, \ "Too high 'neighbor_index' indices for `uint32` indices" # Build the set of keys that must be computed/updated. In # particular, if `overwrite=False`, we do not modify # already-existing keys in the input Data. Except for 'rgb', for # which we always enforce [0, 1] float encoding keys = set(self.keys) if self.overwrite \ else set(self.keys) - set(data.keys) # Add RGB to the features. If colors are stored in int, we # assume they are encoded in [0, 255] and normalize them. # Otherwise, we assume they have already been [0, 1] normalized # NB: we ignore 'overwrite' for this key if 'rgb' in self.keys and data.rgb is not None: data.rgb = to_float_rgb(data.rgb) # Add HSV to the features. If colors are stored in int, we # assume they are encoded in [0, 255] and normalize them. # Otherwise, we assume they have already been [0, 1] normalized. # Note: for all features to live in a similar range, we # normalize H in [0, 1] if 'hsv' in keys and data.rgb is not None: hsv = rgb2hsv(to_float_rgb(data.rgb)) hsv[:, 0] /= 360. data.hsv = hsv # Add LAB to the features. If colors are stored in int, we # assume they are encoded in [0, 255] and normalize them. # Otherwise, we assume they have already been [0, 1] normalized. # Note: for all features to live in a similar range, we # normalize L in [0, 1] and ab in [-1, 1] if 'lab' in keys and data.rgb is not None: data.lab = rgb2lab(to_float_rgb(data.rgb)) / 100 # Add local surfacic density to the features. The local density # is approximated as K / D² where K is the number of nearest # neighbors and D is the distance of the Kth neighbor. We # normalize by D² since points roughly lie on a 2D manifold. # Note that this takes into account partial neighborhoods where # -1 indicates absent neighbors if 'density' in keys: dmax = data.neighbor_distance.max(dim=1).values k = data.neighbor_index.ge(0).sum(dim=1) data.density = (k / dmax ** 2).view(-1, 1) # Add local geometric features needs_geof = any(( 'linearity' in keys, 'planarity' in keys, 'scattering' in keys, 'verticality' in keys, 'normal' in keys)) if needs_geof and data.pos is not None: # Prepare data for numpy boost interface. Note: we add each # point to its own neighborhood before computation device = data.pos.device xyz = data.pos.cpu().numpy() nn = torch.cat( (torch.arange(xyz.shape[0]).view(-1, 1), data.neighbor_index), dim=1) k = nn.shape[1] # Check for missing neighbors (indicated by -1 indices) n_missing = (nn < 0).sum(dim=1) if (n_missing > 0).any(): sizes = k - n_missing nn = nn[nn >= 0] nn_ptr = sizes_to_pointers(sizes.cpu()) else: nn = nn.flatten().cpu() nn_ptr = torch.arange(xyz.shape[0] + 1) * k nn = nn.numpy().astype('uint32') nn_ptr = nn_ptr.numpy().astype('uint32') # Make sure array are contiguous before moving to C++ xyz = np.ascontiguousarray(xyz) nn = np.ascontiguousarray(nn) nn_ptr = np.ascontiguousarray(nn_ptr) # C++ geometric features computation on CPU if self.k_step < 0: f = pgeof.compute_features( xyz, nn, nn_ptr, self.k_min, verbose=False) else: f = pgeof.compute_features_optimal( xyz, nn, nn_ptr, self.k_min, self.k_step, self.k_min_search, verbose=False) f = torch.from_numpy(f) # Keep only required features if 'linearity' in keys: data.linearity = f[:, 0].view(-1, 1).to(device) if 'planarity' in keys: data.planarity = f[:, 1].view(-1, 1).to(device) if 'scattering' in keys: data.scattering = f[:, 2].view(-1, 1).to(device) # Heuristic to increase importance of verticality in # partition if 'verticality' in keys: data.verticality = f[:, 3].view(-1, 1).to(device) data.verticality *= 2 if 'curvature' in keys: data.curvature = f[:, 10].view(-1, 1).to(device) if 'length' in keys: data.length = f[:, 7].view(-1, 1).to(device) if 'surface' in keys: data.surface = f[:, 8].view(-1, 1).to(device) if 'volume' in keys: data.volume = f[:, 9].view(-1, 1).to(device) # As a way to "stabilize" the normals' orientation, we # choose to express them as oriented in the z+ half-space if 'normal' in keys: data.normal = f[:, 4:7].view(-1, 3).to(device) data.normal[data.normal[:, 2] < 0] *= -1 return data class GroundElevation(Transform): """Compute pointwise elevation with respect to the ground. We do so in a two-step process where we first remove as many potentially non-ground points as possible, before fitting a surface model to the resulting trimmed cloud to estimate the ground surface. We offer several tools for filtering out non-ground points: - filtering out all points that are higher than a given threshold above the lowest point in the cloud - filtering out all points whose local verticality (see `PointFeatures`) is above a given threshold - projecting points into a horizontal XY grid and only keeping the lowest point of each XY bin We offer several tool for modeling the ground surface from the trimmed cloud: - 'ransac': single planar surface using RANSAC - 'knn': linear interpolation of the k nearest trimmed points - 'mlp': piecewise planar approximation with an MLP :param z_threshold: float Ground points will be first searched within `global_threshold` of the lowest point in the cloud. Adjust this if the lowest point is below the ground or if you have large above-ground planar structures :param verticality_threshold: float Ground points will be searched among those with lower verticality than `verticality_threshold`. This assumes verticality has been computed beforehand using `PointFeatures`. Note that, depending on the chosen value, this will also remove steep slopes :param xy_grid: float Bin all points into a regular XY grid of size `xy_grid` and isolate as candidate ground point for each cell the one with the lowest Z value :param scale: float Scaling by which the computed elevation should be divided, for the sake of normalization :param kwargs: dict Arguments that will be passed down to the surface modeling function """ def __init__( self, z_threshold=1.5, verticality_threshold=None, xy_grid=None, model='ransac', scale=3.0, **kwargs): if verticality_threshold is not None: assert 0 < verticality_threshold < 1 if xy_grid is not None: assert xy_grid > 0 assert model in ['ransac', 'knn', 'mlp'] self.z_threshold = z_threshold self.verticality_threshold = verticality_threshold self.xy_grid = xy_grid self.model = model self.scale = scale self.kwargs = kwargs def _process(self, data): # Recover the point positions pos = data.pos # Initialize a mask for the filtering out as many non-ground # points as possible, to facilitate the subsequent search of the # ground surface in the point cloud mask = torch.ones(data.num_points, device=pos.device, dtype=torch.bool) # TODO: test all combinations on multiple devices # TODO: integrate voxelization as filtering # See `filter_by_z_distance_of_global_min` for more details if self.z_threshold is not None: mask = mask & filter_by_z_distance_of_global_min( pos, self.z_threshold) # See `filter_by_verticality` for more details if self.verticality_threshold and (0 < self.verticality_threshold < 1): if not hasattr(data, 'verticality'): raise ValueError( f"The Data object does not have a 'verticality' attribute. " f"To compute verticality, please call PointFeatures on " f"your Data first") mask = mask & filter_by_verticality( data.verticality, self.verticality_threshold) # See `filter_by_local_z_min` for more details if self.xy_grid: mask = mask & filter_by_local_z_min(pos, self.xy_grid) # Trim the point cloud based on the computed filters. We hope # that there are mostly ground points in there, but can't be # 100% sure pos_trimmed = pos[mask] # Fit a model to the trimmed points if self.model == 'ransac': model = single_plane_model(pos_trimmed, **self.kwargs) elif self.model == 'knn': model = neighbor_interpolation_model(pos_trimmed, **self.kwargs) elif self.model == 'mlp': model = mlp_model(pos_trimmed, **self.kwargs) # Compute the elevation of each point wrt the estimated ground # surface elevation = model(pos).view(-1, 1) # Scale the elevation and save it in the Data object data.elevation = elevation.view(-1, 1) / self.scale return data class RoomPosition(Transform): """Compute the pointwise normalized room coordinates, as defined in the S3DIS paper section 3.2: https://openaccess.thecvf.com/content_cvpr_2016/papers/Armeni_3D_Semantic_Parsing_CVPR_2016_paper.pdf Results will be saved in the `pos_room` attribute. NB: this is rather suited for indoor setting, with regular dimensions and not so much for open outdoor clouds with unbounded sizes. :param elevation: bool Whether the `elevation` attribute should be used to position the ground to z=0. If True, this assumes `GroundElevation` has been called previously to produce the `elevation` attribute. If False, it is assumed the ground/floor of the input cloud is already positioned at z=0 """ def __init__(self, elevation=False): self.elevation = elevation def _process(self, data): # Recover the point positions pos = data.pos.clone() # Shift ground to z=0, if required. Otherwise the ground is # assumed to be already at z=0 if self.elevation: assert getattr(data, 'elevation', None) is not None pos[:, 2] -= data.elevation # Shift XY pos[:, :2] -= pos[:, :2].min(dim=0).values.view(1, -1) # Scale XYZ based on the maximum values. i.e. the highest point # will be considered as the ceiling pos /= pos.max(dim=0).values.view(1, -1) # Save in Data attribute `pos_room` data.pos_room = pos return data class ColorTransform(Transform): """Parent class for color-based point Transforms, to avoid redundant code. :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ KEYS = ['rgb', 'lab', 'hsv'] def __init__(self, x_idx=None): self.x_idx = x_idx def _process(self, data): if self.x_idx is None: for key in self.KEYS: mean_key = f'mean_{key}' if getattr(data, key, None) is not None: data[key] = self._apply_func(data[key]) if getattr(data, mean_key, None) is not None: data[mean_key] = self._apply_func(data[mean_key]) elif self.x_idx is not None and getattr(data, 'x', None) is not None: data.x[:, self.x_idx:self.x_idx + 3] = self._apply_func( data.x[:, self.x_idx:self.x_idx + 3]) return data def _apply_func(self, rgb): return self._func(rgb) def _func(self, rgb): raise NotImplementedError class ColorAutoContrast(ColorTransform): """Apply some random contrast to the point colors. credit: https://github.com/guochengqian/openpoints :param p: float Probability of the transform to be applied :param blend: float (optional) Blend factor, controlling the contrasting intensity :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ KEYS = ['rgb'] def __init__(self, p=0.2, blend=None, x_idx=None): super().__init__(x_idx=x_idx) self.p = p self.blend = blend def _func(self, rgb): device = rgb.device if torch.rand(1, device=device) < self.p: # Compute the contrasted colors lo = rgb.min(dim=0).values.view(1, -1) hi = rgb.max(dim=0).values.view(1, -1) contrast_feat = (rgb - lo) / (hi - lo) # Blend the maximum contrast with the current color blend = torch.rand(1, device=device) \ if self.blend is None else self.blend rgb = (1 - blend) * rgb + blend * contrast_feat return rgb class NAGColorAutoContrast(ColorAutoContrast): """Apply some random contrast to the point colors. credit: https://github.com/guochengqian/openpoints :param level: int or str Level at which to remove attributes. Can be an int or a str. If the latter, 'all' will apply on all levels, 'i+' will apply on level-i and above, 'i-' will apply on level-i and below :param p: float Probability of the transform to be applied :param blend: float (optional) Blend factor, controlling the contrasting intensity :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ _IN_TYPE = NAG _OUT_TYPE = NAG KEYS = ['rgb'] def __init__(self, *args, level='all', **kwargs): super().__init__(*args, **kwargs) self.level = level def _process(self, nag): level_p = [-1] * nag.num_levels if isinstance(self.level, int): level_p[self.level] = self.p elif self.level == 'all': level_p = [self.p] * nag.num_levels elif self.level[-1] == '+': i = int(self.level[:-1]) level_p[i:] = [self.p] * (nag.num_levels - i) elif self.level[-1] == '-': i = int(self.level[:-1]) level_p[:i] = [self.p] * i else: raise ValueError(f'Unsupported level={self.level}') transforms = [ ColorAutoContrast(p=p, blend=self.blend, x_idx=self.x_idx) for p in level_p] for i_level in range(nag.num_levels): nag._list[i_level] = transforms[i_level](nag._list[i_level]) return nag class ColorDrop(ColorTransform): """Randomly set point colors to 0. :param p: float Probability of the transform to be applied :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ def __init__(self, p=0.2, x_idx=None): super().__init__(x_idx=x_idx) self.p = p def _func(self, rgb): if torch.rand(1, device=rgb.device) < self.p: rgb *= 0 return rgb class NAGColorDrop(ColorDrop): """Randomly set point colors to 0. :param level: int or str Level at which to remove attributes. Can be an int or a str. If the latter, 'all' will apply on all levels, 'i+' will apply on level-i and above, 'i-' will apply on level-i and below :param p: float Probability of the transform to be applied :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ _IN_TYPE = NAG _OUT_TYPE = NAG def __init__(self, *args, level='all', **kwargs): super().__init__(*args, **kwargs) self.level = level def _process(self, nag): level_p = [-1] * nag.num_levels if isinstance(self.level, int): level_p[self.level] = self.p elif self.level == 'all': level_p = [self.p] * nag.num_levels elif self.level[-1] == '+': i = int(self.level[:-1]) level_p[i:] = [self.p] * (nag.num_levels - i) elif self.level[-1] == '-': i = int(self.level[:-1]) level_p[:i] = [self.p] * i else: raise ValueError(f'Unsupported level={self.level}') transforms = [ColorDrop(p=p, x_idx=self.x_idx) for p in level_p] for i_level in range(nag.num_levels): nag._list[i_level] = transforms[i_level](nag._list[i_level]) return nag class ColorNormalize(ColorTransform): """Normalize the colors using given means and standard deviations. credit: https://github.com/guochengqian/openpoints :param mean: list Channel-wise means :param std: list Channel-wise standard deviations :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ KEYS = ['rgb'] def __init__( self, mean=[0.5136457, 0.49523646, 0.44921124], std=[0.18308958, 0.18415008, 0.19252081], x_idx=None): super().__init__(x_idx=x_idx) assert all(x > 0 for x in std), "std values must be >0" self.mean = mean self.std = std def _func(self, rgb): mean = torch.as_tensor(self.mean, device=rgb.device).view(1, -1) std = torch.as_tensor(self.std, device=rgb.device).view(1, -1) rgb = (rgb - mean) / std return rgb class NAGColorNormalize(ColorNormalize): """Normalize the colors using given means and standard deviations. credit: https://github.com/guochengqian/openpoints :param level: int or str Level at which to remove attributes. Can be an int or a str. If the latter, 'all' will apply on all levels, 'i+' will apply on level-i and above, 'i-' will apply on level-i and below :param mean: list Channel-wise means :param std: list Channel-wise standard deviations :param x_idx: int If specified, the colors will be searched in `data.x[:, x_idx:x_idx + 3]` instead of `data.rgb` """ _IN_TYPE = NAG _OUT_TYPE = NAG KEYS = ['rgb'] def __init__(self, *args, level='all', **kwargs): super().__init__(*args, **kwargs) self.level = level def _process(self, nag): level_mean = [[0, 0, 0]] * nag.num_levels level_std = [[1, 1, 1]] * nag.num_levels if isinstance(self.level, int): level_mean[self.level] = self.mean level_std[self.level] = self.std elif self.level == 'all': level_mean = [self.mean] * nag.num_levels level_std = [self.std] * nag.num_levels elif self.level[-1] == '+': i = int(self.level[:-1]) level_mean[i:] = [self.mean] * (nag.num_levels - i) level_std[i:] = [self.std] * (nag.num_levels - i) elif self.level[-1] == '-': i = int(self.level[:-1]) level_mean[:i] = [self.mean] * i level_std[:i] = [self.std] * i else: raise ValueError(f'Unsupported level={self.level}') transforms = [ ColorNormalize(mean=mean, std=std, x_idx=self.x_idx) for mean, std in zip(level_mean, level_std)] for i_level in range(nag.num_levels): nag._list[i_level] = transforms[i_level](nag._list[i_level]) return nag