|
|
|
|
|
""" |
|
|
深度图转点云工具 |
|
|
用于将MetricDepth深度图和RGB图像转换为点云 |
|
|
增强版本:支持MOVi数据集的特殊处理 |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
from typing import Tuple, Optional, List, Dict |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
from scipy.spatial.transform import Rotation |
|
|
|
|
|
class DepthToPointCloud: |
|
|
def __init__(self, |
|
|
min_depth: float = 0.1, |
|
|
max_depth: float = 100.0, |
|
|
max_points: int = 50000): |
|
|
""" |
|
|
初始化深度转点云工具 |
|
|
|
|
|
Args: |
|
|
min_depth: 最小有效深度(米) |
|
|
max_depth: 最大有效深度(米) |
|
|
max_points: 最大点数(用于下采样) |
|
|
""" |
|
|
self.min_depth = min_depth |
|
|
self.max_depth = max_depth |
|
|
self.max_points = max_points |
|
|
|
|
|
def load_depth_npz(self, npz_path: str) -> np.ndarray: |
|
|
""" |
|
|
加载MetricDepth的NPZ文件 |
|
|
|
|
|
Args: |
|
|
npz_path: NPZ文件路径 |
|
|
|
|
|
Returns: |
|
|
depth_map: 深度图(米为单位) |
|
|
""" |
|
|
data = np.load(npz_path) |
|
|
|
|
|
|
|
|
if 'depth' in data: |
|
|
depth = data['depth'] |
|
|
elif 'pred' in data: |
|
|
depth = data['pred'] |
|
|
else: |
|
|
|
|
|
keys = list(data.keys()) |
|
|
if len(keys) > 0: |
|
|
depth = data[keys[0]] |
|
|
else: |
|
|
raise ValueError(f"无法从NPZ文件中找到深度数据: {npz_path}") |
|
|
|
|
|
|
|
|
if depth.ndim == 3 and depth.shape[0] == 1: |
|
|
depth = depth[0] |
|
|
elif depth.ndim == 3 and depth.shape[2] == 1: |
|
|
depth = depth[:, :, 0] |
|
|
|
|
|
return depth.astype(np.float32) |
|
|
|
|
|
def load_rgb_image(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> np.ndarray: |
|
|
""" |
|
|
加载RGB图像 |
|
|
|
|
|
Args: |
|
|
image_path: 图像路径 |
|
|
target_size: 目标大小 (H, W),如果指定则调整大小 |
|
|
|
|
|
Returns: |
|
|
rgb: RGB图像数组 (H, W, 3),值域[0, 255] |
|
|
""" |
|
|
img = Image.open(image_path) |
|
|
|
|
|
|
|
|
if img.mode != 'RGB': |
|
|
img = img.convert('RGB') |
|
|
|
|
|
|
|
|
if target_size is not None: |
|
|
img = img.resize((target_size[1], target_size[0]), Image.BILINEAR) |
|
|
|
|
|
return np.array(img, dtype=np.uint8) |
|
|
|
|
|
def compute_world_scale(self, depth_maps: List[np.ndarray]) -> float: |
|
|
""" |
|
|
计算世界尺度(用于归一化) |
|
|
|
|
|
Args: |
|
|
depth_maps: 深度图列表 |
|
|
|
|
|
Returns: |
|
|
world_scale: 场景的特征尺度 |
|
|
""" |
|
|
all_depths = [] |
|
|
|
|
|
for depth_map in depth_maps: |
|
|
valid_depths = depth_map[(depth_map > self.min_depth) & (depth_map < self.max_depth)] |
|
|
if len(valid_depths) > 0: |
|
|
all_depths.extend(valid_depths.flatten()) |
|
|
|
|
|
if len(all_depths) == 0: |
|
|
return 10.0 |
|
|
|
|
|
|
|
|
all_depths = np.array(all_depths) |
|
|
world_scale = np.percentile(all_depths, 85) |
|
|
|
|
|
return float(world_scale) |
|
|
|
|
|
def depth_to_pointcloud(self, |
|
|
depth_map: np.ndarray, |
|
|
rgb_image: np.ndarray, |
|
|
world_scale: float, |
|
|
camera_intrinsics: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
将深度图转换为点云 |
|
|
|
|
|
Args: |
|
|
depth_map: 深度图 (H, W),米为单位 |
|
|
rgb_image: RGB图像 (H, W, 3) |
|
|
world_scale: 世界尺度,用于归一化Z坐标 |
|
|
camera_intrinsics: 相机内参矩阵 (3, 3),如果为None则使用默认值 |
|
|
|
|
|
Returns: |
|
|
points: 点云坐标 (N, 3),XY归一化到[-1, 1],Z归一化到[0, 1] |
|
|
colors: 点云颜色 (N, 3),值域[0, 255] |
|
|
""" |
|
|
h, w = depth_map.shape |
|
|
|
|
|
|
|
|
if rgb_image.shape[:2] != (h, w): |
|
|
rgb_image = cv2.resize(rgb_image, (w, h), interpolation=cv2.INTER_LINEAR) |
|
|
|
|
|
|
|
|
u = np.arange(w) |
|
|
v = np.arange(h) |
|
|
u_grid, v_grid = np.meshgrid(u, v) |
|
|
|
|
|
|
|
|
if camera_intrinsics is None: |
|
|
|
|
|
fx = fy = w / (2 * np.tan(np.radians(30))) |
|
|
cx = w / 2 |
|
|
cy = h / 2 |
|
|
else: |
|
|
fx = camera_intrinsics[0, 0] |
|
|
fy = camera_intrinsics[1, 1] |
|
|
cx = camera_intrinsics[0, 2] |
|
|
cy = camera_intrinsics[1, 2] |
|
|
|
|
|
|
|
|
z = depth_map |
|
|
x = (u_grid - cx) * z / fx |
|
|
y = (v_grid - cy) * z / fy |
|
|
|
|
|
|
|
|
x_flat = x.flatten() |
|
|
y_flat = y.flatten() |
|
|
z_flat = z.flatten() |
|
|
|
|
|
|
|
|
r = rgb_image[:, :, 0].flatten() |
|
|
g = rgb_image[:, :, 1].flatten() |
|
|
b = rgb_image[:, :, 2].flatten() |
|
|
|
|
|
|
|
|
valid_mask = (z_flat > self.min_depth) & (z_flat < self.max_depth) |
|
|
|
|
|
if valid_mask.sum() < 100: |
|
|
|
|
|
return np.zeros((0, 3), dtype=np.float32), np.zeros((0, 3), dtype=np.uint8) |
|
|
|
|
|
|
|
|
x_valid = x_flat[valid_mask] |
|
|
y_valid = y_flat[valid_mask] |
|
|
z_valid = z_flat[valid_mask] |
|
|
r_valid = r[valid_mask] |
|
|
g_valid = g[valid_mask] |
|
|
b_valid = b[valid_mask] |
|
|
|
|
|
|
|
|
|
|
|
x_range = np.percentile(np.abs(x_valid), 95) |
|
|
y_range = np.percentile(np.abs(y_valid), 95) |
|
|
|
|
|
x_normalized = x_valid / (x_range + 1e-6) |
|
|
y_normalized = y_valid / (y_range + 1e-6) |
|
|
|
|
|
|
|
|
z_normalized = z_valid / world_scale |
|
|
|
|
|
|
|
|
points = np.stack([x_normalized, y_normalized, z_normalized], axis=1) |
|
|
colors = np.stack([r_valid, g_valid, b_valid], axis=1) |
|
|
|
|
|
|
|
|
if len(points) > self.max_points: |
|
|
indices = np.random.choice(len(points), self.max_points, replace=False) |
|
|
points = points[indices] |
|
|
colors = colors[indices] |
|
|
|
|
|
return points.astype(np.float32), colors.astype(np.uint8) |
|
|
|
|
|
def process_sequence(self, |
|
|
depth_folder: Path, |
|
|
rgb_folder: Path, |
|
|
num_frames: Optional[int] = None) -> List[Tuple[np.ndarray, np.ndarray]]: |
|
|
""" |
|
|
处理整个序列 |
|
|
|
|
|
Args: |
|
|
depth_folder: 深度NPZ文件夹路径 |
|
|
rgb_folder: RGB图像文件夹路径 |
|
|
num_frames: 处理的帧数,None表示处理所有帧 |
|
|
|
|
|
Returns: |
|
|
point_clouds: [(points, colors), ...] 列表 |
|
|
""" |
|
|
|
|
|
depth_files = sorted(list(depth_folder.glob("*.npz"))) |
|
|
rgb_files = sorted(list(rgb_folder.glob("*.jpg")) + list(rgb_folder.glob("*.png"))) |
|
|
|
|
|
if len(depth_files) == 0: |
|
|
raise ValueError(f"未找到深度文件: {depth_folder}") |
|
|
if len(rgb_files) == 0: |
|
|
raise ValueError(f"未找到RGB文件: {rgb_folder}") |
|
|
|
|
|
|
|
|
num_frames_available = min(len(depth_files), len(rgb_files)) |
|
|
if num_frames is None: |
|
|
num_frames = num_frames_available |
|
|
else: |
|
|
num_frames = min(num_frames, num_frames_available) |
|
|
|
|
|
print(f"处理 {num_frames} 帧...") |
|
|
|
|
|
|
|
|
depth_maps = [] |
|
|
for i in range(num_frames): |
|
|
depth_map = self.load_depth_npz(str(depth_files[i])) |
|
|
depth_maps.append(depth_map) |
|
|
|
|
|
|
|
|
world_scale = self.compute_world_scale(depth_maps) |
|
|
print(f"计算得到世界尺度: {world_scale:.2f} 米") |
|
|
|
|
|
|
|
|
point_clouds = [] |
|
|
for i in range(num_frames): |
|
|
|
|
|
rgb_image = self.load_rgb_image(str(rgb_files[i])) |
|
|
|
|
|
|
|
|
points, colors = self.depth_to_pointcloud( |
|
|
depth_maps[i], |
|
|
rgb_image, |
|
|
world_scale |
|
|
) |
|
|
|
|
|
point_clouds.append((points, colors)) |
|
|
print(f"帧 {i+1}/{num_frames}: {len(points)} 个点") |
|
|
|
|
|
return point_clouds |
|
|
|
|
|
def depth_to_normalized_pointcloud_movi(self, |
|
|
depth: np.ndarray, |
|
|
segmentation: Optional[np.ndarray] = None, |
|
|
camera_K: Optional[np.ndarray] = None, |
|
|
camera_position: Optional[np.ndarray] = None, |
|
|
camera_quaternion: Optional[np.ndarray] = None, |
|
|
resolution: int = 128, |
|
|
convert_to_zdepth: bool = True, |
|
|
scene_center_override: Optional[np.ndarray] = None, |
|
|
scene_scale_override: Optional[float] = None) -> Tuple: |
|
|
""" |
|
|
MOVi数据集专用:将深度图转换为归一化点云 [-10, 10] |
|
|
与训练代码保持完全一致的实现 |
|
|
|
|
|
Args: |
|
|
depth: (H, W, 1) 深度数组(欧几里得距离) |
|
|
segmentation: (H, W) 实例分割掩码 |
|
|
camera_K: 3x3 相机内参矩阵 |
|
|
camera_position: 相机在世界坐标系中的位置 |
|
|
camera_quaternion: 相机四元数 (x,y,z,w) 格式 |
|
|
resolution: 图像分辨率(假设为正方形) |
|
|
convert_to_zdepth: 是否将欧几里得深度转换为Z深度 |
|
|
|
|
|
Returns: |
|
|
tuple: (instance_pointclouds, points_3d_normalized, segmentation, scene_center, scene_extent) |
|
|
""" |
|
|
H, W = depth.shape[:2] |
|
|
|
|
|
|
|
|
fx = camera_K[0, 0] |
|
|
fy = camera_K[1, 1] |
|
|
cx = camera_K[0, 2] |
|
|
cy = camera_K[1, 2] |
|
|
|
|
|
|
|
|
xx, yy = np.meshgrid(np.arange(W), np.arange(H)) |
|
|
|
|
|
|
|
|
x_norm = (xx - cx) / fx |
|
|
y_norm = (yy - cy) / fy |
|
|
|
|
|
if convert_to_zdepth: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
z = depth[:, :, 0] / np.sqrt(x_norm**2 + y_norm**2 + 1) |
|
|
else: |
|
|
|
|
|
z = depth[:, :, 0] |
|
|
|
|
|
|
|
|
x = x_norm * z |
|
|
y = y_norm * z |
|
|
|
|
|
|
|
|
points_3d_camera = np.stack([x, y, z], axis=-1) |
|
|
|
|
|
|
|
|
if camera_position is not None and camera_quaternion is not None: |
|
|
|
|
|
|
|
|
cam_rot = Rotation.from_quat(camera_quaternion) |
|
|
cam_rot_matrix = cam_rot.as_matrix() |
|
|
|
|
|
|
|
|
points_3d_flat = points_3d_camera.reshape(-1, 3) |
|
|
points_3d_world = points_3d_flat @ cam_rot_matrix.T + camera_position |
|
|
points_3d = points_3d_world.reshape(points_3d_camera.shape) |
|
|
else: |
|
|
points_3d = points_3d_camera |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
valid_mask = z > 0 |
|
|
valid_points = points_3d[valid_mask] |
|
|
|
|
|
if scene_center_override is not None and scene_scale_override is not None: |
|
|
scene_center = np.array(scene_center_override, dtype=np.float32) |
|
|
scene_extent = 20.0 / float(scene_scale_override) if scene_scale_override != 0 else 1.0 |
|
|
points_3d_normalized = (points_3d - scene_center) * float(scene_scale_override) |
|
|
elif len(valid_points) > 0: |
|
|
|
|
|
scene_min = np.min(valid_points, axis=0) |
|
|
scene_max = np.max(valid_points, axis=0) |
|
|
scene_center = (scene_min + scene_max) / 2 |
|
|
scene_extent = np.max(scene_max - scene_min) |
|
|
|
|
|
|
|
|
if scene_extent > 0: |
|
|
scale_factor = 20.0 / scene_extent |
|
|
points_3d_normalized = (points_3d - scene_center) * scale_factor |
|
|
else: |
|
|
points_3d_normalized = points_3d - scene_center |
|
|
else: |
|
|
points_3d_normalized = points_3d |
|
|
scene_center = np.zeros(3) |
|
|
scene_extent = 1.0 |
|
|
|
|
|
|
|
|
if segmentation is None: |
|
|
segmentation = np.zeros(depth.shape[:2], dtype=np.int32) |
|
|
instance_ids = np.unique(segmentation) |
|
|
instance_ids = instance_ids[instance_ids > 0] |
|
|
|
|
|
instance_pointclouds = {} |
|
|
|
|
|
for inst_id in instance_ids: |
|
|
|
|
|
mask = segmentation == inst_id |
|
|
|
|
|
|
|
|
instance_points = points_3d_normalized[mask] |
|
|
|
|
|
if len(instance_points) < 50: |
|
|
continue |
|
|
|
|
|
instance_pointclouds[int(inst_id)] = instance_points |
|
|
|
|
|
|
|
|
return instance_pointclouds, points_3d_normalized, segmentation, scene_center, scene_extent |
|
|
|
|
|
def process_movi_frame(self, |
|
|
depth_path: str, |
|
|
segmentation_path: Optional[str] = None, |
|
|
rgb_path: Optional[str] = None, |
|
|
camera_K: Optional[np.ndarray] = None, |
|
|
camera_position: Optional[np.ndarray] = None, |
|
|
camera_quaternion: Optional[np.ndarray] = None) -> Dict: |
|
|
""" |
|
|
处理MOVi数据集的单帧 |
|
|
|
|
|
Args: |
|
|
depth_path: 深度文件路径(.npy格式) |
|
|
segmentation_path: 分割文件路径(.npy格式) |
|
|
rgb_path: RGB图像路径 |
|
|
camera_K: 相机内参 |
|
|
camera_position: 相机位置 |
|
|
camera_quaternion: 相机旋转 |
|
|
|
|
|
Returns: |
|
|
dict: 包含点云和相关信息的字典 |
|
|
""" |
|
|
|
|
|
depth = np.load(depth_path) |
|
|
|
|
|
|
|
|
segmentation = None |
|
|
if segmentation_path and Path(segmentation_path).exists(): |
|
|
segmentation = np.load(segmentation_path) |
|
|
|
|
|
|
|
|
rgb = None |
|
|
if rgb_path and Path(rgb_path).exists(): |
|
|
rgb = self.load_rgb_image(rgb_path) |
|
|
|
|
|
|
|
|
result = self.depth_to_normalized_pointcloud_movi( |
|
|
depth=depth, |
|
|
segmentation=segmentation, |
|
|
camera_K=camera_K, |
|
|
camera_position=camera_position, |
|
|
camera_quaternion=camera_quaternion, |
|
|
convert_to_zdepth=True |
|
|
) |
|
|
|
|
|
|
|
|
result['rgb_image'] = rgb |
|
|
|
|
|
return result |
|
|
|