import gradio as gr import os import numpy as np import trimesh import torch import time import spaces import cv2 import shutil from datetime import datetime import glob from einops import rearrange # Local imports from geometry_utils import ( Coord2zup, extract_and_align_ground_plane, pad_0001, T_to_C, im_distance_to_im_depth, im_depth_to_point_cloud, ) # VGGT specific imports from vggt.utils.load_fn import load_and_preprocess_images from vggt.utils.pose_enc import pose_encoding_to_extri_intri from vggt.utils.geometry import unproject_depth_map_to_point_map from vggt.models.vggt import VGGT import utonia utonia.utils.set_seed(53124) utonia_model = utonia.load("utonia", repo_id="Pointcept/Utonia") VGGT_model = VGGT() _URL = "https://huggingface.co/facebook/VGGT-1B/resolve/main/model.pt" VGGT_model.load_state_dict(torch.hub.load_state_dict_from_url(_URL)) @spaces.GPU def _gpu_run_vggt_inference(images_tensor): """ GPU-only function: Run VGGT model inference on preprocessed images. Minimizes GPU time by only doing model inference and pose encoding conversion. """ global VGGT_model device = "cuda" if torch.cuda.is_available() else "cpu" # Move images to GPU images_tensor = images_tensor.to(device) model = VGGT_model.to(device) model.eval() print("Running inference...") with torch.no_grad(): if device == "cuda": with torch.cuda.amp.autocast(dtype=torch.bfloat16): predictions = model(images_tensor) else: predictions = model(images_tensor) # Convert pose encoding to extrinsic and intrinsic matrices (GPU operation) print("Converting pose encoding to extrinsic and intrinsic matrices...") extrinsic, intrinsic = pose_encoding_to_extri_intri(predictions["pose_enc"], images_tensor.shape[-2:]) predictions["extrinsic"] = extrinsic predictions["intrinsic"] = intrinsic # Convert to numpy (still on GPU to minimize memory transfer) for key in predictions.keys(): if isinstance(predictions[key], torch.Tensor): predictions[key] = predictions[key].cpu().numpy().squeeze(0) torch.cuda.empty_cache() return predictions def run_model(target_dir) -> dict: """ CPU-GPU hybrid: Handle CPU-intensive file I/O and call GPU function for inference. """ print(f"Processing images from {target_dir}") # Load and preprocess images (CPU) image_names = glob.glob(os.path.join(target_dir, "images", "*")) image_names = sorted(image_names) print(f"Found {len(image_names)} images") if len(image_names) == 0: raise ValueError("No images found. Check your upload.") images = load_and_preprocess_images(image_names) print(f"Preprocessed images shape: {images.shape}") # Call GPU function for inference predictions = _gpu_run_vggt_inference(images) # Post-processing (CPU) print("Computing world points from depth map...") depth_map = predictions["depth"] # (S, H, W, 1) world_points = unproject_depth_map_to_point_map(depth_map, predictions["extrinsic"], predictions["intrinsic"]) predictions["world_points_from_depth"] = world_points return predictions def parse_frames( target_dir, conf_thres=3.0, prediction_mode="Pointmap Regression", ): """ Perform reconstruction using the already-created target_dir/images. """ if not os.path.isdir(target_dir) or target_dir == "None": return None, "No valid target directory found. Please upload first.", None, None start_time = time.time() # Prepare frame_filter dropdown target_dir_images = os.path.join(target_dir, "images") all_files = sorted(os.listdir(target_dir_images)) if os.path.isdir(target_dir_images) else [] all_files = [f"{i}: {filename}" for i, filename in enumerate(all_files)] print("Running run_model...") with torch.no_grad(): predictions = run_model(target_dir) # Save predictions prediction_save_path = os.path.join(target_dir, "predictions.npz") np.savez(prediction_save_path, **predictions) # Convert pose encoding to extrinsic and intrinsic matrices images = predictions["images"] Ts, Ks = predictions["extrinsic"],predictions["intrinsic"] Ts = pad_0001(Ts) Ts_inv = np.linalg.inv(Ts) Cs = np.array([T_to_C(T) for T in Ts]) # (n, 3) # [1, 8, 294, 518, 3] world_points = predictions["world_points"] # Compute view direction for each pixel # (b n h w c) - (n, 3) view_dirs = world_points - rearrange(Cs, "n c -> n 1 1 c") view_dirs = rearrange(view_dirs, "n h w c -> (n h w) c") view_dirs = view_dirs / np.linalg.norm(view_dirs, axis=-1, keepdims=True) # Extract points and colors # [1, 8, 3, 294, 518] img_num = world_points.shape[1] images = predictions["images"] points = rearrange(world_points, "n h w c -> (n h w) c") colors = rearrange(images, "n c h w -> (n h w) c") normals = np.zeros_like(points) if prediction_mode=="Pointmap Branch": world_points_conf = predictions["world_points_conf"] conf = world_points_conf.reshape(-1) points,Ts_inv,_ = Coord2zup(points, Ts_inv) scale = 3 / (points[:, 2].max() - points[:, 2].min()) points *= scale Ts_inv[:, :3, 3] *= scale normals = -np.asarray(view_dirs) normals = normals / np.clip(np.linalg.norm(normals, axis=-1, keepdims=True), 1e-8, None) if conf_thres == 0.0: conf_threshold = 0.0 else: conf_threshold = np.percentile(conf, conf_thres) conf_mask = (conf >= conf_threshold) & (conf > 1e-5) points = points[conf_mask] colors = colors[conf_mask] normals = normals[conf_mask] try: points, colors, normals, _, _, _ = extract_and_align_ground_plane( points=points, colors=colors, normals=normals, ) except Exception as e: print(f"cannot find ground, err:{e}") elif prediction_mode=="Depthmap Branch": # Integrate RGBD images into a TSDF volume and extract a mesh # (n, h, w, 3) im_colors = rearrange(images, "n c h w -> (n) h w c") # (b, n, h, w, 3) im_dists = world_points - rearrange(Cs, "n c -> n 1 1 c") im_dists = np.linalg.norm(im_dists, axis=-1, keepdims=False) # Convert distance to depth im_depths = [] # (n, h, w, c) for im_dist, K in zip(im_dists, Ks): im_depth = im_distance_to_im_depth(im_dist, K) im_depths.append(im_depth) im_depths = np.stack(im_depths, axis=0) points=[] for K, T, im_depth in zip(Ks, Ts, im_depths): point = im_depth_to_point_cloud( im_depth=im_depth, K=K, T=T, to_image=False, ignore_invalid=False, ) points.append(point) points = np.vstack(points) colors = im_colors.reshape(-1,3) world_points_conf = predictions["depth_conf"] conf = world_points_conf.reshape(-1) if conf_thres == 0.0: conf_threshold = 0.0 else: conf_threshold = np.percentile(conf, conf_thres) conf_mask = (conf >= conf_threshold) & (conf > 1e-5) points = points[conf_mask] colors = colors[conf_mask] points,Ts_inv,_ = Coord2zup(points, Ts_inv) scale_factor = 3./(np.max(points[:,2])-np.min(points[:,2])) points *= scale_factor Ts_inv[:, :3, 3] *= scale_factor normals = np.zeros_like(points) try: points, colors, normals, _, _, _ = extract_and_align_ground_plane( points=points, colors=colors, normals=normals, ) except Exception as e: print(f"cannot find ground, err:{e}") original_points = np.asarray(points) original_colors = np.asarray(colors) original_normals = np.asarray(normals) # Cleanup del predictions end_time = time.time() print(f"Total time: {end_time - start_time:.2f} seconds") return original_points, original_colors, original_normals def handle_uploads(input_file,input_video,conf_thres,frame_slider,prediction_mode): """ Create a new 'target_dir' + 'images' subfolder, and place user-uploaded images or extracted frames from video into it. Return (target_dir, image_paths). """ start_time = time.time() # Create a unique folder name timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") target_dir = f"demo_output/inputs_{timestamp}" target_dir_images = os.path.join(target_dir, "images") target_dir_pcds = os.path.join(target_dir, "pcds") # Clean up if somehow that folder already exists if os.path.exists(target_dir): shutil.rmtree(target_dir) os.makedirs(target_dir) os.makedirs(target_dir_images) os.makedirs(target_dir_pcds) # Handle video if input_video is not None: print("processing video") if isinstance(input_video, dict) and "name" in input_video: video_path = input_video["name"] else: video_path = input_video vs = cv2.VideoCapture(video_path) fps = vs.get(cv2.CAP_PROP_FPS) frame_interval = int(fps * frame_slider) # 1 frame/sec count = 0 video_frame_num = 0 image_paths = [] while True: gotit, frame = vs.read() if not gotit: break count += 1 if count % frame_interval == 0: image_path = os.path.join(target_dir_images, f"{video_frame_num:06}.png") cv2.imwrite(image_path, frame) image_paths.append(image_path) video_frame_num += 1 # Sort final images for gallery image_paths = sorted(image_paths) original_points, original_colors, original_normals = parse_frames(target_dir,conf_thres,prediction_mode) if input_file is not None: print("processing ply") loaded = load_point_from_file(input_file) if loaded is None: raise ValueError("Failed to load input point cloud file") original_points = loaded["coord"] original_colors = loaded["color"] original_normals = loaded["normal"] image_paths = None scene_3d = trimesh.Scene() point_cloud_data = trimesh.PointCloud(vertices=original_points, colors=original_colors, vertex_normals=original_normals) scene_3d.add_geometry(point_cloud_data) original_temp = os.path.join(target_dir_pcds,"original.glb") scene_3d.export(file_obj=original_temp) np.save(os.path.join(target_dir_pcds, f"points.npy"), original_points) np.save(os.path.join(target_dir_pcds, f"colors.npy"), original_colors) np.save(os.path.join(target_dir_pcds, f"normals.npy"), original_normals) end_time = time.time() print(f"Files copied to {target_dir}; took {end_time - start_time:.3f} seconds") return target_dir, image_paths,original_temp, end_time - start_time def load_point_from_file(input_file): if input_file is None: return None file_path = input_file if hasattr(input_file, "name"): file_path = input_file.name elif isinstance(input_file, dict) and "name" in input_file: file_path = input_file["name"] if not file_path: return None geometry = trimesh.load(file_path, process=False) if isinstance(geometry, trimesh.Scene): geometries = [g for g in geometry.geometry.values()] if not geometries: return None geometry = geometries[0] if isinstance(geometry, trimesh.PointCloud): coord = np.asarray(geometry.vertices) color = np.asarray(geometry.colors[:, :3]) if geometry.colors is not None and len(geometry.colors) else np.zeros_like(coord) normal = np.zeros_like(coord) if color.dtype != np.float32 and color.dtype != np.float64: color = color.astype(np.float32) / 255.0 return {"coord": coord, "color": color, "normal": normal} if isinstance(geometry, trimesh.Trimesh): coord = np.asarray(geometry.vertices) if geometry.visual is not None and hasattr(geometry.visual, "vertex_colors") and geometry.visual.vertex_colors is not None and len(geometry.visual.vertex_colors): color = np.asarray(geometry.visual.vertex_colors[:, :3]).astype(np.float32) / 255.0 else: color = np.zeros_like(coord) normal = np.asarray(geometry.vertex_normals) if geometry.vertex_normals is not None and len(geometry.vertex_normals) else np.zeros_like(coord) return {"coord": coord, "color": color, "normal": normal} return None def update_gallery_on_upload(input_file,input_video,conf_thres,frame_slider,prediction_mode): """ Whenever user uploads or changes files, immediately handle them and show in the gallery. Return (target_dir, image_paths). If nothing is uploaded, returns "None" and empty list. """ if not input_video and not input_file: return None, None, None, None target_dir, image_paths,original_view, reconstruction_time = handle_uploads(input_file,input_video,conf_thres,frame_slider,prediction_mode) if input_file is not None: return original_view, target_dir, [], f"Upload and preprocess complete with {reconstruction_time:.3f} sec. Click \"PCA Generate\" to begin PCA processing." if input_video is not None: return original_view, target_dir, image_paths, f"Upload and preprocess complete with {reconstruction_time:.3f} sec. Click \"PCA Generate\" to begin PCA processing." def get_pca_color(feat, start = 0, brightness=1.25, center=True): u, s, v = torch.pca_lowrank(feat, center=center, q=3*(start+1), niter=5) projection = feat @ v projection = projection[:, 3*start:3*(start+1)] * 0.6 + projection[:, 3*start:3*(start+1)] * 0.4 min_val = projection.min(dim=-2, keepdim=True)[0] max_val = projection.max(dim=-2, keepdim=True)[0] div = torch.clamp(max_val - min_val, min=1e-6) color = (projection - min_val) / div * brightness color = color.clamp(0.0, 1.0) return color def clear_fields(): """ Clears the 3D viewer, the stored target_dir, and empties the gallery. """ return None def PCAing_log(is_example, log_output): """ Display a quick log message while waiting. """ if is_example: return log_output return "Loading for Doing PCA..." def reset_log(): """ Reset a quick log message. """ return "A new point cloud file or video is uploading and preprocessing..." @spaces.GPU def _gpu_utonia_forward_pca(point, utonia_model_, pca_slider, bright_slider): """ GPU-only function: Run Utonia model forward pass and PCA in one place. Uses inference_mode overall with a scoped disable for the forward call. """ device = "cuda" if torch.cuda.is_available() else "cpu" # Move tensors and model to GPU for key in point.keys(): if isinstance(point[key], torch.Tensor): point[key] = point[key].to(device, non_blocking=True) utonia_model_ = utonia_model_.to(device) utonia_model_.eval() with torch.inference_mode(): utonia_start_time = time.time() # Disable inference_mode for model forward to avoid version counter issues with torch.inference_mode(False): point = utonia_model_(point) utonia_end_time = time.time() # Upcast point feature through hierarchical pooling for _ in range(4): assert "pooling_parent" in point.keys() assert "pooling_inverse" in point.keys() parent = point.pop("pooling_parent") inverse = point.pop("pooling_inverse") parent.feat = torch.cat([parent.feat, point.feat[inverse]], dim=-1) point = parent while "pooling_parent" in point.keys(): assert "pooling_inverse" in point.keys() parent = point.pop("pooling_parent") inverse = point.pop("pooling_inverse") parent.feat = point.feat[inverse] point = parent pca_start_time = time.time() pca_color = get_pca_color(point.feat, start=pca_slider, brightness=bright_slider, center=True) pca_end_time = time.time() # Inverse back to original scale original_pca_color = pca_color[point.inverse] processed_colors = original_pca_color.cpu().detach().numpy() point_feat = point.feat.cpu().detach().numpy() point_inverse = point.inverse.cpu().detach().numpy() utonia_time = utonia_end_time - utonia_start_time pca_time = pca_end_time - pca_start_time return processed_colors, point_feat, point_inverse, utonia_time, pca_time def gradio_demo(target_dir, pca_slider, bright_slider, if_color=True, if_normal=True, scale_value=1.0, apply_z_positive=True, normalize_coord=False): global utonia_model target_dir_pcds = os.path.join(target_dir, "pcds") if not os.path.isfile(os.path.join(target_dir_pcds, "points.npy")): return None, "No point cloud available. Please upload data first." # CPU: Load point cloud data from disk original_points = np.load(os.path.join(target_dir_pcds, "points.npy")) if if_color: original_colors = np.load(os.path.join(target_dir_pcds, "colors.npy")) else: original_colors = np.zeros_like(original_points) if if_normal: original_normals = np.load(os.path.join(target_dir_pcds, "normals.npy")) else: original_normals = np.zeros_like(original_points) processed_temp = os.path.join(target_dir_pcds, "processed.glb") point = {"coord": original_points, "color": original_colors, "normal": original_normals} original_coord = point["coord"].copy() # CPU: Apply transform pipeline transform = utonia.transform.default(scale=scale_value, apply_z_positive=apply_z_positive, normalize_coord=normalize_coord) point = transform(point) # GPU: Run Utonia forward + PCA together (inference_mode inside GPU function) processed_colors, point_feat, point_inverse_cpu, utonia_time, pca_time = _gpu_utonia_forward_pca( point, utonia_model, pca_slider, bright_slider ) # CPU: Save features np.save(os.path.join(target_dir_pcds, "feat.npy"), point_feat) np.save(os.path.join(target_dir_pcds, "inverse.npy"), point_inverse_cpu) # CPU: Build and save the 3D mesh processed_points = original_coord feat_3d = trimesh.Scene() feat_data = trimesh.PointCloud(vertices=processed_points, colors=processed_colors, vertex_normals=original_normals) feat_3d.add_geometry(feat_data) feat_3d.export(processed_temp) return processed_temp, f"Feature visualization process finished with {utonia_time:.3f} seconds using utonia inference and {pca_time:.3f} seconds using PCA. Updating visualization." @spaces.GPU def _gpu_pca_slider_compute(feat_array, inverse_array, pca_slider, bright_slider): """ GPU-only function: Compute PCA colors for slider updates. Minimal GPU allocation for only the essential computation. """ device = "cuda" if torch.cuda.is_available() else "cpu" # Move data to GPU inside GPU function feat_tensor = torch.tensor(feat_array, device=device) inverse_tensor = torch.tensor(inverse_array, device=device) pca_start_time = time.time() pca_colors = get_pca_color(feat_tensor, start=pca_slider, brightness=bright_slider, center=True) processed_colors = pca_colors[inverse_tensor].cpu().detach().numpy() pca_end_time = time.time() return processed_colors, (pca_end_time - pca_start_time) def utonia_slider_update(target_dir, pca_slider, bright_slider, is_example, log_output): """ CPU-GPU hybrid: Handle file I/O on CPU, GPU for PCA computation only. """ if is_example == "True": return None, log_output else: target_dir_pcds = os.path.join(target_dir, "pcds") if os.path.isfile(os.path.join(target_dir_pcds, "feat.npy")): # CPU: Load data from disk feat = np.load(os.path.join(target_dir_pcds, "feat.npy")) inverse = np.load(os.path.join(target_dir_pcds, "inverse.npy")) # GPU: Compute PCA colors only (numpy arrays passed to GPU function) processed_colors, pca_time = _gpu_pca_slider_compute(feat, inverse, pca_slider, bright_slider) # CPU: Load additional data and build mesh processed_points = np.load(os.path.join(target_dir_pcds, "points.npy")) processed_normals = np.load(os.path.join(target_dir_pcds, "normals.npy")) processed_temp = os.path.join(target_dir_pcds, "processed.glb") feat_3d = trimesh.Scene() feat_data = trimesh.PointCloud(vertices=processed_points, colors=processed_colors, vertex_normals=processed_normals) feat_3d.add_geometry(feat_data) feat_3d.export(processed_temp) log_output = f"Feature visualization process finished with {pca_time:.3f} seconds using PCA. Updating visualization." else: processed_temp = None log_output = "No representations saved, please click PCA generate first." return processed_temp, log_output BASE_URL = "https://huggingface.co/datasets/pointcept-bot/utonia_huggingface_demo/resolve/main/" def get_url(path): return f"{BASE_URL}{path}" examples_object = [ [ get_url("object/0005df571e71437991594d0affec9c2b.png"), get_url("object/0005df571e71437991594d0affec9c2b.ply"), 0, 1.2, "True", 1.0, True ], [ get_url("object/0023687e90394c3e97ab19b0160cafb3.png"), get_url("object/0023687e90394c3e97ab19b0160cafb3.ply"), 0, 1.2, "True", 1.0, True ], [ get_url("object/0015eb3cf53b4339b2d0532cf912ab26.png"), get_url("object/0015eb3cf53b4339b2d0532cf912ab26.ply"), 0, 1.2, "True", 1.0, True ], [ get_url("object/001a5201eddf4f3b98591598584673f5.png"), get_url("object/001a5201eddf4f3b98591598584673f5.ply"), 0, 1.2, "True", 1.0, True ], ] examples_manipulation = [ [ get_url("manipulation/000021_AUTOLab_5d05c5aa_2023-11-17-23h-40m-52s-35_46.png"), get_url("manipulation/000021_AUTOLab_5d05c5aa_2023-11-17-23h-40m-52s-35_46.ply"), 0, 1.2, "True", 4.0, False ], [ get_url("manipulation/000018_AUTOLab_44bb9c36_2023-11-23-20h-05m-45s-55_66.png"), get_url("manipulation/000018_AUTOLab_44bb9c36_2023-11-23-20h-05m-45s-55_66.ply"), 1, 1.0, "True", 4.0, False ], [ get_url("manipulation/000037_IPRL_7790ec0a_2023-07-01-09h-37m-21s-15_26.png"), get_url("manipulation/000037_IPRL_7790ec0a_2023-07-01-09h-37m-21s-15_26.ply"), 0, 1.2, "True", 4.0, False ], [ get_url("manipulation/000061_TRI_938130c4_2023-08-10-14h-40m-11s-70_81.png"), get_url("manipulation/000061_TRI_938130c4_2023-08-10-14h-40m-11s-70_81.ply"), 2, 1.2, "True", 4.0, False ], ] examples_indoor = [ [ get_url("indoor/scene0024_00.png"), get_url("indoor/scene0024_00.ply"), 0, 1.0, "True", 0.5, False ], [ get_url("indoor/scene0603_00.png"), get_url("indoor/scene0603_00.ply"), 0, 1.0, "True", 0.5, False ], [ get_url("indoor/027cd6ea0f.png"), get_url("indoor/027cd6ea0f.ply"), 0, 1.0, "True", 0.5, False ], [ get_url("indoor/2c7c10379b.png"), get_url("indoor/2c7c10379b.ply"), 3, 1.0, "True", 0.5, False ], ] examples_outdoor = [ [ get_url("outdoor/segment-10455472356147194054_1560_000_1580_000_with_camera_labels.png"), get_url("outdoor/segment-10455472356147194054_1560_000_1580_000_with_camera_labels.ply"), 1, 1.2, "True", 0.2, False ], [ get_url("outdoor/segment-10963653239323173269_1924_000_1944_000_with_camera_labels.png"), get_url("outdoor/segment-10963653239323173269_1924_000_1944_000_with_camera_labels.ply"), 0, 1.2, "True", 0.2, False ], [ get_url("outdoor/segment-11718898130355901268_2300_000_2320_000_with_camera_labels.png"), get_url("outdoor/segment-11718898130355901268_2300_000_2320_000_with_camera_labels.ply"), 0, 1.2, "True", 0.2, False ], [ get_url("outdoor/segment-11925224148023145510_1040_000_1060_000_with_camera_labels.png"), get_url("outdoor/segment-11925224148023145510_1040_000_1060_000_with_camera_labels.ply"), 0, 1.2, "True", 0.2, False ], ] examples_video = [ [ get_url("video/re10k_1.mp4"), 10.0, 1, "Depthmap Branch", 2, 1.2, "True", 0.5, False ], [ get_url("video/re10k_2.mp4"), 20.0, 1, "Pointmap Branch", 2, 1.2, "True", 0.5, False ], [ get_url("video/re10k_3.mp4"), 10.0, 1, "Pointmap Branch", 1, 1.2, "True", 0.5, False ], [ get_url("video/re10k_4.mp4"), 10.0, 1, "Pointmap Branch", 1, 1., "True", 0.5, False ], ] def example_file_updated( preview_imgs, inputs, pca_slider, bright_slider, is_example, scale_slider, normalize_coord, url_input, ): pass def example_video_updated( inputs, conf_thres, frame_slider, prediction_mode, pca_slider, bright_slider, is_example, scale_slider, normalize_coord, url_input, ): pass with gr.Blocks( css=""" .custom-log * { font-style: italic; font-size: 22px !important; background-image: linear-gradient(120deg, #0ea5e9 0%, #6ee7b7 60%, #34d399 100%); -webkit-background-clip: text; background-clip: text; font-weight: bold !important; color: transparent !important; text-align: center !important; width: 800px; height: 100px; } .example-log * { font-style: italic; font-size: 16px !important; background-image: linear-gradient(120deg, #0ea5e9 0%, #6ee7b7 60%, #34d399 100%); -webkit-background-clip: text; background-clip: text; color: transparent !important; } .common-markdown * { font-size: 22px !important; -webkit-background-clip: text; background-clip: text; font-weight: bold !important; color: #0ea5e9 !important; text-align: center !important; } #big-box { border: 3px solid #00bcd4; padding: 20px; background-color: transparent; border-radius: 15px; } #my_radio .wrap { display: flex; flex-wrap: nowrap; justify-content: center; align-items: center; } #my_radio .wrap label { display: flex; width: 50%; justify-content: center; align-items: center; margin: 0; padding: 10px 0; box-sizing: border-box; } """, ) as demo: gr.HTML( """