| import copy |
| from os import path as osp |
| from typing import List, Tuple |
|
|
| import mmcv |
| import numpy as np |
| import pyquaternion |
| from mmdet3d.datasets.nuscenes_dataset import (NuScenesDataset, |
| lidar_nusc_box_to_global) |
| from mmdet3d.datasets.pipelines import Compose |
| from mmdet.datasets import DATASETS |
| from nuscenes.utils.data_classes import Box as NuScenesBox |
| from pyquaternion import Quaternion |
|
|
|
|
| @DATASETS.register_module() |
| class NuScenesTrackDataset(NuScenesDataset): |
| r"""NuScenes Dataset. |
| |
| This class serves as the API for experiments on the NuScenes Dataset. |
| |
| Please refer to `NuScenes Dataset <https://www.nuscenes.org/download>`_ |
| for data downloading. |
| |
| Args: |
| ann_file (str): Path of annotation file. |
| pipeline (list[dict], optional): Pipeline used for data processing. |
| Defaults to None. |
| data_root (str): Path of dataset root. |
| classes (tuple[str], optional): Classes used in the dataset. |
| Defaults to None. |
| load_interval (int, optional): Interval of loading the dataset. It is |
| used to uniformly sample the dataset. Defaults to 1. |
| with_velocity (bool, optional): Whether include velocity prediction |
| into the experiments. Defaults to True. |
| modality (dict, optional): Modality to specify the sensor data used |
| as input. Defaults to None. |
| box_type_3d (str, optional): Type of 3D box of this dataset. |
| Based on the `box_type_3d`, the dataset will encapsulate the box |
| to its original format then converted them to `box_type_3d`. |
| Defaults to 'LiDAR' in this dataset. Available options includes. |
| - 'LiDAR': Box in LiDAR coordinates. |
| - 'Depth': Box in depth coordinates, usually for indoor dataset. |
| - 'Camera': Box in camera coordinates. |
| filter_empty_gt (bool, optional): Whether to filter empty GT. |
| Defaults to True. |
| test_mode (bool, optional): Whether the dataset is in test mode. |
| Defaults to False. |
| eval_version (bool, optional): Configuration version of evaluation. |
| Defaults to 'detection_cvpr_2019'. |
| use_valid_flag (bool, optional): Whether to use `use_valid_flag` key |
| in the info file as mask to filter gt_boxes and gt_names. |
| Defaults to False. |
| """ |
|
|
| CLASSES = ["car", "truck", "bus", "trailer", "motorcycle", "bicycle", "pedestrian"] |
|
|
| def __init__( |
| self, |
| ann_file, |
| pipeline=None, |
| pipeline_post=None, |
| data_root=None, |
| classes=None, |
| load_interval=1, |
| with_velocity=True, |
| modality=None, |
| box_type_3d="LiDAR", |
| filter_empty_gt=True, |
| test_mode=False, |
| eval_version="detection_cvpr_2019", |
| sample_mode="fixed_interval", |
| sample_interval=1, |
| num_frames_per_sample=3, |
| use_valid_flag=True, |
| **kwargs, |
| ): |
| |
| self.sample_mode = sample_mode |
| self.sample_interval = sample_interval |
| self.num_frames_per_sample = num_frames_per_sample |
| if not test_mode: |
| self.num_frames_per_sample += 1 |
|
|
| |
| if pipeline_post is not None: |
| self.pipeline_post = Compose(pipeline_post) |
|
|
| |
| super().__init__( |
| data_root=data_root, |
| ann_file=ann_file, |
| pipeline=pipeline, |
| classes=classes, |
| modality=modality, |
| box_type_3d=box_type_3d, |
| filter_empty_gt=filter_empty_gt, |
| test_mode=test_mode, |
| with_velocity=with_velocity, |
| eval_version=eval_version, |
| load_interval=load_interval, |
| use_valid_flag=use_valid_flag) |
|
|
| def __len__(self): |
| """Return the length of data infos. |
| |
| Returns: |
| int: Length of data infos. |
| """ |
|
|
| |
| self.num_samples = ( |
| len(self.data_infos) |
| - (self.num_frames_per_sample - 1) * self.sample_interval |
| ) |
| return self.num_samples |
|
|
| def get_data_info(self, index): |
| """Get data info according to the given index. |
| |
| Args: |
| index (int): Index of the sample data to get. |
| |
| Returns: |
| dict: Data information that will be passed to the data |
| preprocessing pipelines. It includes the following keys: |
| |
| - sample_idx (str): Sample index. |
| - pts_filename (str): Filename of point clouds. |
| - sweeps (list[dict]): Infos of sweeps. |
| - timestamp (float): Sample timestamp. |
| - img_filename (str, optional): Image filename. |
| - lidar2img (list[np.ndarray], optional): Transformations |
| from lidar to different cameras. |
| - ann_info (dict): Annotation info. |
| """ |
| info = self.data_infos[index] |
| |
| input_dict = dict( |
| sample_idx=info["token"], |
| pts_filename=info["lidar_path"], |
| sweeps=info["sweeps"], |
| timestamp=info["timestamp"] / 1e6, |
| radar=info["radars"], |
| ) |
|
|
| l2e_r = info["lidar2ego_rotation"] |
| l2e_t = info["lidar2ego_translation"] |
| e2g_r = info["ego2global_rotation"] |
| e2g_t = info["ego2global_translation"] |
| l2e_r_mat = Quaternion(l2e_r).rotation_matrix |
| e2g_r_mat = Quaternion(e2g_r).rotation_matrix |
|
|
| l2g_r_mat = l2e_r_mat.T @ e2g_r_mat.T |
| l2g_t = l2e_t @ e2g_r_mat.T + e2g_t |
| |
| |
| |
| |
| |
|
|
| input_dict.update( |
| dict(l2g_r_mat=l2g_r_mat.astype(np.float32), l2g_t=l2g_t.astype(np.float32)) |
| ) |
|
|
| if self.modality["use_camera"]: |
| image_paths = [] |
| lidar2img_rts = [] |
| intrinsics = [] |
| extrinsics = [] |
| for cam_type, cam_info in info["cams"].items(): |
| image_paths.append(cam_info["data_path"]) |
| |
| lidar2cam_r = np.linalg.inv(cam_info["sensor2lidar_rotation"]) |
| lidar2cam_t = cam_info["sensor2lidar_translation"] @ lidar2cam_r.T |
| lidar2cam_rt = np.eye(4) |
| lidar2cam_rt[:3, :3] = lidar2cam_r.T |
| lidar2cam_rt[3, :3] = -lidar2cam_t |
| intrinsic = cam_info["cam_intrinsic"] |
| viewpad = np.eye(4) |
| viewpad[: intrinsic.shape[0], : intrinsic.shape[1]] = intrinsic |
| lidar2img_rt = viewpad @ lidar2cam_rt.T |
| lidar2img_rts.append(lidar2img_rt) |
| intrinsics.append(viewpad) |
| extrinsics.append(lidar2cam_rt.T) |
|
|
| input_dict.update( |
| dict( |
| img_filename=image_paths, |
| lidar2img=lidar2img_rts, |
| intrinsic=intrinsics, |
| extrinsic=extrinsics, |
| ) |
| ) |
|
|
| if not self.test_mode: |
| annos = self.get_ann_info(index) |
|
|
| |
| if self.use_valid_flag: |
| mask = info["valid_flag"] |
| else: |
| mask = info["num_lidar_pts"] > 0 |
| instance_inds = np.array(info["instance_inds"], dtype=np.int)[mask] |
| annos["instance_inds"] = instance_inds |
| input_dict["ann_info"] = annos |
|
|
| np.set_printoptions(precision=3, suppress=True) |
| import torch |
| torch.set_printoptions(precision=3, sci_mode=False) |
| |
| print("\ninstance_inds", annos['instance_inds']) |
| print("\ngt_bboxes_3d", annos['gt_bboxes_3d']) |
| print("\ngt_names", annos['gt_names']) |
| |
|
|
| return input_dict |
|
|
| def _get_sample_range(self, start_idx): |
| """Load index for a series of frames""" |
|
|
| |
| assert self.sample_mode in [ |
| "fixed_interval", |
| "random_interval", |
| ], "invalid sample mode: {}".format(self.sample_mode) |
| if self.sample_mode == "fixed_interval": |
| sample_interval = self.sample_interval |
| elif self.sample_mode == "random_interval": |
| sample_interval = np.random.randint(1, self.sample_interval + 1) |
| default_range = ( |
| start_idx, |
| start_idx + (self.num_frames_per_sample - 1) * sample_interval + 1, |
| sample_interval, |
| ) |
| return default_range |
|
|
| def prepare_train_data(self, index): |
| """Training data preparation.""" |
|
|
| |
| start, end, interval = self._get_sample_range(index) |
|
|
| |
| ret = None |
| for i in range(start, end, interval): |
| |
| |
| |
| data_i = super().prepare_train_data(i) |
| |
| |
| if data_i is None: |
| return None |
| else: |
| data_i["instance_inds"] = data_i["ann_info"]["instance_inds"] |
| |
| |
| if ret is None: |
| ret = {key: [] for key in data_i.keys()} |
| for key, value in data_i.items(): |
| ret[key].append(value) |
|
|
| |
| ret = self.pipeline_post(ret) |
| return ret |
|
|
| def prepare_test_data(self, index): |
| """Prepare data for testing.""" |
|
|
| |
| start, end, interval = self._get_sample_range(index) |
|
|
| |
| ret = None |
| for i in range(start, end, interval): |
| data_i = super().prepare_test_data(i) |
|
|
| |
| if ret is None: |
| ret = {key: [] for key in data_i.keys()} |
| for key, value in data_i.items(): |
| ret[key].append(value) |
|
|
| |
| ret = self.pipeline_post(ret) |
| return ret |
|
|
| def _format_bbox(self, results, jsonfile_prefix=None): |
| """Convert the results to the standard format. |
| |
| Args: |
| results (list[dict]): Testing results of the dataset. |
| jsonfile_prefix (str): The prefix of the output jsonfile. |
| You can specify the output directory/filename by |
| modifying the jsonfile_prefix. Default: None. |
| |
| Returns: |
| str: Path of the output json file. |
| """ |
| nusc_annos = {} |
| mapped_class_names = self.CLASSES |
|
|
| print("Start to convert detection format...") |
| for sample_id, det in enumerate(mmcv.track_iter_progress(results)): |
| annos = [] |
| sample_token = self.data_infos[sample_id]["token"] |
| if det is None: |
| nusc_annos[sample_token] = annos |
| continue |
| boxes = output_to_nusc_box(det) |
| boxes = lidar_nusc_box_to_global( |
| self.data_infos[sample_id], |
| boxes, |
| mapped_class_names, |
| self.eval_detection_configs, |
| self.eval_version, |
| ) |
| for i, box in enumerate(boxes): |
| name = mapped_class_names[box.label] |
| if np.sqrt(box.velocity[0] ** 2 + box.velocity[1] ** 2) > 0.2: |
| if name in [ |
| "car", |
| "construction_vehicle", |
| "bus", |
| "truck", |
| "trailer", |
| ]: |
| attr = "vehicle.moving" |
| elif name in ["bicycle", "motorcycle"]: |
| attr = "cycle.with_rider" |
| else: |
| attr = NuScenesTrackDataset.DefaultAttribute[name] |
| else: |
| if name in ["pedestrian"]: |
| attr = "pedestrian.standing" |
| elif name in ["bus"]: |
| attr = "vehicle.stopped" |
| else: |
| attr = NuScenesTrackDataset.DefaultAttribute[name] |
|
|
| center_ = box.center.tolist() |
| |
| center_[2] = center_[2] + (box.wlh.tolist()[2] / 2.0) |
| nusc_anno = dict( |
| sample_token=sample_token, |
| translation=box.center.tolist(), |
| size=box.wlh.tolist(), |
| rotation=box.orientation.elements.tolist(), |
| velocity=box.velocity[:2].tolist(), |
| tracking_name=name, |
| attribute_name=attr, |
| tracking_score=box.score, |
| tracking_id=box.token, |
| ) |
| annos.append(nusc_anno) |
| nusc_annos[sample_token] = annos |
| nusc_submissions = { |
| "meta": self.modality, |
| "results": nusc_annos, |
| } |
|
|
| mmcv.mkdir_or_exist(jsonfile_prefix) |
| res_path = osp.join(jsonfile_prefix, "results_nusc.json") |
| print("Results writes to", res_path) |
| mmcv.dump(nusc_submissions, res_path) |
| return res_path |
|
|
| def _evaluate_single( |
| self, result_path, logger=None, metric="bbox", result_name="pts_bbox" |
| ): |
| """Evaluation for a single model in nuScenes protocol. |
| |
| Args: |
| result_path (str): Path of the result file. |
| logger (logging.Logger | str | None): Logger used for printing |
| related information during evaluation. Default: None. |
| metric (str): Metric name used for evaluation. Default: 'bbox'. |
| result_name (str): Result name in the metric prefix. |
| Default: 'pts_bbox'. |
| |
| Returns: |
| dict: Dictionary of evaluation details. |
| """ |
| from nuscenes import NuScenes |
| from nuscenes.eval.detection.evaluate import NuScenesEval |
|
|
| output_dir = osp.join(*osp.split(result_path)[:-1]) |
|
|
| eval_set_map = { |
| "v1.0-mini": "mini_val", |
| "v1.0-trainval": "val", |
| } |
| from nuscenes.eval.common.config import config_factory as track_configs |
| from nuscenes.eval.tracking.evaluate import TrackingEval |
|
|
| cfg = track_configs("tracking_nips_2019") |
| nusc_eval = TrackingEval( |
| config=cfg, |
| result_path=result_path, |
| eval_set=eval_set_map[self.version], |
| output_dir=output_dir, |
| verbose=True, |
| nusc_version=self.version, |
| nusc_dataroot=self.data_root, |
| ) |
| metrics = nusc_eval.main() |
|
|
| |
| metrics = mmcv.load(osp.join(output_dir, "metrics_summary.json")) |
| print(metrics) |
| detail = dict() |
| metric_prefix = f"{result_name}_NuScenes" |
| keys = [ |
| "amota", |
| "amotp", |
| "recall", |
| "motar", |
| "gt", |
| "mota", |
| "motp", |
| "mt", |
| "ml", |
| "faf", |
| "tp", |
| "fp", |
| "fn", |
| "ids", |
| "frag", |
| "tid", |
| "lgd", |
| ] |
| for key in keys: |
| detail["{}/{}".format(metric_prefix, key)] = metrics[key] |
| return detail |
|
|
|
|
| class NuScenesTrackingBox(NuScenesBox): |
| def __init__( |
| self, |
| center: List[float], |
| size: List[float], |
| orientation: Quaternion, |
| label: int = np.nan, |
| score: float = np.nan, |
| velocity: Tuple = (np.nan, np.nan, np.nan), |
| name: str = None, |
| token: str = None, |
| ): |
| """ |
| :param center: Center of box given as x, y, z. |
| :param size: Size of box in width, length, height. |
| :param orientation: Box orientation. |
| :param label: Integer label, optional. |
| :param score: Classification score, optional. |
| :param velocity: Box velocity in x, y, z direction. |
| :param name: Box name, optional. Can be used e.g. for denote category name. |
| :param token: Unique string identifier from DB. |
| """ |
| super(NuScenesTrackingBox, self).__init__( |
| center, size, orientation, label, score, velocity, name, token |
| ) |
|
|
| def rotate(self, quaternion: Quaternion) -> None: |
| self.center = np.dot(quaternion.rotation_matrix, self.center) |
| self.orientation = quaternion * self.orientation |
| self.velocity = np.dot(quaternion.rotation_matrix, self.velocity) |
|
|
| def copy(self) -> "NuScenesTrackingBox": |
| return copy.deepcopy(self) |
|
|
|
|
| def output_to_nusc_box(detection): |
| """Convert the output to the box class in the nuScenes. |
| |
| Args: |
| detection (dict): Detection results. |
| |
| - boxes_3d (:obj:`BaseInstance3DBoxes`): Detection bbox. |
| - scores_3d (torch.Tensor): Detection scores. |
| - labels_3d (torch.Tensor): Predicted box labels. |
| |
| tracking (bool): if convert for tracking evaluation |
| |
| Returns: |
| list[:obj:`NuScenesBox`]: List of NuScenesTrackingBoxes. |
| """ |
| box3d = detection["boxes_3d"] |
| scores = detection["scores_3d"].numpy() |
| |
| if "track_scores" in detection.keys() and detection["track_scores"] is not None: |
| scores = detection["track_scores"].numpy() |
| labels = detection["labels_3d"].numpy() |
|
|
| if "track_ids" in detection.keys() and detection["track_ids"] is not None: |
| track_ids = detection["track_ids"] |
| else: |
| track_ids = [None for _ in range(len(box3d))] |
|
|
| box_gravity_center = box3d.gravity_center.numpy() |
| box_dims = box3d.dims.numpy() |
| box_yaw = box3d.yaw.numpy() |
| |
| |
| box_yaw = -box_yaw - np.pi / 2 |
|
|
| box_list = [] |
| for i in range(len(box3d)): |
| quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw[i]) |
| velocity = (*box3d.tensor[i, 7:9], 0.0) |
| |
| |
| |
| |
| box = NuScenesTrackingBox( |
| box_gravity_center[i], |
| box_dims[i], |
| quat, |
| label=labels[i], |
| score=scores[i], |
| velocity=velocity, |
| token=str(track_ids[i]), |
| ) |
| box_list.append(box) |
| return box_list |
|
|
|
|
| def _test(): |
| file_client_args = dict(backend="disk") |
| point_cloud_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0] |
| voxel_size = [0.2, 0.2, 8] |
|
|
| img_norm_cfg = dict( |
| mean=[103.530, 116.280, 123.675], std=[1.0, 1.0, 1.0], to_rgb=False |
| ) |
| dataset_type = "NuScenesTrackDataset" |
| data_root = "data/nuscenes/" |
| class_names = [ |
| "car", |
| "truck", |
| "bus", |
| "trailer", |
| "motorcycle", |
| "bicycle", |
| "pedestrian", |
| ] |
| input_modality = dict( |
| use_lidar=True, |
| use_camera=True, |
| use_radar=False, |
| use_map=False, |
| use_external=False, |
| ) |
| train_pipeline = [ |
| dict( |
| type="LoadPointsFromFile", |
| coord_type="LIDAR", |
| load_dim=5, |
| use_dim=5, |
| file_client_args=file_client_args, |
| ), |
| dict(type="LoadMultiViewImageFromFiles"), |
| dict( |
| type="LoadPointsFromMultiSweeps", |
| sweeps_num=1, |
| use_dim=[0, 1, 2, 3, 4], |
| file_client_args=file_client_args, |
| pad_empty_sweeps=True, |
| remove_close=True, |
| ), |
| dict(type="LoadAnnotations3D", with_bbox_3d=True, with_label_3d=True), |
| dict(type="ObjectRangeFilter", point_cloud_range=point_cloud_range), |
| dict(type="ObjectNameFilter", classes=class_names), |
| dict(type="Normalize3D", **img_norm_cfg), |
| dict(type="Pad3D", size_divisor=32), |
| ] |
|
|
| train_pipeline_post = [ |
| dict(type="FormatBundle3DTrack", class_names=class_names), |
| dict(type="Collect3D", keys=["points", "gt_bboxes_3d", "gt_labels_3d", "img"]), |
| ] |
|
|
| data = dict( |
| samples_per_gpu=1, |
| workers_per_gpu=4, |
| train=dict( |
| type=dataset_type, |
| data_root=data_root, |
| ann_file=data_root + "track_infos_train.pkl", |
| pipeline=train_pipeline, |
| pipeline_post=train_pipeline_post, |
| classes=class_names, |
| modality=input_modality, |
| test_mode=False, |
| use_valid_flag=True, |
| box_type_3d="LiDAR", |
| ), |
| ) |
|
|
| from mmdet3d.datasets import build_dataset |
|
|
| from plugin.track.pipeline import FormatBundle3DTrack |
|
|
| dataset = build_dataset(data["train"]) |
|
|
| from IPython import embed |
|
|
| embed() |
|
|
|
|
| if __name__ == "__main__": |
| _test() |
|
|