| import matplotlib |
| import numpy as np |
| import torch |
| from PIL import Image |
| from PIL.Image import Resampling |
| from torchvision.transforms import InterpolationMode |
| from torchvision.transforms.functional import resize |
| import cv2 |
| import re |
|
|
| def load_pfm(file): |
| color = None |
| width = None |
| height = None |
| scale = None |
| data_type = None |
| header = file.readline().decode('UTF-8').rstrip() |
|
|
| if header == 'PF': |
| color = True |
| elif header == 'Pf': |
| color = False |
| else: |
| raise Exception('Not a PFM file.') |
| dim_match = re.match(r'^(\d+)\s(\d+)\s$', file.readline().decode('UTF-8')) |
| if dim_match: |
| width, height = map(int, dim_match.groups()) |
| else: |
| raise Exception('Malformed PFM header.') |
| |
| scale = float((file.readline()).decode('UTF-8').rstrip()) |
| if scale < 0: |
| data_type = '<f' |
| else: |
| data_type = '>f' |
| data_string = file.read() |
| data = np.fromstring(data_string, data_type) |
| shape = (height, width, 3) if color else (height, width) |
| data = np.reshape(data, shape) |
| data = cv2.flip(data, 0) |
| return data |
|
|
|
|
| |
| def depth_scale_shift_normalization(depth, low_percent=2, high_percent=98): |
|
|
| bsz = depth.shape[0] |
| depth_ = depth[:,0,:,:].reshape(bsz,-1).cpu().numpy() |
| min_value = torch.from_numpy(np.percentile(a=depth_,q=low_percent,axis=1)).to(depth)[...,None,None,None] |
| max_value = torch.from_numpy(np.percentile(a=depth_,q=high_percent,axis=1)).to(depth)[...,None,None,None] |
|
|
| normalized_depth = ((depth - min_value)/(max_value-min_value+1e-5) - 0.5) * 2 |
| normalized_depth = torch.clip(normalized_depth, -1., 1.) |
|
|
| return normalized_depth |
|
|
| |
| def norm_to_rgb(norm): |
| |
| |
| |
| norm_rgb = ((norm + 1.0) / 2.0 * 255.0).astype(np.uint8) |
| |
| norm_rgb = np.clip(norm_rgb, a_min=0, a_max=255) |
| norm_rgb = norm_rgb.astype(np.uint8) |
| return norm_rgb |
|
|
|
|
| def colorize_depth_maps( |
| depth_map, min_depth=None, max_depth=None, cmap="Spectral", valid_mask=None |
| ): |
| """ |
| Colorize depth maps. |
| """ |
| assert len(depth_map.shape) >= 2, "Invalid dimension" |
|
|
| if isinstance(depth_map, torch.Tensor): |
| depth = depth_map.detach().clone().squeeze().cpu().numpy() |
| elif isinstance(depth_map, np.ndarray): |
| depth = depth_map.copy().squeeze() |
| |
| if depth.ndim < 3: |
| depth = depth[np.newaxis, :, :] |
|
|
| |
| cm = matplotlib.colormaps[cmap] |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| if min_depth != max_depth: |
| depth = ((depth - min_depth) / (max_depth - min_depth)).clip(0, 1) |
| else: |
| |
| depth = depth * 0. |
|
|
| img_colored_np = cm(depth, bytes=False)[:, :, :, 0:3] |
| img_colored_np = np.rollaxis(img_colored_np, 3, 1) |
|
|
| if valid_mask is not None: |
| if isinstance(depth_map, torch.Tensor): |
| valid_mask = valid_mask.detach().numpy() |
| valid_mask = valid_mask.squeeze() |
| if valid_mask.ndim < 3: |
| valid_mask = valid_mask[np.newaxis, np.newaxis, :, :] |
| else: |
| valid_mask = valid_mask[:, np.newaxis, :, :] |
| valid_mask = np.repeat(valid_mask, 3, axis=1) |
| img_colored_np[~valid_mask] = 0 |
|
|
| if isinstance(depth_map, torch.Tensor): |
| img_colored = torch.from_numpy(img_colored_np).float() |
| elif isinstance(depth_map, np.ndarray): |
| img_colored = img_colored_np |
|
|
| return img_colored |
|
|
|
|
| def chw2hwc(chw): |
| assert 3 == len(chw.shape) |
| if isinstance(chw, torch.Tensor): |
| hwc = torch.permute(chw, (1, 2, 0)) |
| elif isinstance(chw, np.ndarray): |
| hwc = np.moveaxis(chw, 0, -1) |
| return hwc |
|
|
|
|
| def resize_max_res_torch( |
| img: torch.Tensor, |
| max_edge_resolution: int, |
| resample_method: InterpolationMode = InterpolationMode.BILINEAR, |
| ) -> torch.Tensor: |
| """ |
| Resize image to limit maximum edge length while keeping aspect ratio. |
| |
| Args: |
| img (`torch.Tensor`): |
| Image tensor to be resized. |
| max_edge_resolution (`int`): |
| Maximum edge length (pixel). |
| resample_method (`PIL.Image.Resampling`): |
| Resampling method used to resize images. |
| |
| Returns: |
| `torch.Tensor`: Resized image. |
| """ |
| assert 3 == img.dim() |
| _, original_height, original_width = img.shape |
| downscale_factor = min( |
| max_edge_resolution / original_width, max_edge_resolution / original_height |
| ) |
|
|
| new_width = int(original_width * downscale_factor) |
| new_height = int(original_height * downscale_factor) |
|
|
| round_num = 16 |
| new_width = round(new_width / round_num) * round_num |
| new_height = round(new_height / round_num) * round_num |
|
|
| resized_img = resize(img, (new_height, new_width), resample_method, antialias=True) |
| return resized_img |
|
|
|
|
| def resize_max_res(img: Image.Image, max_edge_resolution: int, resample_method=Resampling.BILINEAR) -> Image.Image: |
| """ |
| Resize image to limit maximum edge length while keeping aspect ratio |
| |
| Args: |
| img (Image.Image): Image to be resized |
| max_edge_resolution (int): Maximum edge length (px). |
| |
| Returns: |
| Image.Image: Resized image. |
| """ |
| |
| if isinstance(img, torch.Tensor): |
| return resize_max_res_torch(img, max_edge_resolution, resample_method) |
| |
| original_width, original_height = img.size |
| downscale_factor = min( |
| max_edge_resolution / original_width, max_edge_resolution / original_height |
| ) |
|
|
| new_width = int(original_width * downscale_factor) |
| new_height = int(original_height * downscale_factor) |
|
|
| resized_img = img.resize((new_width, new_height), resample=resample_method) |
| return resized_img |
|
|
|
|
| def get_pil_resample_method(method_str: str) -> Resampling: |
| resample_method_dict = { |
| "bilinear": Resampling.BILINEAR, |
| "bicubic": Resampling.BICUBIC, |
| "nearest": Resampling.NEAREST, |
| } |
| resample_method = resample_method_dict.get(method_str, None) |
| if resample_method is None: |
| raise ValueError(f"Unknown resampling method: {resample_method}") |
| else: |
| return resample_method |
|
|
|
|
| def get_tv_resample_method(method_str: str) -> InterpolationMode: |
| resample_method_dict = { |
| "bilinear": InterpolationMode.BILINEAR, |
| "bicubic": InterpolationMode.BICUBIC, |
| |
| } |
| resample_method = resample_method_dict.get(method_str, None) |
| if resample_method is None: |
| raise ValueError(f"Unknown resampling method: {resample_method}") |
| else: |
| return resample_method |
|
|
|
|
| def create_point_cloud(depth_map, camera_matrix, extrinsic_matrix): |
|
|
| """Create point cloud from depth map and camera parameters.""" |
| |
| |
| height, width = depth_map.shape |
|
|
| |
| x = np.linspace(0, width - 1, width) |
| y = np.linspace(0, height - 1, height) |
| x, y = np.meshgrid(x, y) |
|
|
| |
| normalized_x = (x - camera_matrix[0, 2]) / camera_matrix[0, 0] |
| normalized_y = (y - camera_matrix[1, 2]) / camera_matrix[1, 1] |
| normalized_z = np.ones_like(x) |
|
|
| |
| depth_map_reshaped = np.repeat(depth_map[:, :, np.newaxis], 3, axis=2) |
| homogeneous_camera_coords = depth_map_reshaped * np.dstack((normalized_x, |
| normalized_y, |
| normalized_z)) |
|
|
| |
| ones = np.ones((height, width, 1)) |
| homogeneous_camera_coords = np.dstack((homogeneous_camera_coords, ones)) |
|
|
| |
| homogeneous_world_coords = np.dot(homogeneous_camera_coords, |
| extrinsic_matrix.T) |
|
|
| |
| point_cloud = (homogeneous_world_coords[:, :, :3] / |
| homogeneous_world_coords[:, :, 3:]) |
|
|
| point_cloud = point_cloud.reshape(-1, 3) |
|
|
| return point_cloud |
|
|
|
|
| def write_ply_mask(points,colors,path_ply,mask=None): |
| if mask is not None: |
| num = np.sum(mask) |
| else: |
| num = points.shape[0] |
| ply_header = ''' |
| ply format ascii 1.0 |
| element vertex {} |
| property float x |
| property float y |
| property float z |
| property uchar red |
| property uchar green |
| property uchar blue |
| end_header |
| '''.format(num) |
| |
| |
| |
| with open(path_ply, 'w') as f: |
| f.write(ply_header) |
| for i in range(points.shape[0]): |
| if mask.reshape(-1)[i]: |
| f.write('{} {} {} {} {} {}\n'.format(points[i,0], points[i,1], points[i,2], |
| int(colors[i, 2]*255), int(colors[i, 1]*255), int(colors[i, 0]*255))) |
|
|
|
|
| def write_ply(points,colors,path_ply,mask=None): |
| if mask is not None: |
| num = np.sum(mask) |
| else: |
| num = points.shape[0] |
| ply_header = '''ply |
| format ascii 1.0 |
| element vertex {} |
| property float x |
| property float y |
| property float z |
| property uchar red |
| property uchar green |
| property uchar blue |
| end_header |
| '''.format(num) |
|
|
| with open(path_ply, 'w') as f: |
| f.write(ply_header) |
| for i in range(points.shape[0]): |
| f.write('{} {} {} {} {} {}\n'.format(points[i,0], points[i,1], points[i,2], |
| int(colors[i, 2]*255), int(colors[i, 1]*255), int(colors[i, 0]*255))) |
|
|
|
|
| def Disparity_Normalization(disparity): |
| min_value = torch.min(disparity) |
| max_value = torch.max(disparity) |
| normalized_disparity = ((disparity -min_value)/(max_value-min_value+1e-5) - 0.5) * 2 |
| return normalized_disparity |
|
|
|
|
| def Disparity_Normalization_mask(disparity, min_value, max_value): |
| normalized_disparity = ((disparity -min_value)/(max_value-min_value+1e-5) - 0.5) * 2 |
| return normalized_disparity |
|
|
|
|
| def resize_max_res_tensor(input_tensor,is_disp=False,recom_resolution=768): |
|
|
| original_H, original_W = input_tensor.shape[2:] |
| |
| downscale_factor = min(recom_resolution/original_H, |
| recom_resolution/original_W) |
| |
| resized_input_tensor = F.interpolate(input_tensor, |
| scale_factor=downscale_factor,mode='bilinear', |
| align_corners=False) |
| if is_disp: |
| return resized_input_tensor * downscale_factor, downscale_factor |
| else: |
| return resized_input_tensor |