SparseBev / loaders /pipelines /loading.py
Haisong Liu
Release model: vit_eva02_1600x640_trainval_future (#46)
102ac67 unverified
import os
import mmcv
import numpy as np
from mmdet.datasets.builder import PIPELINES
from numpy.linalg import inv
from mmcv.runner import get_dist_info
def compose_lidar2img(ego2global_translation_curr,
ego2global_rotation_curr,
lidar2ego_translation_curr,
lidar2ego_rotation_curr,
sensor2global_translation_past,
sensor2global_rotation_past,
cam_intrinsic_past):
R = sensor2global_rotation_past @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T)
T = sensor2global_translation_past @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T)
T -= ego2global_translation_curr @ (inv(ego2global_rotation_curr).T @ inv(lidar2ego_rotation_curr).T) + lidar2ego_translation_curr @ inv(lidar2ego_rotation_curr).T
lidar2cam_r = inv(R.T)
lidar2cam_t = T @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
viewpad = np.eye(4)
viewpad[:cam_intrinsic_past.shape[0], :cam_intrinsic_past.shape[1]] = cam_intrinsic_past
lidar2img = (viewpad @ lidar2cam_rt.T).astype(np.float32)
return lidar2img
@PIPELINES.register_module()
class LoadMultiViewImageFromMultiSweeps(object):
def __init__(self,
sweeps_num=5,
color_type='color',
test_mode=False):
self.sweeps_num = sweeps_num
self.color_type = color_type
self.test_mode = test_mode
self.train_interval = [4, 8]
self.test_interval = 6
try:
mmcv.use_backend('turbojpeg')
except ImportError:
mmcv.use_backend('cv2')
def load_offline(self, results):
cam_types = [
'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT',
'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT'
]
if len(results['sweeps']['prev']) == 0:
for _ in range(self.sweeps_num):
for j in range(len(cam_types)):
results['img'].append(results['img'][j])
results['img_timestamp'].append(results['img_timestamp'][j])
results['filename'].append(results['filename'][j])
results['lidar2img'].append(np.copy(results['lidar2img'][j]))
else:
if self.test_mode:
interval = self.test_interval
choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)]
elif len(results['sweeps']['prev']) <= self.sweeps_num:
pad_len = self.sweeps_num - len(results['sweeps']['prev'])
choices = list(range(len(results['sweeps']['prev']))) + [len(results['sweeps']['prev']) - 1] * pad_len
else:
max_interval = len(results['sweeps']['prev']) // self.sweeps_num
max_interval = min(max_interval, self.train_interval[1])
min_interval = min(max_interval, self.train_interval[0])
interval = np.random.randint(min_interval, max_interval + 1)
choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)]
for idx in sorted(list(choices)):
sweep_idx = min(idx, len(results['sweeps']['prev']) - 1)
sweep = results['sweeps']['prev'][sweep_idx]
if len(sweep.keys()) < len(cam_types):
sweep = results['sweeps']['prev'][sweep_idx - 1]
for sensor in cam_types:
results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type))
results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6)
results['filename'].append(os.path.relpath(sweep[sensor]['data_path']))
results['lidar2img'].append(compose_lidar2img(
results['ego2global_translation'],
results['ego2global_rotation'],
results['lidar2ego_translation'],
results['lidar2ego_rotation'],
sweep[sensor]['sensor2global_translation'],
sweep[sensor]['sensor2global_rotation'],
sweep[sensor]['cam_intrinsic'],
))
return results
def load_online(self, results):
# only used when measuring FPS
assert self.test_mode
assert self.test_interval == 6
cam_types = [
'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT',
'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT'
]
if len(results['sweeps']['prev']) == 0:
for _ in range(self.sweeps_num):
for j in range(len(cam_types)):
results['img_timestamp'].append(results['img_timestamp'][j])
results['filename'].append(results['filename'][j])
results['lidar2img'].append(np.copy(results['lidar2img'][j]))
else:
interval = self.test_interval
choices = [(k + 1) * interval - 1 for k in range(self.sweeps_num)]
for idx in sorted(list(choices)):
sweep_idx = min(idx, len(results['sweeps']['prev']) - 1)
sweep = results['sweeps']['prev'][sweep_idx]
if len(sweep.keys()) < len(cam_types):
sweep = results['sweeps']['prev'][sweep_idx - 1]
for sensor in cam_types:
# skip loading history frames
results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6)
results['filename'].append(os.path.relpath(sweep[sensor]['data_path']))
results['lidar2img'].append(compose_lidar2img(
results['ego2global_translation'],
results['ego2global_rotation'],
results['lidar2ego_translation'],
results['lidar2ego_rotation'],
sweep[sensor]['sensor2global_translation'],
sweep[sensor]['sensor2global_rotation'],
sweep[sensor]['cam_intrinsic'],
))
return results
def __call__(self, results):
if self.sweeps_num == 0:
return results
world_size = get_dist_info()[1]
if world_size == 1 and self.test_mode:
return self.load_online(results)
else:
return self.load_offline(results)
@PIPELINES.register_module()
class LoadMultiViewImageFromMultiSweepsFuture(object):
def __init__(self,
prev_sweeps_num=5,
next_sweeps_num=5,
color_type='color',
test_mode=False):
self.prev_sweeps_num = prev_sweeps_num
self.next_sweeps_num = next_sweeps_num
self.color_type = color_type
self.test_mode = test_mode
assert prev_sweeps_num == next_sweeps_num
self.train_interval = [4, 8]
self.test_interval = 6
try:
mmcv.use_backend('turbojpeg')
except ImportError:
mmcv.use_backend('cv2')
def __call__(self, results):
if self.prev_sweeps_num == 0 and self.next_sweeps_num == 0:
return results
cam_types = [
'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT',
'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT'
]
if self.test_mode:
interval = self.test_interval
else:
interval = np.random.randint(self.train_interval[0], self.train_interval[1] + 1)
# previous sweeps
if len(results['sweeps']['prev']) == 0:
for _ in range(self.prev_sweeps_num):
for j in range(len(cam_types)):
results['img'].append(results['img'][j])
results['img_timestamp'].append(results['img_timestamp'][j])
results['filename'].append(results['filename'][j])
results['lidar2img'].append(np.copy(results['lidar2img'][j]))
else:
choices = [(k + 1) * interval - 1 for k in range(self.prev_sweeps_num)]
for idx in sorted(list(choices)):
sweep_idx = min(idx, len(results['sweeps']['prev']) - 1)
sweep = results['sweeps']['prev'][sweep_idx]
if len(sweep.keys()) < len(cam_types):
sweep = results['sweeps']['prev'][sweep_idx - 1]
for sensor in cam_types:
results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type))
results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6)
results['filename'].append(sweep[sensor]['data_path'])
results['lidar2img'].append(compose_lidar2img(
results['ego2global_translation'],
results['ego2global_rotation'],
results['lidar2ego_translation'],
results['lidar2ego_rotation'],
sweep[sensor]['sensor2global_translation'],
sweep[sensor]['sensor2global_rotation'],
sweep[sensor]['cam_intrinsic'],
))
# future sweeps
if len(results['sweeps']['next']) == 0:
for _ in range(self.next_sweeps_num):
for j in range(len(cam_types)):
results['img'].append(results['img'][j])
results['img_timestamp'].append(results['img_timestamp'][j])
results['filename'].append(results['filename'][j])
results['lidar2img'].append(np.copy(results['lidar2img'][j]))
else:
choices = [(k + 1) * interval - 1 for k in range(self.next_sweeps_num)]
for idx in sorted(list(choices)):
sweep_idx = min(idx, len(results['sweeps']['next']) - 1)
sweep = results['sweeps']['next'][sweep_idx]
if len(sweep.keys()) < len(cam_types):
sweep = results['sweeps']['next'][sweep_idx - 1]
for sensor in cam_types:
results['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type))
results['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6)
results['filename'].append(sweep[sensor]['data_path'])
results['lidar2img'].append(compose_lidar2img(
results['ego2global_translation'],
results['ego2global_rotation'],
results['lidar2ego_translation'],
results['lidar2ego_rotation'],
sweep[sensor]['sensor2global_translation'],
sweep[sensor]['sensor2global_rotation'],
sweep[sensor]['cam_intrinsic'],
))
return results
'''
This func loads previous and future frames in interleaved order,
e.g. curr, prev1, next1, prev2, next2, prev3, next3...
'''
@PIPELINES.register_module()
class LoadMultiViewImageFromMultiSweepsFutureInterleave(object):
def __init__(self,
prev_sweeps_num=5,
next_sweeps_num=5,
color_type='color',
test_mode=False):
self.prev_sweeps_num = prev_sweeps_num
self.next_sweeps_num = next_sweeps_num
self.color_type = color_type
self.test_mode = test_mode
assert prev_sweeps_num == next_sweeps_num
self.train_interval = [4, 8]
self.test_interval = 6
try:
mmcv.use_backend('turbojpeg')
except ImportError:
mmcv.use_backend('cv2')
def __call__(self, results):
if self.prev_sweeps_num == 0 and self.next_sweeps_num == 0:
return results
cam_types = [
'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT',
'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT'
]
if self.test_mode:
interval = self.test_interval
else:
interval = np.random.randint(self.train_interval[0], self.train_interval[1] + 1)
results_prev = dict(
img=[],
img_timestamp=[],
filename=[],
lidar2img=[]
)
results_next = dict(
img=[],
img_timestamp=[],
filename=[],
lidar2img=[]
)
if len(results['sweeps']['prev']) == 0:
for _ in range(self.prev_sweeps_num):
for j in range(len(cam_types)):
results_prev['img'].append(results['img'][j])
results_prev['img_timestamp'].append(results['img_timestamp'][j])
results_prev['filename'].append(results['filename'][j])
results_prev['lidar2img'].append(np.copy(results['lidar2img'][j]))
else:
choices = [(k + 1) * interval - 1 for k in range(self.prev_sweeps_num)]
for idx in sorted(list(choices)):
sweep_idx = min(idx, len(results['sweeps']['prev']) - 1)
sweep = results['sweeps']['prev'][sweep_idx]
if len(sweep.keys()) < len(cam_types):
sweep = results['sweeps']['prev'][sweep_idx - 1]
for sensor in cam_types:
results_prev['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type))
results_prev['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6)
results_prev['filename'].append(os.path.relpath(sweep[sensor]['data_path']))
results_prev['lidar2img'].append(compose_lidar2img(
results['ego2global_translation'],
results['ego2global_rotation'],
results['lidar2ego_translation'],
results['lidar2ego_rotation'],
sweep[sensor]['sensor2global_translation'],
sweep[sensor]['sensor2global_rotation'],
sweep[sensor]['cam_intrinsic'],
))
if len(results['sweeps']['next']) == 0:
print(1, len(results_next['img']) )
for _ in range(self.next_sweeps_num):
for j in range(len(cam_types)):
results_next['img'].append(results['img'][j])
results_next['img_timestamp'].append(results['img_timestamp'][j])
results_next['filename'].append(results['filename'][j])
results_next['lidar2img'].append(np.copy(results['lidar2img'][j]))
else:
choices = [(k + 1) * interval - 1 for k in range(self.next_sweeps_num)]
for idx in sorted(list(choices)):
sweep_idx = min(idx, len(results['sweeps']['next']) - 1)
sweep = results['sweeps']['next'][sweep_idx]
if len(sweep.keys()) < len(cam_types):
sweep = results['sweeps']['next'][sweep_idx - 1]
for sensor in cam_types:
results_next['img'].append(mmcv.imread(sweep[sensor]['data_path'], self.color_type))
results_next['img_timestamp'].append(sweep[sensor]['timestamp'] / 1e6)
results_next['filename'].append(os.path.relpath(sweep[sensor]['data_path']))
results_next['lidar2img'].append(compose_lidar2img(
results['ego2global_translation'],
results['ego2global_rotation'],
results['lidar2ego_translation'],
results['lidar2ego_rotation'],
sweep[sensor]['sensor2global_translation'],
sweep[sensor]['sensor2global_rotation'],
sweep[sensor]['cam_intrinsic'],
))
assert len(results_prev['img']) % 6 == 0
assert len(results_next['img']) % 6 == 0
for i in range(len(results_prev['img']) // 6):
for j in range(6):
results['img'].append(results_prev['img'][i * 6 + j])
results['img_timestamp'].append(results_prev['img_timestamp'][i * 6 + j])
results['filename'].append(results_prev['filename'][i * 6 + j])
results['lidar2img'].append(results_prev['lidar2img'][i * 6 + j])
for j in range(6):
results['img'].append(results_next['img'][i * 6 + j])
results['img_timestamp'].append(results_next['img_timestamp'][i * 6 + j])
results['filename'].append(results_next['filename'][i * 6 + j])
results['lidar2img'].append(results_next['lidar2img'][i * 6 + j])
return results