| |
| """ |
| Benchmark 3DMatch pairs using Open3D |
| """ |
| import json |
| import sys |
| import csv |
| import time |
| import argparse |
| from pathlib import Path |
| from tqdm import tqdm |
|
|
| try: |
| import open3d as o3d |
| import numpy as np |
| except ImportError as e: |
| print(f"Error: Required packages not installed: {e}") |
| print("Install with: pip install open3d numpy") |
| sys.exit(1) |
|
|
| def load_point_cloud(path): |
| """Load PLY point cloud""" |
| pcd = o3d.io.read_point_cloud(str(path)) |
| return pcd |
|
|
| def preprocess_point_cloud(pcd, voxel_size): |
| """Preprocess point cloud""" |
| |
| pcd_clean = o3d.geometry.PointCloud() |
| pcd_clean.points = o3d.utility.Vector3dVector( |
| np.asarray(pcd.points)[~np.any(~np.isfinite(np.asarray(pcd.points)), axis=1)] |
| ) |
| if pcd.has_normals(): |
| pcd_clean.normals = o3d.utility.Vector3dVector( |
| np.asarray(pcd.normals)[~np.any(~np.isfinite(np.asarray(pcd.points)), axis=1)] |
| ) |
| |
| |
| pcd_clean.remove_duplicated_points() |
| |
| |
| pcd_down = pcd_clean.voxel_down_sample(voxel_size) |
| |
| return pcd_down, len(pcd_clean.points), len(pcd_down.points) |
|
|
| def compute_fpfh(pcd, voxel_size): |
| """Compute FPFH features""" |
| if len(pcd.points) < 10: |
| return None |
| |
| pcd.estimate_normals( |
| o3d.geometry.KDTreeSearchParamRadius(radius=voxel_size * 2.0) |
| ) |
| |
| fpfh = o3d.pipelines.registration.compute_fpfh_feature( |
| pcd, |
| o3d.geometry.KDTreeSearchParamRadius(radius=voxel_size * 5.0) |
| ) |
| |
| return fpfh |
|
|
| def global_registration(source, target, source_fpfh, target_fpfh, voxel_size): |
| """RANSAC global registration""" |
| distance_threshold = voxel_size * 1.5 |
| |
| result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching( |
| source, target, source_fpfh, target_fpfh, |
| mutual_filter=False, |
| max_correspondence_distance=distance_threshold, |
| estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False), |
| ransac_n=3, |
| checkers=[ |
| o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9), |
| o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold) |
| ], |
| criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(50000, 0.999) |
| ) |
| |
| return result |
|
|
| def local_registration(source, target, init_transform, voxel_size): |
| """ICP local registration""" |
| distance_threshold = voxel_size * 0.4 |
| |
| result = o3d.pipelines.registration.registration_icp( |
| source, target, |
| max_correspondence_distance=distance_threshold, |
| init=init_transform, |
| criteria=o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=50), |
| estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane() |
| ) |
| |
| return result |
|
|
| def benchmark_pair(pair, voxel_size): |
| """Benchmark a single pair""" |
| result = { |
| 'scene_id': pair.get('scene_id'), |
| 'source_id': pair.get('source_id'), |
| 'target_id': pair.get('target_id'), |
| 'source_path': pair.get('source_path'), |
| 'target_path': pair.get('target_path'), |
| 'source_points': 0, |
| 'target_points': 0, |
| 'source_down_points': 0, |
| 'target_down_points': 0, |
| 'ransac_fitness': 0.0, |
| 'ransac_rmse': 0.0, |
| 'icp_fitness': 0.0, |
| 'icp_rmse': 0.0, |
| 'runtime_seconds': 0.0, |
| 'status': 'failed', |
| 'error_message': '' |
| } |
| |
| try: |
| start_time = time.time() |
| |
| |
| source = load_point_cloud(pair['source_path']) |
| target = load_point_cloud(pair['target_path']) |
| |
| result['source_points'] = len(source.points) |
| result['target_points'] = len(target.points) |
| |
| if result['source_points'] < 100 or result['target_points'] < 100: |
| result['error_message'] = 'Too few points' |
| result['runtime_seconds'] = time.time() - start_time |
| return result |
| |
| |
| source_down, _, source_down_count = preprocess_point_cloud(source, voxel_size) |
| target_down, _, target_down_count = preprocess_point_cloud(target, voxel_size) |
| |
| result['source_down_points'] = source_down_count |
| result['target_down_points'] = target_down_count |
| |
| if source_down_count < 50 or target_down_count < 50: |
| result['error_message'] = 'Too few points after downsampling' |
| result['runtime_seconds'] = time.time() - start_time |
| return result |
| |
| |
| source_fpfh = compute_fpfh(source_down, voxel_size) |
| target_fpfh = compute_fpfh(target_down, voxel_size) |
| |
| if source_fpfh is None or target_fpfh is None: |
| result['error_message'] = 'Failed to compute FPFH' |
| result['runtime_seconds'] = time.time() - start_time |
| return result |
| |
| |
| ransac_result = global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size) |
| |
| result['ransac_fitness'] = ransac_result.fitness |
| result['ransac_rmse'] = ransac_result.inlier_rmse |
| |
| |
| icp_result = local_registration(source_down, target_down, ransac_result.transformation, voxel_size) |
| |
| result['icp_fitness'] = icp_result.fitness |
| result['icp_rmse'] = icp_result.inlier_rmse |
| |
| result['status'] = 'success' |
| result['runtime_seconds'] = time.time() - start_time |
| |
| except Exception as e: |
| result['error_message'] = str(e) |
| result['runtime_seconds'] = time.time() - start_time |
| |
| return result |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Benchmark 3DMatch pairs') |
| parser.add_argument('--pair_index', required=True, help='Path to pair_index.json') |
| parser.add_argument('--output_csv', required=True, help='Output CSV file') |
| parser.add_argument('--output_json', required=True, help='Output JSON file') |
| parser.add_argument('--voxel_size', type=float, default=0.05, help='Voxel size for downsampling') |
| parser.add_argument('--max_pairs_per_scene', type=int, default=50, help='Max pairs per scene') |
| |
| args = parser.parse_args() |
| |
| |
| with open(args.pair_index) as f: |
| all_pairs = json.load(f) |
| |
| print(f"Loaded {len(all_pairs)} pairs") |
| |
| |
| scene_count = {} |
| pairs = [] |
| for pair in all_pairs: |
| scene_id = pair['scene_id'] |
| if scene_count.get(scene_id, 0) < args.max_pairs_per_scene: |
| pairs.append(pair) |
| scene_count[scene_id] = scene_count.get(scene_id, 0) + 1 |
| |
| print(f"Benchmarking {len(pairs)} pairs (max {args.max_pairs_per_scene} per scene)") |
| |
| |
| results = [] |
| for pair in tqdm(pairs, desc='Benchmarking'): |
| result = benchmark_pair(pair, args.voxel_size) |
| results.append(result) |
| |
| |
| Path(args.output_csv).parent.mkdir(parents=True, exist_ok=True) |
| Path(args.output_json).parent.mkdir(parents=True, exist_ok=True) |
| |
| |
| with open(args.output_csv, 'w', newline='') as f: |
| writer = csv.DictWriter(f, fieldnames=results[0].keys() if results else []) |
| writer.writeheader() |
| writer.writerows(results) |
| |
| |
| with open(args.output_json, 'w') as f: |
| json.dump(results, f, indent=2) |
| |
| |
| success_count = sum(1 for r in results if r['status'] == 'success') |
| print(f"\nResults saved to {args.output_csv} and {args.output_json}") |
| print(f"Success: {success_count}/{len(results)}") |
|
|
| if __name__ == '__main__': |
| main() |
|
|