Spaces:
Running on Zero
Running on Zero
| import os | |
| import time | |
| import xml.etree.ElementTree as ET | |
| from collections import Counter, defaultdict | |
| from pathlib import Path | |
| from typing import Optional | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| import yaml | |
| from scipy.spatial.transform import Rotation | |
| from torch.utils.data import Dataset | |
| from torchvision.transforms import ColorJitter | |
| from datasets.kitti_360.annotation import KITTI360Bbox3D | |
| from datasets.kitti_360.labels import labels | |
| from augmentation import get_color_aug_fn | |
| name2label = {label.name: label for label in labels} | |
| id2ProposedId = {label.id: label.trainId for label in labels} | |
| PropsedId2TrainId = dict(enumerate(list(set(id2ProposedId.values())))) | |
| PropsedId2TrainId = {v : k for k, v in PropsedId2TrainId.items()} | |
| id2TrainId = {k : PropsedId2TrainId[v] for k, v in id2ProposedId.items()} | |
| class FisheyeToPinholeSampler: | |
| def __init__(self, K_target, target_image_size, calibs, rotation=None): | |
| self._compute_transform(K_target, target_image_size, calibs, rotation) | |
| def _compute_transform(self, K_target, target_image_size, calibs, rotation=None): | |
| x = torch.linspace(-1, 1, target_image_size[1]).view(1, -1).expand(target_image_size) | |
| y = torch.linspace(-1, 1, target_image_size[0]).view(-1, 1).expand(target_image_size) | |
| z = torch.ones_like(x) | |
| xyz = torch.stack((x, y, z), dim=-1).view(-1, 3) | |
| # Unproject | |
| xyz = (torch.inverse(torch.tensor(K_target)) @ xyz.T).T | |
| if rotation is not None: | |
| xyz = (torch.tensor(rotation) @ xyz.T).T | |
| # Backproject into fisheye | |
| xyz = xyz / torch.norm(xyz, dim=-1, keepdim=True) | |
| x = xyz[:, 0] | |
| y = xyz[:, 1] | |
| z = xyz[:, 2] | |
| xi_src = calibs["mirror_parameters"]["xi"] | |
| x = x / (z + xi_src) | |
| y = y / (z + xi_src) | |
| k1 = calibs["distortion_parameters"]["k1"] | |
| k2 = calibs["distortion_parameters"]["k2"] | |
| r = x*x + y*y | |
| factor = (1 + k1 * r + k2 * r * r) | |
| x = x * factor | |
| y = y * factor | |
| gamma0 = calibs["projection_parameters"]["gamma1"] | |
| gamma1 = calibs["projection_parameters"]["gamma2"] | |
| u0 = calibs["projection_parameters"]["u0"] | |
| v0 = calibs["projection_parameters"]["v0"] | |
| x = x * gamma0 + u0 | |
| y = y * gamma1 + v0 | |
| xy = torch.stack((x, y), dim=-1).view(1, *target_image_size, 2) | |
| self.sample_pts = xy | |
| def resample(self, img): | |
| img = img.unsqueeze(0) | |
| resampled_img = F.grid_sample(img, self.sample_pts, align_corners=True).squeeze(0) | |
| return resampled_img | |
| class SSCBenchDataset(Dataset): | |
| def __init__(self, | |
| data_path: str, | |
| voxel_gt_path: str, | |
| sequences: Optional[tuple], | |
| target_image_size=(192, 640), | |
| return_stereo=False, | |
| return_depth=False, | |
| data_segmentation_path=None, | |
| frame_count=2, | |
| keyframe_offset=0, | |
| dilation=1, | |
| eigen_depth=True, | |
| color_aug=False, | |
| load_kitti_360_segmentation_gt=False, | |
| load_all=False, | |
| load_fisheye=False, | |
| fisheye_offset=0, | |
| ): | |
| self.data_path = Path(data_path) | |
| self.voxel_gt_path = Path(voxel_gt_path) | |
| self.data_segmentation_path = data_segmentation_path | |
| self.pose_path = os.path.join("<PATH-KITTI-360-DATA-POSES>") | |
| self.target_image_size = target_image_size | |
| self.return_stereo = return_stereo | |
| self.return_depth = return_depth | |
| self.frame_count = frame_count | |
| self.dilation = dilation | |
| self.keyframe_offset = keyframe_offset | |
| self.eigen_depth = eigen_depth | |
| self.color_aug = color_aug | |
| self.load_kitti_360_segmentation_gt = load_kitti_360_segmentation_gt | |
| self.load_all = load_all | |
| self.load_fisheye = load_fisheye | |
| self.fisheye_offset = fisheye_offset | |
| if sequences is None: | |
| self._sequences = self._get_sequences(self.data_path) | |
| else: | |
| self._sequences = [f"2013_05_28_drive_00{s:02d}_sync" for s in sequences] | |
| self._calibs = self._load_calibs(self.data_path) | |
| self._left_offset = ((self.frame_count - 1) // 2 + self.keyframe_offset) * self.dilation | |
| self._img_ids, self._poses = self._load_poses(self.pose_path, self._sequences) | |
| self._perspective_folder = "data_rect" | |
| self._segmentation_perspective_folder = "data_192x640" | |
| self._segmentation_fisheye_folder = "data_192x640_0x-15" | |
| if self.load_all: | |
| self._datapoints = self._load_all_datapoints(self.data_path, self._sequences) | |
| else: | |
| self._datapoints = self._load_datapoints(self.voxel_gt_path, self._sequences) | |
| self._skip = 0 | |
| self.length = len(self._datapoints) | |
| def _get_sequences(data_path): | |
| all_sequences = [] | |
| seqs_path = Path(data_path) / "data_2d_raw" | |
| for seq in seqs_path.iterdir(): | |
| if not seq.is_dir(): | |
| continue | |
| all_sequences.append(seq.name) | |
| return all_sequences | |
| def _load_calibs(data_path, fisheye_rotation=(0, 0)): | |
| data_path = Path(data_path) | |
| calib_folder = data_path / "calibration" | |
| cam_to_pose_file = calib_folder / "calib_cam_to_pose.txt" | |
| cam_to_velo_file = calib_folder / "calib_cam_to_velo.txt" | |
| intrinsics_file = calib_folder / "perspective.txt" | |
| fisheye_02_file = calib_folder / "image_02.yaml" | |
| fisheye_03_file = calib_folder / "image_03.yaml" | |
| cam_to_pose_data = {} | |
| with open(cam_to_pose_file, 'r') as f: | |
| for line in f.readlines(): | |
| key, value = line.split(':', 1) | |
| try: | |
| cam_to_pose_data[key] = np.array([float(x) for x in value.split()], dtype=np.float32) | |
| except ValueError: | |
| pass | |
| cam_to_velo_data = None | |
| with open(cam_to_velo_file, 'r') as f: | |
| line = f.readline() | |
| try: | |
| cam_to_velo_data = np.array([float(x) for x in line.split()], dtype=np.float32) | |
| except ValueError: | |
| pass | |
| intrinsics_data = {} | |
| with open(intrinsics_file, 'r') as f: | |
| for line in f.readlines(): | |
| key, value = line.split(':', 1) | |
| try: | |
| intrinsics_data[key] = np.array([float(x) for x in value.split()], dtype=np.float32) | |
| except ValueError: | |
| pass | |
| with open(fisheye_02_file, 'r') as f: | |
| f.readline() # Skips first line that defines the YAML version | |
| fisheye_02_data = yaml.safe_load(f) | |
| with open(fisheye_03_file, 'r') as f: | |
| f.readline() # Skips first line that defines the YAML version | |
| fisheye_03_data = yaml.safe_load(f) | |
| im_size_rect = (int(intrinsics_data["S_rect_00"][1]), int(intrinsics_data["S_rect_00"][0])) | |
| im_size_fish = (fisheye_02_data["image_height"], fisheye_02_data["image_width"]) | |
| # Projection matrices | |
| # We use these projection matrices also when resampling the fisheye cameras. | |
| # This makes downstream processing easier, but it could be done differently. | |
| P_rect_00 = np.reshape(intrinsics_data['P_rect_00'], (3, 4)) | |
| P_rect_01 = np.reshape(intrinsics_data['P_rect_01'], (3, 4)) | |
| # Rotation matrices from raw to rectified -> Needs to be inverted later | |
| R_rect_00 = np.eye(4, dtype=np.float32) | |
| R_rect_01 = np.eye(4, dtype=np.float32) | |
| R_rect_00[:3, :3] = np.reshape(intrinsics_data['R_rect_00'], (3, 3)) | |
| R_rect_01[:3, :3] = np.reshape(intrinsics_data['R_rect_01'], (3, 3)) | |
| # Rotation matrices from resampled fisheye to raw fisheye | |
| fisheye_rotation = np.array(fisheye_rotation).reshape((1, 2)) | |
| R_02 = np.eye(4, dtype=np.float32) | |
| R_03 = np.eye(4, dtype=np.float32) | |
| R_02[:3, :3] = Rotation.from_euler("xy", fisheye_rotation[:, [1, 0]], degrees=True).as_matrix().astype(np.float32) | |
| R_03[:3, :3] = Rotation.from_euler("xy", fisheye_rotation[:, [1, 0]] * np.array([[1, -1]]), degrees=True).as_matrix().astype(np.float32) | |
| # Load cam to pose transforms | |
| T_00_to_pose = np.eye(4, dtype=np.float32) | |
| T_01_to_pose = np.eye(4, dtype=np.float32) | |
| T_02_to_pose = np.eye(4, dtype=np.float32) | |
| T_03_to_pose = np.eye(4, dtype=np.float32) | |
| T_00_to_velo = np.eye(4, dtype=np.float32) | |
| T_00_to_pose[:3, :] = np.reshape(cam_to_pose_data["image_00"], (3, 4)) | |
| T_01_to_pose[:3, :] = np.reshape(cam_to_pose_data["image_01"], (3, 4)) | |
| T_02_to_pose[:3, :] = np.reshape(cam_to_pose_data["image_02"], (3, 4)) | |
| T_03_to_pose[:3, :] = np.reshape(cam_to_pose_data["image_03"], (3, 4)) | |
| T_00_to_velo[:3, :] = np.reshape(cam_to_velo_data, (3, 4)) | |
| # Compute cam to pose transforms for rectified perspective cameras | |
| T_rect_00_to_pose = T_00_to_pose @ np.linalg.inv(R_rect_00) | |
| T_rect_01_to_pose = T_01_to_pose @ np.linalg.inv(R_rect_01) | |
| # Compute cam to pose transform for fisheye cameras | |
| T_02_to_pose = T_02_to_pose @ R_02 | |
| T_03_to_pose = T_03_to_pose @ R_03 | |
| # Compute velo to cameras and velo to pose transforms | |
| T_velo_to_rect_00 = R_rect_00 @ np.linalg.inv(T_00_to_velo) | |
| T_velo_to_pose = T_rect_00_to_pose @ T_velo_to_rect_00 | |
| T_velo_to_rect_01 = np.linalg.inv(T_rect_01_to_pose) @ T_velo_to_pose | |
| # Calibration matrix is the same for both perspective cameras | |
| K = P_rect_00[:3, :3] | |
| # Normalize calibration | |
| f_x = K[0, 0] / im_size_rect[1] | |
| f_y = K[1, 1] / im_size_rect[0] | |
| c_x = K[0, 2] / im_size_rect[1] | |
| c_y = K[1, 2] / im_size_rect[0] | |
| # Change to image coordinates [-1, 1] | |
| K[0, 0] = f_x * 2. | |
| K[1, 1] = f_y * 2. | |
| K[0, 2] = c_x * 2. - 1 | |
| K[1, 2] = c_y * 2. - 1 | |
| # Convert fisheye calibration to [-1, 1] image dimensions | |
| fisheye_02_data["projection_parameters"]["gamma1"] = (fisheye_02_data["projection_parameters"]["gamma1"] / im_size_fish[1]) * 2. | |
| fisheye_02_data["projection_parameters"]["gamma2"] = (fisheye_02_data["projection_parameters"]["gamma2"] / im_size_fish[0]) * 2. | |
| fisheye_02_data["projection_parameters"]["u0"] = (fisheye_02_data["projection_parameters"]["u0"] / im_size_fish[1]) * 2. - 1. | |
| fisheye_02_data["projection_parameters"]["v0"] = (fisheye_02_data["projection_parameters"]["v0"] / im_size_fish[0]) * 2. - 1. | |
| fisheye_03_data["projection_parameters"]["gamma1"] = (fisheye_03_data["projection_parameters"]["gamma1"] / im_size_fish[1]) * 2. | |
| fisheye_03_data["projection_parameters"]["gamma2"] = (fisheye_03_data["projection_parameters"]["gamma2"] / im_size_fish[0]) * 2. | |
| fisheye_03_data["projection_parameters"]["u0"] = (fisheye_03_data["projection_parameters"]["u0"] / im_size_fish[1]) * 2. - 1. | |
| fisheye_03_data["projection_parameters"]["v0"] = (fisheye_03_data["projection_parameters"]["v0"] / im_size_fish[0]) * 2. - 1. | |
| # Use same camera calibration as perspective cameras for resampling | |
| # K_fisheye = np.eye(3, dtype=np.float32) | |
| # K_fisheye[0, 0] = 2 | |
| # K_fisheye[1, 1] = 2 | |
| K_fisheye = K | |
| calibs = { | |
| "K_perspective": K, | |
| "K_fisheye": K_fisheye, | |
| "T_cam_to_pose": { | |
| "00": T_rect_00_to_pose, | |
| "01": T_rect_01_to_pose, | |
| "02": T_02_to_pose, | |
| "03": T_03_to_pose, | |
| }, | |
| "T_velo_to_cam": { | |
| "00": T_velo_to_rect_00, | |
| "01": T_velo_to_rect_01, | |
| }, | |
| "T_velo_to_pose": T_velo_to_pose, | |
| "fisheye": { | |
| "calib_02": fisheye_02_data, | |
| "calib_03": fisheye_03_data, | |
| "R_02": R_02[:3, :3], | |
| "R_03": R_03[:3, :3] | |
| }, | |
| "im_size": im_size_rect | |
| } | |
| return calibs | |
| def _load_poses(pose_path, sequences): | |
| ids = {} | |
| poses = {} | |
| for seq in sequences: | |
| pose_file = Path(pose_path) / seq / f"poses.txt" | |
| try: | |
| pose_data = np.loadtxt(pose_file) | |
| except FileNotFoundError: | |
| print(f'Ground truth poses are not avaialble for sequence {seq}, {pose_file}.') | |
| ids_seq = pose_data[:, 0].astype(int) | |
| poses_seq = pose_data[:, 1:].astype(np.float32).reshape((-1, 3, 4)) | |
| poses_seq = np.concatenate((poses_seq, np.zeros_like(poses_seq[:, :1, :])), axis=1) | |
| poses_seq[:, 3, 3] = 1 | |
| ids[seq] = ids_seq | |
| poses[seq] = poses_seq | |
| return ids, poses | |
| def _get_resamplers(calibs, K_target, target_image_size): | |
| resampler_02 = FisheyeToPinholeSampler(K_target, target_image_size, calibs["fisheye"]["calib_02"], calibs["fisheye"]["R_02"]) | |
| resampler_03 = FisheyeToPinholeSampler(K_target, target_image_size, calibs["fisheye"]["calib_03"], calibs["fisheye"]["R_03"]) | |
| return resampler_02, resampler_03 | |
| def _load_datapoints(voxel_gt_path, sequences): | |
| datapoints = [] | |
| for seq in sorted(sequences): | |
| ids = [int(file.name[:6]) for file in sorted((voxel_gt_path / seq).glob("*_1_1.npy"))] | |
| datapoints_seq = [(seq, id, False) for id in ids] | |
| datapoints.extend(datapoints_seq) | |
| return datapoints | |
| def _load_all_datapoints(voxel_gt_path, sequences): | |
| datapoints = [] | |
| for seq in sorted(sequences): | |
| ids = [int(file.name[:6]) for file in sorted((voxel_gt_path / 'data_2d_raw' / seq / 'image_00' / 'data_rect').glob("*.png"))] | |
| datapoints_seq = [(seq, id, False) for id in ids] | |
| datapoints.extend(datapoints_seq) | |
| return datapoints | |
| def load_images(self, seq, img_ids): | |
| imgs_p_left = [] | |
| for id in img_ids: | |
| # id = self._img_ids[seq][id] | |
| img_perspective = cv2.cvtColor(cv2.imread(os.path.join(self.data_path, "data_2d_raw", seq, "image_00", self._perspective_folder, f"{id:06d}.png")), cv2.COLOR_BGR2RGB).astype(np.float32) / 255 | |
| imgs_p_left += [img_perspective] | |
| return imgs_p_left | |
| def load_fisheye_images(self, seq, img_ids): | |
| imgs_f_left, imgs_f_right = [], [] | |
| for id in img_ids: | |
| #img_fisheye = cv2.cvtColor(cv2.imread(os.path.join(self.data_path, "data_2d_raw", seq, "image_02", self._segmentation_fisheye_folder, f"{id:010d}.png")), cv2.COLOR_BGR2RGB).astype(np.float32) / 255 | |
| #img_fisheye = cv2.cvtColor(cv2.imread(os.path.join(self.data_path, "data_2d_raw", seq, "image_03", self._segmentation_fisheye_folder, f"{id:010d}.png")), cv2.COLOR_BGR2RGB).astype(np.float32) / 255 | |
| id = self._img_ids[seq][id] | |
| img_fisheye_left = cv2.cvtColor(cv2.imread(os.path.join("<PATH-KITTI-360-DATA-POSES>", "data_2d_raw", seq, "image_02", self._segmentation_fisheye_folder, f"{id:010d}.png")), cv2.COLOR_BGR2RGB).astype(np.float32) / 255 | |
| img_fisheye_right = cv2.cvtColor(cv2.imread(os.path.join("<PATH-KITTI-360-DATA-POSES>", "data_2d_raw", seq, "image_03", self._segmentation_fisheye_folder, f"{id:010d}.png")), cv2.COLOR_BGR2RGB).astype(np.float32) / 255 | |
| imgs_f_left += [img_fisheye_left] | |
| imgs_f_right += [img_fisheye_right] | |
| return imgs_f_left, imgs_f_right | |
| def load_voxel_gt(self, sequence, img_ids): | |
| voxel_gt = [] | |
| for id in img_ids: | |
| target_1_path = os.path.join(self.voxel_gt_path, sequence, f"{id:06d}" + "_1_1.npy") | |
| if not self.load_all or os.path.isfile(target_1_path): | |
| voxel_gt.append(np.load(target_1_path)) | |
| else: | |
| voxel_gt.append(None) | |
| return voxel_gt | |
| def process_img(self, img: np.array, color_aug_fn=None, resampler:FisheyeToPinholeSampler=None): | |
| if resampler is not None and not self.is_preprocessed: | |
| img = torch.tensor(img).permute(2, 0, 1) | |
| img = resampler.resample(img) | |
| else: | |
| if self.target_image_size: | |
| img = cv2.resize(img, (self.target_image_size[1], self.target_image_size[0]), interpolation=cv2.INTER_LINEAR) | |
| img = np.transpose(img, (2, 0, 1)) | |
| img = torch.tensor(img) | |
| if color_aug_fn is not None: | |
| img = color_aug_fn(img) | |
| img = img * 2 - 1 | |
| return img | |
| def load_depth(self, seq, img_id, is_right): | |
| points = np.fromfile(os.path.join(self.data_path, "data_3d_raw", seq, "velodyne_points", "data", f"{img_id:010d}.bin"), dtype=np.float32).reshape(-1, 4) | |
| points[:, 3] = 1.0 | |
| T_velo_to_cam = self._calibs["T_velo_to_cam"]["00" if not is_right else "01"] | |
| K = self._calibs["K_perspective"] | |
| # project the points to the camera | |
| velo_pts_im = np.dot(K @ T_velo_to_cam[:3, :], points.T).T | |
| velo_pts_im[:, :2] = velo_pts_im[:, :2] / velo_pts_im[:, 2][..., None] | |
| # the projection is normalized to [-1, 1] -> transform to [0, height-1] x [0, width-1] | |
| velo_pts_im[:, 0] = np.round((velo_pts_im[:, 0] * .5 + .5) * self.target_image_size[1]) | |
| velo_pts_im[:, 1] = np.round((velo_pts_im[:, 1] * .5 + .5) * self.target_image_size[0]) | |
| # check if in bounds | |
| val_inds = (velo_pts_im[:, 0] >= 0) & (velo_pts_im[:, 1] >= 0) | |
| val_inds = val_inds & (velo_pts_im[:, 0] < self.target_image_size[1]) & (velo_pts_im[:, 1] < self.target_image_size[0]) | |
| velo_pts_im = velo_pts_im[val_inds, :] | |
| # project to image | |
| depth = np.zeros(self.target_image_size) | |
| depth[velo_pts_im[:, 1].astype(np.int32), velo_pts_im[:, 0].astype(np.int32)] = velo_pts_im[:, 2] | |
| # find the duplicate points and choose the closest depth | |
| inds = velo_pts_im[:, 1] * (self.target_image_size[1] - 1) + velo_pts_im[:, 0] - 1 | |
| dupe_inds = [item for item, count in Counter(inds).items() if count > 1] | |
| for dd in dupe_inds: | |
| pts = np.where(inds == dd)[0] | |
| x_loc = int(velo_pts_im[pts[0], 0]) | |
| y_loc = int(velo_pts_im[pts[0], 1]) | |
| depth[y_loc, x_loc] = velo_pts_im[pts, 2].min() | |
| depth[depth < 0] = 0 | |
| return depth[None, :, :] | |
| def __getitem__(self, index: int): | |
| _start_time = time.time() | |
| if index >= self.length: | |
| raise IndexError() | |
| if self._skip != 0: | |
| index += self._skip | |
| sequence, id, is_right = self._datapoints[index] | |
| load_left = (not is_right) or self.return_stereo | |
| load_right = is_right or self.return_stereo | |
| ids = [id] | |
| ids_fish = [id + self.fisheye_offset] | |
| if self.color_aug: | |
| color_aug_fn = get_color_aug_fn(ColorJitter.get_params(brightness=(0.8, 1.2), contrast=(0.8, 1.2), saturation=(0.8, 1.2), hue=(-0.1, 0.1))) | |
| else: | |
| color_aug_fn = None | |
| _start_time_loading = time.time() | |
| imgs_p_left = self.load_images(sequence, ids) | |
| imgs_f_left, imgs_f_right = self.load_fisheye_images(sequence, ids_fish) | |
| voxel_gt = self.load_voxel_gt(sequence, ids) | |
| _loading_time = np.array(time.time() - _start_time_loading) | |
| _start_time_processing = time.time() | |
| imgs_p_left = [self.process_img(img, color_aug_fn=color_aug_fn) for img in imgs_p_left] | |
| imgs_f_left = [self.process_img(img, color_aug_fn=color_aug_fn) for img in imgs_f_left] | |
| imgs_f_right = [self.process_img(img, color_aug_fn=color_aug_fn) for img in imgs_f_right] | |
| _processing_time = np.array(time.time() - _start_time_processing) | |
| # These poses are not camera to world !! | |
| poses_p_left = [self._poses[sequence][i, :, :] @ self._calibs["T_cam_to_pose"]["00"] for i in ids] if load_left else [] | |
| poses_f_left = [self._poses[sequence][i, :, :] @ self._calibs["T_cam_to_pose"]["02"] for i in ids_fish] | |
| poses_f_right = [self._poses[sequence][i, :, :] @ self._calibs["T_cam_to_pose"]["03"] for i in ids_fish] | |
| projs_p_left = [self._calibs["K_perspective"] for _ in ids] if load_left else [] | |
| projs_f_left = [self._calibs["K_fisheye"] for _ in ids_fish] | |
| projs_f_right = [self._calibs["K_fisheye"] for _ in ids_fish] | |
| imgs = imgs_p_left | |
| projs = projs_p_left | |
| poses = poses_p_left | |
| if self.load_fisheye: | |
| imgs += imgs_f_left + imgs_f_right | |
| projs += projs_f_left + projs_f_right | |
| poses += poses_f_left + poses_f_right | |
| _proc_time = np.array(time.time() - _start_time) | |
| # print(_loading_time, _processing_time, _proc_time) | |
| data = { | |
| "imgs": imgs, | |
| "projs": projs, | |
| "voxel_gt": voxel_gt, | |
| "poses": poses, | |
| "t__get_item__": np.array([_proc_time]), | |
| "index": np.array([index]) | |
| } | |
| return data | |
| def __len__(self) -> int: | |
| return self.length | |