import os import mmcv import numpy as np from mmdet.datasets.builder import PIPELINES from numpy.linalg import inv from mmcv.runner import get_dist_info def compose_lidar2img(ego2global_translation_curr, ego2global_rotation_curr, lidar2ego_translation_curr, lidar2ego_rotation_curr, sensor2global_translation_past, sensor2global_rotation_past, cam_intrinsic_past): R = sensor2global_rotation_past @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) T = sensor2global_translation_past @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) T -= ego2global_translation_curr @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) + lidar2ego_translation_curr @ inv(lidar2ego_rotation_curr).T lidar2cam_r = inv(R.T) lidar2cam_t = T @ lidar2cam_r.T lidar2cam_rt = np.eye(4) lidar2cam_rt[:3, :3] = lidar2cam_r.T lidar2cam_rt[3, :3] = -lidar2cam_t viewpad = np.eye(4) viewpad[:cam_intrinsic_past.shape[0], :cam_intrinsic_past.shape[1]] = cam_intrinsic_past lidar2img = (viewpad @ lidar2cam_rt.T).astype(np.float32) return lidar2img @PIPELINES.register_module() class LoadMultiViewImageFromMultiSweeps(object): def __init__(self, sweeps_num=5, color_type='color', test_mode=False): self.sweeps_num = sweeps_num self.color_type = color_type self.test_mode = test_mode self.train_interval = [4, 8] self.test_interval = 6 try: mmcv.use_backend('turbojpeg') except ImportError: mmcv.use_backend('cv2') def load_offline(self, results): cam_types = [ 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' ] if len(results['sweeps']['prev']) == 0: for _ in range(self.sweeps_num): for j in range(len(cam_types)): results['img'].append(results['img'][j]) results['img_timestamp'].append(results['img_timestamp'][j]) results['filename'].append(results['filename'][j]) results['lidar2img'].append(np.copy(results['lidar2img'][j])) else: if self.test_mode: interval = self.test_interval choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)] elif len(results['sweeps']['prev']) <= self.sweeps_num: pad_len = self.sweeps_num - len(results['sweeps']['prev']) choices = list(range(len(results['sweeps']['prev']))) + [len(results['sweeps']['prev']) - 1] * pad_len else: max_interval = len(results['sweeps']['prev']) // self.sweeps_num max_interval = min(max_interval, self.train_interval[1]) min_interval = min(max_interval, self.train_interval[0]) interval = np.random.randint(min_interval, max_interval + 1) choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)] for idx in sorted(list(choices)): sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) sweep = results['sweeps']['prev'][sweep_idx] if len(sweep.keys()) < len(cam_types): sweep = results['sweeps']['prev'][sweep_idx - 1] for sensor in cam_types: results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) results['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) results['lidar2img'].append(compose_lidar2img( results['ego2global_translation'], results['ego2global_rotation'], results['lidar2ego_translation'], results['lidar2ego_rotation'], sweep[sensor]['sensor2global_translation'], sweep[sensor]['sensor2global_rotation'], sweep[sensor]['cam_intrinsic'], )) return results def load_online(self, results): # only used when measuring FPS assert self.test_mode assert self.test_interval == 6 cam_types = [ 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' ] if len(results['sweeps']['prev']) == 0: for _ in range(self.sweeps_num): for j in range(len(cam_types)): results['img_timestamp'].append(results['img_timestamp'][j]) results['filename'].append(results['filename'][j]) results['lidar2img'].append(np.copy(results['lidar2img'][j])) else: interval = self.test_interval choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)] for idx in sorted(list(choices)): sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) sweep = results['sweeps']['prev'][sweep_idx] if len(sweep.keys()) < len(cam_types): sweep = results['sweeps']['prev'][sweep_idx - 1] for sensor in cam_types: # skip loading history frames results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) results['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) results['lidar2img'].append(compose_lidar2img( results['ego2global_translation'], results['ego2global_rotation'], results['lidar2ego_translation'], results['lidar2ego_rotation'], sweep[sensor]['sensor2global_translation'], sweep[sensor]['sensor2global_rotation'], sweep[sensor]['cam_intrinsic'], )) return results def __call__(self, results): if self.sweeps_num == 0: return results world_size = get_dist_info()[1] if world_size == 1 and self.test_mode: return self.load_online(results) else: return self.load_offline(results) @PIPELINES.register_module() class LoadMultiViewImageFromMultiSweepsFuture(object): def __init__(self, prev_sweeps_num=5, next_sweeps_num=5, color_type='color', test_mode=False): self.prev_sweeps_num = prev_sweeps_num self.next_sweeps_num = next_sweeps_num self.color_type = color_type self.test_mode = test_mode assert prev_sweeps_num == next_sweeps_num self.train_interval = [4, 8] self.test_interval = 6 try: mmcv.use_backend('turbojpeg') except ImportError: mmcv.use_backend('cv2') def __call__(self, results): if self.prev_sweeps_num == 0 and self.next_sweeps_num == 0: return results cam_types = [ 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' ] if self.test_mode: interval = self.test_interval else: interval = np.random.randint(self.train_interval[0], self.train_interval[1] + 1) # previous sweeps if len(results['sweeps']['prev']) == 0: for _ in range(self.prev_sweeps_num): for j in range(len(cam_types)): results['img'].append(results['img'][j]) results['img_timestamp'].append(results['img_timestamp'][j]) results['filename'].append(results['filename'][j]) results['lidar2img'].append(np.copy(results['lidar2img'][j])) else: choices = [(k + 1) * interval - 1 for k in range(self.prev_sweeps_num)] for idx in sorted(list(choices)): sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) sweep = results['sweeps']['prev'][sweep_idx] if len(sweep.keys()) < len(cam_types): sweep = results['sweeps']['prev'][sweep_idx - 1] for sensor in cam_types: results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) results['filename'].append(sweep[sensor]['data_path']) results['lidar2img'].append(compose_lidar2img( results['ego2global_translation'], results['ego2global_rotation'], results['lidar2ego_translation'], results['lidar2ego_rotation'], sweep[sensor]['sensor2global_translation'], sweep[sensor]['sensor2global_rotation'], sweep[sensor]['cam_intrinsic'], )) # future sweeps if len(results['sweeps']['next']) == 0: for _ in range(self.next_sweeps_num): for j in range(len(cam_types)): results['img'].append(results['img'][j]) results['img_timestamp'].append(results['img_timestamp'][j]) results['filename'].append(results['filename'][j]) results['lidar2img'].append(np.copy(results['lidar2img'][j])) else: choices = [(k + 1) * interval - 1 for k in range(self.next_sweeps_num)] for idx in sorted(list(choices)): sweep_idx = min(idx, len(results['sweeps']['next']) - 1) sweep = results['sweeps']['next'][sweep_idx] if len(sweep.keys()) < len(cam_types): sweep = results['sweeps']['next'][sweep_idx - 1] for sensor in cam_types: results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) results['filename'].append(sweep[sensor]['data_path']) results['lidar2img'].append(compose_lidar2img( results['ego2global_translation'], results['ego2global_rotation'], results['lidar2ego_translation'], results['lidar2ego_rotation'], sweep[sensor]['sensor2global_translation'], sweep[sensor]['sensor2global_rotation'], sweep[sensor]['cam_intrinsic'], )) return results ''' This func loads previous and future frames in interleaved order, e.g. curr, prev1, next1, prev2, next2, prev3, next3... ''' @PIPELINES.register_module() class LoadMultiViewImageFromMultiSweepsFutureInterleave(object): def __init__(self, prev_sweeps_num=5, next_sweeps_num=5, color_type='color', test_mode=False): self.prev_sweeps_num = prev_sweeps_num self.next_sweeps_num = next_sweeps_num self.color_type = color_type self.test_mode = test_mode assert prev_sweeps_num == next_sweeps_num self.train_interval = [4, 8] self.test_interval = 6 try: mmcv.use_backend('turbojpeg') except ImportError: mmcv.use_backend('cv2') def __call__(self, results): if self.prev_sweeps_num == 0 and self.next_sweeps_num == 0: return results cam_types = [ 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT' ] if self.test_mode: interval = self.test_interval else: interval = np.random.randint(self.train_interval[0], self.train_interval[1] + 1) results_prev = dict( img=[], img_timestamp=[], filename=[], lidar2img=[] ) results_next = dict( img=[], img_timestamp=[], filename=[], lidar2img=[] ) if len(results['sweeps']['prev']) == 0: for _ in range(self.prev_sweeps_num): for j in range(len(cam_types)): results_prev['img'].append(results['img'][j]) results_prev['img_timestamp'].append(results['img_timestamp'][j]) results_prev['filename'].append(results['filename'][j]) results_prev['lidar2img'].append(np.copy(results['lidar2img'][j])) else: choices = [(k + 1) * interval - 1 for k in range(self.prev_sweeps_num)] for idx in sorted(list(choices)): sweep_idx = min(idx, len(results['sweeps']['prev']) - 1) sweep = results['sweeps']['prev'][sweep_idx] if len(sweep.keys()) < len(cam_types): sweep = results['sweeps']['prev'][sweep_idx - 1] for sensor in cam_types: results_prev['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) results_prev['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) results_prev['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) results_prev['lidar2img'].append(compose_lidar2img( results['ego2global_translation'], results['ego2global_rotation'], results['lidar2ego_translation'], results['lidar2ego_rotation'], sweep[sensor]['sensor2global_translation'], sweep[sensor]['sensor2global_rotation'], sweep[sensor]['cam_intrinsic'], )) if len(results['sweeps']['next']) == 0: print(1, len(results_next['img']) ) for _ in range(self.next_sweeps_num): for j in range(len(cam_types)): results_next['img'].append(results['img'][j]) results_next['img_timestamp'].append(results['img_timestamp'][j]) results_next['filename'].append(results['filename'][j]) results_next['lidar2img'].append(np.copy(results['lidar2img'][j])) else: choices = [(k + 1) * interval - 1 for k in range(self.next_sweeps_num)] for idx in sorted(list(choices)): sweep_idx = min(idx, len(results['sweeps']['next']) - 1) sweep = results['sweeps']['next'][sweep_idx] if len(sweep.keys()) < len(cam_types): sweep = results['sweeps']['next'][sweep_idx - 1] for sensor in cam_types: results_next['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type)) results_next['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6) results_next['filename'].append(os.path.relpath(sweep[sensor]['data_path'])) results_next['lidar2img'].append(compose_lidar2img( results['ego2global_translation'], results['ego2global_rotation'], results['lidar2ego_translation'], results['lidar2ego_rotation'], sweep[sensor]['sensor2global_translation'], sweep[sensor]['sensor2global_rotation'], sweep[sensor]['cam_intrinsic'], )) assert len(results_prev['img']) % 6 == 0 assert len(results_next['img']) % 6 == 0 for i in range(len(results_prev['img']) // 6): for j in range(6): results['img'].append(results_prev['img'][i * 6 + j]) results['img_timestamp'].append(results_prev['img_timestamp'][i * 6 + j]) results['filename'].append(results_prev['filename'][i * 6 + j]) results['lidar2img'].append(results_prev['lidar2img'][i * 6 + j]) for j in range(6): results['img'].append(results_next['img'][i * 6 + j]) results['img_timestamp'].append(results_next['img_timestamp'][i * 6 + j]) results['filename'].append(results_next['filename'][i * 6 + j]) results['lidar2img'].append(results_next['lidar2img'][i * 6 + j]) return results