English
Shanci's picture
Upload folder using huggingface_hub
26225c5 verified
import time
import torch
from torch import nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.linear_model import RANSACRegressor
from torch_scatter import scatter_min
from src.utils.partition import xy_partition
from src.utils.point import is_xyz_tensor
from src.utils.neighbors import knn_2
__all__ = [
'filter_by_z_distance_of_global_min', 'filter_by_local_z_min',
'filter_by_verticality', 'single_plane_model',
'neighbor_interpolation_model', 'mlp_model']
def filter_by_z_distance_of_global_min(pos, threshold):
"""Search for points within `threshold` Z-distance of the lowest
point in the input cloud `xyz`.
This can be used to filter out points far from the ground, with some
limitations:
- if the point cloud contains below-ground points
- if the ground is not even and involves stairs, slopes, ...
:param pos: Tensor
Input 3D point cloud
:param threshold: float
Z-distance threshold. Points within a Z-offset of `threshold`
or lower of the lowest point (i.e. smallest Z) will be selected
:return:
"""
assert is_xyz_tensor(pos)
return pos[:, 2] - pos[:, 2].min() < threshold
def filter_by_local_z_min(pos, grid):
"""Search for the lowest point in each cell of a horizontal XY grid.
This can be used to filter out points far from the ground, with some
limitations:
- if the point cloud contains below-ground points
- if the ground has slopes, the size of the `grid` may produce
downstream staircasing effects if the local Z-min points are used as
Z reference for local ground altitude
:param pos: Tensor
Input 3D point cloud
:param grid: float
Size of the grid "XY-voxel"
:return:
"""
assert is_xyz_tensor(pos)
# Bin points into an XY grid
super_index = xy_partition(pos, grid, consecutive=True)
# Search for the lowest point in each grid cell
z_min, z_argmin = scatter_min(pos[:, 2], super_index, dim=0)
is_local_z_min = torch.full((pos.shape[0],), False, device=pos.device)
is_local_z_min[z_argmin] = True
return is_local_z_min
def filter_by_verticality(verticality, threshold):
"""Search for the points with low verticality.
For verticality computation, see the `PointFeatures`.
This can be used to filter out non-ground points, with some
limitations:
- if the point cloud is very noisy, or if the verticality
was computed on too-small, or too-large neighborhoods, the
verticality may not be sufficiently discriminative
- if the ground has slopes, the steepest areas may be filtered out
- if other non-ground horizontal surfaces are present in the point
cloud, these will also be preserved (e.g. table, ceiling,
horizontal building roof, ...)
:param verticality: Tensor
1D tensor holding verticality values as computed by
`PointFeatures`
:param threshold: float
Verticality threshod below which points are considered
"horizontal" enough
:return:
"""
return verticality.squeeze() < threshold
def single_plane_model(pos, random_state=0, residual_threshold=1e-3):
"""Model the ground as a single plane using RANSAC.
Returns a callable taking an XYZ tensor as input and returning the
pointwise elevation.
:param pos: Tensor
Input 3D point cloud
:param random_state: int
Seed for RANSAC
:param residual_threshold: float
Residual threshold for RANSAC
:return:
"""
assert is_xyz_tensor(pos)
xy = pos[:, :2].cpu().numpy()
z = pos[:, 2].cpu().numpy()
# Search the ground plane using RANSAC
ransac = RANSACRegressor(
random_state=random_state, residual_threshold=residual_threshold).fit(
xy, z)
def predict_elevation(pos_query):
assert is_xyz_tensor(pos_query)
device = pos_query.device
xy = pos_query[:, :2]
z = pos_query[:, 2]
return z - torch.from_numpy(ransac.predict(xy.cpu().numpy())).to(device)
return predict_elevation
def neighbor_interpolation_model(pos, k=3, r_max=1):
"""Model the ground based on a trimmed point cloud carrying ground
points only. At inference, a point is associated with its nearest
neighbors in L2 XY distance in the reference ground cloud. The
ground surface is estimated as a linear interpolation of the
neighboring reference points. The elevation is then computed as the
corresponding gap in Z-coordinates.
Returns a callable taking an XYZ tensor as input and returning the
pointwise elevation.
:param pos: Tensor
Input 3D point cloud
:param k: int
Number of neighbors to consider for interpolation
:param r_max: float
Maximum radius for the neighbor search
:return:
"""
assert is_xyz_tensor(pos)
def predict_elevation(pos_query):
# Neighbor search in XY space
xy0 = F.pad(input=pos[:, :2], pad=(0, 1), mode='constant', value=0)
xy0_query = F.pad(
input=pos_query[:, :2], pad=(0, 1), mode='constant', value=0)
neighbors, distances = knn_2(xy0, xy0_query, k, r_max=r_max)
# In case some points received 0 neighbors, we search again for
# those, with a radius so large that no point should be left
# without a neighbor
has_no_neighbor = (neighbors == -1).all(dim=1)
if has_no_neighbor.any():
high = xy0.max(dim=0).values
low = xy0.min(dim=0).values
high_query = xy0_query.max(dim=0).values
low_query = xy0_query.min(dim=0).values
r_max_ = max((high_query - low).norm(), (high - low_query).norm())
neighbors_, distances_ = knn_2(
xy0, xy0_query[has_no_neighbor], k, r_max=r_max_)
neighbors[has_no_neighbor] = neighbors_
distances[has_no_neighbor] = distances_
# If only 1 neighbor is needed, no need for interpolation
if k == 1:
return pos_query[:, 2] - pos[neighbors][:, 2]
# Note there might still be some missing neighbors here and
# there, but no completely empty neighborhood. We treat these
# missing neighbors by attributing a 0-weight
weights = 1 / (distances + 1e-3)
weights[distances == -1] = 0
weights = weights / weights.sum(dim=1).view(-1, 1)
# Estimate the ground height as the weighted combination of the
# neighbors' height
z_query = (pos[:, 2][neighbors] * weights).sum(dim=1)
return pos_query[:, 2] - z_query
return predict_elevation
def mlp_model(
pos,
layers=[32, 16, 8],
batch_ratio=1,
lr=0.01,
lr_decay=1,
weight_decay=0.01,
criterion='l2',
steps=1000,
check_every_n_steps=50,
device='cuda',
verbose=False):
"""Fit an MLP to a point cloud. Assuming the point cloud mostly
contains ground points, this function will train an MLP to model the
ground surface as a piecewise-planar function.
:param pos: Tensor
Input 3D point cloud
:param layers: int or List[int]
Hidden layers for the MLP. Too many weights may let the model
overfit to non-ground patterns. Not enough weights will underfit
the ground and miss some patterns. Having more neurons in the
first layer allows faster convergence
:param batch_ratio: float
Ratio of points to sample from the cloud at each training
iteration. Allows adding some stochasticity to the training.
In practice, `batch_ratio=1` gives better results if the entire
cloud fits in memory
:param lr: float
Initial learning rate
:param lr_decay: float
Multiplicative factor applied to the learning rate after each
iteration
:param weight_decay: float
Weight decay for regularization
:param criterion: str
Loss, either 'l1' or 'l2'
:param steps: int
Number of training steps. This largely affects overall compute
time
:param check_every_n_steps: int
If `verbose=True` the loss will be logged every n iteration for
final visualization
:param device: str or torch.device
Device on which to do the training and inference
:param verbose: bool
If True, a plot of the training loss and some stats will be
printed at the end of the training
:return:
"""
# Local imports to avoid import loop errors
from src.nn import MLP
from src.nn.norm import BatchNorm
assert is_xyz_tensor(pos)
torch.cuda.synchronize()
start = time.time()
# Normalize the XYZ coordinates to live in a manageable range
pos = pos.to(device)
num_points = pos.shape[0]
means = pos.mean(dim=0)
stds = pos.std(dim=0)
pos = (pos - means) / stds
# Prepare the training
batch_size = min(num_points, round(num_points * batch_ratio))
layers = [layers] if isinstance(layers, int) else layers
model = MLP(
[2] + layers + [1],
activation=nn.ReLU(),
last_activation=False,
norm=BatchNorm,
last_norm=False,
drop=None).to(device).train()
optimizer = torch.optim.AdamW(
model.parameters(),
lr=lr,
weight_decay=weight_decay)
weights = torch.ones(num_points, device=device)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, lr_decay)
# For drawing the loss plot for debugging purposes
if verbose:
l = []
t = []
# Training loop
for step in range(steps):
# Optionally, randomly drop some data points for augmentation
if 0 < batch_ratio < 1:
idx = torch.multinomial(weights, batch_size, replacement=False)
pos_ = pos[idx]
else:
pos_ = pos
# Forward pass
xy = pos_[:, :2]
z = pos_[:, 2]
z_hat = model(xy)
# Loss computation
if criterion == 'l2':
loss = ((z - z_hat.squeeze()) ** 2).mean()
elif criterion == 'l1':
loss = (z - z_hat.squeeze()).abs().mean()
else:
raise NotImplementedError("")
# Gradient computation and backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
scheduler.step()
if not verbose or step % check_every_n_steps != check_every_n_steps - 1:
continue
# Keep track of the loss
# print(f"Step {step + 1} loss: {loss:0.3f}")
t.append(step)
l.append(loss.detach().cpu().item())
if verbose:
torch.cuda.synchronize()
print(f"Training time: {time.time() - start:0.1f} sec")
print(f"Loss: {l[-1]:0.3f}")
plt.plot(t, l)
plt.show()
# Training is finished, set the model to inference mode
model = model.eval()
def predict_elevation(pos):
input_device = pos.device
pos = pos.to(device)
xy = (pos[:, :2] - means[:2]) / stds[:2]
z = model(xy).squeeze().detach()
z = z * stds[2] + means[2]
elevation = pos[:, 2] - z
return elevation.to(input_device)
return predict_elevation