R2SE_model / mmdet3d_plugin /datasets /deprecated /nuscenes_temporal.py
unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
import copy
from os import path as osp
from typing import List, Tuple
import mmcv
import numpy as np
import pyquaternion
from mmdet3d.datasets.nuscenes_dataset import (NuScenesDataset,
lidar_nusc_box_to_global)
from mmdet3d.datasets.pipelines import Compose
from mmdet.datasets import DATASETS
from nuscenes.utils.data_classes import Box as NuScenesBox
from pyquaternion import Quaternion
@DATASETS.register_module()
class NuScenesTrackDataset(NuScenesDataset):
r"""NuScenes Dataset.
This class serves as the API for experiments on the NuScenes Dataset.
Please refer to `NuScenes Dataset <https://www.nuscenes.org/download>`_
for data downloading.
Args:
ann_file (str): Path of annotation file.
pipeline (list[dict], optional): Pipeline used for data processing.
Defaults to None.
data_root (str): Path of dataset root.
classes (tuple[str], optional): Classes used in the dataset.
Defaults to None.
load_interval (int, optional): Interval of loading the dataset. It is
used to uniformly sample the dataset. Defaults to 1.
with_velocity (bool, optional): Whether include velocity prediction
into the experiments. Defaults to True.
modality (dict, optional): Modality to specify the sensor data used
as input. Defaults to None.
box_type_3d (str, optional): Type of 3D box of this dataset.
Based on the `box_type_3d`, the dataset will encapsulate the box
to its original format then converted them to `box_type_3d`.
Defaults to 'LiDAR' in this dataset. Available options includes.
- 'LiDAR': Box in LiDAR coordinates.
- 'Depth': Box in depth coordinates, usually for indoor dataset.
- 'Camera': Box in camera coordinates.
filter_empty_gt (bool, optional): Whether to filter empty GT.
Defaults to True.
test_mode (bool, optional): Whether the dataset is in test mode.
Defaults to False.
eval_version (bool, optional): Configuration version of evaluation.
Defaults to 'detection_cvpr_2019'.
use_valid_flag (bool, optional): Whether to use `use_valid_flag` key
in the info file as mask to filter gt_boxes and gt_names.
Defaults to False.
"""
CLASSES = ["car", "truck", "bus", "trailer", "motorcycle", "bicycle", "pedestrian"]
def __init__(
self,
ann_file,
pipeline=None,
pipeline_post=None, # added post pipeline
data_root=None,
classes=None,
load_interval=1,
with_velocity=True,
modality=None,
box_type_3d="LiDAR",
filter_empty_gt=True,
test_mode=False,
eval_version="detection_cvpr_2019",
sample_mode="fixed_interval", # added interval
sample_interval=1,
num_frames_per_sample=3,
use_valid_flag=True,
**kwargs,
):
# add extra input parameters for sampling and interval
self.sample_mode = sample_mode
self.sample_interval = sample_interval
self.num_frames_per_sample = num_frames_per_sample
if not test_mode:
self.num_frames_per_sample += 1
# add the extra data processing pipeline
if pipeline_post is not None:
self.pipeline_post = Compose(pipeline_post)
# default init from nuscenes_dataset.py
super().__init__(
data_root=data_root,
ann_file=ann_file,
pipeline=pipeline,
classes=classes,
modality=modality,
box_type_3d=box_type_3d,
filter_empty_gt=filter_empty_gt,
test_mode=test_mode,
with_velocity=with_velocity,
eval_version=eval_version,
load_interval=load_interval,
use_valid_flag=use_valid_flag)
def __len__(self):
"""Return the length of data infos.
Returns:
int: Length of data infos.
"""
# update to the No. of samples based on sequence
self.num_samples = (
len(self.data_infos)
- (self.num_frames_per_sample - 1) * self.sample_interval
)
return self.num_samples
def get_data_info(self, index):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
info = self.data_infos[index]
# standard protocol modified from SECOND.Pytorch
input_dict = dict(
sample_idx=info["token"],
pts_filename=info["lidar_path"],
sweeps=info["sweeps"],
timestamp=info["timestamp"] / 1e6,
radar=info["radars"],
)
l2e_r = info["lidar2ego_rotation"]
l2e_t = info["lidar2ego_translation"]
e2g_r = info["ego2global_rotation"]
e2g_t = info["ego2global_translation"]
l2e_r_mat = Quaternion(l2e_r).rotation_matrix
e2g_r_mat = Quaternion(e2g_r).rotation_matrix
l2g_r_mat = l2e_r_mat.T @ e2g_r_mat.T # [3, 3]
l2g_t = l2e_t @ e2g_r_mat.T + e2g_t # [1, 3]
# previously, for using R and t from info[''],
# you should points @ info['lidar2ego_rotation'].T + info['lidar2ego_translation']
# but in https://github.com/a1600012888/MUTR3D/blob/main/plugin/track/models/tracker.py#L209
# I am directly calling points @ R + t, rather than points @ R.T + t.
# so need some process metioned two lines above.
input_dict.update(
dict(l2g_r_mat=l2g_r_mat.astype(np.float32), l2g_t=l2g_t.astype(np.float32))
)
if self.modality["use_camera"]:
image_paths = []
lidar2img_rts = []
intrinsics = []
extrinsics = []
for cam_type, cam_info in info["cams"].items():
image_paths.append(cam_info["data_path"])
# obtain lidar to image transformation matrix
lidar2cam_r = np.linalg.inv(cam_info["sensor2lidar_rotation"])
lidar2cam_t = cam_info["sensor2lidar_translation"] @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = cam_info["cam_intrinsic"]
viewpad = np.eye(4)
viewpad[: intrinsic.shape[0], : intrinsic.shape[1]] = intrinsic
lidar2img_rt = viewpad @ lidar2cam_rt.T
lidar2img_rts.append(lidar2img_rt)
intrinsics.append(viewpad)
extrinsics.append(lidar2cam_rt.T)
input_dict.update(
dict(
img_filename=image_paths,
lidar2img=lidar2img_rts,
intrinsic=intrinsics,
extrinsic=extrinsics,
)
)
if not self.test_mode:
annos = self.get_ann_info(index)
# add extra labels for tracklet IDs
if self.use_valid_flag:
mask = info["valid_flag"]
else:
mask = info["num_lidar_pts"] > 0
instance_inds = np.array(info["instance_inds"], dtype=np.int)[mask]
annos["instance_inds"] = instance_inds
input_dict["ann_info"] = annos
np.set_printoptions(precision=3, suppress=True)
import torch
torch.set_printoptions(precision=3, sci_mode=False)
# print("\ninput", input_dict)
print("\ninstance_inds", annos['instance_inds'])
print("\ngt_bboxes_3d", annos['gt_bboxes_3d'])
print("\ngt_names", annos['gt_names'])
# zxc
return input_dict
def _get_sample_range(self, start_idx):
"""Load index for a series of frames"""
# take default sampling method for normal dataset.
assert self.sample_mode in [
"fixed_interval",
"random_interval",
], "invalid sample mode: {}".format(self.sample_mode)
if self.sample_mode == "fixed_interval":
sample_interval = self.sample_interval
elif self.sample_mode == "random_interval":
sample_interval = np.random.randint(1, self.sample_interval + 1)
default_range = (
start_idx,
start_idx + (self.num_frames_per_sample - 1) * sample_interval + 1,
sample_interval,
)
return default_range
def prepare_train_data(self, index):
"""Training data preparation."""
# get the indexes of the starting and ending frames with an interval
start, end, interval = self._get_sample_range(index)
# loop through all frames requested
ret = None
for i in range(start, end, interval):
# load data for one frame using the default single-frame
# data loading pipeline
data_i = super().prepare_train_data(i)
# add instance indices for tracking GT if GT is not None
if data_i is None:
return None
else:
data_i["instance_inds"] = data_i["ann_info"]["instance_inds"]
# initialize results and add to dictionary
if ret is None:
ret = {key: [] for key in data_i.keys()}
for key, value in data_i.items():
ret[key].append(value)
# add data bundle
ret = self.pipeline_post(ret)
return ret
def prepare_test_data(self, index):
"""Prepare data for testing."""
# get the indexes of the starting and ending frames with an interval
start, end, interval = self._get_sample_range(index)
# loop through all frames requested
ret = None
for i in range(start, end, interval):
data_i = super().prepare_test_data(i)
# initialize results and add to dictionary
if ret is None:
ret = {key: [] for key in data_i.keys()}
for key, value in data_i.items():
ret[key].append(value)
# add data bundle
ret = self.pipeline_post(ret)
return ret
def _format_bbox(self, results, jsonfile_prefix=None):
"""Convert the results to the standard format.
Args:
results (list[dict]): Testing results of the dataset.
jsonfile_prefix (str): The prefix of the output jsonfile.
You can specify the output directory/filename by
modifying the jsonfile_prefix. Default: None.
Returns:
str: Path of the output json file.
"""
nusc_annos = {}
mapped_class_names = self.CLASSES
print("Start to convert detection format...")
for sample_id, det in enumerate(mmcv.track_iter_progress(results)):
annos = []
sample_token = self.data_infos[sample_id]["token"]
if det is None:
nusc_annos[sample_token] = annos
continue
boxes = output_to_nusc_box(det)
boxes = lidar_nusc_box_to_global(
self.data_infos[sample_id],
boxes,
mapped_class_names,
self.eval_detection_configs,
self.eval_version,
)
for i, box in enumerate(boxes):
name = mapped_class_names[box.label]
if np.sqrt(box.velocity[0] ** 2 + box.velocity[1] ** 2) > 0.2:
if name in [
"car",
"construction_vehicle",
"bus",
"truck",
"trailer",
]:
attr = "vehicle.moving"
elif name in ["bicycle", "motorcycle"]:
attr = "cycle.with_rider"
else:
attr = NuScenesTrackDataset.DefaultAttribute[name]
else:
if name in ["pedestrian"]:
attr = "pedestrian.standing"
elif name in ["bus"]:
attr = "vehicle.stopped"
else:
attr = NuScenesTrackDataset.DefaultAttribute[name]
center_ = box.center.tolist()
# change from ground height to center height
center_[2] = center_[2] + (box.wlh.tolist()[2] / 2.0)
nusc_anno = dict(
sample_token=sample_token,
translation=box.center.tolist(),
size=box.wlh.tolist(),
rotation=box.orientation.elements.tolist(),
velocity=box.velocity[:2].tolist(),
tracking_name=name,
attribute_name=attr,
tracking_score=box.score,
tracking_id=box.token,
)
annos.append(nusc_anno)
nusc_annos[sample_token] = annos
nusc_submissions = {
"meta": self.modality,
"results": nusc_annos,
}
mmcv.mkdir_or_exist(jsonfile_prefix)
res_path = osp.join(jsonfile_prefix, "results_nusc.json")
print("Results writes to", res_path)
mmcv.dump(nusc_submissions, res_path)
return res_path
def _evaluate_single(
self, result_path, logger=None, metric="bbox", result_name="pts_bbox"
):
"""Evaluation for a single model in nuScenes protocol.
Args:
result_path (str): Path of the result file.
logger (logging.Logger | str | None): Logger used for printing
related information during evaluation. Default: None.
metric (str): Metric name used for evaluation. Default: 'bbox'.
result_name (str): Result name in the metric prefix.
Default: 'pts_bbox'.
Returns:
dict: Dictionary of evaluation details.
"""
from nuscenes import NuScenes
from nuscenes.eval.detection.evaluate import NuScenesEval
output_dir = osp.join(*osp.split(result_path)[:-1])
eval_set_map = {
"v1.0-mini": "mini_val",
"v1.0-trainval": "val",
}
from nuscenes.eval.common.config import config_factory as track_configs
from nuscenes.eval.tracking.evaluate import TrackingEval
cfg = track_configs("tracking_nips_2019")
nusc_eval = TrackingEval(
config=cfg,
result_path=result_path,
eval_set=eval_set_map[self.version],
output_dir=output_dir,
verbose=True,
nusc_version=self.version,
nusc_dataroot=self.data_root,
)
metrics = nusc_eval.main()
# record metrics
metrics = mmcv.load(osp.join(output_dir, "metrics_summary.json"))
print(metrics)
detail = dict()
metric_prefix = f"{result_name}_NuScenes"
keys = [
"amota",
"amotp",
"recall",
"motar",
"gt",
"mota",
"motp",
"mt",
"ml",
"faf",
"tp",
"fp",
"fn",
"ids",
"frag",
"tid",
"lgd",
]
for key in keys:
detail["{}/{}".format(metric_prefix, key)] = metrics[key]
return detail
class NuScenesTrackingBox(NuScenesBox):
def __init__(
self,
center: List[float],
size: List[float],
orientation: Quaternion,
label: int = np.nan,
score: float = np.nan,
velocity: Tuple = (np.nan, np.nan, np.nan),
name: str = None,
token: str = None,
):
"""
:param center: Center of box given as x, y, z.
:param size: Size of box in width, length, height.
:param orientation: Box orientation.
:param label: Integer label, optional.
:param score: Classification score, optional.
:param velocity: Box velocity in x, y, z direction.
:param name: Box name, optional. Can be used e.g. for denote category name.
:param token: Unique string identifier from DB.
"""
super(NuScenesTrackingBox, self).__init__(
center, size, orientation, label, score, velocity, name, token
)
def rotate(self, quaternion: Quaternion) -> None:
self.center = np.dot(quaternion.rotation_matrix, self.center)
self.orientation = quaternion * self.orientation
self.velocity = np.dot(quaternion.rotation_matrix, self.velocity)
def copy(self) -> "NuScenesTrackingBox":
return copy.deepcopy(self)
def output_to_nusc_box(detection):
"""Convert the output to the box class in the nuScenes.
Args:
detection (dict): Detection results.
- boxes_3d (:obj:`BaseInstance3DBoxes`): Detection bbox.
- scores_3d (torch.Tensor): Detection scores.
- labels_3d (torch.Tensor): Predicted box labels.
tracking (bool): if convert for tracking evaluation
Returns:
list[:obj:`NuScenesBox`]: List of NuScenesTrackingBoxes.
"""
box3d = detection["boxes_3d"]
scores = detection["scores_3d"].numpy()
# overwrite the scores with the tracking scores
if "track_scores" in detection.keys() and detection["track_scores"] is not None:
scores = detection["track_scores"].numpy()
labels = detection["labels_3d"].numpy()
if "track_ids" in detection.keys() and detection["track_ids"] is not None:
track_ids = detection["track_ids"]
else:
track_ids = [None for _ in range(len(box3d))]
box_gravity_center = box3d.gravity_center.numpy()
box_dims = box3d.dims.numpy()
box_yaw = box3d.yaw.numpy()
# TODO: check whether this is necessary
# with dir_offset & dir_limit in the head
box_yaw = -box_yaw - np.pi / 2
box_list = []
for i in range(len(box3d)):
quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw[i])
velocity = (*box3d.tensor[i, 7:9], 0.0)
# velo_val = np.linalg.norm(box3d[i, 7:9])
# velo_ori = box3d[i, 6]
# velocity = (
# velo_val * np.cos(velo_ori), velo_val * np.sin(velo_ori), 0.0)
box = NuScenesTrackingBox(
box_gravity_center[i],
box_dims[i],
quat,
label=labels[i],
score=scores[i],
velocity=velocity,
token=str(track_ids[i]),
)
box_list.append(box)
return box_list
def _test():
file_client_args = dict(backend="disk")
point_cloud_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0]
voxel_size = [0.2, 0.2, 8]
img_norm_cfg = dict(
mean=[103.530, 116.280, 123.675], std=[1.0, 1.0, 1.0], to_rgb=False
)
dataset_type = "NuScenesTrackDataset"
data_root = "data/nuscenes/"
class_names = [
"car",
"truck",
"bus",
"trailer",
"motorcycle",
"bicycle",
"pedestrian",
]
input_modality = dict(
use_lidar=True,
use_camera=True,
use_radar=False,
use_map=False,
use_external=False,
)
train_pipeline = [
dict(
type="LoadPointsFromFile",
coord_type="LIDAR",
load_dim=5,
use_dim=5,
file_client_args=file_client_args,
),
dict(type="LoadMultiViewImageFromFiles"),
dict(
type="LoadPointsFromMultiSweeps",
sweeps_num=1,
use_dim=[0, 1, 2, 3, 4],
file_client_args=file_client_args,
pad_empty_sweeps=True,
remove_close=True,
),
dict(type="LoadAnnotations3D", with_bbox_3d=True, with_label_3d=True),
dict(type="ObjectRangeFilter", point_cloud_range=point_cloud_range),
dict(type="ObjectNameFilter", classes=class_names),
dict(type="Normalize3D", **img_norm_cfg),
dict(type="Pad3D", size_divisor=32),
]
train_pipeline_post = [
dict(type="FormatBundle3DTrack", class_names=class_names),
dict(type="Collect3D", keys=["points", "gt_bboxes_3d", "gt_labels_3d", "img"]),
]
data = dict(
samples_per_gpu=1,
workers_per_gpu=4,
train=dict(
type=dataset_type,
data_root=data_root,
ann_file=data_root + "track_infos_train.pkl", # this is user generated
pipeline=train_pipeline,
pipeline_post=train_pipeline_post,
classes=class_names,
modality=input_modality,
test_mode=False,
use_valid_flag=True,
box_type_3d="LiDAR",
),
)
from mmdet3d.datasets import build_dataset
from plugin.track.pipeline import FormatBundle3DTrack
dataset = build_dataset(data["train"])
from IPython import embed
embed()
if __name__ == "__main__":
_test()