| import os |
| import mmcv |
| import numpy as np |
| from mmdet.datasets.builder import PIPELINES |
| from numpy.linalg import inv |
| from mmcv.runner import get_dist_info |
|
|
|
|
| def compose_lidar2img(ego2global_translation_curr, |
| ego2global_rotation_curr, |
| lidar2ego_translation_curr, |
| lidar2ego_rotation_curr, |
| sensor2global_translation_past, |
| sensor2global_rotation_past, |
| cam_intrinsic_past): |
| |
| R = sensor2global_rotation_past @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) |
| T = sensor2global_translation_past @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) |
| T -= ego2global_translation_curr @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) + lidar2ego_translation_curr @ inv(lidar2ego_rotation_curr).T |
|
|
| lidar2cam_r = inv(R.T) |
| lidar2cam_t = T @ lidar2cam_r.T |
|
|
| lidar2cam_rt = np.eye(4) |
| lidar2cam_rt[:3, :3] = lidar2cam_r.T |
| lidar2cam_rt[3, :3] = -lidar2cam_t |
|
|
| viewpad = np.eye(4) |
| viewpad[:cam_intrinsic_past.shape[0], :cam_intrinsic_past.shape[1]] = cam_intrinsic_past |
| lidar2img = (viewpad @ lidar2cam_rt.T).astype(np.float32) |
|
|
| return lidar2img |
|
|
|
|
| @PIPELINES.register_module() |
| class LoadMultiViewImageFromMultiSweeps(object): |
| def __init__(self, |
| sweeps_num=5, |
| color_type='color', |
| test_mode=False): |
| self.sweeps_num = sweeps_num |
| self.color_type = color_type |
| self.test_mode = test_mode |
|
|
| self.train_interval = [4, 8] |
| self.test_interval = 6 |
|
|
| try: |
| mmcv.use_backend('turbojpeg') |
| except ImportError: |
| mmcv.use_backend('cv2') |
|
|
| def load_offline(self, results): |
| cam_types = [ |
| 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', |
| 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' |
| ] |
|
|
| if len(results['sweeps']['prev']) == 0: |
| for _ in range(self.sweeps_num): |
| for j in range(len(cam_types)): |
| results['img'].append(results['img'][j]) |
| results['img_timestamp'].append(results['img_timestamp'][j]) |
| results['filename'].append(results['filename'][j]) |
| results['lidar2img'].append(np.copy(results['lidar2img'][j])) |
| else: |
| if self.test_mode: |
| interval = self.test_interval |
| choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)] |
| elif len(results['sweeps']['prev']) <= self.sweeps_num: |
| pad_len = self.sweeps_num - len(results['sweeps']['prev']) |
| choices = list(range(len(results['sweeps']['prev']))) + [len(results['sweeps']['prev']) - 1] * pad_len |
| else: |
| max_interval = len(results['sweeps']['prev']) // self.sweeps_num |
| max_interval = min(max_interval, self.train_interval[1]) |
| min_interval = min(max_interval, self.train_interval[0]) |
| interval = np.random.randint(min_interval, max_interval + 1) |
| choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)] |
|
|
| for idx in sorted(list(choices)): |
| sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) |
| sweep = results['sweeps']['prev'][sweep_idx] |
|
|
| if len(sweep.keys()) < len(cam_types): |
| sweep = results['sweeps']['prev'][sweep_idx - 1] |
|
|
| for sensor in cam_types: |
| results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) |
| results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) |
| results['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) |
| results['lidar2img'].append(compose_lidar2img( |
| results['ego2global_translation'], |
| results['ego2global_rotation'], |
| results['lidar2ego_translation'], |
| results['lidar2ego_rotation'], |
| sweep[sensor]['sensor2global_translation'], |
| sweep[sensor]['sensor2global_rotation'], |
| sweep[sensor]['cam_intrinsic'], |
| )) |
|
|
| return results |
|
|
| def load_online(self, results): |
| |
| assert self.test_mode |
| assert self.test_interval == 6 |
|
|
| cam_types = [ |
| 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', |
| 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' |
| ] |
|
|
| if len(results['sweeps']['prev']) == 0: |
| for _ in range(self.sweeps_num): |
| for j in range(len(cam_types)): |
| results['img_timestamp'].append(results['img_timestamp'][j]) |
| results['filename'].append(results['filename'][j]) |
| results['lidar2img'].append(np.copy(results['lidar2img'][j])) |
| else: |
| interval = self.test_interval |
| choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)] |
|
|
| for idx in sorted(list(choices)): |
| sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) |
| sweep = results['sweeps']['prev'][sweep_idx] |
|
|
| if len(sweep.keys()) < len(cam_types): |
| sweep = results['sweeps']['prev'][sweep_idx - 1] |
|
|
| for sensor in cam_types: |
| |
| results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) |
| results['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) |
| results['lidar2img'].append(compose_lidar2img( |
| results['ego2global_translation'], |
| results['ego2global_rotation'], |
| results['lidar2ego_translation'], |
| results['lidar2ego_rotation'], |
| sweep[sensor]['sensor2global_translation'], |
| sweep[sensor]['sensor2global_rotation'], |
| sweep[sensor]['cam_intrinsic'], |
| )) |
|
|
| return results |
|
|
| def __call__(self, results): |
| if self.sweeps_num == 0: |
| return results |
|
|
| world_size = get_dist_info()[1] |
| if world_size == 1 and self.test_mode: |
| return self.load_online(results) |
| else: |
| return self.load_offline(results) |
|
|
|
|
| @PIPELINES.register_module() |
| class LoadMultiViewImageFromMultiSweepsFuture(object): |
| def __init__(self, |
| prev_sweeps_num=5, |
| next_sweeps_num=5, |
| color_type='color', |
| test_mode=False): |
| self.prev_sweeps_num = prev_sweeps_num |
| self.next_sweeps_num = next_sweeps_num |
| self.color_type = color_type |
| self.test_mode = test_mode |
|
|
| assert prev_sweeps_num == next_sweeps_num |
|
|
| self.train_interval = [4, 8] |
| self.test_interval = 6 |
|
|
| try: |
| mmcv.use_backend('turbojpeg') |
| except ImportError: |
| mmcv.use_backend('cv2') |
|
|
| def __call__(self, results): |
| if self.prev_sweeps_num == 0 and self.next_sweeps_num == 0: |
| return results |
|
|
| cam_types = [ |
| 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', |
| 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' |
| ] |
|
|
| if self.test_mode: |
| interval = self.test_interval |
| else: |
| interval = np.random.randint(self.train_interval[0], self.train_interval[1] + 1) |
|
|
| |
| if len(results['sweeps']['prev']) == 0: |
| for _ in range(self.prev_sweeps_num): |
| for j in range(len(cam_types)): |
| results['img'].append(results['img'][j]) |
| results['img_timestamp'].append(results['img_timestamp'][j]) |
| results['filename'].append(results['filename'][j]) |
| results['lidar2img'].append(np.copy(results['lidar2img'][j])) |
| else: |
| choices = [(k + 1) * interval - 1 for k in range(self.prev_sweeps_num)] |
|
|
| for idx in sorted(list(choices)): |
| sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) |
| sweep = results['sweeps']['prev'][sweep_idx] |
|
|
| if len(sweep.keys()) < len(cam_types): |
| sweep = results['sweeps']['prev'][sweep_idx - 1] |
|
|
| for sensor in cam_types: |
| results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) |
| results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) |
| results['filename'].append(sweep[sensor]['data_path']) |
| results['lidar2img'].append(compose_lidar2img( |
| results['ego2global_translation'], |
| results['ego2global_rotation'], |
| results['lidar2ego_translation'], |
| results['lidar2ego_rotation'], |
| sweep[sensor]['sensor2global_translation'], |
| sweep[sensor]['sensor2global_rotation'], |
| sweep[sensor]['cam_intrinsic'], |
| )) |
|
|
| |
| if len(results['sweeps']['next']) == 0: |
| for _ in range(self.next_sweeps_num): |
| for j in range(len(cam_types)): |
| results['img'].append(results['img'][j]) |
| results['img_timestamp'].append(results['img_timestamp'][j]) |
| results['filename'].append(results['filename'][j]) |
| results['lidar2img'].append(np.copy(results['lidar2img'][j])) |
| else: |
| choices = [(k + 1) * interval - 1 for k in range(self.next_sweeps_num)] |
|
|
| for idx in sorted(list(choices)): |
| sweep_idx = min(idx, len(results['sweeps']['next']) - 1) |
| sweep = results['sweeps']['next'][sweep_idx] |
|
|
| if len(sweep.keys()) < len(cam_types): |
| sweep = results['sweeps']['next'][sweep_idx - 1] |
|
|
| for sensor in cam_types: |
| results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) |
| results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) |
| results['filename'].append(sweep[sensor]['data_path']) |
| results['lidar2img'].append(compose_lidar2img( |
| results['ego2global_translation'], |
| results['ego2global_rotation'], |
| results['lidar2ego_translation'], |
| results['lidar2ego_rotation'], |
| sweep[sensor]['sensor2global_translation'], |
| sweep[sensor]['sensor2global_rotation'], |
| sweep[sensor]['cam_intrinsic'], |
| )) |
|
|
| return results |
|
|
|
|
| ''' |
| This func loads previous and future frames in interleaved order, |
| e.g. curr, prev1, next1, prev2, next2, prev3, next3... |
| ''' |
| @PIPELINES.register_module() |
| class LoadMultiViewImageFromMultiSweepsFutureInterleave(object): |
| def __init__(self, |
| prev_sweeps_num=5, |
| next_sweeps_num=5, |
| color_type='color', |
| test_mode=False): |
| self.prev_sweeps_num = prev_sweeps_num |
| self.next_sweeps_num = next_sweeps_num |
| self.color_type = color_type |
| self.test_mode = test_mode |
|
|
| assert prev_sweeps_num == next_sweeps_num |
|
|
| self.train_interval = [4, 8] |
| self.test_interval = 6 |
|
|
| try: |
| mmcv.use_backend('turbojpeg') |
| except ImportError: |
| mmcv.use_backend('cv2') |
|
|
| def __call__(self, results): |
| if self.prev_sweeps_num == 0 and self.next_sweeps_num == 0: |
| return results |
|
|
| cam_types = [ |
| 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', |
| 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' |
| ] |
|
|
| if self.test_mode: |
| interval = self.test_interval |
| else: |
| interval = np.random.randint(self.train_interval[0], self.train_interval[1] + 1) |
|
|
| results_prev = dict( |
| img=[], |
| img_timestamp=[], |
| filename=[], |
| lidar2img=[] |
| ) |
| results_next = dict( |
| img=[], |
| img_timestamp=[], |
| filename=[], |
| lidar2img=[] |
| ) |
|
|
| if len(results['sweeps']['prev']) == 0: |
| for _ in range(self.prev_sweeps_num): |
| for j in range(len(cam_types)): |
| results_prev['img'].append(results['img'][j]) |
| results_prev['img_timestamp'].append(results['img_timestamp'][j]) |
| results_prev['filename'].append(results['filename'][j]) |
| results_prev['lidar2img'].append(np.copy(results['lidar2img'][j])) |
| else: |
| choices = [(k + 1) * interval - 1 for k in range(self.prev_sweeps_num)] |
|
|
| for idx in sorted(list(choices)): |
| sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) |
| sweep = results['sweeps']['prev'][sweep_idx] |
|
|
| if len(sweep.keys()) < len(cam_types): |
| sweep = results['sweeps']['prev'][sweep_idx - 1] |
|
|
| for sensor in cam_types: |
| results_prev['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) |
| results_prev['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) |
| results_prev['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) |
| results_prev['lidar2img'].append(compose_lidar2img( |
| results['ego2global_translation'], |
| results['ego2global_rotation'], |
| results['lidar2ego_translation'], |
| results['lidar2ego_rotation'], |
| sweep[sensor]['sensor2global_translation'], |
| sweep[sensor]['sensor2global_rotation'], |
| sweep[sensor]['cam_intrinsic'], |
| )) |
|
|
| if len(results['sweeps']['next']) == 0: |
| print(1, len(results_next['img']) ) |
| for _ in range(self.next_sweeps_num): |
| for j in range(len(cam_types)): |
| results_next['img'].append(results['img'][j]) |
| results_next['img_timestamp'].append(results['img_timestamp'][j]) |
| results_next['filename'].append(results['filename'][j]) |
| results_next['lidar2img'].append(np.copy(results['lidar2img'][j])) |
| else: |
| choices = [(k + 1) * interval - 1 for k in range(self.next_sweeps_num)] |
|
|
| for idx in sorted(list(choices)): |
| sweep_idx = min(idx, len(results['sweeps']['next']) - 1) |
| sweep = results['sweeps']['next'][sweep_idx] |
|
|
| if len(sweep.keys()) < len(cam_types): |
| sweep = results['sweeps']['next'][sweep_idx - 1] |
|
|
| for sensor in cam_types: |
| results_next['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) |
| results_next['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) |
| results_next['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) |
| results_next['lidar2img'].append(compose_lidar2img( |
| results['ego2global_translation'], |
| results['ego2global_rotation'], |
| results['lidar2ego_translation'], |
| results['lidar2ego_rotation'], |
| sweep[sensor]['sensor2global_translation'], |
| sweep[sensor]['sensor2global_rotation'], |
| sweep[sensor]['cam_intrinsic'], |
| )) |
|
|
| assert len(results_prev['img']) % 6 == 0 |
| assert len(results_next['img']) % 6 == 0 |
|
|
| for i in range(len(results_prev['img']) // 6): |
| for j in range(6): |
| results['img'].append(results_prev['img'][i * 6 + j]) |
| results['img_timestamp'].append(results_prev['img_timestamp'][i * 6 + j]) |
| results['filename'].append(results_prev['filename'][i * 6 + j]) |
| results['lidar2img'].append(results_prev['lidar2img'][i * 6 + j]) |
|
|
| for j in range(6): |
| results['img'].append(results_next['img'][i * 6 + j]) |
| results['img_timestamp'].append(results_next['img_timestamp'][i * 6 + j]) |
| results['filename'].append(results_next['filename'][i * 6 + j]) |
| results['lidar2img'].append(results_next['lidar2img'][i * 6 + j]) |
|
|
| return results |
|
|