Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions for point cloud processing and visualization | |
| """ | |
| import numpy as np | |
| import open3d as o3d | |
| from pathlib import Path | |
| import cv2 | |
| def visualize_point_cloud(ply_file): | |
| """ | |
| Visualize a single point cloud using Open3D | |
| Usage: | |
| visualize_point_cloud("pointcloud_000000.ply") | |
| """ | |
| pcd = o3d.io.read_point_cloud(ply_file) | |
| # Statistical outlier removal | |
| pcd, _ = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) | |
| # Estimate normals | |
| pcd.estimate_normals( | |
| search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30) | |
| ) | |
| o3d.visualization.draw_geometries( | |
| [pcd], | |
| window_name="Point Cloud Viewer", | |
| width=1024, | |
| height=768, | |
| point_show_normal=False | |
| ) | |
| def merge_point_clouds(ply_files, output_file="merged_pointcloud.ply", subsample_rate=0.3): | |
| """ | |
| Merge multiple point clouds into one | |
| Args: | |
| ply_files: List of PLY file paths | |
| output_file: Output merged PLY file | |
| subsample_rate: Keep only this fraction of points (0.3 = 30%) | |
| Usage: | |
| merge_point_clouds(["pc1.ply", "pc2.ply"], "merged.ply") | |
| """ | |
| merged_pcd = o3d.geometry.PointCloud() | |
| for ply_file in ply_files: | |
| pcd = o3d.io.read_point_cloud(str(ply_file)) | |
| # Subsample to reduce size | |
| if subsample_rate < 1.0: | |
| pcd = pcd.random_down_sample(subsample_rate) | |
| merged_pcd += pcd | |
| # Remove duplicates and outliers | |
| merged_pcd = merged_pcd.remove_duplicated_points() | |
| merged_pcd, _ = merged_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) | |
| # Save | |
| o3d.io.write_point_cloud(output_file, merged_pcd) | |
| print(f"Merged point cloud saved: {output_file}") | |
| print(f"Total points: {len(merged_pcd.points)}") | |
| return output_file | |
| def create_mesh_from_pointcloud(ply_file, output_file="mesh.ply", depth=9): | |
| """ | |
| Create mesh from point cloud using Poisson surface reconstruction | |
| Args: | |
| ply_file: Input PLY file | |
| output_file: Output mesh file | |
| depth: Poisson depth (higher = more detail, slower) | |
| Usage: | |
| create_mesh_from_pointcloud("pointcloud.ply", "mesh.ply") | |
| """ | |
| pcd = o3d.io.read_point_cloud(ply_file) | |
| # Estimate normals if not present | |
| if not pcd.has_normals(): | |
| pcd.estimate_normals( | |
| search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30) | |
| ) | |
| pcd.orient_normals_consistent_tangent_plane(k=15) | |
| # Poisson reconstruction | |
| print("Running Poisson reconstruction...") | |
| mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson( | |
| pcd, depth=depth | |
| ) | |
| # Remove low density vertices | |
| vertices_to_remove = densities < np.quantile(densities, 0.01) | |
| mesh.remove_vertices_by_mask(vertices_to_remove) | |
| # Save | |
| o3d.io.write_triangle_mesh(output_file, mesh) | |
| print(f"Mesh saved: {output_file}") | |
| print(f"Vertices: {len(mesh.vertices)}, Triangles: {len(mesh.triangles)}") | |
| return output_file | |
| def animate_reconstruction(ply_directory, output_video="reconstruction.mp4", fps=10): | |
| """ | |
| Create a video animating through the point clouds | |
| Args: | |
| ply_directory: Directory containing PLY files | |
| output_video: Output video file | |
| fps: Frames per second | |
| Usage: | |
| animate_reconstruction("./point_clouds", "animation.mp4") | |
| """ | |
| ply_files = sorted(Path(ply_directory).glob("*.ply")) | |
| if not ply_files: | |
| print("No PLY files found!") | |
| return | |
| # Setup visualizer | |
| vis = o3d.visualization.Visualizer() | |
| vis.create_window(visible=False, width=1920, height=1080) | |
| # Get first point cloud for camera setup | |
| pcd = o3d.io.read_point_cloud(str(ply_files[0])) | |
| vis.add_geometry(pcd) | |
| # Setup camera | |
| ctr = vis.get_view_control() | |
| ctr.set_zoom(0.8) | |
| # Render options | |
| opt = vis.get_render_option() | |
| opt.point_size = 2.0 | |
| opt.background_color = np.asarray([0, 0, 0]) | |
| # Capture frames | |
| frames = [] | |
| for i, ply_file in enumerate(ply_files): | |
| print(f"Rendering frame {i+1}/{len(ply_files)}") | |
| # Update point cloud | |
| pcd = o3d.io.read_point_cloud(str(ply_file)) | |
| vis.clear_geometries() | |
| vis.add_geometry(pcd) | |
| # Rotate camera slightly | |
| ctr.rotate(10.0, 0.0) | |
| # Capture | |
| vis.poll_events() | |
| vis.update_renderer() | |
| img = np.asarray(vis.capture_screen_float_buffer(do_render=True)) | |
| img = (img * 255).astype(np.uint8) | |
| frames.append(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| vis.destroy_window() | |
| # Write video | |
| height, width = frames[0].shape[:2] | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video, fourcc, fps, (width, height)) | |
| for frame in frames: | |
| out.write(frame) | |
| out.release() | |
| print(f"Video saved: {output_video}") | |
| def depth_map_to_pointcloud(rgb_image_path, depth_npy_path, output_ply, subsample=0.5): | |
| """ | |
| Convert single RGB image and depth map to point cloud | |
| Args: | |
| rgb_image_path: Path to RGB image | |
| depth_npy_path: Path to depth numpy array | |
| output_ply: Output PLY file | |
| subsample: Subsampling rate (0.5 = keep 50% of points) | |
| Usage: | |
| depth_map_to_pointcloud("frame.jpg", "depth.npy", "output.ply") | |
| """ | |
| # Load data | |
| rgb = cv2.imread(rgb_image_path) | |
| rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB) | |
| depth = np.load(depth_npy_path) | |
| # Normalize depth to reasonable scale | |
| depth = depth.astype(np.float32) | |
| depth = (depth - depth.min()) / (depth.max() - depth.min()) | |
| depth = depth * 10.0 # Scale to 0-10 units | |
| h, w = depth.shape | |
| # Camera intrinsics (adjust for your camera) | |
| fx = fy = w * 0.7 | |
| cx, cy = w / 2, h / 2 | |
| # Create 3D points | |
| u, v = np.meshgrid(np.arange(w), np.arange(h)) | |
| z = depth | |
| x = (u - cx) * z / fx | |
| y = (v - cy) * z / fy | |
| points = np.stack([x, y, z], axis=-1).reshape(-1, 3) | |
| colors = rgb.reshape(-1, 3) / 255.0 | |
| # Subsample | |
| if subsample < 1.0: | |
| n_points = int(len(points) * subsample) | |
| indices = np.random.choice(len(points), n_points, replace=False) | |
| points = points[indices] | |
| colors = colors[indices] | |
| # Create point cloud | |
| pcd = o3d.geometry.PointCloud() | |
| pcd.points = o3d.utility.Vector3dVector(points) | |
| pcd.colors = o3d.utility.Vector3dVector(colors) | |
| # Clean up | |
| pcd, _ = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) | |
| # Save | |
| o3d.io.write_point_cloud(output_ply, pcd) | |
| print(f"Point cloud saved: {output_ply} ({len(pcd.points)} points)") | |
| return output_ply | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| command = sys.argv[1] | |
| if command == "visualize" and len(sys.argv) > 2: | |
| visualize_point_cloud(sys.argv[2]) | |
| elif command == "merge" and len(sys.argv) > 3: | |
| ply_files = sys.argv[2:-1] | |
| output = sys.argv[-1] | |
| merge_point_clouds(ply_files, output) | |
| elif command == "mesh" and len(sys.argv) > 2: | |
| output = sys.argv[3] if len(sys.argv) > 3 else "mesh.ply" | |
| create_mesh_from_pointcloud(sys.argv[2], output) | |
| elif command == "animate" and len(sys.argv) > 2: | |
| output = sys.argv[3] if len(sys.argv) > 3 else "reconstruction.mp4" | |
| animate_reconstruction(sys.argv[2], output) | |
| else: | |
| print("Usage:") | |
| print(" python utils.py visualize <ply_file>") | |
| print(" python utils.py merge <ply1> <ply2> ... <output>") | |
| print(" python utils.py mesh <input_ply> [output_mesh]") | |
| print(" python utils.py animate <ply_directory> [output_video]") | |
| else: | |
| print("Utility functions loaded. Import in Python:") | |
| print(" from utils import visualize_point_cloud, merge_point_clouds") |