#!/usr/bin/env python3 """ 深度图转点云工具 用于将MetricDepth深度图和RGB图像转换为点云 增强版本:支持MOVi数据集的特殊处理 """ import numpy as np from pathlib import Path from typing import Tuple, Optional, List, Dict import cv2 from PIL import Image from scipy.spatial.transform import Rotation class DepthToPointCloud: def __init__(self, min_depth: float = 0.1, max_depth: float = 100.0, max_points: int = 50000): """ 初始化深度转点云工具 Args: min_depth: 最小有效深度(米) max_depth: 最大有效深度(米) max_points: 最大点数(用于下采样) """ self.min_depth = min_depth self.max_depth = max_depth self.max_points = max_points def load_depth_npz(self, npz_path: str) -> np.ndarray: """ 加载MetricDepth的NPZ文件 Args: npz_path: NPZ文件路径 Returns: depth_map: 深度图(米为单位) """ data = np.load(npz_path) # MetricDepth通常将深度存储在'depth'或'pred'键中 if 'depth' in data: depth = data['depth'] elif 'pred' in data: depth = data['pred'] else: # 尝试获取第一个数组 keys = list(data.keys()) if len(keys) > 0: depth = data[keys[0]] else: raise ValueError(f"无法从NPZ文件中找到深度数据: {npz_path}") # 确保是2D数组 if depth.ndim == 3 and depth.shape[0] == 1: depth = depth[0] elif depth.ndim == 3 and depth.shape[2] == 1: depth = depth[:, :, 0] return depth.astype(np.float32) def load_rgb_image(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> np.ndarray: """ 加载RGB图像 Args: image_path: 图像路径 target_size: 目标大小 (H, W),如果指定则调整大小 Returns: rgb: RGB图像数组 (H, W, 3),值域[0, 255] """ img = Image.open(image_path) # 转换为RGB if img.mode != 'RGB': img = img.convert('RGB') # 调整大小(如果需要) if target_size is not None: img = img.resize((target_size[1], target_size[0]), Image.BILINEAR) return np.array(img, dtype=np.uint8) def compute_world_scale(self, depth_maps: List[np.ndarray]) -> float: """ 计算世界尺度(用于归一化) Args: depth_maps: 深度图列表 Returns: world_scale: 场景的特征尺度 """ all_depths = [] for depth_map in depth_maps: valid_depths = depth_map[(depth_map > self.min_depth) & (depth_map < self.max_depth)] if len(valid_depths) > 0: all_depths.extend(valid_depths.flatten()) if len(all_depths) == 0: return 10.0 # 默认值 # 使用85%分位数作为场景尺度 all_depths = np.array(all_depths) world_scale = np.percentile(all_depths, 85) return float(world_scale) def depth_to_pointcloud(self, depth_map: np.ndarray, rgb_image: np.ndarray, world_scale: float, camera_intrinsics: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]: """ 将深度图转换为点云 Args: depth_map: 深度图 (H, W),米为单位 rgb_image: RGB图像 (H, W, 3) world_scale: 世界尺度,用于归一化Z坐标 camera_intrinsics: 相机内参矩阵 (3, 3),如果为None则使用默认值 Returns: points: 点云坐标 (N, 3),XY归一化到[-1, 1],Z归一化到[0, 1] colors: 点云颜色 (N, 3),值域[0, 255] """ h, w = depth_map.shape # 确保RGB图像大小匹配 if rgb_image.shape[:2] != (h, w): rgb_image = cv2.resize(rgb_image, (w, h), interpolation=cv2.INTER_LINEAR) # 创建像素坐标网格 u = np.arange(w) v = np.arange(h) u_grid, v_grid = np.meshgrid(u, v) # 如果没有相机内参,使用默认值 if camera_intrinsics is None: # 假设视场角为60度 fx = fy = w / (2 * np.tan(np.radians(30))) cx = w / 2 cy = h / 2 else: fx = camera_intrinsics[0, 0] fy = camera_intrinsics[1, 1] cx = camera_intrinsics[0, 2] cy = camera_intrinsics[1, 2] # 反投影到3D空间 z = depth_map x = (u_grid - cx) * z / fx y = (v_grid - cy) * z / fy # 展平数组 x_flat = x.flatten() y_flat = y.flatten() z_flat = z.flatten() # 获取颜色 r = rgb_image[:, :, 0].flatten() g = rgb_image[:, :, 1].flatten() b = rgb_image[:, :, 2].flatten() # 过滤有效深度 valid_mask = (z_flat > self.min_depth) & (z_flat < self.max_depth) if valid_mask.sum() < 100: # 太少有效点 # 返回空点云 return np.zeros((0, 3), dtype=np.float32), np.zeros((0, 3), dtype=np.uint8) # 应用mask x_valid = x_flat[valid_mask] y_valid = y_flat[valid_mask] z_valid = z_flat[valid_mask] r_valid = r[valid_mask] g_valid = g[valid_mask] b_valid = b[valid_mask] # 归一化坐标 # XY归一化到[-1, 1](基于深度加权的范围) x_range = np.percentile(np.abs(x_valid), 95) y_range = np.percentile(np.abs(y_valid), 95) x_normalized = x_valid / (x_range + 1e-6) y_normalized = y_valid / (y_range + 1e-6) # Z归一化(除以世界尺度) z_normalized = z_valid / world_scale # 创建点云 points = np.stack([x_normalized, y_normalized, z_normalized], axis=1) colors = np.stack([r_valid, g_valid, b_valid], axis=1) # 随机下采样到最大点数 if len(points) > self.max_points: indices = np.random.choice(len(points), self.max_points, replace=False) points = points[indices] colors = colors[indices] return points.astype(np.float32), colors.astype(np.uint8) def process_sequence(self, depth_folder: Path, rgb_folder: Path, num_frames: Optional[int] = None) -> List[Tuple[np.ndarray, np.ndarray]]: """ 处理整个序列 Args: depth_folder: 深度NPZ文件夹路径 rgb_folder: RGB图像文件夹路径 num_frames: 处理的帧数,None表示处理所有帧 Returns: point_clouds: [(points, colors), ...] 列表 """ # 获取深度文件列表 depth_files = sorted(list(depth_folder.glob("*.npz"))) rgb_files = sorted(list(rgb_folder.glob("*.jpg")) + list(rgb_folder.glob("*.png"))) if len(depth_files) == 0: raise ValueError(f"未找到深度文件: {depth_folder}") if len(rgb_files) == 0: raise ValueError(f"未找到RGB文件: {rgb_folder}") # 确保文件数量匹配 num_frames_available = min(len(depth_files), len(rgb_files)) if num_frames is None: num_frames = num_frames_available else: num_frames = min(num_frames, num_frames_available) print(f"处理 {num_frames} 帧...") # 首先加载所有深度图以计算世界尺度 depth_maps = [] for i in range(num_frames): depth_map = self.load_depth_npz(str(depth_files[i])) depth_maps.append(depth_map) # 计算世界尺度 world_scale = self.compute_world_scale(depth_maps) print(f"计算得到世界尺度: {world_scale:.2f} 米") # 转换每一帧 point_clouds = [] for i in range(num_frames): # 加载RGB图像 rgb_image = self.load_rgb_image(str(rgb_files[i])) # 转换为点云 points, colors = self.depth_to_pointcloud( depth_maps[i], rgb_image, world_scale ) point_clouds.append((points, colors)) print(f"帧 {i+1}/{num_frames}: {len(points)} 个点") return point_clouds def depth_to_normalized_pointcloud_movi(self, depth: np.ndarray, segmentation: Optional[np.ndarray] = None, camera_K: Optional[np.ndarray] = None, camera_position: Optional[np.ndarray] = None, camera_quaternion: Optional[np.ndarray] = None, resolution: int = 128, convert_to_zdepth: bool = True, scene_center_override: Optional[np.ndarray] = None, scene_scale_override: Optional[float] = None) -> Tuple: """ MOVi数据集专用:将深度图转换为归一化点云 [-10, 10] 与训练代码保持完全一致的实现 Args: depth: (H, W, 1) 深度数组(欧几里得距离) segmentation: (H, W) 实例分割掩码 camera_K: 3x3 相机内参矩阵 camera_position: 相机在世界坐标系中的位置 camera_quaternion: 相机四元数 (x,y,z,w) 格式 resolution: 图像分辨率(假设为正方形) convert_to_zdepth: 是否将欧几里得深度转换为Z深度 Returns: tuple: (instance_pointclouds, points_3d_normalized, segmentation, scene_center, scene_extent) """ H, W = depth.shape[:2] # Get camera parameters fx = camera_K[0, 0] fy = camera_K[1, 1] cx = camera_K[0, 2] cy = camera_K[1, 2] # Create pixel grid xx, yy = np.meshgrid(np.arange(W), np.arange(H)) # Convert to normalized camera coordinates x_norm = (xx - cx) / fx y_norm = (yy - cy) / fy if convert_to_zdepth: # MOVi uses euclidean distance, convert to z-depth (planar depth) # For each pixel, we have: euclidean_dist^2 = x^2 + y^2 + z^2 # Where x = x_norm * z, y = y_norm * z # So: euclidean_dist^2 = (x_norm^2 + y_norm^2 + 1) * z^2 z = depth[:, :, 0] / np.sqrt(x_norm**2 + y_norm**2 + 1) else: # Use depth as-is (assume it's already z-depth) z = depth[:, :, 0] # Get 3D points x = x_norm * z y = y_norm * z # Stack to get point cloud (in camera coordinates) points_3d_camera = np.stack([x, y, z], axis=-1) # Transform from camera to world coordinates if camera pose is provided if camera_position is not None and camera_quaternion is not None: # Convert quaternion to rotation matrix # MOVi uses [x, y, z, w] format cam_rot = Rotation.from_quat(camera_quaternion) cam_rot_matrix = cam_rot.as_matrix() # Transform points: World = R * Camera + T points_3d_flat = points_3d_camera.reshape(-1, 3) points_3d_world = points_3d_flat @ cam_rot_matrix.T + camera_position points_3d = points_3d_world.reshape(points_3d_camera.shape) else: points_3d = points_3d_camera # Normalize entire scene to [-10, 10] range # Find scene bounds (only valid depth points) valid_mask = z > 0 valid_points = points_3d[valid_mask] if scene_center_override is not None and scene_scale_override is not None: scene_center = np.array(scene_center_override, dtype=np.float32) scene_extent = 20.0 / float(scene_scale_override) if scene_scale_override != 0 else 1.0 points_3d_normalized = (points_3d - scene_center) * float(scene_scale_override) elif len(valid_points) > 0: # Find scene extent scene_min = np.min(valid_points, axis=0) scene_max = np.max(valid_points, axis=0) scene_center = (scene_min + scene_max) / 2 scene_extent = np.max(scene_max - scene_min) # Scale to [-10, 10] if scene_extent > 0: scale_factor = 20.0 / scene_extent # 20 because we want -10 to 10 points_3d_normalized = (points_3d - scene_center) * scale_factor else: points_3d_normalized = points_3d - scene_center else: points_3d_normalized = points_3d scene_center = np.zeros(3) scene_extent = 1.0 # Get unique instance IDs (excluding background=0); if segmentation缺失则使用全0 if segmentation is None: segmentation = np.zeros(depth.shape[:2], dtype=np.int32) instance_ids = np.unique(segmentation) instance_ids = instance_ids[instance_ids > 0] instance_pointclouds = {} for inst_id in instance_ids: # Get mask for this instance mask = segmentation == inst_id # Extract points for this instance (already normalized with scene) instance_points = points_3d_normalized[mask] if len(instance_points) < 50: # Skip if too few points continue instance_pointclouds[int(inst_id)] = instance_points # Also return the full scene point cloud and segmentation for visualization return instance_pointclouds, points_3d_normalized, segmentation, scene_center, scene_extent def process_movi_frame(self, depth_path: str, segmentation_path: Optional[str] = None, rgb_path: Optional[str] = None, camera_K: Optional[np.ndarray] = None, camera_position: Optional[np.ndarray] = None, camera_quaternion: Optional[np.ndarray] = None) -> Dict: """ 处理MOVi数据集的单帧 Args: depth_path: 深度文件路径(.npy格式) segmentation_path: 分割文件路径(.npy格式) rgb_path: RGB图像路径 camera_K: 相机内参 camera_position: 相机位置 camera_quaternion: 相机旋转 Returns: dict: 包含点云和相关信息的字典 """ # 加载深度 depth = np.load(depth_path) # 加载分割(如果有) segmentation = None if segmentation_path and Path(segmentation_path).exists(): segmentation = np.load(segmentation_path) # 加载RGB(如果有) rgb = None if rgb_path and Path(rgb_path).exists(): rgb = self.load_rgb_image(rgb_path) # 转换为点云 result = self.depth_to_normalized_pointcloud_movi( depth=depth, segmentation=segmentation, camera_K=camera_K, camera_position=camera_position, camera_quaternion=camera_quaternion, convert_to_zdepth=True ) # 添加RGB信息 result['rgb_image'] = rgb return result