| | import time |
| | import torch |
| | from torch import nn |
| | import torch.nn.functional as F |
| | import matplotlib.pyplot as plt |
| | from sklearn.linear_model import RANSACRegressor |
| | from torch_scatter import scatter_min |
| | from src.utils.partition import xy_partition |
| | from src.utils.point import is_xyz_tensor |
| | from src.utils.neighbors import knn_2 |
| |
|
| |
|
| |
|
| | __all__ = [ |
| | 'filter_by_z_distance_of_global_min', 'filter_by_local_z_min', |
| | 'filter_by_verticality', 'single_plane_model', |
| | 'neighbor_interpolation_model', 'mlp_model'] |
| |
|
| |
|
| | def filter_by_z_distance_of_global_min(pos, threshold): |
| | """Search for points within `threshold` Z-distance of the lowest |
| | point in the input cloud `xyz`. |
| | |
| | This can be used to filter out points far from the ground, with some |
| | limitations: |
| | - if the point cloud contains below-ground points |
| | - if the ground is not even and involves stairs, slopes, ... |
| | |
| | :param pos: Tensor |
| | Input 3D point cloud |
| | :param threshold: float |
| | Z-distance threshold. Points within a Z-offset of `threshold` |
| | or lower of the lowest point (i.e. smallest Z) will be selected |
| | :return: |
| | """ |
| | assert is_xyz_tensor(pos) |
| | return pos[:, 2] - pos[:, 2].min() < threshold |
| |
|
| |
|
| | def filter_by_local_z_min(pos, grid): |
| | """Search for the lowest point in each cell of a horizontal XY grid. |
| | |
| | This can be used to filter out points far from the ground, with some |
| | limitations: |
| | - if the point cloud contains below-ground points |
| | - if the ground has slopes, the size of the `grid` may produce |
| | downstream staircasing effects if the local Z-min points are used as |
| | Z reference for local ground altitude |
| | |
| | :param pos: Tensor |
| | Input 3D point cloud |
| | :param grid: float |
| | Size of the grid "XY-voxel" |
| | :return: |
| | """ |
| | assert is_xyz_tensor(pos) |
| |
|
| | |
| | super_index = xy_partition(pos, grid, consecutive=True) |
| |
|
| | |
| | z_min, z_argmin = scatter_min(pos[:, 2], super_index, dim=0) |
| | is_local_z_min = torch.full((pos.shape[0],), False, device=pos.device) |
| | is_local_z_min[z_argmin] = True |
| |
|
| | return is_local_z_min |
| |
|
| |
|
| | def filter_by_verticality(verticality, threshold): |
| | """Search for the points with low verticality. |
| | |
| | For verticality computation, see the `PointFeatures`. |
| | |
| | This can be used to filter out non-ground points, with some |
| | limitations: |
| | - if the point cloud is very noisy, or if the verticality |
| | was computed on too-small, or too-large neighborhoods, the |
| | verticality may not be sufficiently discriminative |
| | - if the ground has slopes, the steepest areas may be filtered out |
| | - if other non-ground horizontal surfaces are present in the point |
| | cloud, these will also be preserved (e.g. table, ceiling, |
| | horizontal building roof, ...) |
| | |
| | :param verticality: Tensor |
| | 1D tensor holding verticality values as computed by |
| | `PointFeatures` |
| | :param threshold: float |
| | Verticality threshod below which points are considered |
| | "horizontal" enough |
| | :return: |
| | """ |
| | return verticality.squeeze() < threshold |
| |
|
| |
|
| | def single_plane_model(pos, random_state=0, residual_threshold=1e-3): |
| | """Model the ground as a single plane using RANSAC. |
| | |
| | Returns a callable taking an XYZ tensor as input and returning the |
| | pointwise elevation. |
| | |
| | :param pos: Tensor |
| | Input 3D point cloud |
| | :param random_state: int |
| | Seed for RANSAC |
| | :param residual_threshold: float |
| | Residual threshold for RANSAC |
| | :return: |
| | """ |
| | assert is_xyz_tensor(pos) |
| |
|
| | xy = pos[:, :2].cpu().numpy() |
| | z = pos[:, 2].cpu().numpy() |
| |
|
| | |
| | ransac = RANSACRegressor( |
| | random_state=random_state, residual_threshold=residual_threshold).fit( |
| | xy, z) |
| |
|
| | def predict_elevation(pos_query): |
| | assert is_xyz_tensor(pos_query) |
| | device = pos_query.device |
| | xy = pos_query[:, :2] |
| | z = pos_query[:, 2] |
| | return z - torch.from_numpy(ransac.predict(xy.cpu().numpy())).to(device) |
| |
|
| | return predict_elevation |
| |
|
| |
|
| | def neighbor_interpolation_model(pos, k=3, r_max=1): |
| | """Model the ground based on a trimmed point cloud carrying ground |
| | points only. At inference, a point is associated with its nearest |
| | neighbors in L2 XY distance in the reference ground cloud. The |
| | ground surface is estimated as a linear interpolation of the |
| | neighboring reference points. The elevation is then computed as the |
| | corresponding gap in Z-coordinates. |
| | |
| | Returns a callable taking an XYZ tensor as input and returning the |
| | pointwise elevation. |
| | |
| | :param pos: Tensor |
| | Input 3D point cloud |
| | :param k: int |
| | Number of neighbors to consider for interpolation |
| | :param r_max: float |
| | Maximum radius for the neighbor search |
| | :return: |
| | """ |
| | assert is_xyz_tensor(pos) |
| |
|
| | def predict_elevation(pos_query): |
| | |
| | xy0 = F.pad(input=pos[:, :2], pad=(0, 1), mode='constant', value=0) |
| | xy0_query = F.pad( |
| | input=pos_query[:, :2], pad=(0, 1), mode='constant', value=0) |
| | neighbors, distances = knn_2(xy0, xy0_query, k, r_max=r_max) |
| |
|
| | |
| | |
| | |
| | has_no_neighbor = (neighbors == -1).all(dim=1) |
| | if has_no_neighbor.any(): |
| | high = xy0.max(dim=0).values |
| | low = xy0.min(dim=0).values |
| | high_query = xy0_query.max(dim=0).values |
| | low_query = xy0_query.min(dim=0).values |
| | r_max_ = max((high_query - low).norm(), (high - low_query).norm()) |
| |
|
| | neighbors_, distances_ = knn_2( |
| | xy0, xy0_query[has_no_neighbor], k, r_max=r_max_) |
| |
|
| | neighbors[has_no_neighbor] = neighbors_ |
| | distances[has_no_neighbor] = distances_ |
| |
|
| | |
| | if k == 1: |
| | return pos_query[:, 2] - pos[neighbors][:, 2] |
| |
|
| | |
| | |
| | |
| | weights = 1 / (distances + 1e-3) |
| | weights[distances == -1] = 0 |
| | weights = weights / weights.sum(dim=1).view(-1, 1) |
| |
|
| | |
| | |
| | z_query = (pos[:, 2][neighbors] * weights).sum(dim=1) |
| |
|
| | return pos_query[:, 2] - z_query |
| |
|
| | return predict_elevation |
| |
|
| |
|
| | def mlp_model( |
| | pos, |
| | layers=[32, 16, 8], |
| | batch_ratio=1, |
| | lr=0.01, |
| | lr_decay=1, |
| | weight_decay=0.01, |
| | criterion='l2', |
| | steps=1000, |
| | check_every_n_steps=50, |
| | device='cuda', |
| | verbose=False): |
| | """Fit an MLP to a point cloud. Assuming the point cloud mostly |
| | contains ground points, this function will train an MLP to model the |
| | ground surface as a piecewise-planar function. |
| | |
| | :param pos: Tensor |
| | Input 3D point cloud |
| | :param layers: int or List[int] |
| | Hidden layers for the MLP. Too many weights may let the model |
| | overfit to non-ground patterns. Not enough weights will underfit |
| | the ground and miss some patterns. Having more neurons in the |
| | first layer allows faster convergence |
| | :param batch_ratio: float |
| | Ratio of points to sample from the cloud at each training |
| | iteration. Allows adding some stochasticity to the training. |
| | In practice, `batch_ratio=1` gives better results if the entire |
| | cloud fits in memory |
| | :param lr: float |
| | Initial learning rate |
| | :param lr_decay: float |
| | Multiplicative factor applied to the learning rate after each |
| | iteration |
| | :param weight_decay: float |
| | Weight decay for regularization |
| | :param criterion: str |
| | Loss, either 'l1' or 'l2' |
| | :param steps: int |
| | Number of training steps. This largely affects overall compute |
| | time |
| | :param check_every_n_steps: int |
| | If `verbose=True` the loss will be logged every n iteration for |
| | final visualization |
| | :param device: str or torch.device |
| | Device on which to do the training and inference |
| | :param verbose: bool |
| | If True, a plot of the training loss and some stats will be |
| | printed at the end of the training |
| | :return: |
| | """ |
| | |
| | from src.nn import MLP |
| | from src.nn.norm import BatchNorm |
| |
|
| | assert is_xyz_tensor(pos) |
| |
|
| | torch.cuda.synchronize() |
| | start = time.time() |
| |
|
| | |
| | pos = pos.to(device) |
| | num_points = pos.shape[0] |
| | means = pos.mean(dim=0) |
| | stds = pos.std(dim=0) |
| | pos = (pos - means) / stds |
| |
|
| | |
| | batch_size = min(num_points, round(num_points * batch_ratio)) |
| | layers = [layers] if isinstance(layers, int) else layers |
| | model = MLP( |
| | [2] + layers + [1], |
| | activation=nn.ReLU(), |
| | last_activation=False, |
| | norm=BatchNorm, |
| | last_norm=False, |
| | drop=None).to(device).train() |
| | optimizer = torch.optim.AdamW( |
| | model.parameters(), |
| | lr=lr, |
| | weight_decay=weight_decay) |
| | weights = torch.ones(num_points, device=device) |
| | scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, lr_decay) |
| |
|
| | |
| | if verbose: |
| | l = [] |
| | t = [] |
| |
|
| | |
| | for step in range(steps): |
| | |
| | if 0 < batch_ratio < 1: |
| | idx = torch.multinomial(weights, batch_size, replacement=False) |
| | pos_ = pos[idx] |
| | else: |
| | pos_ = pos |
| |
|
| | |
| | xy = pos_[:, :2] |
| | z = pos_[:, 2] |
| | z_hat = model(xy) |
| |
|
| | |
| | if criterion == 'l2': |
| | loss = ((z - z_hat.squeeze()) ** 2).mean() |
| | elif criterion == 'l1': |
| | loss = (z - z_hat.squeeze()).abs().mean() |
| | else: |
| | raise NotImplementedError("") |
| |
|
| | |
| | optimizer.zero_grad() |
| | loss.backward() |
| | optimizer.step() |
| | scheduler.step() |
| |
|
| | if not verbose or step % check_every_n_steps != check_every_n_steps - 1: |
| | continue |
| |
|
| | |
| | |
| | t.append(step) |
| | l.append(loss.detach().cpu().item()) |
| |
|
| | if verbose: |
| | torch.cuda.synchronize() |
| | print(f"Training time: {time.time() - start:0.1f} sec") |
| | print(f"Loss: {l[-1]:0.3f}") |
| | plt.plot(t, l) |
| | plt.show() |
| |
|
| | |
| | model = model.eval() |
| |
|
| | def predict_elevation(pos): |
| | input_device = pos.device |
| | pos = pos.to(device) |
| |
|
| | xy = (pos[:, :2] - means[:2]) / stds[:2] |
| | z = model(xy).squeeze().detach() |
| | z = z * stds[2] + means[2] |
| |
|
| | elevation = pos[:, 2] - z |
| |
|
| | return elevation.to(input_device) |
| |
|
| | return predict_elevation |
| |
|