|
|
import mmcv |
|
|
import numpy as np |
|
|
from mmcv import is_tuple_of |
|
|
from mmcv.utils import build_from_cfg |
|
|
from mmdet3d.core import VoxelGenerator |
|
|
from mmdet3d.core.bbox import box_np_ops |
|
|
from mmdet3d.datasets.builder import OBJECTSAMPLERS |
|
|
from mmdet3d.datasets.pipelines.data_augment_utils import noise_per_object_v3_ |
|
|
from mmdet.datasets.builder import PIPELINES |
|
|
from mmdet.datasets.pipelines import RandomFlip |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class RandomFlip3D(RandomFlip): |
|
|
"""Flip the points & bbox. |
|
|
|
|
|
If the input dict contains the key "flip", then the flag will be used, |
|
|
otherwise it will be randomly decided by a ratio specified in the init |
|
|
method. |
|
|
|
|
|
Args: |
|
|
sync_2d (bool, optional): Whether to apply flip according to the 2D |
|
|
images. If True, it will apply the same flip as that to 2D images. |
|
|
If False, it will decide whether to flip randomly and independently |
|
|
to that of 2D images. Defaults to True. |
|
|
flip_ratio_bev_horizontal (float, optional): The flipping probability |
|
|
in horizontal direction. Defaults to 0.0. |
|
|
flip_ratio_bev_vertical (float, optional): The flipping probability |
|
|
in vertical direction. Defaults to 0.0. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
sync_2d=True, |
|
|
flip_ratio_bev_horizontal=0.0, |
|
|
flip_ratio_bev_vertical=0.0, |
|
|
**kwargs, |
|
|
): |
|
|
super(RandomFlip3D, self).__init__( |
|
|
flip_ratio=flip_ratio_bev_horizontal, **kwargs |
|
|
) |
|
|
self.sync_2d = sync_2d |
|
|
self.flip_ratio_bev_vertical = flip_ratio_bev_vertical |
|
|
if flip_ratio_bev_horizontal is not None: |
|
|
assert ( |
|
|
isinstance(flip_ratio_bev_horizontal, (int, float)) |
|
|
and 0 <= flip_ratio_bev_horizontal <= 1 |
|
|
) |
|
|
if flip_ratio_bev_vertical is not None: |
|
|
assert ( |
|
|
isinstance(flip_ratio_bev_vertical, (int, float)) |
|
|
and 0 <= flip_ratio_bev_vertical <= 1 |
|
|
) |
|
|
|
|
|
def random_flip_data_3d(self, input_dict, direction="horizontal"): |
|
|
"""Flip 3D data randomly. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
direction (str): Flip direction. Default: horizontal. |
|
|
|
|
|
Returns: |
|
|
dict: Flipped results, 'points', 'bbox3d_fields' keys are \ |
|
|
updated in the result dict. |
|
|
""" |
|
|
assert direction in ["horizontal", "vertical"] |
|
|
if len(input_dict["bbox3d_fields"]) == 0: |
|
|
input_dict["bbox3d_fields"].append("empty_box3d") |
|
|
input_dict["empty_box3d"] = input_dict["box_type_3d"]( |
|
|
np.array([], dtype=np.float32) |
|
|
) |
|
|
assert len(input_dict["bbox3d_fields"]) == 1 |
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
if "points" in input_dict: |
|
|
input_dict["points"] = input_dict[key].flip( |
|
|
direction, points=input_dict["points"] |
|
|
) |
|
|
else: |
|
|
input_dict[key].flip(direction) |
|
|
if "centers2d" in input_dict: |
|
|
assert ( |
|
|
self.sync_2d is True and direction == "horizontal" |
|
|
), "Only support sync_2d=True and horizontal flip with images" |
|
|
w = input_dict["img_shape"][1] |
|
|
input_dict["centers2d"][..., 0] = w - input_dict["centers2d"][..., 0] |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to flip points, values in the ``bbox3d_fields`` and \ |
|
|
also flip 2D image and its annotations. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Flipped results, 'flip', 'flip_direction', \ |
|
|
'pcd_horizontal_flip' and 'pcd_vertical_flip' keys are added \ |
|
|
into result dict. |
|
|
""" |
|
|
|
|
|
super(RandomFlip3D, self).__call__(input_dict) |
|
|
|
|
|
if self.sync_2d: |
|
|
input_dict["pcd_horizontal_flip"] = input_dict["flip"] |
|
|
input_dict["pcd_vertical_flip"] = False |
|
|
else: |
|
|
if "pcd_horizontal_flip" not in input_dict: |
|
|
flip_horizontal = True if np.random.rand() < self.flip_ratio else False |
|
|
input_dict["pcd_horizontal_flip"] = flip_horizontal |
|
|
if "pcd_vertical_flip" not in input_dict: |
|
|
flip_vertical = ( |
|
|
True if np.random.rand() < self.flip_ratio_bev_vertical else False |
|
|
) |
|
|
input_dict["pcd_vertical_flip"] = flip_vertical |
|
|
|
|
|
if "transformation_3d_flow" not in input_dict: |
|
|
input_dict["transformation_3d_flow"] = [] |
|
|
|
|
|
if input_dict["pcd_horizontal_flip"]: |
|
|
self.random_flip_data_3d(input_dict, "horizontal") |
|
|
input_dict["transformation_3d_flow"].extend(["HF"]) |
|
|
if input_dict["pcd_vertical_flip"]: |
|
|
self.random_flip_data_3d(input_dict, "vertical") |
|
|
input_dict["transformation_3d_flow"].extend(["VF"]) |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(sync_2d={self.sync_2d}," |
|
|
repr_str += f" flip_ratio_bev_vertical={self.flip_ratio_bev_vertical})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class ObjectSample(object): |
|
|
"""Sample GT objects to the data. |
|
|
|
|
|
Args: |
|
|
db_sampler (dict): Config dict of the database sampler. |
|
|
sample_2d (bool): Whether to also paste 2D image patch to the images |
|
|
This should be true when applying multi-modality cut-and-paste. |
|
|
Defaults to False. |
|
|
""" |
|
|
|
|
|
def __init__(self, db_sampler, sample_2d=False): |
|
|
self.sampler_cfg = db_sampler |
|
|
self.sample_2d = sample_2d |
|
|
if "type" not in db_sampler.keys(): |
|
|
db_sampler["type"] = "DataBaseSampler" |
|
|
self.db_sampler = build_from_cfg(db_sampler, OBJECTSAMPLERS) |
|
|
|
|
|
@staticmethod |
|
|
def remove_points_in_boxes(points, boxes): |
|
|
"""Remove the points in the sampled bounding boxes. |
|
|
|
|
|
Args: |
|
|
points (:obj:`BasePoints`): Input point cloud array. |
|
|
boxes (np.ndarray): Sampled ground truth boxes. |
|
|
|
|
|
Returns: |
|
|
np.ndarray: Points with those in the boxes removed. |
|
|
""" |
|
|
masks = box_np_ops.points_in_rbbox(points.coord.numpy(), boxes) |
|
|
points = points[np.logical_not(masks.any(-1))] |
|
|
return points |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to sample ground truth objects to the data. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after object sampling augmentation, \ |
|
|
'points', 'gt_bboxes_3d', 'gt_labels_3d' keys are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
gt_bboxes_3d = input_dict["gt_bboxes_3d"] |
|
|
gt_labels_3d = input_dict["gt_labels_3d"] |
|
|
|
|
|
|
|
|
points = input_dict["points"] |
|
|
if self.sample_2d: |
|
|
img = input_dict["img"] |
|
|
gt_bboxes_2d = input_dict["gt_bboxes"] |
|
|
|
|
|
sampled_dict = self.db_sampler.sample_all( |
|
|
gt_bboxes_3d.tensor.numpy(), |
|
|
gt_labels_3d, |
|
|
gt_bboxes_2d=gt_bboxes_2d, |
|
|
img=img, |
|
|
) |
|
|
else: |
|
|
sampled_dict = self.db_sampler.sample_all( |
|
|
gt_bboxes_3d.tensor.numpy(), gt_labels_3d, img=None |
|
|
) |
|
|
|
|
|
if sampled_dict is not None: |
|
|
sampled_gt_bboxes_3d = sampled_dict["gt_bboxes_3d"] |
|
|
sampled_points = sampled_dict["points"] |
|
|
sampled_gt_labels = sampled_dict["gt_labels_3d"] |
|
|
|
|
|
gt_labels_3d = np.concatenate([gt_labels_3d, sampled_gt_labels], axis=0) |
|
|
gt_bboxes_3d = gt_bboxes_3d.new_box( |
|
|
np.concatenate([gt_bboxes_3d.tensor.numpy(), sampled_gt_bboxes_3d]) |
|
|
) |
|
|
|
|
|
points = self.remove_points_in_boxes(points, sampled_gt_bboxes_3d) |
|
|
|
|
|
points = points.cat([sampled_points, points]) |
|
|
|
|
|
if self.sample_2d: |
|
|
sampled_gt_bboxes_2d = sampled_dict["gt_bboxes_2d"] |
|
|
gt_bboxes_2d = np.concatenate( |
|
|
[gt_bboxes_2d, sampled_gt_bboxes_2d] |
|
|
).astype(np.float32) |
|
|
|
|
|
input_dict["gt_bboxes"] = gt_bboxes_2d |
|
|
input_dict["img"] = sampled_dict["img"] |
|
|
|
|
|
input_dict["gt_bboxes_3d"] = gt_bboxes_3d |
|
|
input_dict["gt_labels_3d"] = gt_labels_3d.astype(np.long) |
|
|
input_dict["points"] = points |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f" sample_2d={self.sample_2d}," |
|
|
repr_str += f" data_root={self.sampler_cfg.data_root}," |
|
|
repr_str += f" info_path={self.sampler_cfg.info_path}," |
|
|
repr_str += f" rate={self.sampler_cfg.rate}," |
|
|
repr_str += f" prepare={self.sampler_cfg.prepare}," |
|
|
repr_str += f" classes={self.sampler_cfg.classes}," |
|
|
repr_str += f" sample_groups={self.sampler_cfg.sample_groups}" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class ObjectNoise(object): |
|
|
"""Apply noise to each GT objects in the scene. |
|
|
|
|
|
Args: |
|
|
translation_std (list[float], optional): Standard deviation of the |
|
|
distribution where translation noise are sampled from. |
|
|
Defaults to [0.25, 0.25, 0.25]. |
|
|
global_rot_range (list[float], optional): Global rotation to the scene. |
|
|
Defaults to [0.0, 0.0]. |
|
|
rot_range (list[float], optional): Object rotation range. |
|
|
Defaults to [-0.15707963267, 0.15707963267]. |
|
|
num_try (int, optional): Number of times to try if the noise applied is |
|
|
invalid. Defaults to 100. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
translation_std=[0.25, 0.25, 0.25], |
|
|
global_rot_range=[0.0, 0.0], |
|
|
rot_range=[-0.15707963267, 0.15707963267], |
|
|
num_try=100, |
|
|
): |
|
|
self.translation_std = translation_std |
|
|
self.global_rot_range = global_rot_range |
|
|
self.rot_range = rot_range |
|
|
self.num_try = num_try |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to apply noise to each ground truth in the scene. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after adding noise to each object, \ |
|
|
'points', 'gt_bboxes_3d' keys are updated in the result dict. |
|
|
""" |
|
|
gt_bboxes_3d = input_dict["gt_bboxes_3d"] |
|
|
points = input_dict["points"] |
|
|
|
|
|
|
|
|
numpy_box = gt_bboxes_3d.tensor.numpy() |
|
|
numpy_points = points.tensor.numpy() |
|
|
|
|
|
noise_per_object_v3_( |
|
|
numpy_box, |
|
|
numpy_points, |
|
|
rotation_perturb=self.rot_range, |
|
|
center_noise_std=self.translation_std, |
|
|
global_random_rot_range=self.global_rot_range, |
|
|
num_try=self.num_try, |
|
|
) |
|
|
|
|
|
input_dict["gt_bboxes_3d"] = gt_bboxes_3d.new_box(numpy_box) |
|
|
input_dict["points"] = points.new_point(numpy_points) |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(num_try={self.num_try}," |
|
|
repr_str += f" translation_std={self.translation_std}," |
|
|
repr_str += f" global_rot_range={self.global_rot_range}," |
|
|
repr_str += f" rot_range={self.rot_range})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class GlobalRotScaleTrans(object): |
|
|
"""Apply global rotation, scaling and translation to a 3D scene. |
|
|
|
|
|
Args: |
|
|
rot_range (list[float]): Range of rotation angle. |
|
|
Defaults to [-0.78539816, 0.78539816] (close to [-pi/4, pi/4]). |
|
|
scale_ratio_range (list[float]): Range of scale ratio. |
|
|
Defaults to [0.95, 1.05]. |
|
|
translation_std (list[float]): The standard deviation of ranslation |
|
|
noise. This apply random translation to a scene by a noise, which |
|
|
is sampled from a gaussian distribution whose standard deviation |
|
|
is set by ``translation_std``. Defaults to [0, 0, 0] |
|
|
shift_height (bool): Whether to shift height. |
|
|
(the fourth dimension of indoor points) when scaling. |
|
|
Defaults to False. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
rot_range=[-0.78539816, 0.78539816], |
|
|
scale_ratio_range=[0.95, 1.05], |
|
|
translation_std=[0, 0, 0], |
|
|
shift_height=False, |
|
|
): |
|
|
self.rot_range = rot_range |
|
|
self.scale_ratio_range = scale_ratio_range |
|
|
self.translation_std = translation_std |
|
|
self.shift_height = shift_height |
|
|
|
|
|
def _trans_bbox_points(self, input_dict): |
|
|
"""Private function to translate bounding boxes and points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after translation, 'points', 'pcd_trans' \ |
|
|
and keys in input_dict['bbox3d_fields'] are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
if not isinstance(self.translation_std, (list, tuple, np.ndarray)): |
|
|
translation_std = [ |
|
|
self.translation_std, |
|
|
self.translation_std, |
|
|
self.translation_std, |
|
|
] |
|
|
else: |
|
|
translation_std = self.translation_std |
|
|
translation_std = np.array(translation_std, dtype=np.float32) |
|
|
trans_factor = np.random.normal(scale=translation_std, size=3).T |
|
|
|
|
|
input_dict["points"].translate(trans_factor) |
|
|
input_dict["pcd_trans"] = trans_factor |
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
input_dict[key].translate(trans_factor) |
|
|
|
|
|
def _rot_bbox_points(self, input_dict): |
|
|
"""Private function to rotate bounding boxes and points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after rotation, 'points', 'pcd_rotation' \ |
|
|
and keys in input_dict['bbox3d_fields'] are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
rotation = self.rot_range |
|
|
if not isinstance(rotation, list): |
|
|
rotation = [-rotation, rotation] |
|
|
noise_rotation = np.random.uniform(rotation[0], rotation[1]) |
|
|
|
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
if len(input_dict[key].tensor) != 0: |
|
|
points, rot_mat_T = input_dict[key].rotate( |
|
|
noise_rotation, input_dict["points"] |
|
|
) |
|
|
input_dict["points"] = points |
|
|
input_dict["pcd_rotation"] = rot_mat_T |
|
|
rot_mat_T_np = np.eye(4) |
|
|
rot_mat_T_np[:3, :3] = rot_mat_T.numpy() |
|
|
input_dict["lidar2img"] = input_dict["lidar2img"] @ rot_mat_T_np |
|
|
|
|
|
|
|
|
def _scale_bbox_points(self, input_dict): |
|
|
"""Private function to scale bounding boxes and points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after scaling, 'points'and keys in \ |
|
|
input_dict['bbox3d_fields'] are updated in the result dict. |
|
|
""" |
|
|
scale = input_dict["pcd_scale_factor"] |
|
|
points = input_dict["points"] |
|
|
points.scale(scale) |
|
|
if self.shift_height: |
|
|
assert "height" in points.attribute_dims.keys() |
|
|
points.tensor[:, points.attribute_dims["height"]] *= scale |
|
|
input_dict["points"] = points |
|
|
|
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
input_dict[key].scale(scale) |
|
|
|
|
|
scale_eye = np.eye(4) |
|
|
scale_eye[0, 0] *= scale |
|
|
scale_eye[1, 1] *= scale |
|
|
|
|
|
lidar2imgs = [] |
|
|
for idx in range(len(input_dict["intrinsic"])): |
|
|
intrinsic = input_dict["intrinsic"][idx] |
|
|
extrinsic = input_dict["extrinsic"][idx] |
|
|
lidar2img = (intrinsic * scale_eye) @ extrinsic |
|
|
lidar2imgs.append(lidar2img) |
|
|
input_dict["lidar2img"] = lidar2imgs |
|
|
|
|
|
def _random_scale(self, input_dict): |
|
|
"""Private function to randomly set the scale factor. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after scaling, 'pcd_scale_factor' are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
scale_factor = np.random.uniform( |
|
|
self.scale_ratio_range[0], self.scale_ratio_range[1] |
|
|
) |
|
|
input_dict["pcd_scale_factor"] = scale_factor |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Private function to rotate, scale and translate bounding boxes and \ |
|
|
points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after scaling, 'points', 'pcd_rotation', |
|
|
'pcd_scale_factor', 'pcd_trans' and keys in \ |
|
|
input_dict['bbox3d_fields'] are updated in the result dict. |
|
|
""" |
|
|
if "transformation_3d_flow" not in input_dict: |
|
|
input_dict["transformation_3d_flow"] = [] |
|
|
|
|
|
self._rot_bbox_points(input_dict) |
|
|
|
|
|
if "pcd_scale_factor" not in input_dict: |
|
|
self._random_scale(input_dict) |
|
|
self._scale_bbox_points(input_dict) |
|
|
|
|
|
self._trans_bbox_points(input_dict) |
|
|
|
|
|
input_dict["transformation_3d_flow"].extend(["R", "S", "T"]) |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(rot_range={self.rot_range}," |
|
|
repr_str += f" scale_ratio_range={self.scale_ratio_range}," |
|
|
repr_str += f" translation_std={self.translation_std}," |
|
|
repr_str += f" shift_height={self.shift_height})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class PointShuffle(object): |
|
|
"""Shuffle input points.""" |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to shuffle points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after filtering, 'points', 'pts_instance_mask' \ |
|
|
and 'pts_semantic_mask' keys are updated in the result dict. |
|
|
""" |
|
|
idx = input_dict["points"].shuffle() |
|
|
idx = idx.numpy() |
|
|
|
|
|
pts_instance_mask = input_dict.get("pts_instance_mask", None) |
|
|
pts_semantic_mask = input_dict.get("pts_semantic_mask", None) |
|
|
|
|
|
if pts_instance_mask is not None: |
|
|
input_dict["pts_instance_mask"] = pts_instance_mask[idx] |
|
|
|
|
|
if pts_semantic_mask is not None: |
|
|
input_dict["pts_semantic_mask"] = pts_semantic_mask[idx] |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
return self.__class__.__name__ |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class ObjectRangeFilter(object): |
|
|
"""Filter objects by the range. |
|
|
|
|
|
Args: |
|
|
point_cloud_range (list[float]): Point cloud range. |
|
|
""" |
|
|
|
|
|
def __init__(self, point_cloud_range): |
|
|
self.pcd_range = np.array(point_cloud_range, dtype=np.float32) |
|
|
self.bev_range = self.pcd_range[[0, 1, 3, 4]] |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to filter objects by the range. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \ |
|
|
keys are updated in the result dict. |
|
|
""" |
|
|
gt_bboxes_3d = input_dict["gt_bboxes_3d"] |
|
|
gt_labels_3d = input_dict["gt_labels_3d"] |
|
|
mask = gt_bboxes_3d.in_range_bev(self.bev_range) |
|
|
gt_bboxes_3d = gt_bboxes_3d[mask] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_labels_3d = gt_labels_3d[mask.numpy().astype(np.bool)] |
|
|
|
|
|
|
|
|
gt_bboxes_3d.limit_yaw(offset=0.5, period=2 * np.pi) |
|
|
input_dict["gt_bboxes_3d"] = gt_bboxes_3d |
|
|
input_dict["gt_labels_3d"] = gt_labels_3d |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(point_cloud_range={self.pcd_range.tolist()})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class PointsRangeFilter(object): |
|
|
"""Filter points by the range. |
|
|
|
|
|
Args: |
|
|
point_cloud_range (list[float]): Point cloud range. |
|
|
""" |
|
|
|
|
|
def __init__(self, point_cloud_range): |
|
|
self.pcd_range = np.array(point_cloud_range, dtype=np.float32) |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to filter points by the range. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after filtering, 'points', 'pts_instance_mask' \ |
|
|
and 'pts_semantic_mask' keys are updated in the result dict. |
|
|
""" |
|
|
points = input_dict["points"] |
|
|
points_mask = points.in_range_3d(self.pcd_range) |
|
|
clean_points = points[points_mask] |
|
|
input_dict["points"] = clean_points |
|
|
points_mask = points_mask.numpy() |
|
|
|
|
|
pts_instance_mask = input_dict.get("pts_instance_mask", None) |
|
|
pts_semantic_mask = input_dict.get("pts_semantic_mask", None) |
|
|
|
|
|
if pts_instance_mask is not None: |
|
|
input_dict["pts_instance_mask"] = pts_instance_mask[points_mask] |
|
|
|
|
|
if pts_semantic_mask is not None: |
|
|
input_dict["pts_semantic_mask"] = pts_semantic_mask[points_mask] |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(point_cloud_range={self.pcd_range.tolist()})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class ObjectNameFilter(object): |
|
|
"""Filter GT objects by their names. |
|
|
|
|
|
Args: |
|
|
classes (list[str]): List of class names to be kept for training. |
|
|
""" |
|
|
|
|
|
def __init__(self, classes): |
|
|
self.classes = classes |
|
|
self.labels = list(range(len(self.classes))) |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to filter objects by their names. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \ |
|
|
keys are updated in the result dict. |
|
|
""" |
|
|
gt_labels_3d = input_dict["gt_labels_3d"] |
|
|
gt_bboxes_mask = np.array( |
|
|
[n in self.labels for n in gt_labels_3d], dtype=np.bool_ |
|
|
) |
|
|
input_dict["gt_bboxes_3d"] = input_dict["gt_bboxes_3d"][gt_bboxes_mask] |
|
|
input_dict["gt_labels_3d"] = input_dict["gt_labels_3d"][gt_bboxes_mask] |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(classes={self.classes})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class IndoorPointSample(object): |
|
|
"""Indoor point sample. |
|
|
|
|
|
Sampling data to a certain number. |
|
|
|
|
|
Args: |
|
|
name (str): Name of the dataset. |
|
|
num_points (int): Number of points to be sampled. |
|
|
""" |
|
|
|
|
|
def __init__(self, num_points): |
|
|
self.num_points = num_points |
|
|
|
|
|
def points_random_sampling( |
|
|
self, points, num_samples, replace=None, return_choices=False |
|
|
): |
|
|
"""Points random sampling. |
|
|
|
|
|
Sample points to a certain number. |
|
|
|
|
|
Args: |
|
|
points (np.ndarray | :obj:`BasePoints`): 3D Points. |
|
|
num_samples (int): Number of samples to be sampled. |
|
|
replace (bool): Whether the sample is with or without replacement. |
|
|
Defaults to None. |
|
|
return_choices (bool): Whether return choice. Defaults to False. |
|
|
|
|
|
Returns: |
|
|
tuple[np.ndarray] | np.ndarray: |
|
|
|
|
|
- points (np.ndarray | :obj:`BasePoints`): 3D Points. |
|
|
- choices (np.ndarray, optional): The generated random samples. |
|
|
""" |
|
|
if replace is None: |
|
|
replace = points.shape[0] < num_samples |
|
|
choices = np.random.choice(points.shape[0], num_samples, replace=replace) |
|
|
if return_choices: |
|
|
return points[choices], choices |
|
|
else: |
|
|
return points[choices] |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to sample points to in indoor scenes. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after sampling, 'points', 'pts_instance_mask' \ |
|
|
and 'pts_semantic_mask' keys are updated in the result dict. |
|
|
""" |
|
|
points = results["points"] |
|
|
points, choices = self.points_random_sampling( |
|
|
points, self.num_points, return_choices=True |
|
|
) |
|
|
results["points"] = points |
|
|
|
|
|
pts_instance_mask = results.get("pts_instance_mask", None) |
|
|
pts_semantic_mask = results.get("pts_semantic_mask", None) |
|
|
|
|
|
if pts_instance_mask is not None: |
|
|
pts_instance_mask = pts_instance_mask[choices] |
|
|
results["pts_instance_mask"] = pts_instance_mask |
|
|
|
|
|
if pts_semantic_mask is not None: |
|
|
pts_semantic_mask = pts_semantic_mask[choices] |
|
|
results["pts_semantic_mask"] = pts_semantic_mask |
|
|
|
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(num_points={self.num_points})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class IndoorPatchPointSample(object): |
|
|
r"""Indoor point sample within a patch. Modified from `PointNet++ <https:// |
|
|
github.com/charlesq34/pointnet2/blob/master/scannet/scannet_dataset.py>`_. |
|
|
|
|
|
Sampling data to a certain number for semantic segmentation. |
|
|
|
|
|
Args: |
|
|
num_points (int): Number of points to be sampled. |
|
|
block_size (float, optional): Size of a block to sample points from. |
|
|
Defaults to 1.5. |
|
|
sample_rate (float, optional): Stride used in sliding patch generation. |
|
|
Defaults to 1.0. |
|
|
ignore_index (int, optional): Label index that won't be used for the |
|
|
segmentation task. This is set in PointSegClassMapping as neg_cls. |
|
|
Defaults to None. |
|
|
use_normalized_coord (bool, optional): Whether to use normalized xyz as |
|
|
additional features. Defaults to False. |
|
|
num_try (int, optional): Number of times to try if the patch selected |
|
|
is invalid. Defaults to 10. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
num_points, |
|
|
block_size=1.5, |
|
|
sample_rate=1.0, |
|
|
ignore_index=None, |
|
|
use_normalized_coord=False, |
|
|
num_try=10, |
|
|
): |
|
|
self.num_points = num_points |
|
|
self.block_size = block_size |
|
|
self.sample_rate = sample_rate |
|
|
self.ignore_index = ignore_index |
|
|
self.use_normalized_coord = use_normalized_coord |
|
|
self.num_try = num_try |
|
|
|
|
|
def _input_generation( |
|
|
self, coords, patch_center, coord_max, attributes, attribute_dims, point_type |
|
|
): |
|
|
"""Generating model input. |
|
|
|
|
|
Generate input by subtracting patch center and adding additional \ |
|
|
features. Currently support colors and normalized xyz as features. |
|
|
|
|
|
Args: |
|
|
coords (np.ndarray): Sampled 3D Points. |
|
|
patch_center (np.ndarray): Center coordinate of the selected patch. |
|
|
coord_max (np.ndarray): Max coordinate of all 3D Points. |
|
|
attributes (np.ndarray): features of input points. |
|
|
attribute_dims (dict): Dictionary to indicate the meaning of extra |
|
|
dimension. |
|
|
point_type (type): class of input points inherited from BasePoints. |
|
|
|
|
|
Returns: |
|
|
:obj:`BasePoints`: The generated input data. |
|
|
""" |
|
|
|
|
|
centered_coords = coords.copy() |
|
|
centered_coords[:, 0] -= patch_center[0] |
|
|
centered_coords[:, 1] -= patch_center[1] |
|
|
|
|
|
if self.use_normalized_coord: |
|
|
normalized_coord = coords / coord_max |
|
|
attributes = np.concatenate([attributes, normalized_coord], axis=1) |
|
|
if attribute_dims is None: |
|
|
attribute_dims = dict() |
|
|
attribute_dims.update( |
|
|
dict( |
|
|
normalized_coord=[ |
|
|
attributes.shape[1], |
|
|
attributes.shape[1] + 1, |
|
|
attributes.shape[1] + 2, |
|
|
] |
|
|
) |
|
|
) |
|
|
|
|
|
points = np.concatenate([centered_coords, attributes], axis=1) |
|
|
points = point_type( |
|
|
points, points_dim=points.shape[1], attribute_dims=attribute_dims |
|
|
) |
|
|
|
|
|
return points |
|
|
|
|
|
def _patch_points_sampling(self, points, sem_mask, replace=None): |
|
|
"""Patch points sampling. |
|
|
|
|
|
First sample a valid patch. |
|
|
Then sample points within that patch to a certain number. |
|
|
|
|
|
Args: |
|
|
points (:obj:`BasePoints`): 3D Points. |
|
|
sem_mask (np.ndarray): semantic segmentation mask for input points. |
|
|
replace (bool): Whether the sample is with or without replacement. |
|
|
Defaults to None. |
|
|
|
|
|
Returns: |
|
|
tuple[:obj:`BasePoints`, np.ndarray] | :obj:`BasePoints`: |
|
|
|
|
|
- points (:obj:`BasePoints`): 3D Points. |
|
|
- choices (np.ndarray): The generated random samples. |
|
|
""" |
|
|
coords = points.coord.numpy() |
|
|
attributes = points.tensor[:, 3:].numpy() |
|
|
attribute_dims = points.attribute_dims |
|
|
point_type = type(points) |
|
|
|
|
|
coord_max = np.amax(coords, axis=0) |
|
|
coord_min = np.amin(coords, axis=0) |
|
|
|
|
|
for i in range(self.num_try): |
|
|
|
|
|
cur_center = coords[np.random.choice(coords.shape[0])] |
|
|
|
|
|
|
|
|
cur_max = cur_center + np.array( |
|
|
[self.block_size / 2.0, self.block_size / 2.0, 0.0] |
|
|
) |
|
|
cur_min = cur_center - np.array( |
|
|
[self.block_size / 2.0, self.block_size / 2.0, 0.0] |
|
|
) |
|
|
cur_max[2] = coord_max[2] |
|
|
cur_min[2] = coord_min[2] |
|
|
cur_choice = ( |
|
|
np.sum( |
|
|
(coords >= (cur_min - 0.2)) * (coords <= (cur_max + 0.2)), axis=1 |
|
|
) |
|
|
== 3 |
|
|
) |
|
|
|
|
|
if not cur_choice.any(): |
|
|
continue |
|
|
|
|
|
cur_coords = coords[cur_choice, :] |
|
|
cur_sem_mask = sem_mask[cur_choice] |
|
|
|
|
|
|
|
|
|
|
|
mask = ( |
|
|
np.sum( |
|
|
(cur_coords >= (cur_min - 0.01)) * (cur_coords <= (cur_max + 0.01)), |
|
|
axis=1, |
|
|
) |
|
|
== 3 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
vidx = np.ceil( |
|
|
(cur_coords[mask, :] - cur_min) |
|
|
/ (cur_max - cur_min) |
|
|
* np.array([31.0, 31.0, 62.0]) |
|
|
) |
|
|
vidx = np.unique(vidx[:, 0] * 31.0 * 62.0 + vidx[:, 1] * 62.0 + vidx[:, 2]) |
|
|
flag1 = len(vidx) / 31.0 / 31.0 / 62.0 >= 0.02 |
|
|
|
|
|
|
|
|
if self.ignore_index is None: |
|
|
flag2 = True |
|
|
else: |
|
|
flag2 = ( |
|
|
np.sum(cur_sem_mask != self.ignore_index) / len(cur_sem_mask) >= 0.7 |
|
|
) |
|
|
|
|
|
if flag1 and flag2: |
|
|
break |
|
|
|
|
|
|
|
|
if replace is None: |
|
|
replace = cur_sem_mask.shape[0] < self.num_points |
|
|
choices = np.random.choice( |
|
|
np.where(cur_choice)[0], self.num_points, replace=replace |
|
|
) |
|
|
|
|
|
|
|
|
points = self._input_generation( |
|
|
coords[choices], |
|
|
cur_center, |
|
|
coord_max, |
|
|
attributes[choices], |
|
|
attribute_dims, |
|
|
point_type, |
|
|
) |
|
|
|
|
|
return points, choices |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to sample points to in indoor scenes. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after sampling, 'points', 'pts_instance_mask' \ |
|
|
and 'pts_semantic_mask' keys are updated in the result dict. |
|
|
""" |
|
|
points = results["points"] |
|
|
|
|
|
assert ( |
|
|
"pts_semantic_mask" in results.keys() |
|
|
), "semantic mask should be provided in training and evaluation" |
|
|
pts_semantic_mask = results["pts_semantic_mask"] |
|
|
|
|
|
points, choices = self._patch_points_sampling(points, pts_semantic_mask) |
|
|
|
|
|
results["points"] = points |
|
|
results["pts_semantic_mask"] = pts_semantic_mask[choices] |
|
|
pts_instance_mask = results.get("pts_instance_mask", None) |
|
|
if pts_instance_mask is not None: |
|
|
results["pts_instance_mask"] = pts_instance_mask[choices] |
|
|
|
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(num_points={self.num_points}," |
|
|
repr_str += f" block_size={self.block_size}," |
|
|
repr_str += f" sample_rate={self.sample_rate}," |
|
|
repr_str += f" ignore_index={self.ignore_index}," |
|
|
repr_str += f" use_normalized_coord={self.use_normalized_coord}," |
|
|
repr_str += f" num_try={self.num_try})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class BackgroundPointsFilter(object): |
|
|
"""Filter background points near the bounding box. |
|
|
|
|
|
Args: |
|
|
bbox_enlarge_range (tuple[float], float): Bbox enlarge range. |
|
|
""" |
|
|
|
|
|
def __init__(self, bbox_enlarge_range): |
|
|
assert ( |
|
|
is_tuple_of(bbox_enlarge_range, float) and len(bbox_enlarge_range) == 3 |
|
|
) or isinstance( |
|
|
bbox_enlarge_range, float |
|
|
), f"Invalid arguments bbox_enlarge_range {bbox_enlarge_range}" |
|
|
|
|
|
if isinstance(bbox_enlarge_range, float): |
|
|
bbox_enlarge_range = [bbox_enlarge_range] * 3 |
|
|
self.bbox_enlarge_range = np.array(bbox_enlarge_range, dtype=np.float32)[ |
|
|
np.newaxis, : |
|
|
] |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to filter points by the range. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after filtering, 'points', 'pts_instance_mask' \ |
|
|
and 'pts_semantic_mask' keys are updated in the result dict. |
|
|
""" |
|
|
points = input_dict["points"] |
|
|
gt_bboxes_3d = input_dict["gt_bboxes_3d"] |
|
|
|
|
|
gt_bboxes_3d_np = gt_bboxes_3d.tensor.numpy() |
|
|
gt_bboxes_3d_np[:, :3] = gt_bboxes_3d.gravity_center.numpy() |
|
|
enlarged_gt_bboxes_3d = gt_bboxes_3d_np.copy() |
|
|
enlarged_gt_bboxes_3d[:, 3:6] += self.bbox_enlarge_range |
|
|
points_numpy = points.tensor.numpy() |
|
|
foreground_masks = box_np_ops.points_in_rbbox(points_numpy, gt_bboxes_3d_np) |
|
|
enlarge_foreground_masks = box_np_ops.points_in_rbbox( |
|
|
points_numpy, enlarged_gt_bboxes_3d |
|
|
) |
|
|
foreground_masks = foreground_masks.max(1) |
|
|
enlarge_foreground_masks = enlarge_foreground_masks.max(1) |
|
|
valid_masks = ~np.logical_and(~foreground_masks, enlarge_foreground_masks) |
|
|
|
|
|
input_dict["points"] = points[valid_masks] |
|
|
pts_instance_mask = input_dict.get("pts_instance_mask", None) |
|
|
if pts_instance_mask is not None: |
|
|
input_dict["pts_instance_mask"] = pts_instance_mask[valid_masks] |
|
|
|
|
|
pts_semantic_mask = input_dict.get("pts_semantic_mask", None) |
|
|
if pts_semantic_mask is not None: |
|
|
input_dict["pts_semantic_mask"] = pts_semantic_mask[valid_masks] |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(bbox_enlarge_range={self.bbox_enlarge_range.tolist()})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class VoxelBasedPointSampler(object): |
|
|
"""Voxel based point sampler. |
|
|
|
|
|
Apply voxel sampling to multiple sweep points. |
|
|
|
|
|
Args: |
|
|
cur_sweep_cfg (dict): Config for sampling current points. |
|
|
prev_sweep_cfg (dict): Config for sampling previous points. |
|
|
time_dim (int): Index that indicate the time dimention |
|
|
for input points. |
|
|
""" |
|
|
|
|
|
def __init__(self, cur_sweep_cfg, prev_sweep_cfg=None, time_dim=3): |
|
|
self.cur_voxel_generator = VoxelGenerator(**cur_sweep_cfg) |
|
|
self.cur_voxel_num = self.cur_voxel_generator._max_voxels |
|
|
self.time_dim = time_dim |
|
|
if prev_sweep_cfg is not None: |
|
|
assert prev_sweep_cfg["max_num_points"] == cur_sweep_cfg["max_num_points"] |
|
|
self.prev_voxel_generator = VoxelGenerator(**prev_sweep_cfg) |
|
|
self.prev_voxel_num = self.prev_voxel_generator._max_voxels |
|
|
else: |
|
|
self.prev_voxel_generator = None |
|
|
self.prev_voxel_num = 0 |
|
|
|
|
|
def _sample_points(self, points, sampler, point_dim): |
|
|
"""Sample points for each points subset. |
|
|
|
|
|
Args: |
|
|
points (np.ndarray): Points subset to be sampled. |
|
|
sampler (VoxelGenerator): Voxel based sampler for |
|
|
each points subset. |
|
|
point_dim (int): The dimention of each points |
|
|
|
|
|
Returns: |
|
|
np.ndarray: Sampled points. |
|
|
""" |
|
|
voxels, coors, num_points_per_voxel = sampler.generate(points) |
|
|
if voxels.shape[0] < sampler._max_voxels: |
|
|
padding_points = np.zeros( |
|
|
[ |
|
|
sampler._max_voxels - voxels.shape[0], |
|
|
sampler._max_num_points, |
|
|
point_dim, |
|
|
], |
|
|
dtype=points.dtype, |
|
|
) |
|
|
padding_points[:] = voxels[0] |
|
|
sample_points = np.concatenate([voxels, padding_points], axis=0) |
|
|
else: |
|
|
sample_points = voxels |
|
|
|
|
|
return sample_points |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to sample points from multiple sweeps. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after sampling, 'points', 'pts_instance_mask' \ |
|
|
and 'pts_semantic_mask' keys are updated in the result dict. |
|
|
""" |
|
|
points = results["points"] |
|
|
original_dim = points.shape[1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
map_fields2dim = [] |
|
|
start_dim = original_dim |
|
|
points_numpy = points.tensor.numpy() |
|
|
extra_channel = [points_numpy] |
|
|
for idx, key in enumerate(results["pts_mask_fields"]): |
|
|
map_fields2dim.append((key, idx + start_dim)) |
|
|
extra_channel.append(results[key][..., None]) |
|
|
|
|
|
start_dim += len(results["pts_mask_fields"]) |
|
|
for idx, key in enumerate(results["pts_seg_fields"]): |
|
|
map_fields2dim.append((key, idx + start_dim)) |
|
|
extra_channel.append(results[key][..., None]) |
|
|
|
|
|
points_numpy = np.concatenate(extra_channel, axis=-1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cur_points_flag = points_numpy[:, self.time_dim] == 0 |
|
|
cur_sweep_points = points_numpy[cur_points_flag] |
|
|
prev_sweeps_points = points_numpy[~cur_points_flag] |
|
|
if prev_sweeps_points.shape[0] == 0: |
|
|
prev_sweeps_points = cur_sweep_points |
|
|
|
|
|
|
|
|
np.random.shuffle(cur_sweep_points) |
|
|
np.random.shuffle(prev_sweeps_points) |
|
|
|
|
|
cur_sweep_points = self._sample_points( |
|
|
cur_sweep_points, self.cur_voxel_generator, points_numpy.shape[1] |
|
|
) |
|
|
if self.prev_voxel_generator is not None: |
|
|
prev_sweeps_points = self._sample_points( |
|
|
prev_sweeps_points, self.prev_voxel_generator, points_numpy.shape[1] |
|
|
) |
|
|
|
|
|
points_numpy = np.concatenate([cur_sweep_points, prev_sweeps_points], 0) |
|
|
else: |
|
|
points_numpy = cur_sweep_points |
|
|
|
|
|
if self.cur_voxel_generator._max_num_points == 1: |
|
|
points_numpy = points_numpy.squeeze(1) |
|
|
results["points"] = points.new_point(points_numpy[..., :original_dim]) |
|
|
|
|
|
|
|
|
for key, dim_index in map_fields2dim: |
|
|
results[key] = points_numpy[..., dim_index] |
|
|
|
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
|
|
|
def _auto_indent(repr_str, indent): |
|
|
repr_str = repr_str.split("\n") |
|
|
repr_str = [" " * indent + t + "\n" for t in repr_str] |
|
|
repr_str = "".join(repr_str)[:-1] |
|
|
return repr_str |
|
|
|
|
|
repr_str = self.__class__.__name__ |
|
|
indent = 4 |
|
|
repr_str += "(\n" |
|
|
repr_str += " " * indent + f"num_cur_sweep={self.cur_voxel_num},\n" |
|
|
repr_str += " " * indent + f"num_prev_sweep={self.prev_voxel_num},\n" |
|
|
repr_str += " " * indent + f"time_dim={self.time_dim},\n" |
|
|
repr_str += " " * indent + "cur_voxel_generator=\n" |
|
|
repr_str += f"{_auto_indent(repr(self.cur_voxel_generator), 8)},\n" |
|
|
repr_str += " " * indent + "prev_voxel_generator=\n" |
|
|
repr_str += f"{_auto_indent(repr(self.prev_voxel_generator), 8)})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class Pad3D(object): |
|
|
"""Pad the image & mask. |
|
|
There are two padding modes: (1) pad to a fixed size and (2) pad to the |
|
|
minimum size that is divisible by some number. |
|
|
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor", |
|
|
Args: |
|
|
size (tuple, optional): Fixed padding size. |
|
|
size_divisor (int, optional): The divisor of padded size. |
|
|
pad_val (float, optional): Padding value, 0 by default. |
|
|
""" |
|
|
|
|
|
def __init__(self, size=None, size_divisor=None, pad_val=0): |
|
|
self.size = size |
|
|
self.size_divisor = size_divisor |
|
|
self.pad_val = pad_val |
|
|
|
|
|
assert size is not None or size_divisor is not None |
|
|
assert size is None or size_divisor is None |
|
|
|
|
|
def _pad_img(self, results): |
|
|
"""Pad images according to ``self.size``.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.size is not None: |
|
|
padded_img = mmcv.impad( |
|
|
results["img"], shape=self.size, pad_val=self.pad_val |
|
|
) |
|
|
elif self.size_divisor is not None: |
|
|
padded_img = [ |
|
|
mmcv.impad_to_multiple(img, self.size_divisor, pad_val=self.pad_val) |
|
|
for img in results["img"] |
|
|
] |
|
|
results["img"] = padded_img |
|
|
results["img_shape"] = [img.shape for img in padded_img] |
|
|
results["img_fixed_size"] = self.size |
|
|
results["img_size_divisor"] = self.size_divisor |
|
|
|
|
|
def _pad_masks(self, results): |
|
|
"""Pad masks according to ``results['pad_shape']``.""" |
|
|
pad_shape = results["pad_shape"][:2] |
|
|
for key in results.get("mask_fields", []): |
|
|
results[key] = results[key].pad(pad_shape, pad_val=self.pad_val) |
|
|
|
|
|
def _pad_seg(self, results): |
|
|
"""Pad semantic segmentation map according to |
|
|
``results['pad_shape']``.""" |
|
|
for key in results.get("seg_fields", []): |
|
|
results[key] = mmcv.impad(results[key], shape=results["pad_shape"][:2]) |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to pad images, masks, semantic segmentation maps. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Updated result dict. |
|
|
""" |
|
|
self._pad_img(results) |
|
|
self._pad_masks(results) |
|
|
self._pad_seg(results) |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(size={self.size}, " |
|
|
repr_str += f"size_divisor={self.size_divisor}, " |
|
|
repr_str += f"pad_val={self.pad_val})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class Normalize3D(object): |
|
|
"""Normalize the image. |
|
|
Added key is "img_norm_cfg". |
|
|
Args: |
|
|
mean (sequence): Mean values of 3 channels. |
|
|
std (sequence): Std values of 3 channels. |
|
|
to_rgb (bool): Whether to convert the image from BGR to RGB, |
|
|
default is true. |
|
|
""" |
|
|
|
|
|
def __init__(self, mean, std, to_rgb=True): |
|
|
self.mean = np.array(mean, dtype=np.float32) |
|
|
self.std = np.array(std, dtype=np.float32) |
|
|
self.to_rgb = to_rgb |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to normalize images. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Normalized results, 'img_norm_cfg' key is added into |
|
|
result dict. |
|
|
""" |
|
|
|
|
|
results["img"] = [ |
|
|
mmcv.imnormalize(img, self.mean, self.std, self.to_rgb) |
|
|
for img in results["img"] |
|
|
] |
|
|
results["img_norm_cfg"] = dict(mean=self.mean, std=self.std, to_rgb=self.to_rgb) |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class RandomLRFlip(object): |
|
|
"""LRFlip the images.""" |
|
|
|
|
|
def __init__(self, **kwargs): |
|
|
pass |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
for key in input_dict.get("img_fields", ["img"]): |
|
|
imgs = input_dict[key] |
|
|
flip = [] |
|
|
for idx in range(len(imgs)): |
|
|
if np.random.uniform() > 0.5: |
|
|
imgs[idx] = np.flip(imgs[idx], 1) |
|
|
flip.append(1.0) |
|
|
else: |
|
|
flip.append(0.0) |
|
|
input_dict[key] = imgs |
|
|
input_dict[key + "_" + "flip"] = np.asarray(flip).astype("float32") |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class RandomFlip3DCam(RandomFlip): |
|
|
"""Flip the points & bbox. |
|
|
|
|
|
If the input dict contains the key "flip", then the flag will be used, |
|
|
otherwise it will be randomly decided by a ratio specified in the init |
|
|
method. |
|
|
|
|
|
Args: |
|
|
sync_2d (bool, optional): Whether to apply flip according to the 2D |
|
|
images. If True, it will apply the same flip as that to 2D images. |
|
|
If False, it will decide whether to flip randomly and independently |
|
|
to that of 2D images. Defaults to True. |
|
|
flip_ratio_bev_horizontal (float, optional): The flipping probability |
|
|
in horizontal direction. Defaults to 0.0. |
|
|
flip_ratio_bev_vertical (float, optional): The flipping probability |
|
|
in vertical direction. Defaults to 0.0. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
sync_2d=True, |
|
|
flip_ratio_bev_horizontal=0.0, |
|
|
flip_ratio_bev_vertical=0.0, |
|
|
**kwargs, |
|
|
): |
|
|
super(RandomFlip3DCam, self).__init__( |
|
|
flip_ratio=flip_ratio_bev_horizontal, **kwargs |
|
|
) |
|
|
self.sync_2d = sync_2d |
|
|
self.flip_ratio_bev_vertical = flip_ratio_bev_vertical |
|
|
if flip_ratio_bev_horizontal is not None: |
|
|
assert ( |
|
|
isinstance(flip_ratio_bev_horizontal, (int, float)) |
|
|
and 0 <= flip_ratio_bev_horizontal <= 1 |
|
|
) |
|
|
if flip_ratio_bev_vertical is not None: |
|
|
assert ( |
|
|
isinstance(flip_ratio_bev_vertical, (int, float)) |
|
|
and 0 <= flip_ratio_bev_vertical <= 1 |
|
|
) |
|
|
|
|
|
def random_flip_data_3d(self, input_dict, direction="horizontal"): |
|
|
"""Flip 3D data randomly. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
direction (str): Flip direction. Default: horizontal. |
|
|
|
|
|
Returns: |
|
|
dict: Flipped results, 'points', 'bbox3d_fields' keys are \ |
|
|
updated in the result dict. |
|
|
""" |
|
|
assert direction in ["horizontal", "vertical"] |
|
|
if len(input_dict["bbox3d_fields"]) == 0: |
|
|
input_dict["bbox3d_fields"].append("empty_box3d") |
|
|
input_dict["empty_box3d"] = input_dict["box_type_3d"]( |
|
|
np.array([], dtype=np.float32) |
|
|
) |
|
|
assert len(input_dict["bbox3d_fields"]) == 1 |
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
if "points" in input_dict: |
|
|
input_dict["points"] = input_dict[key].flip( |
|
|
direction, points=input_dict["points"] |
|
|
) |
|
|
else: |
|
|
input_dict[key].flip(direction) |
|
|
if "centers2d" in input_dict: |
|
|
assert ( |
|
|
self.sync_2d is True and direction == "horizontal" |
|
|
), "Only support sync_2d=True and horizontal flip with images" |
|
|
w = input_dict["img_shape"][1] |
|
|
input_dict["centers2d"][..., 0] = w - input_dict["centers2d"][..., 0] |
|
|
|
|
|
def flip_extrinsic(self, input_dict, direction="horizontal"): |
|
|
lidar2img = input_dict["lidar2img"] |
|
|
flip_mat = np.eye(4) |
|
|
if direction == "vertical": |
|
|
flip_mat[0, 0] = -1 |
|
|
elif direction == "horizontal": |
|
|
flip_mat[1, 1] = -1 |
|
|
lidar2img = lidar2img @ flip_mat |
|
|
input_dict["lidar2img"] = lidar2img |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to flip points, values in the ``bbox3d_fields`` and \ |
|
|
also flip 2D image and its annotations. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Flipped results, 'flip', 'flip_direction', \ |
|
|
'pcd_horizontal_flip' and 'pcd_vertical_flip' keys are added \ |
|
|
into result dict. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
if self.sync_2d: |
|
|
input_dict["pcd_horizontal_flip"] = input_dict["flip"] |
|
|
input_dict["pcd_vertical_flip"] = False |
|
|
else: |
|
|
if "pcd_horizontal_flip" not in input_dict: |
|
|
flip_horizontal = True if np.random.rand() < self.flip_ratio else False |
|
|
input_dict["pcd_horizontal_flip"] = flip_horizontal |
|
|
if "pcd_vertical_flip" not in input_dict: |
|
|
flip_vertical = ( |
|
|
True if np.random.rand() < self.flip_ratio_bev_vertical else False |
|
|
) |
|
|
input_dict["pcd_vertical_flip"] = flip_vertical |
|
|
|
|
|
if "transformation_3d_flow" not in input_dict: |
|
|
input_dict["transformation_3d_flow"] = [] |
|
|
|
|
|
if input_dict["pcd_horizontal_flip"]: |
|
|
self.random_flip_data_3d(input_dict, "horizontal") |
|
|
input_dict["transformation_3d_flow"].extend(["HF"]) |
|
|
self.flip_extrinsic(input_dict, "horizontal") |
|
|
if input_dict["pcd_vertical_flip"]: |
|
|
self.random_flip_data_3d(input_dict, "vertical") |
|
|
input_dict["transformation_3d_flow"].extend(["VF"]) |
|
|
self.flip_extrinsic(input_dict, "vertical") |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(sync_2d={self.sync_2d}," |
|
|
repr_str += f" flip_ratio_bev_vertical={self.flip_ratio_bev_vertical})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class GlobalRotScaleTransCam(object): |
|
|
"""Apply global rotation, scaling and translation to a 3D scene. |
|
|
|
|
|
Args: |
|
|
rot_range (list[float]): Range of rotation angle. |
|
|
Defaults to [-0.78539816, 0.78539816] (close to [-pi/4, pi/4]). |
|
|
scale_ratio_range (list[float]): Range of scale ratio. |
|
|
Defaults to [0.95, 1.05]. |
|
|
translation_std (list[float]): The standard deviation of ranslation |
|
|
noise. This apply random translation to a scene by a noise, which |
|
|
is sampled from a gaussian distribution whose standard deviation |
|
|
is set by ``translation_std``. Defaults to [0, 0, 0] |
|
|
shift_height (bool): Whether to shift height. |
|
|
(the fourth dimension of indoor points) when scaling. |
|
|
Defaults to False. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
rot_range=[-0.78539816, 0.78539816], |
|
|
scale_ratio_range=[1.0, 1.0], |
|
|
translation_std=[0, 0, 0], |
|
|
shift_height=False, |
|
|
): |
|
|
self.rot_range = rot_range |
|
|
self.scale_ratio_range = scale_ratio_range |
|
|
self.translation_std = translation_std |
|
|
self.shift_height = shift_height |
|
|
|
|
|
def _trans_bbox_points(self, input_dict): |
|
|
"""Private function to translate bounding boxes and points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after translation, 'points', 'pcd_trans' \ |
|
|
and keys in input_dict['bbox3d_fields'] are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
if not isinstance(self.translation_std, (list, tuple, np.ndarray)): |
|
|
translation_std = [ |
|
|
self.translation_std, |
|
|
self.translation_std, |
|
|
self.translation_std, |
|
|
] |
|
|
else: |
|
|
translation_std = self.translation_std |
|
|
translation_std = np.array(translation_std, dtype=np.float32) |
|
|
trans_factor = np.random.normal(scale=translation_std, size=3).T |
|
|
|
|
|
input_dict["points"].translate(trans_factor) |
|
|
input_dict["pcd_trans"] = trans_factor |
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
input_dict[key].translate(trans_factor) |
|
|
|
|
|
def _rot_bbox_points(self, input_dict): |
|
|
"""Private function to rotate bounding boxes and points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after rotation, 'points', 'pcd_rotation' \ |
|
|
and keys in input_dict['bbox3d_fields'] are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
rotation = self.rot_range |
|
|
if not isinstance(rotation, list): |
|
|
rotation = [-rotation, rotation] |
|
|
noise_rotation = np.random.uniform(rotation[0], rotation[1]) |
|
|
|
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
if len(input_dict[key].tensor) != 0: |
|
|
points, rot_mat_T = input_dict[key].rotate( |
|
|
noise_rotation, input_dict["points"] |
|
|
) |
|
|
input_dict["points"] = points |
|
|
input_dict["pcd_rotation"] = rot_mat_T |
|
|
|
|
|
|
|
|
def _scale_bbox_points(self, input_dict): |
|
|
"""Private function to scale bounding boxes and points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after scaling, 'points'and keys in \ |
|
|
input_dict['bbox3d_fields'] are updated in the result dict. |
|
|
""" |
|
|
scale = input_dict["pcd_scale_factor"] |
|
|
points = input_dict["points"] |
|
|
points.scale(scale) |
|
|
if self.shift_height: |
|
|
assert "height" in points.attribute_dims.keys() |
|
|
points.tensor[:, points.attribute_dims["height"]] *= scale |
|
|
input_dict["points"] = points |
|
|
|
|
|
for key in input_dict["bbox3d_fields"]: |
|
|
input_dict[key].scale(scale) |
|
|
|
|
|
def _random_scale(self, input_dict): |
|
|
"""Private function to randomly set the scale factor. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after scaling, 'pcd_scale_factor' are updated \ |
|
|
in the result dict. |
|
|
""" |
|
|
scale_factor = np.random.uniform( |
|
|
self.scale_ratio_range[0], self.scale_ratio_range[1] |
|
|
) |
|
|
input_dict["pcd_scale_factor"] = scale_factor |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Private function to rotate, scale and translate bounding boxes and \ |
|
|
points. |
|
|
|
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
|
|
|
Returns: |
|
|
dict: Results after scaling, 'points', 'pcd_rotation', |
|
|
'pcd_scale_factor', 'pcd_trans' and keys in \ |
|
|
input_dict['bbox3d_fields'] are updated in the result dict. |
|
|
""" |
|
|
if "transformation_3d_flow" not in input_dict: |
|
|
input_dict["transformation_3d_flow"] = [] |
|
|
|
|
|
self._rot_bbox_points(input_dict) |
|
|
|
|
|
if "pcd_scale_factor" not in input_dict: |
|
|
self._random_scale(input_dict) |
|
|
self._scale_bbox_points(input_dict) |
|
|
|
|
|
self._trans_bbox_points(input_dict) |
|
|
|
|
|
input_dict["transformation_3d_flow"].extend(["R", "S", "T"]) |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(rot_range={self.rot_range}," |
|
|
repr_str += f" scale_ratio_range={self.scale_ratio_range}," |
|
|
repr_str += f" translation_std={self.translation_std}," |
|
|
repr_str += f" shift_height={self.shift_height})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class Clip3D(object): |
|
|
"""Clip the image |
|
|
There are two padding modes: (1) pad to a fixed size and (2) pad to the |
|
|
minimum size that is divisible by some number. |
|
|
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor", |
|
|
Args: |
|
|
size (tuple, optional): Fixed padding size. |
|
|
size_divisor (int, optional): The divisor of padded size. |
|
|
pad_val (float, optional): Padding value, 0 by default. |
|
|
""" |
|
|
|
|
|
def __init__(self, size=None): |
|
|
self.size = size |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to pad images, masks, semantic segmentation maps. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Updated result dict. |
|
|
""" |
|
|
for key in results.get("img_fields", ["img"]): |
|
|
result_img = [ |
|
|
img[: self.size[0], : self.size[1], ...] for img in results[key] |
|
|
] |
|
|
results[key] = result_img |
|
|
results["img_shape"] = [img.shape for img in result_img] |
|
|
results["img_fixed_size"] = self.size |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(size={self.size}, " |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class RandomScaleImage3D(object): |
|
|
"""Random scale the image |
|
|
There are two padding modes: (1) pad to a fixed size and (2) pad to the |
|
|
minimum size that is divisible by some number. |
|
|
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor", |
|
|
Args: |
|
|
size (tuple, optional): Fixed padding size. |
|
|
size_divisor (int, optional): The divisor of padded size. |
|
|
pad_val (float, optional): Padding value, 0 by default. |
|
|
""" |
|
|
|
|
|
def __init__(self, scales=(0.85, 1.15)): |
|
|
self.scales = scales |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to pad images, masks, semantic segmentation maps. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Updated result dict. |
|
|
""" |
|
|
rand_scale = np.random.uniform(low=self.scales[0], high=self.scales[1]) |
|
|
img_shape = results["img_shape"][0] |
|
|
y_size = int((img_shape[0] * rand_scale) // 32) * 32 |
|
|
x_size = int((img_shape[1] * rand_scale) // 32) * 32 |
|
|
y_scale = y_size * 1.0 / img_shape[0] |
|
|
x_scale = x_size * 1.0 / img_shape[1] |
|
|
scale_factor = np.eye(4) |
|
|
scale_factor[0, 0] *= x_scale |
|
|
scale_factor[1, 1] *= y_scale |
|
|
for key in results.get("img_fields", ["img"]): |
|
|
result_img = [ |
|
|
mmcv.imresize(img, (x_size, y_size), return_scale=False) |
|
|
for img in results[key] |
|
|
] |
|
|
results[key] = result_img |
|
|
lidar2img = [scale_factor @ l2i for l2i in results["lidar2img"]] |
|
|
results["lidar2img"] = lidar2img |
|
|
|
|
|
results["img_shape"] = [img.shape for img in result_img] |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(size={self.size}, " |
|
|
return repr_str |
|
|
|