import gradio as gr import open3d as o3d import numpy as np import time import multiprocessing import os import tempfile import trimesh alignment_cache = {} current_orientation = "+z" def compute_alignment(files): """Align point clouds and cache results.""" ck = tuple(files) if ck in alignment_cache: return alignment_cache[ck] if len(files) < 2: return (None, None) st = time.time() print(f"Aligning {len(files)} point clouds using {multiprocessing.cpu_count()} CPU cores") clouds, names = [], [] for p in files: c = o3d.io.read_point_cloud(p) if not c.has_points(): return (None, f"Failed to load point cloud from {os.path.basename(p)}") clouds.append(c), names.append(os.path.basename(p)) ref = clouds[0] trans = [np.eye(4)] for i, src in enumerate(clouds[1:], 1): vs = [0.5, 0.3, 0.1] t = np.eye(4) for v in vs: s_down, r_down = src.voxel_down_sample(v), ref.voxel_down_sample(v) rn = v * 2 s_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=rn, max_nn=30)) r_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=rn, max_nn=30)) d = v * 2 mi = 10 if i == 0 else 5 r = o3d.pipelines.registration.registration_icp( s_down, r_down, d, t, o3d.pipelines.registration.TransformationEstimationPointToPlane(), o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=mi) ) t = r.transformation trans.append(t) aligned = [] for c, t in zip(clouds, trans): cc = o3d.geometry.PointCloud(c) cc.transform(t) aligned.append(cc) ap = [np.asarray(x.points) for x in aligned] if not ap: return (None, "Failed to process point clouds") cat = np.vstack(ap) mn, mx = np.min(cat, 0), np.max(cat, 0) ctr = (mn + mx) / 2 scl = np.max(mx - mn) combined = o3d.geometry.PointCloud() for c in aligned: combined += c tm = time.time() - st msg = f"Successfully aligned {len(files)} point clouds in {tm:.2f} seconds." data = { 'aligned_clouds': aligned, 'transformations': trans, 'clouds': clouds, 'filenames': names, 'global_center': ctr, 'global_scale': scl, 'combined_cloud': combined } alignment_cache[ck] = (data, msg) return data, msg def visualize_alignment(files, show_colors=False, show_outlines=True, point_density=10000, orientation="+z", point_size=0.005, is_initial_view=False): if not files or len(files) < 2: return (None, "Please upload at least 2 point cloud files (.ply)") res, msg = compute_alignment(files) if not res: return (None, msg) clouds = res['aligned_clouds'] ctr, scl = res['global_center'], res['global_scale'] cols = [[1,0,0],[0,1,0],[0,0,1],[1,1,0],[1,0,1],[0,1,1],[1,0.5,0]] tdir = tempfile.mkdtemp() global current_orientation current_orientation = orientation normed, boxes = [], [] for i, c in enumerate(clouds): cp = o3d.geometry.PointCloud(c) if show_colors: cp.paint_uniform_color(cols[i % len(cols)]) pts = np.asarray(cp.points) pts = (pts - ctr)/(scl/2) n = o3d.geometry.PointCloud() n.points = o3d.utility.Vector3dVector(pts) if cp.has_colors(): n.colors = cp.colors normed.append(n) if show_outlines: b = n.get_axis_aligned_bounding_box() b.color = cols[i % len(cols)] boxes.append(b) comb = o3d.geometry.PointCloud() for c in normed: comb += c if point_density < len(comb.points): bb = comb.get_axis_aligned_bounding_box() vol = np.prod(bb.get_extent()) ppu = point_density / vol if vol > 0 else 1 vs = max(0.001, (1/ppu)**(1/3)) comb = comb.voxel_down_sample(vs) vs, fs, vc = [], [], [] pts = np.asarray(comb.points) if comb.has_colors(): pcols = np.asarray(comb.colors) else: pcols = np.tile([0.8,0.8,0.8], (len(pts),1)) sph = trimesh.creation.icosphere(subdivisions=1, radius=point_size) mxp = min(point_density, len(pts)) idxs = np.linspace(0, len(pts)-1, mxp, dtype=int) for i, idx in enumerate(idxs): p = pts[idx] c = pcols[idx] s = sph.copy() s.apply_translation(p) s.visual.vertex_colors = np.tile((c*255).astype(np.uint8), (len(s.vertices),1)) si = len(vs) vs.extend(s.vertices) fs.extend(s.faces + si) vc.extend(s.visual.vertex_colors) cm = trimesh.Trimesh(vertices=vs, faces=fs, vertex_colors=vc) if show_outlines: box_edges = [(0,1),(1,2),(2,3),(3,0),(4,5),(5,6),(6,7),(7,4), (0,4),(1,5),(2,6),(3,7)] for i,b in enumerate(boxes): mb, xb = b.min_bound, b.max_bound bv = [[mb[0],mb[1],mb[2]],[xb[0],mb[1],mb[2]],[xb[0],xb[1],mb[2]],[mb[0],xb[1],mb[2]], [mb[0],mb[1],xb[2]],[xb[0],mb[1],xb[2]],[xb[0],xb[1],xb[2]],[mb[0],xb[1],xb[2]]] bc = cols[i % len(cols)] r = point_size*0.8 for s0,s1 in box_edges: p0, p1 = bv[s0], bv[s1] d = np.array(p1) - np.array(p0) ln = np.linalg.norm(d) if ln<1e-6: continue d /= ln za = np.array([0,0,1]) if abs(np.dot(d,za))>0.999: ra = np.array([1,0,0]) else: ra = np.cross(za,d) ra /= np.linalg.norm(ra) ang = np.arccos(np.dot(za,d)) rot = trimesh.transformations.rotation_matrix(ang,ra) trn = trimesh.transformations.translation_matrix(p0) cyl = trimesh.creation.cylinder(radius=r,height=ln,sections=8) cyl.apply_translation([0,0,ln/2]) cyl.apply_transform(rot) cyl.apply_transform(trn) cyl.visual.face_colors = (np.array(bc)*255).astype(np.uint8) cm = trimesh.util.concatenate([cm,cyl]) sc = trimesh.Scene(cm) sc.add_geometry(trimesh.creation.axis(origin_size=0.01, axis_radius=0.0025)) if orientation == "+y": sc.apply_transform(trimesh.transformations.rotation_matrix(-np.pi/2,[1,0,0])) elif orientation == "-y": sc.apply_transform(trimesh.transformations.rotation_matrix(np.pi/2,[1,0,0])) elif orientation == "-z": sc.apply_transform(trimesh.transformations.rotation_matrix(np.pi,[1,0,0])) fo = os.path.join(tdir,"aligned_scene.glb") sc.export(fo) return (fo, msg) def process_upload(files, sc, so, pd, o, ps): d, m = compute_alignment(files) if d is None: return (None, m) return visualize_alignment(files, sc, so, pd, o, ps, True) def update_visualization_only(files, sc, so, pd, o, ps): if not files or len(files)<2: return None if tuple(files) not in alignment_cache: return None mo, _ = visualize_alignment(files, sc, so, pd, o, ps, False) return mo with gr.Blocks(theme=gr.themes.Base()) as app: gr.Markdown("# Point Cloud Alignment Tool - 3D ICP") with gr.Row(): with gr.Column(scale=1): file_input = gr.File(file_count="multiple", file_types=[".ply"], label="Upload Point Cloud Files (.ply)", type="filepath") with gr.Row(): show_colors = gr.Checkbox(label="Show Colored Models", value=False) show_outlines = gr.Checkbox(label="Show Bounding Box Outlines", value=True) point_density = gr.Slider(1000, 500000, 10000, step=1000, label="Point Density (fewer points = faster rendering)") orientation = gr.Dropdown(["+z","-z","+y","-y"], value="+z", label="Model Orientation (up direction)") point_size = gr.Slider(0.001, 0.05, 0.005, step=0.001, label="Point Size") submit_btn = gr.Button("Align Point Clouds", variant="primary") with gr.Column(scale=2): output_model = gr.Model3D(label="Aligned Point Clouds", clear_color=[0.1,0.1,0.1,1.0]) output_text = gr.Textbox(label="Status") submit_btn.click(process_upload, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model, output_text]) show_colors.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) show_outlines.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) point_density.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) orientation.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) point_size.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) if __name__ == "__main__": print(f"Using {multiprocessing.cpu_count()} CPU cores for parallel processing") app.launch(share=True)