|
|
import time |
|
|
import torch |
|
|
from torch import nn |
|
|
import torch.nn.functional as F |
|
|
import matplotlib.pyplot as plt |
|
|
from sklearn.linear_model import RANSACRegressor |
|
|
from torch_scatter import scatter_min |
|
|
from src.utils.partition import xy_partition |
|
|
from src.utils.point import is_xyz_tensor |
|
|
from src.utils.neighbors import knn_2 |
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
'filter_by_z_distance_of_global_min', 'filter_by_local_z_min', |
|
|
'filter_by_verticality', 'single_plane_model', |
|
|
'neighbor_interpolation_model', 'mlp_model'] |
|
|
|
|
|
|
|
|
def filter_by_z_distance_of_global_min(pos, threshold): |
|
|
"""Search for points within `threshold` Z-distance of the lowest |
|
|
point in the input cloud `xyz`. |
|
|
|
|
|
This can be used to filter out points far from the ground, with some |
|
|
limitations: |
|
|
- if the point cloud contains below-ground points |
|
|
- if the ground is not even and involves stairs, slopes, ... |
|
|
|
|
|
:param pos: Tensor |
|
|
Input 3D point cloud |
|
|
:param threshold: float |
|
|
Z-distance threshold. Points within a Z-offset of `threshold` |
|
|
or lower of the lowest point (i.e. smallest Z) will be selected |
|
|
:return: |
|
|
""" |
|
|
assert is_xyz_tensor(pos) |
|
|
return pos[:, 2] - pos[:, 2].min() < threshold |
|
|
|
|
|
|
|
|
def filter_by_local_z_min(pos, grid): |
|
|
"""Search for the lowest point in each cell of a horizontal XY grid. |
|
|
|
|
|
This can be used to filter out points far from the ground, with some |
|
|
limitations: |
|
|
- if the point cloud contains below-ground points |
|
|
- if the ground has slopes, the size of the `grid` may produce |
|
|
downstream staircasing effects if the local Z-min points are used as |
|
|
Z reference for local ground altitude |
|
|
|
|
|
:param pos: Tensor |
|
|
Input 3D point cloud |
|
|
:param grid: float |
|
|
Size of the grid "XY-voxel" |
|
|
:return: |
|
|
""" |
|
|
assert is_xyz_tensor(pos) |
|
|
|
|
|
|
|
|
super_index = xy_partition(pos, grid, consecutive=True) |
|
|
|
|
|
|
|
|
z_min, z_argmin = scatter_min(pos[:, 2], super_index, dim=0) |
|
|
is_local_z_min = torch.full((pos.shape[0],), False, device=pos.device) |
|
|
is_local_z_min[z_argmin] = True |
|
|
|
|
|
return is_local_z_min |
|
|
|
|
|
|
|
|
def filter_by_verticality(verticality, threshold): |
|
|
"""Search for the points with low verticality. |
|
|
|
|
|
For verticality computation, see the `PointFeatures`. |
|
|
|
|
|
This can be used to filter out non-ground points, with some |
|
|
limitations: |
|
|
- if the point cloud is very noisy, or if the verticality |
|
|
was computed on too-small, or too-large neighborhoods, the |
|
|
verticality may not be sufficiently discriminative |
|
|
- if the ground has slopes, the steepest areas may be filtered out |
|
|
- if other non-ground horizontal surfaces are present in the point |
|
|
cloud, these will also be preserved (e.g. table, ceiling, |
|
|
horizontal building roof, ...) |
|
|
|
|
|
:param verticality: Tensor |
|
|
1D tensor holding verticality values as computed by |
|
|
`PointFeatures` |
|
|
:param threshold: float |
|
|
Verticality threshod below which points are considered |
|
|
"horizontal" enough |
|
|
:return: |
|
|
""" |
|
|
return verticality.squeeze() < threshold |
|
|
|
|
|
|
|
|
def single_plane_model(pos, random_state=0, residual_threshold=1e-3): |
|
|
"""Model the ground as a single plane using RANSAC. |
|
|
|
|
|
Returns a callable taking an XYZ tensor as input and returning the |
|
|
pointwise elevation. |
|
|
|
|
|
:param pos: Tensor |
|
|
Input 3D point cloud |
|
|
:param random_state: int |
|
|
Seed for RANSAC |
|
|
:param residual_threshold: float |
|
|
Residual threshold for RANSAC |
|
|
:return: |
|
|
""" |
|
|
assert is_xyz_tensor(pos) |
|
|
|
|
|
xy = pos[:, :2].cpu().numpy() |
|
|
z = pos[:, 2].cpu().numpy() |
|
|
|
|
|
|
|
|
ransac = RANSACRegressor( |
|
|
random_state=random_state, residual_threshold=residual_threshold).fit( |
|
|
xy, z) |
|
|
|
|
|
def predict_elevation(pos_query): |
|
|
assert is_xyz_tensor(pos_query) |
|
|
device = pos_query.device |
|
|
xy = pos_query[:, :2] |
|
|
z = pos_query[:, 2] |
|
|
return z - torch.from_numpy(ransac.predict(xy.cpu().numpy())).to(device) |
|
|
|
|
|
return predict_elevation |
|
|
|
|
|
|
|
|
def neighbor_interpolation_model(pos, k=3, r_max=1): |
|
|
"""Model the ground based on a trimmed point cloud carrying ground |
|
|
points only. At inference, a point is associated with its nearest |
|
|
neighbors in L2 XY distance in the reference ground cloud. The |
|
|
ground surface is estimated as a linear interpolation of the |
|
|
neighboring reference points. The elevation is then computed as the |
|
|
corresponding gap in Z-coordinates. |
|
|
|
|
|
Returns a callable taking an XYZ tensor as input and returning the |
|
|
pointwise elevation. |
|
|
|
|
|
:param pos: Tensor |
|
|
Input 3D point cloud |
|
|
:param k: int |
|
|
Number of neighbors to consider for interpolation |
|
|
:param r_max: float |
|
|
Maximum radius for the neighbor search |
|
|
:return: |
|
|
""" |
|
|
assert is_xyz_tensor(pos) |
|
|
|
|
|
def predict_elevation(pos_query): |
|
|
|
|
|
xy0 = F.pad(input=pos[:, :2], pad=(0, 1), mode='constant', value=0) |
|
|
xy0_query = F.pad( |
|
|
input=pos_query[:, :2], pad=(0, 1), mode='constant', value=0) |
|
|
neighbors, distances = knn_2(xy0, xy0_query, k, r_max=r_max) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
has_no_neighbor = (neighbors == -1).all(dim=1) |
|
|
if has_no_neighbor.any(): |
|
|
high = xy0.max(dim=0).values |
|
|
low = xy0.min(dim=0).values |
|
|
high_query = xy0_query.max(dim=0).values |
|
|
low_query = xy0_query.min(dim=0).values |
|
|
r_max_ = max((high_query - low).norm(), (high - low_query).norm()) |
|
|
|
|
|
neighbors_, distances_ = knn_2( |
|
|
xy0, xy0_query[has_no_neighbor], k, r_max=r_max_) |
|
|
|
|
|
neighbors[has_no_neighbor] = neighbors_ |
|
|
distances[has_no_neighbor] = distances_ |
|
|
|
|
|
|
|
|
if k == 1: |
|
|
return pos_query[:, 2] - pos[neighbors][:, 2] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
weights = 1 / (distances + 1e-3) |
|
|
weights[distances == -1] = 0 |
|
|
weights = weights / weights.sum(dim=1).view(-1, 1) |
|
|
|
|
|
|
|
|
|
|
|
z_query = (pos[:, 2][neighbors] * weights).sum(dim=1) |
|
|
|
|
|
return pos_query[:, 2] - z_query |
|
|
|
|
|
return predict_elevation |
|
|
|
|
|
|
|
|
def mlp_model( |
|
|
pos, |
|
|
layers=[32, 16, 8], |
|
|
batch_ratio=1, |
|
|
lr=0.01, |
|
|
lr_decay=1, |
|
|
weight_decay=0.01, |
|
|
criterion='l2', |
|
|
steps=1000, |
|
|
check_every_n_steps=50, |
|
|
device='cuda', |
|
|
verbose=False): |
|
|
"""Fit an MLP to a point cloud. Assuming the point cloud mostly |
|
|
contains ground points, this function will train an MLP to model the |
|
|
ground surface as a piecewise-planar function. |
|
|
|
|
|
:param pos: Tensor |
|
|
Input 3D point cloud |
|
|
:param layers: int or List[int] |
|
|
Hidden layers for the MLP. Too many weights may let the model |
|
|
overfit to non-ground patterns. Not enough weights will underfit |
|
|
the ground and miss some patterns. Having more neurons in the |
|
|
first layer allows faster convergence |
|
|
:param batch_ratio: float |
|
|
Ratio of points to sample from the cloud at each training |
|
|
iteration. Allows adding some stochasticity to the training. |
|
|
In practice, `batch_ratio=1` gives better results if the entire |
|
|
cloud fits in memory |
|
|
:param lr: float |
|
|
Initial learning rate |
|
|
:param lr_decay: float |
|
|
Multiplicative factor applied to the learning rate after each |
|
|
iteration |
|
|
:param weight_decay: float |
|
|
Weight decay for regularization |
|
|
:param criterion: str |
|
|
Loss, either 'l1' or 'l2' |
|
|
:param steps: int |
|
|
Number of training steps. This largely affects overall compute |
|
|
time |
|
|
:param check_every_n_steps: int |
|
|
If `verbose=True` the loss will be logged every n iteration for |
|
|
final visualization |
|
|
:param device: str or torch.device |
|
|
Device on which to do the training and inference |
|
|
:param verbose: bool |
|
|
If True, a plot of the training loss and some stats will be |
|
|
printed at the end of the training |
|
|
:return: |
|
|
""" |
|
|
|
|
|
from src.nn import MLP |
|
|
from src.nn.norm import BatchNorm |
|
|
|
|
|
assert is_xyz_tensor(pos) |
|
|
|
|
|
torch.cuda.synchronize() |
|
|
start = time.time() |
|
|
|
|
|
|
|
|
pos = pos.to(device) |
|
|
num_points = pos.shape[0] |
|
|
means = pos.mean(dim=0) |
|
|
stds = pos.std(dim=0) |
|
|
pos = (pos - means) / stds |
|
|
|
|
|
|
|
|
batch_size = min(num_points, round(num_points * batch_ratio)) |
|
|
layers = [layers] if isinstance(layers, int) else layers |
|
|
model = MLP( |
|
|
[2] + layers + [1], |
|
|
activation=nn.ReLU(), |
|
|
last_activation=False, |
|
|
norm=BatchNorm, |
|
|
last_norm=False, |
|
|
drop=None).to(device).train() |
|
|
optimizer = torch.optim.AdamW( |
|
|
model.parameters(), |
|
|
lr=lr, |
|
|
weight_decay=weight_decay) |
|
|
weights = torch.ones(num_points, device=device) |
|
|
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, lr_decay) |
|
|
|
|
|
|
|
|
if verbose: |
|
|
l = [] |
|
|
t = [] |
|
|
|
|
|
|
|
|
for step in range(steps): |
|
|
|
|
|
if 0 < batch_ratio < 1: |
|
|
idx = torch.multinomial(weights, batch_size, replacement=False) |
|
|
pos_ = pos[idx] |
|
|
else: |
|
|
pos_ = pos |
|
|
|
|
|
|
|
|
xy = pos_[:, :2] |
|
|
z = pos_[:, 2] |
|
|
z_hat = model(xy) |
|
|
|
|
|
|
|
|
if criterion == 'l2': |
|
|
loss = ((z - z_hat.squeeze()) ** 2).mean() |
|
|
elif criterion == 'l1': |
|
|
loss = (z - z_hat.squeeze()).abs().mean() |
|
|
else: |
|
|
raise NotImplementedError("") |
|
|
|
|
|
|
|
|
optimizer.zero_grad() |
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
scheduler.step() |
|
|
|
|
|
if not verbose or step % check_every_n_steps != check_every_n_steps - 1: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
t.append(step) |
|
|
l.append(loss.detach().cpu().item()) |
|
|
|
|
|
if verbose: |
|
|
torch.cuda.synchronize() |
|
|
print(f"Training time: {time.time() - start:0.1f} sec") |
|
|
print(f"Loss: {l[-1]:0.3f}") |
|
|
plt.plot(t, l) |
|
|
plt.show() |
|
|
|
|
|
|
|
|
model = model.eval() |
|
|
|
|
|
def predict_elevation(pos): |
|
|
input_device = pos.device |
|
|
pos = pos.to(device) |
|
|
|
|
|
xy = (pos[:, :2] - means[:2]) / stds[:2] |
|
|
z = model(xy).squeeze().detach() |
|
|
z = z * stds[2] + means[2] |
|
|
|
|
|
elevation = pos[:, 2] - z |
|
|
|
|
|
return elevation.to(input_device) |
|
|
|
|
|
return predict_elevation |
|
|
|