|
|
import numpy as np |
|
|
from numpy import random |
|
|
import mmcv |
|
|
from mmdet.datasets.builder import PIPELINES |
|
|
from mmcv.parallel import DataContainer as DC |
|
|
from mmdet3d.datasets.pipelines.transforms_3d import ObjectRangeFilter, ObjectNameFilter |
|
|
from mmdet3d.core.bbox import ( |
|
|
CameraInstance3DBoxes, |
|
|
DepthInstance3DBoxes, |
|
|
LiDARInstance3DBoxes, |
|
|
) |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class PadMultiViewImage(object): |
|
|
"""Pad the multi-view image. |
|
|
There are two padding modes: (1) pad to a fixed size and (2) pad to the |
|
|
minimum size that is divisible by some number. |
|
|
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor", |
|
|
Args: |
|
|
size (tuple, optional): Fixed padding size. |
|
|
size_divisor (int, optional): The divisor of padded size. |
|
|
pad_val (float, optional): Padding value, 0 by default. |
|
|
""" |
|
|
|
|
|
def __init__(self, size=None, size_divisor=None, pad_val=0): |
|
|
self.size = size |
|
|
self.size_divisor = size_divisor |
|
|
self.pad_val = pad_val |
|
|
|
|
|
assert size is not None or size_divisor is not None |
|
|
assert size is None or size_divisor is None |
|
|
|
|
|
def _pad_img(self, results): |
|
|
"""Pad images according to ``self.size``.""" |
|
|
if self.size is not None: |
|
|
padded_img = [ |
|
|
mmcv.impad(img, shape=self.size, pad_val=self.pad_val) |
|
|
for img in results["img"] |
|
|
] |
|
|
elif self.size_divisor is not None: |
|
|
padded_img = [ |
|
|
mmcv.impad_to_multiple(img, self.size_divisor, pad_val=self.pad_val) |
|
|
for img in results["img"] |
|
|
] |
|
|
|
|
|
results["ori_shape"] = [img.shape for img in results["img"]] |
|
|
results["img"] = padded_img |
|
|
results["img_shape"] = [img.shape for img in padded_img] |
|
|
results["pad_shape"] = [img.shape for img in padded_img] |
|
|
results["pad_fixed_size"] = self.size |
|
|
results["pad_size_divisor"] = self.size_divisor |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to pad images, masks, semantic segmentation maps. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Updated result dict. |
|
|
""" |
|
|
self._pad_img(results) |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(size={self.size}, " |
|
|
repr_str += f"size_divisor={self.size_divisor}, " |
|
|
repr_str += f"pad_val={self.pad_val})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class NormalizeMultiviewImage(object): |
|
|
"""Normalize the image. |
|
|
Added key is "img_norm_cfg". |
|
|
Args: |
|
|
mean (sequence): Mean values of 3 channels. |
|
|
std (sequence): Std values of 3 channels. |
|
|
to_rgb (bool): Whether to convert the image from BGR to RGB, |
|
|
default is true. |
|
|
""" |
|
|
|
|
|
def __init__(self, mean, std, to_rgb=True): |
|
|
self.mean = np.array(mean, dtype=np.float32) |
|
|
self.std = np.array(std, dtype=np.float32) |
|
|
self.to_rgb = to_rgb |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to normalize images. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Normalized results, 'img_norm_cfg' key is added into |
|
|
result dict. |
|
|
""" |
|
|
results["img"] = [ |
|
|
mmcv.imnormalize(img, self.mean, self.std, self.to_rgb) |
|
|
for img in results["img"] |
|
|
] |
|
|
results["img_norm_cfg"] = dict(mean=self.mean, std=self.std, to_rgb=self.to_rgb) |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class PhotoMetricDistortionMultiViewImage: |
|
|
"""Apply photometric distortion to image sequentially, every transformation |
|
|
is applied with a probability of 0.5. The position of random contrast is in |
|
|
second or second to last. |
|
|
1. random brightness |
|
|
2. random contrast (mode 0) |
|
|
3. convert color from BGR to HSV |
|
|
4. random saturation |
|
|
5. random hue |
|
|
6. convert color from HSV to BGR |
|
|
7. random contrast (mode 1) |
|
|
8. randomly swap channels |
|
|
Args: |
|
|
brightness_delta (int): delta of brightness. |
|
|
contrast_range (tuple): range of contrast. |
|
|
saturation_range (tuple): range of saturation. |
|
|
hue_delta (int): delta of hue. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
brightness_delta=32, |
|
|
contrast_range=(0.5, 1.5), |
|
|
saturation_range=(0.5, 1.5), |
|
|
hue_delta=18, |
|
|
): |
|
|
self.brightness_delta = brightness_delta |
|
|
self.contrast_lower, self.contrast_upper = contrast_range |
|
|
self.saturation_lower, self.saturation_upper = saturation_range |
|
|
self.hue_delta = hue_delta |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to perform photometric distortion on images. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Result dict with images distorted. |
|
|
""" |
|
|
imgs = results["img"] |
|
|
new_imgs = [] |
|
|
for img in imgs: |
|
|
assert img.dtype == np.float32, ( |
|
|
"PhotoMetricDistortion needs the input image of dtype np.float32," |
|
|
' please set "to_float32=True" in "LoadImageFromFile" pipeline' |
|
|
) |
|
|
|
|
|
if random.randint(2): |
|
|
delta = random.uniform(-self.brightness_delta, self.brightness_delta) |
|
|
img += delta |
|
|
|
|
|
|
|
|
|
|
|
mode = random.randint(2) |
|
|
if mode == 1: |
|
|
if random.randint(2): |
|
|
alpha = random.uniform(self.contrast_lower, self.contrast_upper) |
|
|
img *= alpha |
|
|
|
|
|
|
|
|
img = mmcv.bgr2hsv(img) |
|
|
|
|
|
|
|
|
if random.randint(2): |
|
|
img[..., 1] *= random.uniform( |
|
|
self.saturation_lower, self.saturation_upper |
|
|
) |
|
|
|
|
|
|
|
|
if random.randint(2): |
|
|
img[..., 0] += random.uniform(-self.hue_delta, self.hue_delta) |
|
|
img[..., 0][img[..., 0] > 360] -= 360 |
|
|
img[..., 0][img[..., 0] < 0] += 360 |
|
|
|
|
|
|
|
|
img = mmcv.hsv2bgr(img) |
|
|
|
|
|
|
|
|
if mode == 0: |
|
|
if random.randint(2): |
|
|
alpha = random.uniform(self.contrast_lower, self.contrast_upper) |
|
|
img *= alpha |
|
|
|
|
|
|
|
|
if random.randint(2): |
|
|
img = img[..., random.permutation(3)] |
|
|
new_imgs.append(img) |
|
|
results["img"] = new_imgs |
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(\nbrightness_delta={self.brightness_delta},\n" |
|
|
repr_str += "contrast_range=" |
|
|
repr_str += f"{(self.contrast_lower, self.contrast_upper)},\n" |
|
|
repr_str += "saturation_range=" |
|
|
repr_str += f"{(self.saturation_lower, self.saturation_upper)},\n" |
|
|
repr_str += f"hue_delta={self.hue_delta})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class CustomCollect3D(object): |
|
|
"""Collect data from the loader relevant to the specific task. |
|
|
This is usually the last stage of the data loader pipeline. Typically keys |
|
|
is set to some subset of "img", "proposals", "gt_bboxes", |
|
|
"gt_bboxes_ignore", "gt_labels", and/or "gt_masks". |
|
|
The "img_meta" item is always populated. The contents of the "img_meta" |
|
|
dictionary depends on "meta_keys". By default this includes: |
|
|
- 'img_shape': shape of the image input to the network as a tuple \ |
|
|
(h, w, c). Note that images may be zero padded on the \ |
|
|
bottom/right if the batch tensor is larger than this shape. |
|
|
- 'scale_factor': a float indicating the preprocessing scale |
|
|
- 'flip': a boolean indicating if image flip transform was used |
|
|
- 'filename': path to the image file |
|
|
- 'ori_shape': original shape of the image as a tuple (h, w, c) |
|
|
- 'pad_shape': image shape after padding |
|
|
- 'lidar2img': transform from lidar to image |
|
|
- 'depth2img': transform from depth to image |
|
|
- 'cam2img': transform from camera to image |
|
|
- 'pcd_horizontal_flip': a boolean indicating if point cloud is \ |
|
|
flipped horizontally |
|
|
- 'pcd_vertical_flip': a boolean indicating if point cloud is \ |
|
|
flipped vertically |
|
|
- 'box_mode_3d': 3D box mode |
|
|
- 'box_type_3d': 3D box type |
|
|
- 'img_norm_cfg': a dict of normalization information: |
|
|
- mean: per channel mean subtraction |
|
|
- std: per channel std divisor |
|
|
- to_rgb: bool indicating if bgr was converted to rgb |
|
|
- 'pcd_trans': point cloud transformations |
|
|
- 'sample_idx': sample index |
|
|
- 'pcd_scale_factor': point cloud scale factor |
|
|
- 'pcd_rotation': rotation applied to point cloud |
|
|
- 'pts_filename': path to point cloud file. |
|
|
Args: |
|
|
keys (Sequence[str]): Keys of results to be collected in ``data``. |
|
|
meta_keys (Sequence[str], optional): Meta keys to be converted to |
|
|
``mmcv.DataContainer`` and collected in ``data[img_metas]``. |
|
|
Default: ('filename', 'ori_shape', 'img_shape', 'lidar2img', |
|
|
'depth2img', 'cam2img', 'pad_shape', 'scale_factor', 'flip', |
|
|
'pcd_horizontal_flip', 'pcd_vertical_flip', 'box_mode_3d', |
|
|
'box_type_3d', 'img_norm_cfg', 'pcd_trans', |
|
|
'sample_idx', 'pcd_scale_factor', 'pcd_rotation', 'pts_filename') |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
keys, |
|
|
meta_keys=( |
|
|
"filename", |
|
|
"frame_idx", |
|
|
"ori_shape", |
|
|
"img_shape", |
|
|
"lidar2img", |
|
|
"lidar2global_rotation", |
|
|
"depth2img", |
|
|
"cam2img", |
|
|
"pad_shape", |
|
|
"scale_factor", |
|
|
"flip", |
|
|
"pcd_horizontal_flip", |
|
|
"pcd_vertical_flip", |
|
|
"box_mode_3d", |
|
|
"box_type_3d", |
|
|
"img_norm_cfg", |
|
|
"pcd_trans", |
|
|
"sample_idx", |
|
|
"prev_idx", |
|
|
"next_idx", |
|
|
"pcd_scale_factor", |
|
|
"pcd_rotation", |
|
|
"pts_filename", |
|
|
"transformation_3d_flow", |
|
|
"scene_token", |
|
|
"can_bus", |
|
|
"log_name", |
|
|
"log_token", |
|
|
), |
|
|
): |
|
|
self.keys = keys |
|
|
self.meta_keys = meta_keys |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to collect keys in results. The keys in ``meta_keys`` |
|
|
will be converted to :obj:`mmcv.DataContainer`. |
|
|
Args: |
|
|
results (dict): Result dict contains the data to collect. |
|
|
Returns: |
|
|
dict: The result dict contains the following keys |
|
|
- keys in ``self.keys`` |
|
|
- ``img_metas`` |
|
|
""" |
|
|
|
|
|
data = {} |
|
|
img_metas = {} |
|
|
for key in self.meta_keys: |
|
|
if key in results: |
|
|
img_metas[key] = results[key] |
|
|
|
|
|
data["img_metas"] = DC(img_metas, cpu_only=True) |
|
|
for key in self.keys: |
|
|
|
|
|
|
|
|
data[key] = results[key] |
|
|
return data |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
return ( |
|
|
self.__class__.__name__ + f"(keys={self.keys}, meta_keys={self.meta_keys})" |
|
|
) |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class RandomScaleImageMultiViewImage(object): |
|
|
"""Random scale the image |
|
|
Args: |
|
|
scales |
|
|
""" |
|
|
|
|
|
def __init__(self, scales=[]): |
|
|
self.scales = scales |
|
|
assert len(self.scales) == 1 |
|
|
|
|
|
def __call__(self, results): |
|
|
"""Call function to pad images, masks, semantic segmentation maps. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Updated result dict. |
|
|
""" |
|
|
rand_ind = np.random.permutation(range(len(self.scales)))[0] |
|
|
rand_scale = self.scales[rand_ind] |
|
|
|
|
|
y_size = [int(img.shape[0] * rand_scale) for img in results["img"]] |
|
|
x_size = [int(img.shape[1] * rand_scale) for img in results["img"]] |
|
|
scale_factor = np.eye(4) |
|
|
scale_factor[0, 0] *= rand_scale |
|
|
scale_factor[1, 1] *= rand_scale |
|
|
results["img"] = [ |
|
|
mmcv.imresize(img, (x_size[idx], y_size[idx]), return_scale=False) |
|
|
for idx, img in enumerate(results["img"]) |
|
|
] |
|
|
lidar2img = [scale_factor @ l2i for l2i in results["lidar2img"]] |
|
|
results["lidar2img"] = lidar2img |
|
|
results["img_shape"] = [img.shape for img in results["img"]] |
|
|
results["ori_shape"] = [img.shape for img in results["img"]] |
|
|
|
|
|
return results |
|
|
|
|
|
def __repr__(self): |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(size={self.scales}, " |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class ObjectRangeFilterTrack(object): |
|
|
"""Filter objects by the range. |
|
|
Args: |
|
|
point_cloud_range (list[float]): Point cloud range. |
|
|
""" |
|
|
|
|
|
def __init__(self, point_cloud_range): |
|
|
self.pcd_range = np.array(point_cloud_range, dtype=np.float32) |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to filter objects by the range. |
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \ |
|
|
keys are updated in the result dict. |
|
|
""" |
|
|
|
|
|
if isinstance( |
|
|
input_dict["gt_bboxes_3d"], (LiDARInstance3DBoxes, DepthInstance3DBoxes) |
|
|
): |
|
|
bev_range = self.pcd_range[[0, 1, 3, 4]] |
|
|
elif isinstance(input_dict["gt_bboxes_3d"], CameraInstance3DBoxes): |
|
|
bev_range = self.pcd_range[[0, 2, 3, 5]] |
|
|
|
|
|
if "gt_inds" in input_dict["ann_info"].keys(): |
|
|
input_dict["gt_inds"] = input_dict["ann_info"]["gt_inds"] |
|
|
if "gt_fut_traj" in input_dict["ann_info"].keys(): |
|
|
input_dict["gt_fut_traj"] = input_dict["ann_info"]["gt_fut_traj"] |
|
|
if "gt_fut_traj_mask" in input_dict["ann_info"].keys(): |
|
|
input_dict["gt_fut_traj_mask"] = input_dict["ann_info"]["gt_fut_traj_mask"] |
|
|
if "gt_past_traj" in input_dict["ann_info"].keys(): |
|
|
input_dict["gt_past_traj"] = input_dict["ann_info"]["gt_past_traj"] |
|
|
if "gt_past_traj_mask" in input_dict["ann_info"].keys(): |
|
|
input_dict["gt_past_traj_mask"] = input_dict["ann_info"][ |
|
|
"gt_past_traj_mask" |
|
|
] |
|
|
|
|
|
gt_bboxes_3d = input_dict["gt_bboxes_3d"] |
|
|
gt_labels_3d = input_dict["gt_labels_3d"] |
|
|
gt_inds = input_dict["gt_inds"] |
|
|
gt_fut_traj = input_dict["gt_fut_traj"] |
|
|
gt_fut_traj_mask = input_dict["gt_fut_traj_mask"] |
|
|
gt_past_traj = input_dict["gt_past_traj"] |
|
|
gt_past_traj_mask = input_dict["gt_past_traj_mask"] |
|
|
|
|
|
mask = gt_bboxes_3d.in_range_bev(bev_range) |
|
|
gt_bboxes_3d = gt_bboxes_3d[mask] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mask = mask.numpy().astype(np.bool) |
|
|
gt_labels_3d = gt_labels_3d[mask] |
|
|
gt_inds = gt_inds[mask] |
|
|
gt_fut_traj = gt_fut_traj[mask] |
|
|
gt_fut_traj_mask = gt_fut_traj_mask[mask] |
|
|
gt_past_traj = gt_past_traj[mask] |
|
|
gt_past_traj_mask = gt_past_traj_mask[mask] |
|
|
|
|
|
|
|
|
gt_bboxes_3d.limit_yaw(offset=0.5, period=2 * np.pi) |
|
|
input_dict["gt_bboxes_3d"] = gt_bboxes_3d |
|
|
input_dict["gt_labels_3d"] = gt_labels_3d |
|
|
input_dict["gt_inds"] = gt_inds |
|
|
input_dict["gt_fut_traj"] = gt_fut_traj |
|
|
input_dict["gt_fut_traj_mask"] = gt_fut_traj_mask |
|
|
input_dict["gt_past_traj"] = gt_past_traj |
|
|
input_dict["gt_past_traj_mask"] = gt_past_traj_mask |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(point_cloud_range={self.pcd_range.tolist()})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class ObjectNameFilterTrack(object): |
|
|
"""Filter GT objects by their names. |
|
|
Args: |
|
|
classes (list[str]): List of class names to be kept for training. |
|
|
""" |
|
|
|
|
|
def __init__(self, classes): |
|
|
self.classes = classes |
|
|
self.labels = list(range(len(self.classes))) |
|
|
|
|
|
def __call__(self, input_dict): |
|
|
"""Call function to filter objects by their names. |
|
|
Args: |
|
|
input_dict (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \ |
|
|
keys are updated in the result dict. |
|
|
""" |
|
|
gt_labels_3d = input_dict["gt_labels_3d"] |
|
|
gt_bboxes_mask = np.array( |
|
|
[n in self.labels for n in gt_labels_3d], dtype=np.bool_ |
|
|
) |
|
|
input_dict["gt_bboxes_3d"] = input_dict["gt_bboxes_3d"][gt_bboxes_mask] |
|
|
input_dict["gt_labels_3d"] = input_dict["gt_labels_3d"][gt_bboxes_mask] |
|
|
input_dict["gt_inds"] = input_dict["gt_inds"][gt_bboxes_mask] |
|
|
input_dict["gt_fut_traj"] = input_dict["gt_fut_traj"][gt_bboxes_mask] |
|
|
input_dict["gt_fut_traj_mask"] = input_dict["gt_fut_traj_mask"][gt_bboxes_mask] |
|
|
input_dict["gt_past_traj"] = input_dict["gt_past_traj"][gt_bboxes_mask] |
|
|
input_dict["gt_past_traj_mask"] = input_dict["gt_past_traj_mask"][ |
|
|
gt_bboxes_mask |
|
|
] |
|
|
return input_dict |
|
|
|
|
|
def __repr__(self): |
|
|
"""str: Return a string that describes the module.""" |
|
|
repr_str = self.__class__.__name__ |
|
|
repr_str += f"(classes={self.classes})" |
|
|
return repr_str |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class CustomObjectRangeFilter(ObjectRangeFilter): |
|
|
def __call__(self, results): |
|
|
"""Call function to filter objects by the range. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' |
|
|
keys are updated in the result dict. |
|
|
""" |
|
|
|
|
|
if isinstance( |
|
|
results["gt_bboxes_3d"], (LiDARInstance3DBoxes, DepthInstance3DBoxes) |
|
|
): |
|
|
bev_range = self.pcd_range[[0, 1, 3, 4]] |
|
|
elif isinstance(results["gt_bboxes_3d"], CameraInstance3DBoxes): |
|
|
bev_range = self.pcd_range[[0, 2, 3, 5]] |
|
|
|
|
|
gt_bboxes_3d = results["gt_bboxes_3d"] |
|
|
gt_labels_3d = results["gt_labels_3d"] |
|
|
mask = gt_bboxes_3d.in_range_bev(bev_range) |
|
|
gt_bboxes_3d = gt_bboxes_3d[mask] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_labels_3d = gt_labels_3d[mask.numpy().astype(np.bool)] |
|
|
|
|
|
|
|
|
gt_bboxes_3d.limit_yaw(offset=0.5, period=2 * np.pi) |
|
|
results["gt_bboxes_3d"] = gt_bboxes_3d |
|
|
results["gt_labels_3d"] = gt_labels_3d |
|
|
|
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
@PIPELINES.register_module() |
|
|
class CustomObjectNameFilter(ObjectNameFilter): |
|
|
def __call__(self, results): |
|
|
"""Call function to filter objects by their names. |
|
|
Args: |
|
|
results (dict): Result dict from loading pipeline. |
|
|
Returns: |
|
|
dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' |
|
|
keys are updated in the result dict. |
|
|
""" |
|
|
gt_labels_3d = results["gt_labels_3d"] |
|
|
gt_bboxes_mask = np.array( |
|
|
[n in self.labels for n in gt_labels_3d], dtype=np.bool_ |
|
|
) |
|
|
results["gt_bboxes_3d"] = results["gt_bboxes_3d"][gt_bboxes_mask] |
|
|
results["gt_labels_3d"] = results["gt_labels_3d"][gt_bboxes_mask] |
|
|
|
|
|
|
|
|
return results |
|
|
|