#!/usr/bin/env python3 """ Benchmark 3DMatch pairs using Open3D """ import json import sys import csv import time import argparse from pathlib import Path from tqdm import tqdm try: import open3d as o3d import numpy as np except ImportError as e: print(f"Error: Required packages not installed: {e}") print("Install with: pip install open3d numpy") sys.exit(1) def load_point_cloud(path): """Load PLY point cloud""" pcd = o3d.io.read_point_cloud(str(path)) return pcd def preprocess_point_cloud(pcd, voxel_size): """Preprocess point cloud""" # Remove non-finite points pcd_clean = o3d.geometry.PointCloud() pcd_clean.points = o3d.utility.Vector3dVector( np.asarray(pcd.points)[~np.any(~np.isfinite(np.asarray(pcd.points)), axis=1)] ) if pcd.has_normals(): pcd_clean.normals = o3d.utility.Vector3dVector( np.asarray(pcd.normals)[~np.any(~np.isfinite(np.asarray(pcd.points)), axis=1)] ) # Remove duplicates pcd_clean.remove_duplicated_points() # Downsample pcd_down = pcd_clean.voxel_down_sample(voxel_size) return pcd_down, len(pcd_clean.points), len(pcd_down.points) def compute_fpfh(pcd, voxel_size): """Compute FPFH features""" if len(pcd.points) < 10: return None pcd.estimate_normals( o3d.geometry.KDTreeSearchParamRadius(radius=voxel_size * 2.0) ) fpfh = o3d.pipelines.registration.compute_fpfh_feature( pcd, o3d.geometry.KDTreeSearchParamRadius(radius=voxel_size * 5.0) ) return fpfh def global_registration(source, target, source_fpfh, target_fpfh, voxel_size): """RANSAC global registration""" distance_threshold = voxel_size * 1.5 result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching( source, target, source_fpfh, target_fpfh, mutual_filter=False, max_correspondence_distance=distance_threshold, estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False), ransac_n=3, checkers=[ o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9), o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold) ], criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(50000, 0.999) ) return result def local_registration(source, target, init_transform, voxel_size): """ICP local registration""" distance_threshold = voxel_size * 0.4 result = o3d.pipelines.registration.registration_icp( source, target, max_correspondence_distance=distance_threshold, init=init_transform, criteria=o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=50), estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane() ) return result def benchmark_pair(pair, voxel_size): """Benchmark a single pair""" result = { 'scene_id': pair.get('scene_id'), 'source_id': pair.get('source_id'), 'target_id': pair.get('target_id'), 'source_path': pair.get('source_path'), 'target_path': pair.get('target_path'), 'source_points': 0, 'target_points': 0, 'source_down_points': 0, 'target_down_points': 0, 'ransac_fitness': 0.0, 'ransac_rmse': 0.0, 'icp_fitness': 0.0, 'icp_rmse': 0.0, 'runtime_seconds': 0.0, 'status': 'failed', 'error_message': '' } try: start_time = time.time() # Load point clouds source = load_point_cloud(pair['source_path']) target = load_point_cloud(pair['target_path']) result['source_points'] = len(source.points) result['target_points'] = len(target.points) if result['source_points'] < 100 or result['target_points'] < 100: result['error_message'] = 'Too few points' result['runtime_seconds'] = time.time() - start_time return result # Preprocess source_down, _, source_down_count = preprocess_point_cloud(source, voxel_size) target_down, _, target_down_count = preprocess_point_cloud(target, voxel_size) result['source_down_points'] = source_down_count result['target_down_points'] = target_down_count if source_down_count < 50 or target_down_count < 50: result['error_message'] = 'Too few points after downsampling' result['runtime_seconds'] = time.time() - start_time return result # Compute features source_fpfh = compute_fpfh(source_down, voxel_size) target_fpfh = compute_fpfh(target_down, voxel_size) if source_fpfh is None or target_fpfh is None: result['error_message'] = 'Failed to compute FPFH' result['runtime_seconds'] = time.time() - start_time return result # RANSAC ransac_result = global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size) result['ransac_fitness'] = ransac_result.fitness result['ransac_rmse'] = ransac_result.inlier_rmse # ICP icp_result = local_registration(source_down, target_down, ransac_result.transformation, voxel_size) result['icp_fitness'] = icp_result.fitness result['icp_rmse'] = icp_result.inlier_rmse result['status'] = 'success' result['runtime_seconds'] = time.time() - start_time except Exception as e: result['error_message'] = str(e) result['runtime_seconds'] = time.time() - start_time return result def main(): parser = argparse.ArgumentParser(description='Benchmark 3DMatch pairs') parser.add_argument('--pair_index', required=True, help='Path to pair_index.json') parser.add_argument('--output_csv', required=True, help='Output CSV file') parser.add_argument('--output_json', required=True, help='Output JSON file') parser.add_argument('--voxel_size', type=float, default=0.05, help='Voxel size for downsampling') parser.add_argument('--max_pairs_per_scene', type=int, default=50, help='Max pairs per scene') args = parser.parse_args() # Load pair index with open(args.pair_index) as f: all_pairs = json.load(f) print(f"Loaded {len(all_pairs)} pairs") # Limit pairs per scene scene_count = {} pairs = [] for pair in all_pairs: scene_id = pair['scene_id'] if scene_count.get(scene_id, 0) < args.max_pairs_per_scene: pairs.append(pair) scene_count[scene_id] = scene_count.get(scene_id, 0) + 1 print(f"Benchmarking {len(pairs)} pairs (max {args.max_pairs_per_scene} per scene)") # Benchmark results = [] for pair in tqdm(pairs, desc='Benchmarking'): result = benchmark_pair(pair, args.voxel_size) results.append(result) # Save results Path(args.output_csv).parent.mkdir(parents=True, exist_ok=True) Path(args.output_json).parent.mkdir(parents=True, exist_ok=True) # CSV with open(args.output_csv, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=results[0].keys() if results else []) writer.writeheader() writer.writerows(results) # JSON with open(args.output_json, 'w') as f: json.dump(results, f, indent=2) # Summary success_count = sum(1 for r in results if r['status'] == 'success') print(f"\nResults saved to {args.output_csv} and {args.output_json}") print(f"Success: {success_count}/{len(results)}") if __name__ == '__main__': main()