WaveGen / nano_WaveGen /utils /depth_to_pointcloud.py
FangSen9000's picture
Upload nano_WaveGen
8e263cf verified
#!/usr/bin/env python3
"""
深度图转点云工具
用于将MetricDepth深度图和RGB图像转换为点云
增强版本:支持MOVi数据集的特殊处理
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional, List, Dict
import cv2
from PIL import Image
from scipy.spatial.transform import Rotation
class DepthToPointCloud:
def __init__(self,
min_depth: float = 0.1,
max_depth: float = 100.0,
max_points: int = 50000):
"""
初始化深度转点云工具
Args:
min_depth: 最小有效深度(米)
max_depth: 最大有效深度(米)
max_points: 最大点数(用于下采样)
"""
self.min_depth = min_depth
self.max_depth = max_depth
self.max_points = max_points
def load_depth_npz(self, npz_path: str) -> np.ndarray:
"""
加载MetricDepth的NPZ文件
Args:
npz_path: NPZ文件路径
Returns:
depth_map: 深度图(米为单位)
"""
data = np.load(npz_path)
# MetricDepth通常将深度存储在'depth'或'pred'键中
if 'depth' in data:
depth = data['depth']
elif 'pred' in data:
depth = data['pred']
else:
# 尝试获取第一个数组
keys = list(data.keys())
if len(keys) > 0:
depth = data[keys[0]]
else:
raise ValueError(f"无法从NPZ文件中找到深度数据: {npz_path}")
# 确保是2D数组
if depth.ndim == 3 and depth.shape[0] == 1:
depth = depth[0]
elif depth.ndim == 3 and depth.shape[2] == 1:
depth = depth[:, :, 0]
return depth.astype(np.float32)
def load_rgb_image(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> np.ndarray:
"""
加载RGB图像
Args:
image_path: 图像路径
target_size: 目标大小 (H, W),如果指定则调整大小
Returns:
rgb: RGB图像数组 (H, W, 3),值域[0, 255]
"""
img = Image.open(image_path)
# 转换为RGB
if img.mode != 'RGB':
img = img.convert('RGB')
# 调整大小(如果需要)
if target_size is not None:
img = img.resize((target_size[1], target_size[0]), Image.BILINEAR)
return np.array(img, dtype=np.uint8)
def compute_world_scale(self, depth_maps: List[np.ndarray]) -> float:
"""
计算世界尺度(用于归一化)
Args:
depth_maps: 深度图列表
Returns:
world_scale: 场景的特征尺度
"""
all_depths = []
for depth_map in depth_maps:
valid_depths = depth_map[(depth_map > self.min_depth) & (depth_map < self.max_depth)]
if len(valid_depths) > 0:
all_depths.extend(valid_depths.flatten())
if len(all_depths) == 0:
return 10.0 # 默认值
# 使用85%分位数作为场景尺度
all_depths = np.array(all_depths)
world_scale = np.percentile(all_depths, 85)
return float(world_scale)
def depth_to_pointcloud(self,
depth_map: np.ndarray,
rgb_image: np.ndarray,
world_scale: float,
camera_intrinsics: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
"""
将深度图转换为点云
Args:
depth_map: 深度图 (H, W),米为单位
rgb_image: RGB图像 (H, W, 3)
world_scale: 世界尺度,用于归一化Z坐标
camera_intrinsics: 相机内参矩阵 (3, 3),如果为None则使用默认值
Returns:
points: 点云坐标 (N, 3),XY归一化到[-1, 1],Z归一化到[0, 1]
colors: 点云颜色 (N, 3),值域[0, 255]
"""
h, w = depth_map.shape
# 确保RGB图像大小匹配
if rgb_image.shape[:2] != (h, w):
rgb_image = cv2.resize(rgb_image, (w, h), interpolation=cv2.INTER_LINEAR)
# 创建像素坐标网格
u = np.arange(w)
v = np.arange(h)
u_grid, v_grid = np.meshgrid(u, v)
# 如果没有相机内参,使用默认值
if camera_intrinsics is None:
# 假设视场角为60度
fx = fy = w / (2 * np.tan(np.radians(30)))
cx = w / 2
cy = h / 2
else:
fx = camera_intrinsics[0, 0]
fy = camera_intrinsics[1, 1]
cx = camera_intrinsics[0, 2]
cy = camera_intrinsics[1, 2]
# 反投影到3D空间
z = depth_map
x = (u_grid - cx) * z / fx
y = (v_grid - cy) * z / fy
# 展平数组
x_flat = x.flatten()
y_flat = y.flatten()
z_flat = z.flatten()
# 获取颜色
r = rgb_image[:, :, 0].flatten()
g = rgb_image[:, :, 1].flatten()
b = rgb_image[:, :, 2].flatten()
# 过滤有效深度
valid_mask = (z_flat > self.min_depth) & (z_flat < self.max_depth)
if valid_mask.sum() < 100: # 太少有效点
# 返回空点云
return np.zeros((0, 3), dtype=np.float32), np.zeros((0, 3), dtype=np.uint8)
# 应用mask
x_valid = x_flat[valid_mask]
y_valid = y_flat[valid_mask]
z_valid = z_flat[valid_mask]
r_valid = r[valid_mask]
g_valid = g[valid_mask]
b_valid = b[valid_mask]
# 归一化坐标
# XY归一化到[-1, 1](基于深度加权的范围)
x_range = np.percentile(np.abs(x_valid), 95)
y_range = np.percentile(np.abs(y_valid), 95)
x_normalized = x_valid / (x_range + 1e-6)
y_normalized = y_valid / (y_range + 1e-6)
# Z归一化(除以世界尺度)
z_normalized = z_valid / world_scale
# 创建点云
points = np.stack([x_normalized, y_normalized, z_normalized], axis=1)
colors = np.stack([r_valid, g_valid, b_valid], axis=1)
# 随机下采样到最大点数
if len(points) > self.max_points:
indices = np.random.choice(len(points), self.max_points, replace=False)
points = points[indices]
colors = colors[indices]
return points.astype(np.float32), colors.astype(np.uint8)
def process_sequence(self,
depth_folder: Path,
rgb_folder: Path,
num_frames: Optional[int] = None) -> List[Tuple[np.ndarray, np.ndarray]]:
"""
处理整个序列
Args:
depth_folder: 深度NPZ文件夹路径
rgb_folder: RGB图像文件夹路径
num_frames: 处理的帧数,None表示处理所有帧
Returns:
point_clouds: [(points, colors), ...] 列表
"""
# 获取深度文件列表
depth_files = sorted(list(depth_folder.glob("*.npz")))
rgb_files = sorted(list(rgb_folder.glob("*.jpg")) + list(rgb_folder.glob("*.png")))
if len(depth_files) == 0:
raise ValueError(f"未找到深度文件: {depth_folder}")
if len(rgb_files) == 0:
raise ValueError(f"未找到RGB文件: {rgb_folder}")
# 确保文件数量匹配
num_frames_available = min(len(depth_files), len(rgb_files))
if num_frames is None:
num_frames = num_frames_available
else:
num_frames = min(num_frames, num_frames_available)
print(f"处理 {num_frames} 帧...")
# 首先加载所有深度图以计算世界尺度
depth_maps = []
for i in range(num_frames):
depth_map = self.load_depth_npz(str(depth_files[i]))
depth_maps.append(depth_map)
# 计算世界尺度
world_scale = self.compute_world_scale(depth_maps)
print(f"计算得到世界尺度: {world_scale:.2f} 米")
# 转换每一帧
point_clouds = []
for i in range(num_frames):
# 加载RGB图像
rgb_image = self.load_rgb_image(str(rgb_files[i]))
# 转换为点云
points, colors = self.depth_to_pointcloud(
depth_maps[i],
rgb_image,
world_scale
)
point_clouds.append((points, colors))
print(f"帧 {i+1}/{num_frames}: {len(points)} 个点")
return point_clouds
def depth_to_normalized_pointcloud_movi(self,
depth: np.ndarray,
segmentation: Optional[np.ndarray] = None,
camera_K: Optional[np.ndarray] = None,
camera_position: Optional[np.ndarray] = None,
camera_quaternion: Optional[np.ndarray] = None,
resolution: int = 128,
convert_to_zdepth: bool = True,
scene_center_override: Optional[np.ndarray] = None,
scene_scale_override: Optional[float] = None) -> Tuple:
"""
MOVi数据集专用:将深度图转换为归一化点云 [-10, 10]
与训练代码保持完全一致的实现
Args:
depth: (H, W, 1) 深度数组(欧几里得距离)
segmentation: (H, W) 实例分割掩码
camera_K: 3x3 相机内参矩阵
camera_position: 相机在世界坐标系中的位置
camera_quaternion: 相机四元数 (x,y,z,w) 格式
resolution: 图像分辨率(假设为正方形)
convert_to_zdepth: 是否将欧几里得深度转换为Z深度
Returns:
tuple: (instance_pointclouds, points_3d_normalized, segmentation, scene_center, scene_extent)
"""
H, W = depth.shape[:2]
# Get camera parameters
fx = camera_K[0, 0]
fy = camera_K[1, 1]
cx = camera_K[0, 2]
cy = camera_K[1, 2]
# Create pixel grid
xx, yy = np.meshgrid(np.arange(W), np.arange(H))
# Convert to normalized camera coordinates
x_norm = (xx - cx) / fx
y_norm = (yy - cy) / fy
if convert_to_zdepth:
# MOVi uses euclidean distance, convert to z-depth (planar depth)
# For each pixel, we have: euclidean_dist^2 = x^2 + y^2 + z^2
# Where x = x_norm * z, y = y_norm * z
# So: euclidean_dist^2 = (x_norm^2 + y_norm^2 + 1) * z^2
z = depth[:, :, 0] / np.sqrt(x_norm**2 + y_norm**2 + 1)
else:
# Use depth as-is (assume it's already z-depth)
z = depth[:, :, 0]
# Get 3D points
x = x_norm * z
y = y_norm * z
# Stack to get point cloud (in camera coordinates)
points_3d_camera = np.stack([x, y, z], axis=-1)
# Transform from camera to world coordinates if camera pose is provided
if camera_position is not None and camera_quaternion is not None:
# Convert quaternion to rotation matrix
# MOVi uses [x, y, z, w] format
cam_rot = Rotation.from_quat(camera_quaternion)
cam_rot_matrix = cam_rot.as_matrix()
# Transform points: World = R * Camera + T
points_3d_flat = points_3d_camera.reshape(-1, 3)
points_3d_world = points_3d_flat @ cam_rot_matrix.T + camera_position
points_3d = points_3d_world.reshape(points_3d_camera.shape)
else:
points_3d = points_3d_camera
# Normalize entire scene to [-10, 10] range
# Find scene bounds (only valid depth points)
valid_mask = z > 0
valid_points = points_3d[valid_mask]
if scene_center_override is not None and scene_scale_override is not None:
scene_center = np.array(scene_center_override, dtype=np.float32)
scene_extent = 20.0 / float(scene_scale_override) if scene_scale_override != 0 else 1.0
points_3d_normalized = (points_3d - scene_center) * float(scene_scale_override)
elif len(valid_points) > 0:
# Find scene extent
scene_min = np.min(valid_points, axis=0)
scene_max = np.max(valid_points, axis=0)
scene_center = (scene_min + scene_max) / 2
scene_extent = np.max(scene_max - scene_min)
# Scale to [-10, 10]
if scene_extent > 0:
scale_factor = 20.0 / scene_extent # 20 because we want -10 to 10
points_3d_normalized = (points_3d - scene_center) * scale_factor
else:
points_3d_normalized = points_3d - scene_center
else:
points_3d_normalized = points_3d
scene_center = np.zeros(3)
scene_extent = 1.0
# Get unique instance IDs (excluding background=0); if segmentation缺失则使用全0
if segmentation is None:
segmentation = np.zeros(depth.shape[:2], dtype=np.int32)
instance_ids = np.unique(segmentation)
instance_ids = instance_ids[instance_ids > 0]
instance_pointclouds = {}
for inst_id in instance_ids:
# Get mask for this instance
mask = segmentation == inst_id
# Extract points for this instance (already normalized with scene)
instance_points = points_3d_normalized[mask]
if len(instance_points) < 50: # Skip if too few points
continue
instance_pointclouds[int(inst_id)] = instance_points
# Also return the full scene point cloud and segmentation for visualization
return instance_pointclouds, points_3d_normalized, segmentation, scene_center, scene_extent
def process_movi_frame(self,
depth_path: str,
segmentation_path: Optional[str] = None,
rgb_path: Optional[str] = None,
camera_K: Optional[np.ndarray] = None,
camera_position: Optional[np.ndarray] = None,
camera_quaternion: Optional[np.ndarray] = None) -> Dict:
"""
处理MOVi数据集的单帧
Args:
depth_path: 深度文件路径(.npy格式)
segmentation_path: 分割文件路径(.npy格式)
rgb_path: RGB图像路径
camera_K: 相机内参
camera_position: 相机位置
camera_quaternion: 相机旋转
Returns:
dict: 包含点云和相关信息的字典
"""
# 加载深度
depth = np.load(depth_path)
# 加载分割(如果有)
segmentation = None
if segmentation_path and Path(segmentation_path).exists():
segmentation = np.load(segmentation_path)
# 加载RGB(如果有)
rgb = None
if rgb_path and Path(rgb_path).exists():
rgb = self.load_rgb_image(rgb_path)
# 转换为点云
result = self.depth_to_normalized_pointcloud_movi(
depth=depth,
segmentation=segmentation,
camera_K=camera_K,
camera_position=camera_position,
camera_quaternion=camera_quaternion,
convert_to_zdepth=True
)
# 添加RGB信息
result['rgb_image'] = rgb
return result