| |
| from os import path as osp |
|
|
| import mmcv |
|
|
| import numpy as np |
| import pyquaternion |
| import mmdet3d.datasets.nuscenes_dataset as nuscenes_utils |
| from mmdet3d.datasets.builder import DATASETS |
| from mmdet3d.datasets.nuscenes_dataset import NuScenesDataset |
| from mmdet3d_plugin.eval.detection.config import config_factory |
|
|
|
|
| @DATASETS.register_module() |
| class CarlaChallengeDataset(NuScenesDataset): |
| """Carla Dataset. |
| |
| This class serves as the API for experiments on the Carla Dataset. |
| |
| Args: |
| data_root (str): Path of dataset root. |
| ann_file (str): Path of annotation file. |
| split (str): Split of input data. |
| pts_prefix (str, optional): Prefix of points files. |
| Defaults to 'velodyne'. |
| pipeline (list[dict], optional): Pipeline used for data processing. |
| Defaults to None. |
| classes (tuple[str], optional): Classes used in the dataset. |
| Defaults to None. |
| modality (dict, optional): Modality to specify the sensor data used |
| as input. Defaults to None. |
| box_type_3d (str, optional): Type of 3D box of this dataset. |
| Based on the `box_type_3d`, the dataset will encapsulate the box |
| to its original format then converted them to `box_type_3d`. |
| Defaults to 'LiDAR' in this dataset. Available options includes |
| |
| - 'LiDAR': box in LiDAR coordinates |
| - 'Depth': box in depth coordinates, usually for indoor dataset |
| - 'Camera': box in camera coordinates |
| filter_empty_gt (bool, optional): Whether to filter empty GT. |
| Defaults to True. |
| test_mode (bool, optional): Whether the dataset is in test mode. |
| Defaults to False. |
| pcd_limit_range (list(float), optional): The range of point cloud used |
| to filter invalid predicted boxes. |
| Default: [-85, -85, -5, 85, 85, 5]. |
| """ |
|
|
| |
| CLASSES = ("Car", "Motorcycle", "Cyclist", "Pedestrian") |
|
|
| def __init__( |
| self, |
| ann_file, |
| pipeline=None, |
| data_root=None, |
| classes=None, |
| load_interval=1, |
| with_velocity=True, |
| modality=None, |
| box_type_3d="LiDAR", |
| filter_empty_gt=True, |
| test_mode=False, |
| eval_version="detection_cvpr_2019", |
| use_valid_flag=False, |
| ): |
| self.load_interval = load_interval |
| self.use_valid_flag = use_valid_flag |
| super().__init__( |
| data_root=data_root, |
| ann_file=ann_file, |
| pipeline=pipeline, |
| classes=classes, |
| modality=modality, |
| box_type_3d=box_type_3d, |
| filter_empty_gt=filter_empty_gt, |
| test_mode=test_mode, |
| ) |
|
|
| self.with_velocity = with_velocity |
|
|
| |
| self.eval_detection_configs = config_factory() |
|
|
| if self.modality is None: |
| self.modality = dict( |
| use_camera=False, |
| use_lidar=True, |
| use_radar=False, |
| use_map=False, |
| use_external=False, |
| ) |
|
|
| def load_annotations(self, ann_file): |
| """Load annotations from ann_file. |
| |
| Args: |
| ann_file (str): Path of the annotation file. |
| |
| Returns: |
| list[dict]: List of annotations sorted by timestamps. |
| """ |
| data = mmcv.load(ann_file, file_format="pkl") |
| data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"])) |
| data_infos = data_infos[:: self.load_interval] |
| self.metadata = data["metadata"] |
| self.version = "v1.0-trainval" |
| return data_infos |
|
|
| def get_data_info(self, index): |
| """Get data info according to the given index. |
| |
| Args: |
| index (int): Index of the sample data to get. |
| |
| Returns: |
| dict: Data information that will be passed to the data |
| preprocessing pipelines. It includes the following keys: |
| |
| - sample_idx (str): Sample index. |
| - pts_filename (str): Filename of point clouds. |
| - sweeps (list[dict]): Infos of sweeps. |
| - timestamp (float): Sample timestamp. |
| - img_filename (str, optional): Image filename. |
| - lidar2img (list[np.ndarray], optional): Transformations |
| from lidar to different cameras. |
| - ann_info (dict): Annotation info. |
| """ |
| info = self.data_infos[index] |
| |
| input_dict = dict( |
| sample_idx=info["token"], |
| pts_filename=info["lidar_path"], |
| sweeps=info["sweeps"], |
| timestamp=int(info["timestamp"]), |
| ) |
|
|
| if self.modality["use_camera"]: |
| image_paths = [] |
| lidar2img_rts = [] |
| images = [] |
|
|
| |
| for cam_type in ["left", "front", "right", "tele"]: |
| |
| cam_info = info["cams"][cam_type] |
| image_paths.append(cam_info["data_path"]) |
| image = mmcv.imread(cam_info["data_path"], "unchanged") |
| images.append(image) |
|
|
| |
| |
| sensor2lidar_rotation: np.ndarray = cam_info[ |
| "sensor2lidar_rotation" |
| ] |
| sensor2lidar_rotation_mat: np.ndarray = pyquaternion.Quaternion( |
| sensor2lidar_rotation |
| ).rotation_matrix |
|
|
| lidar2cam_r = np.linalg.inv(sensor2lidar_rotation_mat) |
| lidar2cam_t = cam_info["sensor2lidar_translation"] @ lidar2cam_r.T |
| lidar2cam_rt = np.eye(4) |
| lidar2cam_rt[:3, :3] = lidar2cam_r.T |
| lidar2cam_rt[3, :3] = -lidar2cam_t |
| swap_mat = np.asarray( |
| [[0, 1, 0, 0], [0, 0, -1, 0], [1, 0, 0, 0], [0, 0, 0, 1]] |
| ) |
| intrinsic = cam_info["cam_intrinsic"] |
| viewpad = np.eye(4) |
| viewpad[: intrinsic.shape[0], : intrinsic.shape[1]] = intrinsic |
| lidar2img_rt = viewpad @ swap_mat @ lidar2cam_rt.T |
| lidar2img_rts.append(lidar2img_rt) |
|
|
| input_dict.update( |
| dict( |
| img=images, |
| img_shape=images[0].shape + (len(images),), |
| img_filename=image_paths, |
| lidar2img=lidar2img_rts, |
| ) |
| ) |
|
|
| |
| if not self.test_mode: |
| annos = self.get_ann_info(index) |
|
|
| |
| annos["light_hazard"] = np.array([info["traffic_light"]["hazard"]]).astype( |
| "int32" |
| ) |
| annos["light_inrange"] = np.array( |
| [info["traffic_light"]["in_range"]] |
| ).astype("int32") |
| annos["sign_inrange"] = np.array([info["stop_sign"]["in_range"]]).astype( |
| "int32" |
| ) |
| input_dict["ann_info"] = annos |
|
|
| return input_dict |
|
|
| def add_traffic(self, input_dict, info): |
| gt_boxes = np.concatenate( |
| (info["traffic_light"]["gt_boxes"], info["stop_sign"]["gt_boxes"])) |
|
|
| gt_names = np.concatenate( |
| (info["traffic_light"]["gt_names"], info["stop_sign"]["gt_names"])) |
|
|
| mask = np.concatenate(( |
| info["traffic_light"]["valid_flag"], info["stop_sign"]["valid_flag"])) |
|
|
| gt_bboxes_3d = gt_boxes[mask] |
| gt_names_3d = gt_names[mask] |
| gt_labels_3d = [] |
| for cat in gt_names_3d: |
| if cat in self.CLASSES: |
| gt_labels_3d.append(self.CLASSES.index(cat)) |
| else: |
| gt_labels_3d.append(-1) |
| gt_labels_3d = np.array(gt_labels_3d) |
| gt_bboxes_3d = np.concatenate((gt_bboxes_3d, np.zeros((gt_bboxes_3d.shape[0], 2))), -1) |
| gt_bboxes_3d = LiDARInstance3DBoxes( |
| gt_bboxes_3d, |
| box_dim=gt_bboxes_3d.shape[-1], |
| origin=(0.5, 0.5, 0.5)).convert_to(self.box_mode_3d) |
| annos = input_dict["ann_info"] |
| annos["gt_bboxes_3d"].tensor = torch.cat( |
| (annos["gt_bboxes_3d"].tensor, gt_bboxes_3d.tensor), 0) |
| annos["gt_labels_3d"] = np.concatenate( |
| (annos["gt_labels_3d"], gt_labels_3d.astype(annos["gt_labels_3d"].dtype)), 0).astype(np.int) |
| annos["gt_names"] = np.concatenate( |
| (annos["gt_names"], gt_names_3d), 0) |
|
|
| def _format_bbox(self, results, jsonfile_prefix=None): |
| """Modify to remove the attribute names for each detection""" |
|
|
| nusc_annos = {} |
| mapped_class_names = self.CLASSES |
|
|
| print("Start to convert detection format...") |
| for sample_id, det in enumerate(mmcv.track_iter_progress(results)): |
| annos = [] |
| boxes = nuscenes_utils.output_to_nusc_box(det, self.with_velocity) |
| sample_token = self.data_infos[sample_id]["token"] |
| boxes = nuscenes_utils.lidar_nusc_box_to_global( |
| self.data_infos[sample_id], |
| boxes, |
| mapped_class_names, |
| self.eval_detection_configs, |
| self.eval_version, |
| ) |
| for i, box in enumerate(boxes): |
| name = mapped_class_names[box.label] |
| nusc_anno = dict( |
| sample_token=sample_token, |
| translation=box.center.tolist(), |
| size=box.wlh.tolist(), |
| rotation=box.orientation.elements.tolist(), |
| velocity=box.velocity[:2].tolist(), |
| detection_name=name, |
| detection_score=box.score, |
| attribute_name="no attribute", |
| ) |
| annos.append(nusc_anno) |
| nusc_annos[sample_token] = annos |
| nusc_submissions = { |
| "meta": self.modality, |
| "results": nusc_annos, |
| } |
|
|
| mmcv.mkdir_or_exist(jsonfile_prefix) |
| res_path = osp.join(jsonfile_prefix, "results_nusc.json") |
| print("Results writes to", res_path) |
| mmcv.dump(nusc_submissions, res_path) |
| return res_path |
|
|