point-cloud-registration / scripts /benchmark_pairs.py
duytranus's picture
fix: app.py
cac424e
#!/usr/bin/env python3
"""
Benchmark 3DMatch pairs using Open3D
"""
import json
import sys
import csv
import time
import argparse
from pathlib import Path
from tqdm import tqdm
try:
import open3d as o3d
import numpy as np
except ImportError as e:
print(f"Error: Required packages not installed: {e}")
print("Install with: pip install open3d numpy")
sys.exit(1)
def load_point_cloud(path):
"""Load PLY point cloud"""
pcd = o3d.io.read_point_cloud(str(path))
return pcd
def preprocess_point_cloud(pcd, voxel_size):
"""Preprocess point cloud"""
# Remove non-finite points
pcd_clean = o3d.geometry.PointCloud()
pcd_clean.points = o3d.utility.Vector3dVector(
np.asarray(pcd.points)[~np.any(~np.isfinite(np.asarray(pcd.points)), axis=1)]
)
if pcd.has_normals():
pcd_clean.normals = o3d.utility.Vector3dVector(
np.asarray(pcd.normals)[~np.any(~np.isfinite(np.asarray(pcd.points)), axis=1)]
)
# Remove duplicates
pcd_clean.remove_duplicated_points()
# Downsample
pcd_down = pcd_clean.voxel_down_sample(voxel_size)
return pcd_down, len(pcd_clean.points), len(pcd_down.points)
def compute_fpfh(pcd, voxel_size):
"""Compute FPFH features"""
if len(pcd.points) < 10:
return None
pcd.estimate_normals(
o3d.geometry.KDTreeSearchParamRadius(radius=voxel_size * 2.0)
)
fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd,
o3d.geometry.KDTreeSearchParamRadius(radius=voxel_size * 5.0)
)
return fpfh
def global_registration(source, target, source_fpfh, target_fpfh, voxel_size):
"""RANSAC global registration"""
distance_threshold = voxel_size * 1.5
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source, target, source_fpfh, target_fpfh,
mutual_filter=False,
max_correspondence_distance=distance_threshold,
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
ransac_n=3,
checkers=[
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
],
criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(50000, 0.999)
)
return result
def local_registration(source, target, init_transform, voxel_size):
"""ICP local registration"""
distance_threshold = voxel_size * 0.4
result = o3d.pipelines.registration.registration_icp(
source, target,
max_correspondence_distance=distance_threshold,
init=init_transform,
criteria=o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=50),
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane()
)
return result
def benchmark_pair(pair, voxel_size):
"""Benchmark a single pair"""
result = {
'scene_id': pair.get('scene_id'),
'source_id': pair.get('source_id'),
'target_id': pair.get('target_id'),
'source_path': pair.get('source_path'),
'target_path': pair.get('target_path'),
'source_points': 0,
'target_points': 0,
'source_down_points': 0,
'target_down_points': 0,
'ransac_fitness': 0.0,
'ransac_rmse': 0.0,
'icp_fitness': 0.0,
'icp_rmse': 0.0,
'runtime_seconds': 0.0,
'status': 'failed',
'error_message': ''
}
try:
start_time = time.time()
# Load point clouds
source = load_point_cloud(pair['source_path'])
target = load_point_cloud(pair['target_path'])
result['source_points'] = len(source.points)
result['target_points'] = len(target.points)
if result['source_points'] < 100 or result['target_points'] < 100:
result['error_message'] = 'Too few points'
result['runtime_seconds'] = time.time() - start_time
return result
# Preprocess
source_down, _, source_down_count = preprocess_point_cloud(source, voxel_size)
target_down, _, target_down_count = preprocess_point_cloud(target, voxel_size)
result['source_down_points'] = source_down_count
result['target_down_points'] = target_down_count
if source_down_count < 50 or target_down_count < 50:
result['error_message'] = 'Too few points after downsampling'
result['runtime_seconds'] = time.time() - start_time
return result
# Compute features
source_fpfh = compute_fpfh(source_down, voxel_size)
target_fpfh = compute_fpfh(target_down, voxel_size)
if source_fpfh is None or target_fpfh is None:
result['error_message'] = 'Failed to compute FPFH'
result['runtime_seconds'] = time.time() - start_time
return result
# RANSAC
ransac_result = global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size)
result['ransac_fitness'] = ransac_result.fitness
result['ransac_rmse'] = ransac_result.inlier_rmse
# ICP
icp_result = local_registration(source_down, target_down, ransac_result.transformation, voxel_size)
result['icp_fitness'] = icp_result.fitness
result['icp_rmse'] = icp_result.inlier_rmse
result['status'] = 'success'
result['runtime_seconds'] = time.time() - start_time
except Exception as e:
result['error_message'] = str(e)
result['runtime_seconds'] = time.time() - start_time
return result
def main():
parser = argparse.ArgumentParser(description='Benchmark 3DMatch pairs')
parser.add_argument('--pair_index', required=True, help='Path to pair_index.json')
parser.add_argument('--output_csv', required=True, help='Output CSV file')
parser.add_argument('--output_json', required=True, help='Output JSON file')
parser.add_argument('--voxel_size', type=float, default=0.05, help='Voxel size for downsampling')
parser.add_argument('--max_pairs_per_scene', type=int, default=50, help='Max pairs per scene')
args = parser.parse_args()
# Load pair index
with open(args.pair_index) as f:
all_pairs = json.load(f)
print(f"Loaded {len(all_pairs)} pairs")
# Limit pairs per scene
scene_count = {}
pairs = []
for pair in all_pairs:
scene_id = pair['scene_id']
if scene_count.get(scene_id, 0) < args.max_pairs_per_scene:
pairs.append(pair)
scene_count[scene_id] = scene_count.get(scene_id, 0) + 1
print(f"Benchmarking {len(pairs)} pairs (max {args.max_pairs_per_scene} per scene)")
# Benchmark
results = []
for pair in tqdm(pairs, desc='Benchmarking'):
result = benchmark_pair(pair, args.voxel_size)
results.append(result)
# Save results
Path(args.output_csv).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_json).parent.mkdir(parents=True, exist_ok=True)
# CSV
with open(args.output_csv, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys() if results else [])
writer.writeheader()
writer.writerows(results)
# JSON
with open(args.output_json, 'w') as f:
json.dump(results, f, indent=2)
# Summary
success_count = sum(1 for r in results if r['status'] == 'success')
print(f"\nResults saved to {args.output_csv} and {args.output_json}")
print(f"Success: {success_count}/{len(results)}")
if __name__ == '__main__':
main()