unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
import mmcv
import numpy as np
from mmcv import is_tuple_of
from mmcv.utils import build_from_cfg
from mmdet3d.core import VoxelGenerator
from mmdet3d.core.bbox import box_np_ops
from mmdet3d.datasets.builder import OBJECTSAMPLERS
from mmdet3d.datasets.pipelines.data_augment_utils import noise_per_object_v3_
from mmdet.datasets.builder import PIPELINES
from mmdet.datasets.pipelines import RandomFlip
@PIPELINES.register_module()
class RandomFlip3D(RandomFlip):
"""Flip the points & bbox.
If the input dict contains the key "flip", then the flag will be used,
otherwise it will be randomly decided by a ratio specified in the init
method.
Args:
sync_2d (bool, optional): Whether to apply flip according to the 2D
images. If True, it will apply the same flip as that to 2D images.
If False, it will decide whether to flip randomly and independently
to that of 2D images. Defaults to True.
flip_ratio_bev_horizontal (float, optional): The flipping probability
in horizontal direction. Defaults to 0.0.
flip_ratio_bev_vertical (float, optional): The flipping probability
in vertical direction. Defaults to 0.0.
"""
def __init__(
self,
sync_2d=True,
flip_ratio_bev_horizontal=0.0,
flip_ratio_bev_vertical=0.0,
**kwargs,
):
super(RandomFlip3D, self).__init__(
flip_ratio=flip_ratio_bev_horizontal, **kwargs
)
self.sync_2d = sync_2d
self.flip_ratio_bev_vertical = flip_ratio_bev_vertical
if flip_ratio_bev_horizontal is not None:
assert (
isinstance(flip_ratio_bev_horizontal, (int, float))
and 0 <= flip_ratio_bev_horizontal <= 1
)
if flip_ratio_bev_vertical is not None:
assert (
isinstance(flip_ratio_bev_vertical, (int, float))
and 0 <= flip_ratio_bev_vertical <= 1
)
def random_flip_data_3d(self, input_dict, direction="horizontal"):
"""Flip 3D data randomly.
Args:
input_dict (dict): Result dict from loading pipeline.
direction (str): Flip direction. Default: horizontal.
Returns:
dict: Flipped results, 'points', 'bbox3d_fields' keys are \
updated in the result dict.
"""
assert direction in ["horizontal", "vertical"]
if len(input_dict["bbox3d_fields"]) == 0: # test mode
input_dict["bbox3d_fields"].append("empty_box3d")
input_dict["empty_box3d"] = input_dict["box_type_3d"](
np.array([], dtype=np.float32)
)
assert len(input_dict["bbox3d_fields"]) == 1
for key in input_dict["bbox3d_fields"]:
if "points" in input_dict:
input_dict["points"] = input_dict[key].flip(
direction, points=input_dict["points"]
)
else:
input_dict[key].flip(direction)
if "centers2d" in input_dict:
assert (
self.sync_2d is True and direction == "horizontal"
), "Only support sync_2d=True and horizontal flip with images"
w = input_dict["img_shape"][1]
input_dict["centers2d"][..., 0] = w - input_dict["centers2d"][..., 0]
def __call__(self, input_dict):
"""Call function to flip points, values in the ``bbox3d_fields`` and \
also flip 2D image and its annotations.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Flipped results, 'flip', 'flip_direction', \
'pcd_horizontal_flip' and 'pcd_vertical_flip' keys are added \
into result dict.
"""
# filp 2D image and its annotations
super(RandomFlip3D, self).__call__(input_dict)
if self.sync_2d:
input_dict["pcd_horizontal_flip"] = input_dict["flip"]
input_dict["pcd_vertical_flip"] = False
else:
if "pcd_horizontal_flip" not in input_dict:
flip_horizontal = True if np.random.rand() < self.flip_ratio else False
input_dict["pcd_horizontal_flip"] = flip_horizontal
if "pcd_vertical_flip" not in input_dict:
flip_vertical = (
True if np.random.rand() < self.flip_ratio_bev_vertical else False
)
input_dict["pcd_vertical_flip"] = flip_vertical
if "transformation_3d_flow" not in input_dict:
input_dict["transformation_3d_flow"] = []
if input_dict["pcd_horizontal_flip"]:
self.random_flip_data_3d(input_dict, "horizontal")
input_dict["transformation_3d_flow"].extend(["HF"])
if input_dict["pcd_vertical_flip"]:
self.random_flip_data_3d(input_dict, "vertical")
input_dict["transformation_3d_flow"].extend(["VF"])
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(sync_2d={self.sync_2d},"
repr_str += f" flip_ratio_bev_vertical={self.flip_ratio_bev_vertical})"
return repr_str
@PIPELINES.register_module()
class ObjectSample(object):
"""Sample GT objects to the data.
Args:
db_sampler (dict): Config dict of the database sampler.
sample_2d (bool): Whether to also paste 2D image patch to the images
This should be true when applying multi-modality cut-and-paste.
Defaults to False.
"""
def __init__(self, db_sampler, sample_2d=False):
self.sampler_cfg = db_sampler
self.sample_2d = sample_2d
if "type" not in db_sampler.keys():
db_sampler["type"] = "DataBaseSampler"
self.db_sampler = build_from_cfg(db_sampler, OBJECTSAMPLERS)
@staticmethod
def remove_points_in_boxes(points, boxes):
"""Remove the points in the sampled bounding boxes.
Args:
points (:obj:`BasePoints`): Input point cloud array.
boxes (np.ndarray): Sampled ground truth boxes.
Returns:
np.ndarray: Points with those in the boxes removed.
"""
masks = box_np_ops.points_in_rbbox(points.coord.numpy(), boxes)
points = points[np.logical_not(masks.any(-1))]
return points
def __call__(self, input_dict):
"""Call function to sample ground truth objects to the data.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after object sampling augmentation, \
'points', 'gt_bboxes_3d', 'gt_labels_3d' keys are updated \
in the result dict.
"""
gt_bboxes_3d = input_dict["gt_bboxes_3d"]
gt_labels_3d = input_dict["gt_labels_3d"]
# change to float for blending operation
points = input_dict["points"]
if self.sample_2d:
img = input_dict["img"]
gt_bboxes_2d = input_dict["gt_bboxes"]
# Assume for now 3D & 2D bboxes are the same
sampled_dict = self.db_sampler.sample_all(
gt_bboxes_3d.tensor.numpy(),
gt_labels_3d,
gt_bboxes_2d=gt_bboxes_2d,
img=img,
)
else:
sampled_dict = self.db_sampler.sample_all(
gt_bboxes_3d.tensor.numpy(), gt_labels_3d, img=None
)
if sampled_dict is not None:
sampled_gt_bboxes_3d = sampled_dict["gt_bboxes_3d"]
sampled_points = sampled_dict["points"]
sampled_gt_labels = sampled_dict["gt_labels_3d"]
gt_labels_3d = np.concatenate([gt_labels_3d, sampled_gt_labels], axis=0)
gt_bboxes_3d = gt_bboxes_3d.new_box(
np.concatenate([gt_bboxes_3d.tensor.numpy(), sampled_gt_bboxes_3d])
)
points = self.remove_points_in_boxes(points, sampled_gt_bboxes_3d)
# check the points dimension
points = points.cat([sampled_points, points])
if self.sample_2d:
sampled_gt_bboxes_2d = sampled_dict["gt_bboxes_2d"]
gt_bboxes_2d = np.concatenate(
[gt_bboxes_2d, sampled_gt_bboxes_2d]
).astype(np.float32)
input_dict["gt_bboxes"] = gt_bboxes_2d
input_dict["img"] = sampled_dict["img"]
input_dict["gt_bboxes_3d"] = gt_bboxes_3d
input_dict["gt_labels_3d"] = gt_labels_3d.astype(np.long)
input_dict["points"] = points
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f" sample_2d={self.sample_2d},"
repr_str += f" data_root={self.sampler_cfg.data_root},"
repr_str += f" info_path={self.sampler_cfg.info_path},"
repr_str += f" rate={self.sampler_cfg.rate},"
repr_str += f" prepare={self.sampler_cfg.prepare},"
repr_str += f" classes={self.sampler_cfg.classes},"
repr_str += f" sample_groups={self.sampler_cfg.sample_groups}"
return repr_str
@PIPELINES.register_module()
class ObjectNoise(object):
"""Apply noise to each GT objects in the scene.
Args:
translation_std (list[float], optional): Standard deviation of the
distribution where translation noise are sampled from.
Defaults to [0.25, 0.25, 0.25].
global_rot_range (list[float], optional): Global rotation to the scene.
Defaults to [0.0, 0.0].
rot_range (list[float], optional): Object rotation range.
Defaults to [-0.15707963267, 0.15707963267].
num_try (int, optional): Number of times to try if the noise applied is
invalid. Defaults to 100.
"""
def __init__(
self,
translation_std=[0.25, 0.25, 0.25],
global_rot_range=[0.0, 0.0],
rot_range=[-0.15707963267, 0.15707963267],
num_try=100,
):
self.translation_std = translation_std
self.global_rot_range = global_rot_range
self.rot_range = rot_range
self.num_try = num_try
def __call__(self, input_dict):
"""Call function to apply noise to each ground truth in the scene.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after adding noise to each object, \
'points', 'gt_bboxes_3d' keys are updated in the result dict.
"""
gt_bboxes_3d = input_dict["gt_bboxes_3d"]
points = input_dict["points"]
# TODO: check this inplace function
numpy_box = gt_bboxes_3d.tensor.numpy()
numpy_points = points.tensor.numpy()
noise_per_object_v3_(
numpy_box,
numpy_points,
rotation_perturb=self.rot_range,
center_noise_std=self.translation_std,
global_random_rot_range=self.global_rot_range,
num_try=self.num_try,
)
input_dict["gt_bboxes_3d"] = gt_bboxes_3d.new_box(numpy_box)
input_dict["points"] = points.new_point(numpy_points)
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(num_try={self.num_try},"
repr_str += f" translation_std={self.translation_std},"
repr_str += f" global_rot_range={self.global_rot_range},"
repr_str += f" rot_range={self.rot_range})"
return repr_str
@PIPELINES.register_module()
class GlobalRotScaleTrans(object):
"""Apply global rotation, scaling and translation to a 3D scene.
Args:
rot_range (list[float]): Range of rotation angle.
Defaults to [-0.78539816, 0.78539816] (close to [-pi/4, pi/4]).
scale_ratio_range (list[float]): Range of scale ratio.
Defaults to [0.95, 1.05].
translation_std (list[float]): The standard deviation of ranslation
noise. This apply random translation to a scene by a noise, which
is sampled from a gaussian distribution whose standard deviation
is set by ``translation_std``. Defaults to [0, 0, 0]
shift_height (bool): Whether to shift height.
(the fourth dimension of indoor points) when scaling.
Defaults to False.
"""
def __init__(
self,
rot_range=[-0.78539816, 0.78539816],
scale_ratio_range=[0.95, 1.05],
translation_std=[0, 0, 0],
shift_height=False,
):
self.rot_range = rot_range
self.scale_ratio_range = scale_ratio_range
self.translation_std = translation_std
self.shift_height = shift_height
def _trans_bbox_points(self, input_dict):
"""Private function to translate bounding boxes and points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after translation, 'points', 'pcd_trans' \
and keys in input_dict['bbox3d_fields'] are updated \
in the result dict.
"""
if not isinstance(self.translation_std, (list, tuple, np.ndarray)):
translation_std = [
self.translation_std,
self.translation_std,
self.translation_std,
]
else:
translation_std = self.translation_std
translation_std = np.array(translation_std, dtype=np.float32)
trans_factor = np.random.normal(scale=translation_std, size=3).T
input_dict["points"].translate(trans_factor)
input_dict["pcd_trans"] = trans_factor
for key in input_dict["bbox3d_fields"]:
input_dict[key].translate(trans_factor)
def _rot_bbox_points(self, input_dict):
"""Private function to rotate bounding boxes and points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after rotation, 'points', 'pcd_rotation' \
and keys in input_dict['bbox3d_fields'] are updated \
in the result dict.
"""
rotation = self.rot_range
if not isinstance(rotation, list):
rotation = [-rotation, rotation]
noise_rotation = np.random.uniform(rotation[0], rotation[1])
for key in input_dict["bbox3d_fields"]:
if len(input_dict[key].tensor) != 0:
points, rot_mat_T = input_dict[key].rotate(
noise_rotation, input_dict["points"]
)
input_dict["points"] = points
input_dict["pcd_rotation"] = rot_mat_T
rot_mat_T_np = np.eye(4)
rot_mat_T_np[:3, :3] = rot_mat_T.numpy()
input_dict["lidar2img"] = input_dict["lidar2img"] @ rot_mat_T_np
# input_dict['points_instance'].rotate(noise_rotation)
def _scale_bbox_points(self, input_dict):
"""Private function to scale bounding boxes and points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after scaling, 'points'and keys in \
input_dict['bbox3d_fields'] are updated in the result dict.
"""
scale = input_dict["pcd_scale_factor"]
points = input_dict["points"]
points.scale(scale)
if self.shift_height:
assert "height" in points.attribute_dims.keys()
points.tensor[:, points.attribute_dims["height"]] *= scale
input_dict["points"] = points
for key in input_dict["bbox3d_fields"]:
input_dict[key].scale(scale)
scale_eye = np.eye(4)
scale_eye[0, 0] *= scale
scale_eye[1, 1] *= scale
lidar2imgs = []
for idx in range(len(input_dict["intrinsic"])):
intrinsic = input_dict["intrinsic"][idx]
extrinsic = input_dict["extrinsic"][idx]
lidar2img = (intrinsic * scale_eye) @ extrinsic
lidar2imgs.append(lidar2img)
input_dict["lidar2img"] = lidar2imgs
def _random_scale(self, input_dict):
"""Private function to randomly set the scale factor.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after scaling, 'pcd_scale_factor' are updated \
in the result dict.
"""
scale_factor = np.random.uniform(
self.scale_ratio_range[0], self.scale_ratio_range[1]
)
input_dict["pcd_scale_factor"] = scale_factor
def __call__(self, input_dict):
"""Private function to rotate, scale and translate bounding boxes and \
points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after scaling, 'points', 'pcd_rotation',
'pcd_scale_factor', 'pcd_trans' and keys in \
input_dict['bbox3d_fields'] are updated in the result dict.
"""
if "transformation_3d_flow" not in input_dict:
input_dict["transformation_3d_flow"] = []
self._rot_bbox_points(input_dict)
if "pcd_scale_factor" not in input_dict:
self._random_scale(input_dict)
self._scale_bbox_points(input_dict)
self._trans_bbox_points(input_dict)
input_dict["transformation_3d_flow"].extend(["R", "S", "T"])
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(rot_range={self.rot_range},"
repr_str += f" scale_ratio_range={self.scale_ratio_range},"
repr_str += f" translation_std={self.translation_std},"
repr_str += f" shift_height={self.shift_height})"
return repr_str
@PIPELINES.register_module()
class PointShuffle(object):
"""Shuffle input points."""
def __call__(self, input_dict):
"""Call function to shuffle points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after filtering, 'points', 'pts_instance_mask' \
and 'pts_semantic_mask' keys are updated in the result dict.
"""
idx = input_dict["points"].shuffle()
idx = idx.numpy()
pts_instance_mask = input_dict.get("pts_instance_mask", None)
pts_semantic_mask = input_dict.get("pts_semantic_mask", None)
if pts_instance_mask is not None:
input_dict["pts_instance_mask"] = pts_instance_mask[idx]
if pts_semantic_mask is not None:
input_dict["pts_semantic_mask"] = pts_semantic_mask[idx]
return input_dict
def __repr__(self):
return self.__class__.__name__
@PIPELINES.register_module()
class ObjectRangeFilter(object):
"""Filter objects by the range.
Args:
point_cloud_range (list[float]): Point cloud range.
"""
def __init__(self, point_cloud_range):
self.pcd_range = np.array(point_cloud_range, dtype=np.float32)
self.bev_range = self.pcd_range[[0, 1, 3, 4]]
def __call__(self, input_dict):
"""Call function to filter objects by the range.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \
keys are updated in the result dict.
"""
gt_bboxes_3d = input_dict["gt_bboxes_3d"]
gt_labels_3d = input_dict["gt_labels_3d"]
mask = gt_bboxes_3d.in_range_bev(self.bev_range)
gt_bboxes_3d = gt_bboxes_3d[mask]
# mask is a torch tensor but gt_labels_3d is still numpy array
# using mask to index gt_labels_3d will cause bug when
# len(gt_labels_3d) == 1, where mask=1 will be interpreted
# as gt_labels_3d[1] and cause out of index error
gt_labels_3d = gt_labels_3d[mask.numpy().astype(np.bool)]
# limit rad to [-pi, pi]
gt_bboxes_3d.limit_yaw(offset=0.5, period=2 * np.pi)
input_dict["gt_bboxes_3d"] = gt_bboxes_3d
input_dict["gt_labels_3d"] = gt_labels_3d
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(point_cloud_range={self.pcd_range.tolist()})"
return repr_str
@PIPELINES.register_module()
class PointsRangeFilter(object):
"""Filter points by the range.
Args:
point_cloud_range (list[float]): Point cloud range.
"""
def __init__(self, point_cloud_range):
self.pcd_range = np.array(point_cloud_range, dtype=np.float32)
def __call__(self, input_dict):
"""Call function to filter points by the range.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after filtering, 'points', 'pts_instance_mask' \
and 'pts_semantic_mask' keys are updated in the result dict.
"""
points = input_dict["points"]
points_mask = points.in_range_3d(self.pcd_range)
clean_points = points[points_mask]
input_dict["points"] = clean_points
points_mask = points_mask.numpy()
pts_instance_mask = input_dict.get("pts_instance_mask", None)
pts_semantic_mask = input_dict.get("pts_semantic_mask", None)
if pts_instance_mask is not None:
input_dict["pts_instance_mask"] = pts_instance_mask[points_mask]
if pts_semantic_mask is not None:
input_dict["pts_semantic_mask"] = pts_semantic_mask[points_mask]
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(point_cloud_range={self.pcd_range.tolist()})"
return repr_str
@PIPELINES.register_module()
class ObjectNameFilter(object):
"""Filter GT objects by their names.
Args:
classes (list[str]): List of class names to be kept for training.
"""
def __init__(self, classes):
self.classes = classes
self.labels = list(range(len(self.classes)))
def __call__(self, input_dict):
"""Call function to filter objects by their names.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \
keys are updated in the result dict.
"""
gt_labels_3d = input_dict["gt_labels_3d"]
gt_bboxes_mask = np.array(
[n in self.labels for n in gt_labels_3d], dtype=np.bool_
)
input_dict["gt_bboxes_3d"] = input_dict["gt_bboxes_3d"][gt_bboxes_mask]
input_dict["gt_labels_3d"] = input_dict["gt_labels_3d"][gt_bboxes_mask]
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(classes={self.classes})"
return repr_str
@PIPELINES.register_module()
class IndoorPointSample(object):
"""Indoor point sample.
Sampling data to a certain number.
Args:
name (str): Name of the dataset.
num_points (int): Number of points to be sampled.
"""
def __init__(self, num_points):
self.num_points = num_points
def points_random_sampling(
self, points, num_samples, replace=None, return_choices=False
):
"""Points random sampling.
Sample points to a certain number.
Args:
points (np.ndarray | :obj:`BasePoints`): 3D Points.
num_samples (int): Number of samples to be sampled.
replace (bool): Whether the sample is with or without replacement.
Defaults to None.
return_choices (bool): Whether return choice. Defaults to False.
Returns:
tuple[np.ndarray] | np.ndarray:
- points (np.ndarray | :obj:`BasePoints`): 3D Points.
- choices (np.ndarray, optional): The generated random samples.
"""
if replace is None:
replace = points.shape[0] < num_samples
choices = np.random.choice(points.shape[0], num_samples, replace=replace)
if return_choices:
return points[choices], choices
else:
return points[choices]
def __call__(self, results):
"""Call function to sample points to in indoor scenes.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after sampling, 'points', 'pts_instance_mask' \
and 'pts_semantic_mask' keys are updated in the result dict.
"""
points = results["points"]
points, choices = self.points_random_sampling(
points, self.num_points, return_choices=True
)
results["points"] = points
pts_instance_mask = results.get("pts_instance_mask", None)
pts_semantic_mask = results.get("pts_semantic_mask", None)
if pts_instance_mask is not None:
pts_instance_mask = pts_instance_mask[choices]
results["pts_instance_mask"] = pts_instance_mask
if pts_semantic_mask is not None:
pts_semantic_mask = pts_semantic_mask[choices]
results["pts_semantic_mask"] = pts_semantic_mask
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(num_points={self.num_points})"
return repr_str
@PIPELINES.register_module()
class IndoorPatchPointSample(object):
r"""Indoor point sample within a patch. Modified from `PointNet++ <https://
github.com/charlesq34/pointnet2/blob/master/scannet/scannet_dataset.py>`_.
Sampling data to a certain number for semantic segmentation.
Args:
num_points (int): Number of points to be sampled.
block_size (float, optional): Size of a block to sample points from.
Defaults to 1.5.
sample_rate (float, optional): Stride used in sliding patch generation.
Defaults to 1.0.
ignore_index (int, optional): Label index that won't be used for the
segmentation task. This is set in PointSegClassMapping as neg_cls.
Defaults to None.
use_normalized_coord (bool, optional): Whether to use normalized xyz as
additional features. Defaults to False.
num_try (int, optional): Number of times to try if the patch selected
is invalid. Defaults to 10.
"""
def __init__(
self,
num_points,
block_size=1.5,
sample_rate=1.0,
ignore_index=None,
use_normalized_coord=False,
num_try=10,
):
self.num_points = num_points
self.block_size = block_size
self.sample_rate = sample_rate
self.ignore_index = ignore_index
self.use_normalized_coord = use_normalized_coord
self.num_try = num_try
def _input_generation(
self, coords, patch_center, coord_max, attributes, attribute_dims, point_type
):
"""Generating model input.
Generate input by subtracting patch center and adding additional \
features. Currently support colors and normalized xyz as features.
Args:
coords (np.ndarray): Sampled 3D Points.
patch_center (np.ndarray): Center coordinate of the selected patch.
coord_max (np.ndarray): Max coordinate of all 3D Points.
attributes (np.ndarray): features of input points.
attribute_dims (dict): Dictionary to indicate the meaning of extra
dimension.
point_type (type): class of input points inherited from BasePoints.
Returns:
:obj:`BasePoints`: The generated input data.
"""
# subtract patch center, the z dimension is not centered
centered_coords = coords.copy()
centered_coords[:, 0] -= patch_center[0]
centered_coords[:, 1] -= patch_center[1]
if self.use_normalized_coord:
normalized_coord = coords / coord_max
attributes = np.concatenate([attributes, normalized_coord], axis=1)
if attribute_dims is None:
attribute_dims = dict()
attribute_dims.update(
dict(
normalized_coord=[
attributes.shape[1],
attributes.shape[1] + 1,
attributes.shape[1] + 2,
]
)
)
points = np.concatenate([centered_coords, attributes], axis=1)
points = point_type(
points, points_dim=points.shape[1], attribute_dims=attribute_dims
)
return points
def _patch_points_sampling(self, points, sem_mask, replace=None):
"""Patch points sampling.
First sample a valid patch.
Then sample points within that patch to a certain number.
Args:
points (:obj:`BasePoints`): 3D Points.
sem_mask (np.ndarray): semantic segmentation mask for input points.
replace (bool): Whether the sample is with or without replacement.
Defaults to None.
Returns:
tuple[:obj:`BasePoints`, np.ndarray] | :obj:`BasePoints`:
- points (:obj:`BasePoints`): 3D Points.
- choices (np.ndarray): The generated random samples.
"""
coords = points.coord.numpy()
attributes = points.tensor[:, 3:].numpy()
attribute_dims = points.attribute_dims
point_type = type(points)
coord_max = np.amax(coords, axis=0)
coord_min = np.amin(coords, axis=0)
for i in range(self.num_try):
# random sample a point as patch center
cur_center = coords[np.random.choice(coords.shape[0])]
# boundary of a patch
cur_max = cur_center + np.array(
[self.block_size / 2.0, self.block_size / 2.0, 0.0]
)
cur_min = cur_center - np.array(
[self.block_size / 2.0, self.block_size / 2.0, 0.0]
)
cur_max[2] = coord_max[2]
cur_min[2] = coord_min[2]
cur_choice = (
np.sum(
(coords >= (cur_min - 0.2)) * (coords <= (cur_max + 0.2)), axis=1
)
== 3
)
if not cur_choice.any(): # no points in this patch
continue
cur_coords = coords[cur_choice, :]
cur_sem_mask = sem_mask[cur_choice]
# two criterion for patch sampling, adopted from PointNet++
# points within selected patch shoule be scattered separately
mask = (
np.sum(
(cur_coords >= (cur_min - 0.01)) * (cur_coords <= (cur_max + 0.01)),
axis=1,
)
== 3
)
# not sure if 31, 31, 62 are just some big values used to transform
# coords from 3d array to 1d and then check their uniqueness
# this is used in all the ScanNet code following PointNet++
vidx = np.ceil(
(cur_coords[mask, :] - cur_min)
/ (cur_max - cur_min)
* np.array([31.0, 31.0, 62.0])
)
vidx = np.unique(vidx[:, 0] * 31.0 * 62.0 + vidx[:, 1] * 62.0 + vidx[:, 2])
flag1 = len(vidx) / 31.0 / 31.0 / 62.0 >= 0.02
# selected patch should contain enough annotated points
if self.ignore_index is None:
flag2 = True
else:
flag2 = (
np.sum(cur_sem_mask != self.ignore_index) / len(cur_sem_mask) >= 0.7
)
if flag1 and flag2:
break
# random sample idx
if replace is None:
replace = cur_sem_mask.shape[0] < self.num_points
choices = np.random.choice(
np.where(cur_choice)[0], self.num_points, replace=replace
)
# construct model input
points = self._input_generation(
coords[choices],
cur_center,
coord_max,
attributes[choices],
attribute_dims,
point_type,
)
return points, choices
def __call__(self, results):
"""Call function to sample points to in indoor scenes.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after sampling, 'points', 'pts_instance_mask' \
and 'pts_semantic_mask' keys are updated in the result dict.
"""
points = results["points"]
assert (
"pts_semantic_mask" in results.keys()
), "semantic mask should be provided in training and evaluation"
pts_semantic_mask = results["pts_semantic_mask"]
points, choices = self._patch_points_sampling(points, pts_semantic_mask)
results["points"] = points
results["pts_semantic_mask"] = pts_semantic_mask[choices]
pts_instance_mask = results.get("pts_instance_mask", None)
if pts_instance_mask is not None:
results["pts_instance_mask"] = pts_instance_mask[choices]
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(num_points={self.num_points},"
repr_str += f" block_size={self.block_size},"
repr_str += f" sample_rate={self.sample_rate},"
repr_str += f" ignore_index={self.ignore_index},"
repr_str += f" use_normalized_coord={self.use_normalized_coord},"
repr_str += f" num_try={self.num_try})"
return repr_str
@PIPELINES.register_module()
class BackgroundPointsFilter(object):
"""Filter background points near the bounding box.
Args:
bbox_enlarge_range (tuple[float], float): Bbox enlarge range.
"""
def __init__(self, bbox_enlarge_range):
assert (
is_tuple_of(bbox_enlarge_range, float) and len(bbox_enlarge_range) == 3
) or isinstance(
bbox_enlarge_range, float
), f"Invalid arguments bbox_enlarge_range {bbox_enlarge_range}"
if isinstance(bbox_enlarge_range, float):
bbox_enlarge_range = [bbox_enlarge_range] * 3
self.bbox_enlarge_range = np.array(bbox_enlarge_range, dtype=np.float32)[
np.newaxis, :
]
def __call__(self, input_dict):
"""Call function to filter points by the range.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after filtering, 'points', 'pts_instance_mask' \
and 'pts_semantic_mask' keys are updated in the result dict.
"""
points = input_dict["points"]
gt_bboxes_3d = input_dict["gt_bboxes_3d"]
gt_bboxes_3d_np = gt_bboxes_3d.tensor.numpy()
gt_bboxes_3d_np[:, :3] = gt_bboxes_3d.gravity_center.numpy()
enlarged_gt_bboxes_3d = gt_bboxes_3d_np.copy()
enlarged_gt_bboxes_3d[:, 3:6] += self.bbox_enlarge_range
points_numpy = points.tensor.numpy()
foreground_masks = box_np_ops.points_in_rbbox(points_numpy, gt_bboxes_3d_np)
enlarge_foreground_masks = box_np_ops.points_in_rbbox(
points_numpy, enlarged_gt_bboxes_3d
)
foreground_masks = foreground_masks.max(1)
enlarge_foreground_masks = enlarge_foreground_masks.max(1)
valid_masks = ~np.logical_and(~foreground_masks, enlarge_foreground_masks)
input_dict["points"] = points[valid_masks]
pts_instance_mask = input_dict.get("pts_instance_mask", None)
if pts_instance_mask is not None:
input_dict["pts_instance_mask"] = pts_instance_mask[valid_masks]
pts_semantic_mask = input_dict.get("pts_semantic_mask", None)
if pts_semantic_mask is not None:
input_dict["pts_semantic_mask"] = pts_semantic_mask[valid_masks]
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(bbox_enlarge_range={self.bbox_enlarge_range.tolist()})"
return repr_str
@PIPELINES.register_module()
class VoxelBasedPointSampler(object):
"""Voxel based point sampler.
Apply voxel sampling to multiple sweep points.
Args:
cur_sweep_cfg (dict): Config for sampling current points.
prev_sweep_cfg (dict): Config for sampling previous points.
time_dim (int): Index that indicate the time dimention
for input points.
"""
def __init__(self, cur_sweep_cfg, prev_sweep_cfg=None, time_dim=3):
self.cur_voxel_generator = VoxelGenerator(**cur_sweep_cfg)
self.cur_voxel_num = self.cur_voxel_generator._max_voxels
self.time_dim = time_dim
if prev_sweep_cfg is not None:
assert prev_sweep_cfg["max_num_points"] == cur_sweep_cfg["max_num_points"]
self.prev_voxel_generator = VoxelGenerator(**prev_sweep_cfg)
self.prev_voxel_num = self.prev_voxel_generator._max_voxels
else:
self.prev_voxel_generator = None
self.prev_voxel_num = 0
def _sample_points(self, points, sampler, point_dim):
"""Sample points for each points subset.
Args:
points (np.ndarray): Points subset to be sampled.
sampler (VoxelGenerator): Voxel based sampler for
each points subset.
point_dim (int): The dimention of each points
Returns:
np.ndarray: Sampled points.
"""
voxels, coors, num_points_per_voxel = sampler.generate(points)
if voxels.shape[0] < sampler._max_voxels:
padding_points = np.zeros(
[
sampler._max_voxels - voxels.shape[0],
sampler._max_num_points,
point_dim,
],
dtype=points.dtype,
)
padding_points[:] = voxels[0]
sample_points = np.concatenate([voxels, padding_points], axis=0)
else:
sample_points = voxels
return sample_points
def __call__(self, results):
"""Call function to sample points from multiple sweeps.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after sampling, 'points', 'pts_instance_mask' \
and 'pts_semantic_mask' keys are updated in the result dict.
"""
points = results["points"]
original_dim = points.shape[1]
# TODO: process instance and semantic mask while _max_num_points
# is larger than 1
# Extend points with seg and mask fields
map_fields2dim = []
start_dim = original_dim
points_numpy = points.tensor.numpy()
extra_channel = [points_numpy]
for idx, key in enumerate(results["pts_mask_fields"]):
map_fields2dim.append((key, idx + start_dim))
extra_channel.append(results[key][..., None])
start_dim += len(results["pts_mask_fields"])
for idx, key in enumerate(results["pts_seg_fields"]):
map_fields2dim.append((key, idx + start_dim))
extra_channel.append(results[key][..., None])
points_numpy = np.concatenate(extra_channel, axis=-1)
# Split points into two part, current sweep points and
# previous sweeps points.
# TODO: support different sampling methods for next sweeps points
# and previous sweeps points.
cur_points_flag = points_numpy[:, self.time_dim] == 0
cur_sweep_points = points_numpy[cur_points_flag]
prev_sweeps_points = points_numpy[~cur_points_flag]
if prev_sweeps_points.shape[0] == 0:
prev_sweeps_points = cur_sweep_points
# Shuffle points before sampling
np.random.shuffle(cur_sweep_points)
np.random.shuffle(prev_sweeps_points)
cur_sweep_points = self._sample_points(
cur_sweep_points, self.cur_voxel_generator, points_numpy.shape[1]
)
if self.prev_voxel_generator is not None:
prev_sweeps_points = self._sample_points(
prev_sweeps_points, self.prev_voxel_generator, points_numpy.shape[1]
)
points_numpy = np.concatenate([cur_sweep_points, prev_sweeps_points], 0)
else:
points_numpy = cur_sweep_points
if self.cur_voxel_generator._max_num_points == 1:
points_numpy = points_numpy.squeeze(1)
results["points"] = points.new_point(points_numpy[..., :original_dim])
# Restore the correspoinding seg and mask fields
for key, dim_index in map_fields2dim:
results[key] = points_numpy[..., dim_index]
return results
def __repr__(self):
"""str: Return a string that describes the module."""
def _auto_indent(repr_str, indent):
repr_str = repr_str.split("\n")
repr_str = [" " * indent + t + "\n" for t in repr_str]
repr_str = "".join(repr_str)[:-1]
return repr_str
repr_str = self.__class__.__name__
indent = 4
repr_str += "(\n"
repr_str += " " * indent + f"num_cur_sweep={self.cur_voxel_num},\n"
repr_str += " " * indent + f"num_prev_sweep={self.prev_voxel_num},\n"
repr_str += " " * indent + f"time_dim={self.time_dim},\n"
repr_str += " " * indent + "cur_voxel_generator=\n"
repr_str += f"{_auto_indent(repr(self.cur_voxel_generator), 8)},\n"
repr_str += " " * indent + "prev_voxel_generator=\n"
repr_str += f"{_auto_indent(repr(self.prev_voxel_generator), 8)})"
return repr_str
@PIPELINES.register_module()
class Pad3D(object):
"""Pad the image & mask.
There are two padding modes: (1) pad to a fixed size and (2) pad to the
minimum size that is divisible by some number.
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor",
Args:
size (tuple, optional): Fixed padding size.
size_divisor (int, optional): The divisor of padded size.
pad_val (float, optional): Padding value, 0 by default.
"""
def __init__(self, size=None, size_divisor=None, pad_val=0):
self.size = size
self.size_divisor = size_divisor
self.pad_val = pad_val
# only one of size and size_divisor should be valid
assert size is not None or size_divisor is not None
assert size is None or size_divisor is None
def _pad_img(self, results):
"""Pad images according to ``self.size``."""
# # print(results.keys())
# # print(results.get('img_fields', ['img']))
# # print(results['img_fields'])
# print(len(results['img']))
# print(results['img'][0].shape)
# zxc
if self.size is not None:
padded_img = mmcv.impad(
results["img"], shape=self.size, pad_val=self.pad_val
)
elif self.size_divisor is not None:
padded_img = [
mmcv.impad_to_multiple(img, self.size_divisor, pad_val=self.pad_val)
for img in results["img"]
]
results["img"] = padded_img
results["img_shape"] = [img.shape for img in padded_img]
results["img_fixed_size"] = self.size
results["img_size_divisor"] = self.size_divisor
def _pad_masks(self, results):
"""Pad masks according to ``results['pad_shape']``."""
pad_shape = results["pad_shape"][:2]
for key in results.get("mask_fields", []):
results[key] = results[key].pad(pad_shape, pad_val=self.pad_val)
def _pad_seg(self, results):
"""Pad semantic segmentation map according to
``results['pad_shape']``."""
for key in results.get("seg_fields", []):
results[key] = mmcv.impad(results[key], shape=results["pad_shape"][:2])
def __call__(self, results):
"""Call function to pad images, masks, semantic segmentation maps.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Updated result dict.
"""
self._pad_img(results)
self._pad_masks(results)
self._pad_seg(results)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f"(size={self.size}, "
repr_str += f"size_divisor={self.size_divisor}, "
repr_str += f"pad_val={self.pad_val})"
return repr_str
@PIPELINES.register_module()
class Normalize3D(object):
"""Normalize the image.
Added key is "img_norm_cfg".
Args:
mean (sequence): Mean values of 3 channels.
std (sequence): Std values of 3 channels.
to_rgb (bool): Whether to convert the image from BGR to RGB,
default is true.
"""
def __init__(self, mean, std, to_rgb=True):
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.to_rgb = to_rgb
def __call__(self, results):
"""Call function to normalize images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Normalized results, 'img_norm_cfg' key is added into
result dict.
"""
# for key in results.get('img_fields', ['img']):
results["img"] = [
mmcv.imnormalize(img, self.mean, self.std, self.to_rgb)
for img in results["img"]
]
results["img_norm_cfg"] = dict(mean=self.mean, std=self.std, to_rgb=self.to_rgb)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f"(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})"
return repr_str
@PIPELINES.register_module()
class RandomLRFlip(object):
"""LRFlip the images."""
def __init__(self, **kwargs):
pass
def __call__(self, input_dict):
for key in input_dict.get("img_fields", ["img"]):
imgs = input_dict[key]
flip = []
for idx in range(len(imgs)):
if np.random.uniform() > 0.5:
imgs[idx] = np.flip(imgs[idx], 1)
flip.append(1.0)
else:
flip.append(0.0)
input_dict[key] = imgs
input_dict[key + "_" + "flip"] = np.asarray(flip).astype("float32")
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
return repr_str
@PIPELINES.register_module()
class RandomFlip3DCam(RandomFlip):
"""Flip the points & bbox.
If the input dict contains the key "flip", then the flag will be used,
otherwise it will be randomly decided by a ratio specified in the init
method.
Args:
sync_2d (bool, optional): Whether to apply flip according to the 2D
images. If True, it will apply the same flip as that to 2D images.
If False, it will decide whether to flip randomly and independently
to that of 2D images. Defaults to True.
flip_ratio_bev_horizontal (float, optional): The flipping probability
in horizontal direction. Defaults to 0.0.
flip_ratio_bev_vertical (float, optional): The flipping probability
in vertical direction. Defaults to 0.0.
"""
def __init__(
self,
sync_2d=True,
flip_ratio_bev_horizontal=0.0,
flip_ratio_bev_vertical=0.0,
**kwargs,
):
super(RandomFlip3DCam, self).__init__(
flip_ratio=flip_ratio_bev_horizontal, **kwargs
)
self.sync_2d = sync_2d
self.flip_ratio_bev_vertical = flip_ratio_bev_vertical
if flip_ratio_bev_horizontal is not None:
assert (
isinstance(flip_ratio_bev_horizontal, (int, float))
and 0 <= flip_ratio_bev_horizontal <= 1
)
if flip_ratio_bev_vertical is not None:
assert (
isinstance(flip_ratio_bev_vertical, (int, float))
and 0 <= flip_ratio_bev_vertical <= 1
)
def random_flip_data_3d(self, input_dict, direction="horizontal"):
"""Flip 3D data randomly.
Args:
input_dict (dict): Result dict from loading pipeline.
direction (str): Flip direction. Default: horizontal.
Returns:
dict: Flipped results, 'points', 'bbox3d_fields' keys are \
updated in the result dict.
"""
assert direction in ["horizontal", "vertical"]
if len(input_dict["bbox3d_fields"]) == 0: # test mode
input_dict["bbox3d_fields"].append("empty_box3d")
input_dict["empty_box3d"] = input_dict["box_type_3d"](
np.array([], dtype=np.float32)
)
assert len(input_dict["bbox3d_fields"]) == 1
for key in input_dict["bbox3d_fields"]:
if "points" in input_dict:
input_dict["points"] = input_dict[key].flip(
direction, points=input_dict["points"]
)
else:
input_dict[key].flip(direction)
if "centers2d" in input_dict:
assert (
self.sync_2d is True and direction == "horizontal"
), "Only support sync_2d=True and horizontal flip with images"
w = input_dict["img_shape"][1]
input_dict["centers2d"][..., 0] = w - input_dict["centers2d"][..., 0]
def flip_extrinsic(self, input_dict, direction="horizontal"):
lidar2img = input_dict["lidar2img"] # (6, 4, 4)
flip_mat = np.eye(4)
if direction == "vertical":
flip_mat[0, 0] = -1
elif direction == "horizontal":
flip_mat[1, 1] = -1
lidar2img = lidar2img @ flip_mat
input_dict["lidar2img"] = lidar2img
def __call__(self, input_dict):
"""Call function to flip points, values in the ``bbox3d_fields`` and \
also flip 2D image and its annotations.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Flipped results, 'flip', 'flip_direction', \
'pcd_horizontal_flip' and 'pcd_vertical_flip' keys are added \
into result dict.
"""
# do not filp 2D image and its annotations
# super(RandomFlip3D, self).__call__(input_dict)
if self.sync_2d:
input_dict["pcd_horizontal_flip"] = input_dict["flip"]
input_dict["pcd_vertical_flip"] = False
else:
if "pcd_horizontal_flip" not in input_dict:
flip_horizontal = True if np.random.rand() < self.flip_ratio else False
input_dict["pcd_horizontal_flip"] = flip_horizontal
if "pcd_vertical_flip" not in input_dict:
flip_vertical = (
True if np.random.rand() < self.flip_ratio_bev_vertical else False
)
input_dict["pcd_vertical_flip"] = flip_vertical
if "transformation_3d_flow" not in input_dict:
input_dict["transformation_3d_flow"] = []
if input_dict["pcd_horizontal_flip"]:
self.random_flip_data_3d(input_dict, "horizontal")
input_dict["transformation_3d_flow"].extend(["HF"])
self.flip_extrinsic(input_dict, "horizontal")
if input_dict["pcd_vertical_flip"]:
self.random_flip_data_3d(input_dict, "vertical")
input_dict["transformation_3d_flow"].extend(["VF"])
self.flip_extrinsic(input_dict, "vertical")
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(sync_2d={self.sync_2d},"
repr_str += f" flip_ratio_bev_vertical={self.flip_ratio_bev_vertical})"
return repr_str
@PIPELINES.register_module()
class GlobalRotScaleTransCam(object):
"""Apply global rotation, scaling and translation to a 3D scene.
Args:
rot_range (list[float]): Range of rotation angle.
Defaults to [-0.78539816, 0.78539816] (close to [-pi/4, pi/4]).
scale_ratio_range (list[float]): Range of scale ratio.
Defaults to [0.95, 1.05].
translation_std (list[float]): The standard deviation of ranslation
noise. This apply random translation to a scene by a noise, which
is sampled from a gaussian distribution whose standard deviation
is set by ``translation_std``. Defaults to [0, 0, 0]
shift_height (bool): Whether to shift height.
(the fourth dimension of indoor points) when scaling.
Defaults to False.
"""
def __init__(
self,
rot_range=[-0.78539816, 0.78539816],
scale_ratio_range=[1.0, 1.0],
translation_std=[0, 0, 0],
shift_height=False,
):
self.rot_range = rot_range
self.scale_ratio_range = scale_ratio_range
self.translation_std = translation_std
self.shift_height = shift_height
def _trans_bbox_points(self, input_dict):
"""Private function to translate bounding boxes and points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after translation, 'points', 'pcd_trans' \
and keys in input_dict['bbox3d_fields'] are updated \
in the result dict.
"""
if not isinstance(self.translation_std, (list, tuple, np.ndarray)):
translation_std = [
self.translation_std,
self.translation_std,
self.translation_std,
]
else:
translation_std = self.translation_std
translation_std = np.array(translation_std, dtype=np.float32)
trans_factor = np.random.normal(scale=translation_std, size=3).T
input_dict["points"].translate(trans_factor)
input_dict["pcd_trans"] = trans_factor
for key in input_dict["bbox3d_fields"]:
input_dict[key].translate(trans_factor)
def _rot_bbox_points(self, input_dict):
"""Private function to rotate bounding boxes and points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after rotation, 'points', 'pcd_rotation' \
and keys in input_dict['bbox3d_fields'] are updated \
in the result dict.
"""
rotation = self.rot_range
if not isinstance(rotation, list):
rotation = [-rotation, rotation]
noise_rotation = np.random.uniform(rotation[0], rotation[1])
for key in input_dict["bbox3d_fields"]:
if len(input_dict[key].tensor) != 0:
points, rot_mat_T = input_dict[key].rotate(
noise_rotation, input_dict["points"]
)
input_dict["points"] = points
input_dict["pcd_rotation"] = rot_mat_T
# input_dict['points_instance'].rotate(noise_rotation)
def _scale_bbox_points(self, input_dict):
"""Private function to scale bounding boxes and points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after scaling, 'points'and keys in \
input_dict['bbox3d_fields'] are updated in the result dict.
"""
scale = input_dict["pcd_scale_factor"]
points = input_dict["points"]
points.scale(scale)
if self.shift_height:
assert "height" in points.attribute_dims.keys()
points.tensor[:, points.attribute_dims["height"]] *= scale
input_dict["points"] = points
for key in input_dict["bbox3d_fields"]:
input_dict[key].scale(scale)
def _random_scale(self, input_dict):
"""Private function to randomly set the scale factor.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after scaling, 'pcd_scale_factor' are updated \
in the result dict.
"""
scale_factor = np.random.uniform(
self.scale_ratio_range[0], self.scale_ratio_range[1]
)
input_dict["pcd_scale_factor"] = scale_factor
def __call__(self, input_dict):
"""Private function to rotate, scale and translate bounding boxes and \
points.
Args:
input_dict (dict): Result dict from loading pipeline.
Returns:
dict: Results after scaling, 'points', 'pcd_rotation',
'pcd_scale_factor', 'pcd_trans' and keys in \
input_dict['bbox3d_fields'] are updated in the result dict.
"""
if "transformation_3d_flow" not in input_dict:
input_dict["transformation_3d_flow"] = []
self._rot_bbox_points(input_dict)
if "pcd_scale_factor" not in input_dict:
self._random_scale(input_dict)
self._scale_bbox_points(input_dict)
self._trans_bbox_points(input_dict)
input_dict["transformation_3d_flow"].extend(["R", "S", "T"])
return input_dict
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(rot_range={self.rot_range},"
repr_str += f" scale_ratio_range={self.scale_ratio_range},"
repr_str += f" translation_std={self.translation_std},"
repr_str += f" shift_height={self.shift_height})"
return repr_str
@PIPELINES.register_module()
class Clip3D(object):
"""Clip the image
There are two padding modes: (1) pad to a fixed size and (2) pad to the
minimum size that is divisible by some number.
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor",
Args:
size (tuple, optional): Fixed padding size.
size_divisor (int, optional): The divisor of padded size.
pad_val (float, optional): Padding value, 0 by default.
"""
def __init__(self, size=None):
self.size = size
def __call__(self, results):
"""Call function to pad images, masks, semantic segmentation maps.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Updated result dict.
"""
for key in results.get("img_fields", ["img"]):
result_img = [
img[: self.size[0], : self.size[1], ...] for img in results[key]
]
results[key] = result_img
results["img_shape"] = [img.shape for img in result_img]
results["img_fixed_size"] = self.size
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f"(size={self.size}, "
return repr_str
@PIPELINES.register_module()
class RandomScaleImage3D(object):
"""Random scale the image
There are two padding modes: (1) pad to a fixed size and (2) pad to the
minimum size that is divisible by some number.
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor",
Args:
size (tuple, optional): Fixed padding size.
size_divisor (int, optional): The divisor of padded size.
pad_val (float, optional): Padding value, 0 by default.
"""
def __init__(self, scales=(0.85, 1.15)):
self.scales = scales
def __call__(self, results):
"""Call function to pad images, masks, semantic segmentation maps.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Updated result dict.
"""
rand_scale = np.random.uniform(low=self.scales[0], high=self.scales[1])
img_shape = results["img_shape"][0]
y_size = int((img_shape[0] * rand_scale) // 32) * 32
x_size = int((img_shape[1] * rand_scale) // 32) * 32
y_scale = y_size * 1.0 / img_shape[0]
x_scale = x_size * 1.0 / img_shape[1]
scale_factor = np.eye(4)
scale_factor[0, 0] *= x_scale
scale_factor[1, 1] *= y_scale
for key in results.get("img_fields", ["img"]):
result_img = [
mmcv.imresize(img, (x_size, y_size), return_scale=False)
for img in results[key]
]
results[key] = result_img
lidar2img = [scale_factor @ l2i for l2i in results["lidar2img"]]
results["lidar2img"] = lidar2img
results["img_shape"] = [img.shape for img in result_img]
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f"(size={self.size}, "
return repr_str