ryanhlewis's picture
Create app.py
268d37c verified
Raw
History Blame Contribute Delete
9.25 kB
import gradio as gr
import open3d as o3d
import numpy as np
import time
import multiprocessing
import os
import tempfile
import trimesh
alignment_cache = {}
current_orientation = "+z"
def compute_alignment(files):
"""Align point clouds and cache results."""
ck = tuple(files)
if ck in alignment_cache: return alignment_cache[ck]
if len(files) < 2: return (None, None)
st = time.time()
print(f"Aligning {len(files)} point clouds using {multiprocessing.cpu_count()} CPU cores")
clouds, names = [], []
for p in files:
c = o3d.io.read_point_cloud(p)
if not c.has_points(): return (None, f"Failed to load point cloud from {os.path.basename(p)}")
clouds.append(c), names.append(os.path.basename(p))
ref = clouds[0]
trans = [np.eye(4)]
for i, src in enumerate(clouds[1:], 1):
vs = [0.5, 0.3, 0.1]
t = np.eye(4)
for v in vs:
s_down, r_down = src.voxel_down_sample(v), ref.voxel_down_sample(v)
rn = v * 2
s_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=rn, max_nn=30))
r_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=rn, max_nn=30))
d = v * 2
mi = 10 if i == 0 else 5
r = o3d.pipelines.registration.registration_icp(
s_down, r_down, d, t,
o3d.pipelines.registration.TransformationEstimationPointToPlane(),
o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=mi)
)
t = r.transformation
trans.append(t)
aligned = []
for c, t in zip(clouds, trans):
cc = o3d.geometry.PointCloud(c)
cc.transform(t)
aligned.append(cc)
ap = [np.asarray(x.points) for x in aligned]
if not ap: return (None, "Failed to process point clouds")
cat = np.vstack(ap)
mn, mx = np.min(cat, 0), np.max(cat, 0)
ctr = (mn + mx) / 2
scl = np.max(mx - mn)
combined = o3d.geometry.PointCloud()
for c in aligned: combined += c
tm = time.time() - st
msg = f"Successfully aligned {len(files)} point clouds in {tm:.2f} seconds."
data = {
'aligned_clouds': aligned, 'transformations': trans, 'clouds': clouds,
'filenames': names, 'global_center': ctr, 'global_scale': scl,
'combined_cloud': combined
}
alignment_cache[ck] = (data, msg)
return data, msg
def visualize_alignment(files, show_colors=False, show_outlines=True, point_density=10000,
orientation="+z", point_size=0.005, is_initial_view=False):
if not files or len(files) < 2: return (None, "Please upload at least 2 point cloud files (.ply)")
res, msg = compute_alignment(files)
if not res: return (None, msg)
clouds = res['aligned_clouds']
ctr, scl = res['global_center'], res['global_scale']
cols = [[1,0,0],[0,1,0],[0,0,1],[1,1,0],[1,0,1],[0,1,1],[1,0.5,0]]
tdir = tempfile.mkdtemp()
global current_orientation
current_orientation = orientation
normed, boxes = [], []
for i, c in enumerate(clouds):
cp = o3d.geometry.PointCloud(c)
if show_colors: cp.paint_uniform_color(cols[i % len(cols)])
pts = np.asarray(cp.points)
pts = (pts - ctr)/(scl/2)
n = o3d.geometry.PointCloud()
n.points = o3d.utility.Vector3dVector(pts)
if cp.has_colors(): n.colors = cp.colors
normed.append(n)
if show_outlines:
b = n.get_axis_aligned_bounding_box()
b.color = cols[i % len(cols)]
boxes.append(b)
comb = o3d.geometry.PointCloud()
for c in normed: comb += c
if point_density < len(comb.points):
bb = comb.get_axis_aligned_bounding_box()
vol = np.prod(bb.get_extent())
ppu = point_density / vol if vol > 0 else 1
vs = max(0.001, (1/ppu)**(1/3))
comb = comb.voxel_down_sample(vs)
vs, fs, vc = [], [], []
pts = np.asarray(comb.points)
if comb.has_colors(): pcols = np.asarray(comb.colors)
else: pcols = np.tile([0.8,0.8,0.8], (len(pts),1))
sph = trimesh.creation.icosphere(subdivisions=1, radius=point_size)
mxp = min(point_density, len(pts))
idxs = np.linspace(0, len(pts)-1, mxp, dtype=int)
for i, idx in enumerate(idxs):
p = pts[idx]
c = pcols[idx]
s = sph.copy()
s.apply_translation(p)
s.visual.vertex_colors = np.tile((c*255).astype(np.uint8), (len(s.vertices),1))
si = len(vs)
vs.extend(s.vertices)
fs.extend(s.faces + si)
vc.extend(s.visual.vertex_colors)
cm = trimesh.Trimesh(vertices=vs, faces=fs, vertex_colors=vc)
if show_outlines:
box_edges = [(0,1),(1,2),(2,3),(3,0),(4,5),(5,6),(6,7),(7,4),
(0,4),(1,5),(2,6),(3,7)]
for i,b in enumerate(boxes):
mb, xb = b.min_bound, b.max_bound
bv = [[mb[0],mb[1],mb[2]],[xb[0],mb[1],mb[2]],[xb[0],xb[1],mb[2]],[mb[0],xb[1],mb[2]],
[mb[0],mb[1],xb[2]],[xb[0],mb[1],xb[2]],[xb[0],xb[1],xb[2]],[mb[0],xb[1],xb[2]]]
bc = cols[i % len(cols)]
r = point_size*0.8
for s0,s1 in box_edges:
p0, p1 = bv[s0], bv[s1]
d = np.array(p1) - np.array(p0)
ln = np.linalg.norm(d)
if ln<1e-6: continue
d /= ln
za = np.array([0,0,1])
if abs(np.dot(d,za))>0.999: ra = np.array([1,0,0])
else:
ra = np.cross(za,d)
ra /= np.linalg.norm(ra)
ang = np.arccos(np.dot(za,d))
rot = trimesh.transformations.rotation_matrix(ang,ra)
trn = trimesh.transformations.translation_matrix(p0)
cyl = trimesh.creation.cylinder(radius=r,height=ln,sections=8)
cyl.apply_translation([0,0,ln/2])
cyl.apply_transform(rot)
cyl.apply_transform(trn)
cyl.visual.face_colors = (np.array(bc)*255).astype(np.uint8)
cm = trimesh.util.concatenate([cm,cyl])
sc = trimesh.Scene(cm)
sc.add_geometry(trimesh.creation.axis(origin_size=0.01, axis_radius=0.0025))
if orientation == "+y":
sc.apply_transform(trimesh.transformations.rotation_matrix(-np.pi/2,[1,0,0]))
elif orientation == "-y":
sc.apply_transform(trimesh.transformations.rotation_matrix(np.pi/2,[1,0,0]))
elif orientation == "-z":
sc.apply_transform(trimesh.transformations.rotation_matrix(np.pi,[1,0,0]))
fo = os.path.join(tdir,"aligned_scene.glb")
sc.export(fo)
return (fo, msg)
def process_upload(files, sc, so, pd, o, ps):
d, m = compute_alignment(files)
if d is None: return (None, m)
return visualize_alignment(files, sc, so, pd, o, ps, True)
def update_visualization_only(files, sc, so, pd, o, ps):
if not files or len(files)<2: return None
if tuple(files) not in alignment_cache: return None
mo, _ = visualize_alignment(files, sc, so, pd, o, ps, False)
return mo
with gr.Blocks(theme=gr.themes.Base()) as app:
gr.Markdown("# Point Cloud Alignment Tool - 3D ICP")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(file_count="multiple", file_types=[".ply"], label="Upload Point Cloud Files (.ply)", type="filepath")
with gr.Row():
show_colors = gr.Checkbox(label="Show Colored Models", value=False)
show_outlines = gr.Checkbox(label="Show Bounding Box Outlines", value=True)
point_density = gr.Slider(1000, 500000, 10000, step=1000, label="Point Density (fewer points = faster rendering)")
orientation = gr.Dropdown(["+z","-z","+y","-y"], value="+z", label="Model Orientation (up direction)")
point_size = gr.Slider(0.001, 0.05, 0.005, step=0.001, label="Point Size")
submit_btn = gr.Button("Align Point Clouds", variant="primary")
with gr.Column(scale=2):
output_model = gr.Model3D(label="Aligned Point Clouds", clear_color=[0.1,0.1,0.1,1.0])
output_text = gr.Textbox(label="Status")
submit_btn.click(process_upload,
[file_input, show_colors, show_outlines, point_density, orientation, point_size],
[output_model, output_text])
show_colors.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model])
show_outlines.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model])
point_density.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model])
orientation.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model])
point_size.change(update_visualization_only, [file_input, show_colors, show_outlines, point_density, orientation, point_size], [output_model])
if __name__ == "__main__":
print(f"Using {multiprocessing.cpu_count()} CPU cores for parallel processing")
app.launch(share=True)