Spaces:
Build error
Build error
| import gradio as gr | |
| import open3d as o3d | |
| import numpy as np | |
| import time | |
| import multiprocessing | |
| import os | |
| import tempfile | |
| import trimesh | |
| alignment_cache = {} | |
| current_orientation = "+z" | |
| def compute_alignment(files): | |
| """Align point clouds and cache results.""" | |
| ck = tuple(files) | |
| if ck in alignment_cache: return alignment_cache[ck] | |
| if len(files) < 2: return (None, None) | |
| st = time.time() | |
| print(f"Aligning {len(files)} point clouds using {multiprocessing.cpu_count()} CPU cores") | |
| clouds, names = [], [] | |
| for p in files: | |
| c = o3d.io.read_point_cloud(p) | |
| if not c.has_points(): return (None, f"Failed to load point cloud from {os.path.basename(p)}") | |
| clouds.append(c), names.append(os.path.basename(p)) | |
| ref = clouds[0] | |
| trans = [np.eye(4)] | |
| for i, src in enumerate(clouds[1:], 1): | |
| vs = [0.5, 0.3, 0.1] | |
| t = np.eye(4) | |
| for v in vs: | |
| s_down, r_down = src.voxel_down_sample(v), ref.voxel_down_sample(v) | |
| rn = v * 2 | |
| s_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=rn, max_nn=30)) | |
| r_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=rn, max_nn=30)) | |
| d = v * 2 | |
| mi = 10 if i == 0 else 5 | |
| r = o3d.pipelines.registration.registration_icp( | |
| s_down, r_down, d, t, | |
| o3d.pipelines.registration.TransformationEstimationPointToPlane(), | |
| o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=mi) | |
| ) | |
| t = r.transformation | |
| trans.append(t) | |
| aligned = [] | |
| for c, t in zip(clouds, trans): | |
| cc = o3d.geometry.PointCloud(c) | |
| cc.transform(t) | |
| aligned.append(cc) | |
| ap = [np.asarray(x.points) for x in aligned] | |
| if not ap: return (None, "Failed to process point clouds") | |
| cat = np.vstack(ap) | |
| mn, mx = np.min(cat, 0), np.max(cat, 0) | |
| ctr = (mn + mx) / 2 | |
| scl = np.max(mx - mn) | |
| combined = o3d.geometry.PointCloud() | |
| for c in aligned: combined += c | |
| tm = time.time() - st | |
| msg = f"Successfully aligned {len(files)} point clouds in {tm:.2f} seconds." | |
| data = { | |
| 'aligned_clouds': aligned, 'transformations': trans, 'clouds': clouds, | |
| 'filenames': names, 'global_center': ctr, 'global_scale': scl, | |
| 'combined_cloud': combined | |
| } | |
| alignment_cache[ck] = (data, msg) | |
| return data, msg | |
| def visualize_alignment(files, show_colors=False, show_outlines=True, point_density=10000, | |
| orientation="+z", point_size=0.005, is_initial_view=False): | |
| if not files or len(files) < 2: return (None, "Please upload at least 2 point cloud files (.ply)") | |
| res, msg = compute_alignment(files) | |
| if not res: return (None, msg) | |
| clouds = res['aligned_clouds'] | |
| ctr, scl = res['global_center'], res['global_scale'] | |
| cols = [[1,0,0],[0,1,0],[0,0,1],[1,1,0],[1,0,1],[0,1,1],[1,0.5,0]] | |
| tdir = tempfile.mkdtemp() | |
| global current_orientation | |
| current_orientation = orientation | |
| normed, boxes = [], [] | |
| for i, c in enumerate(clouds): | |
| cp = o3d.geometry.PointCloud(c) | |
| if show_colors: cp.paint_uniform_color(cols[i % len(cols)]) | |
| pts = np.asarray(cp.points) | |
| pts = (pts - ctr)/(scl/2) | |
| n = o3d.geometry.PointCloud() | |
| n.points = o3d.utility.Vector3dVector(pts) | |
| if cp.has_colors(): n.colors = cp.colors | |
| normed.append(n) | |
| if show_outlines: | |
| b = n.get_axis_aligned_bounding_box() | |
| b.color = cols[i % len(cols)] | |
| boxes.append(b) | |
| comb = o3d.geometry.PointCloud() | |
| for c in normed: comb += c | |
| if point_density < len(comb.points): | |
| bb = comb.get_axis_aligned_bounding_box() | |
| vol = np.prod(bb.get_extent()) | |
| ppu = point_density / vol if vol > 0 else 1 | |
| vs = max(0.001, (1/ppu)**(1/3)) | |
| comb = comb.voxel_down_sample(vs) | |
| vs, fs, vc = [], [], [] | |
| pts = np.asarray(comb.points) | |
| if comb.has_colors(): pcols = np.asarray(comb.colors) | |
| else: pcols = np.tile([0.8,0.8,0.8], (len(pts),1)) | |
| sph = trimesh.creation.icosphere(subdivisions=1, radius=point_size) | |
| mxp = min(point_density, len(pts)) | |
| idxs = np.linspace(0, len(pts)-1, mxp, dtype=int) | |
| for i, idx in enumerate(idxs): | |
| p = pts[idx] | |
| c = pcols[idx] | |
| s = sph.copy() | |
| s.apply_translation(p) | |
| s.visual.vertex_colors = np.tile((c*255).astype(np.uint8), (len(s.vertices),1)) | |
| si = len(vs) | |
| vs.extend(s.vertices) | |
| fs.extend(s.faces + si) | |
| vc.extend(s.visual.vertex_colors) | |
| cm = trimesh.Trimesh(vertices=vs, faces=fs, vertex_colors=vc) | |
| if show_outlines: | |
| box_edges = [(0,1),(1,2),(2,3),(3,0),(4,5),(5,6),(6,7),(7,4), | |
| (0,4),(1,5),(2,6),(3,7)] | |
| for i,b in enumerate(boxes): | |
| mb, xb = b.min_bound, b.max_bound | |
| bv = [[mb[0],mb[1],mb[2]],[xb[0],mb[1],mb[2]],[xb[0],xb[1],mb[2]],[mb[0],xb[1],mb[2]], | |
| [mb[0],mb[1],xb[2]],[xb[0],mb[1],xb[2]],[xb[0],xb[1],xb[2]],[mb[0],xb[1],xb[2]]] | |
| bc = cols[i % len(cols)] | |
| r = point_size*0.8 | |
| for s0,s1 in box_edges: | |
| p0, p1 = bv[s0], bv[s1] | |
| d = np.array(p1) - np.array(p0) | |
| ln = np.linalg.norm(d) | |
| if ln<1e-6: continue | |
| d /= ln | |
| za = np.array([0,0,1]) | |
| if abs(np.dot(d,za))>0.999: ra = np.array([1,0,0]) | |
| else: | |
| ra = np.cross(za,d) | |
| ra /= np.linalg.norm(ra) | |
| ang = np.arccos(np.dot(za,d)) | |
| rot = trimesh.transformations.rotation_matrix(ang,ra) | |
| trn = trimesh.transformations.translation_matrix(p0) | |
| cyl = trimesh.creation.cylinder(radius=r,height=ln,sections=8) | |
| cyl.apply_translation([0,0,ln/2]) | |
| cyl.apply_transform(rot) | |
| cyl.apply_transform(trn) | |
| cyl.visual.face_colors = (np.array(bc)*255).astype(np.uint8) | |
| cm = trimesh.util.concatenate([cm,cyl]) | |
| sc = trimesh.Scene(cm) | |
| sc.add_geometry(trimesh.creation.axis(origin_size=0.01, axis_radius=0.0025)) | |
| if orientation == "+y": | |
| sc.apply_transform(trimesh.transformations.rotation_matrix(-np.pi/2,[1,0,0])) | |
| elif orientation == "-y": | |
| sc.apply_transform(trimesh.transformations.rotation_matrix(np.pi/2,[1,0,0])) | |
| elif orientation == "-z": | |
| sc.apply_transform(trimesh.transformations.rotation_matrix(np.pi,[1,0,0])) | |
| fo = os.path.join(tdir,"aligned_scene.glb") | |
| sc.export(fo) | |
| return (fo, msg) | |
| def process_upload(files, sc, so, pd, o, ps): | |
| d, m = compute_alignment(files) | |
| if d is None: return (None, m) | |
| return visualize_alignment(files, sc, so, pd, o, ps, True) | |
| def update_visualization_only(files, sc, so, pd, o, ps): | |
| if not files or len(files)<2: return None | |
| if tuple(files) not in alignment_cache: return None | |
| mo, _ = visualize_alignment(files, sc, so, pd, o, ps, False) | |
| return mo | |
| with gr.Blocks(theme=gr.themes.Base()) as app: | |
| gr.Markdown("# Point Cloud Alignment Tool - 3D ICP") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File(file_count="multiple", file_types=[".ply"], label="Upload Point Cloud Files (.ply)", type="filepath") | |
| with gr.Row(): | |
| show_colors = gr.Checkbox(label="Show Colored Models", value=False) | |
| show_outlines = gr.Checkbox(label="Show Bounding Box Outlines", value=True) | |
| point_density = gr.Slider(1000, 500000, 10000, step=1000, label="Point Density (fewer points = faster rendering)") | |
| orientation = gr.Dropdown(["+z","-z","+y","-y"], value="+z", label="Model Orientation (up direction)") | |
| point_size = gr.Slider(0.001, 0.05, 0.005, step=0.001, label="Point Size") | |
| submit_btn = gr.Button("Align Point Clouds", variant="primary") | |
| with gr.Column(scale=2): | |
| output_model = gr.Model3D(label="Aligned Point Clouds", clear_color=[0.1,0.1,0.1,1.0]) | |
| output_text = gr.Textbox(label="Status") | |
| submit_btn.click(process_upload, | |
| [file_input, show_colors, show_outlines, point_density, orientation, point_size], | |
| [output_model, output_text]) | |
| show_colors.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) | |
| show_outlines.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) | |
| point_density.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) | |
| orientation.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) | |
| point_size.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model]) | |
| if __name__ == "__main__": | |
| print(f"Using {multiprocessing.cpu_count()} CPU cores for parallel processing") | |
| app.launch(share=True) | |