""" Insta360 3D Reconstruction - Hugging Face Space Version Optimized for longer videos with intelligent frame sampling Supports ZeroGPU for faster processing """ import gradio as gr import numpy as np import torch from PIL import Image from transformers import DPTForDepthEstimation, DPTImageProcessor import open3d as o3d import plotly.graph_objects as go import cv2 import tempfile from pathlib import Path import time import warnings from scipy import ndimage from scipy.ndimage import gaussian_filter import spaces # For ZeroGPU support warnings.filterwarnings('ignore') # Load model print("šŸ”„ Loading depth estimation model...") try: dpt_processor = DPTImageProcessor.from_pretrained("Intel/dpt-large") dpt_model = DPTForDepthEstimation.from_pretrained("Intel/dpt-large") # Don't move to CUDA here - ZeroGPU will handle it in decorated functions dpt_model.eval() print("āœ… Model loaded successfully! (ZeroGPU will handle GPU allocation)") except Exception as e: print(f"āŒ Error loading model: {e}") dpt_processor = None dpt_model = None # Enhanced depth processing functions def bilateral_filter_depth(depth_map, d=9, sigma_color=75, sigma_space=75): """Apply bilateral filter to preserve edges while smoothing depth""" depth_norm = ((depth_map - depth_map.min()) / (depth_map.max() - depth_map.min()) * 255).astype(np.uint8) filtered = cv2.bilateralFilter(depth_norm, d, sigma_color, sigma_space) filtered = filtered.astype(np.float32) / 255.0 filtered = filtered * (depth_map.max() - depth_map.min()) + depth_map.min() return filtered def multi_scale_depth_refinement(depth_map, scales=[1.0, 0.5]): """Process depth at multiple scales and fuse""" h, w = depth_map.shape refined_depths = [] weights = [] for scale in scales: if scale == 1.0: scaled_depth = depth_map else: new_h, new_w = int(h * scale), int(w * scale) scaled_depth = cv2.resize(depth_map, (new_w, new_h), interpolation=cv2.INTER_LINEAR) scaled_depth = cv2.resize(scaled_depth, (w, h), interpolation=cv2.INTER_LINEAR) filtered_depth = bilateral_filter_depth(scaled_depth) refined_depths.append(filtered_depth) weights.append(scale) weights = np.array(weights) weights = weights / weights.sum() final_depth = np.zeros_like(depth_map) for depth, weight in zip(refined_depths, weights): final_depth += depth * weight return final_depth def estimate_depth_confidence(depth_map): """Estimate confidence map based on depth consistency""" grad_x = cv2.Sobel(depth_map, cv2.CV_64F, 1, 0, ksize=3) grad_y = cv2.Sobel(depth_map, cv2.CV_64F, 0, 1, ksize=3) grad_mag = np.sqrt(grad_x**2 + grad_y**2) confidence = 1.0 / (1.0 + grad_mag / grad_mag.max()) confidence = gaussian_filter(confidence, sigma=2) return confidence def intelligent_frame_sampling(video_path, target_frames=6, max_frames=100): """ Intelligently sample frames from video based on motion and content For long videos, this prevents processing too many similar frames """ cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) duration = total_frames / fps if fps > 0 else 0 # For very long videos, sample more intelligently if duration > 120: # 2 minutes # Sample every N seconds instead of uniformly sample_interval = max(int(fps * 15), 1) # Every 15 seconds frame_indices = list(range(0, total_frames, sample_interval)) else: # Uniform sampling frame_indices = np.linspace(0, total_frames - 1, min(target_frames, total_frames), dtype=int) # Limit to max_frames to prevent timeout if len(frame_indices) > max_frames: frame_indices = frame_indices[::len(frame_indices)//max_frames][:max_frames] cap.release() return frame_indices, total_frames, fps, duration def extract_frames_smart(video_path, target_frames=6): """Extract frames intelligently based on video length""" frame_indices, total_frames, fps, duration = intelligent_frame_sampling(video_path, target_frames) cap = cv2.VideoCapture(video_path) frames = [] for idx in frame_indices: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ret, frame = cap.read() if ret: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(frame_rgb) cap.release() info = { 'total_frames': total_frames, 'extracted_frames': len(frames), 'fps': fps, 'duration': duration, 'frame_indices': frame_indices } return frames, info def equirectangular_to_perspective(equirect_img, fov=90, theta=0, phi=0, height=512, width=512): """Convert equirectangular image to perspective view""" equ_h, equ_w = equirect_img.shape[:2] y, x = np.meshgrid(np.arange(height), np.arange(width), indexing='ij') x_norm = (2.0 * x / width - 1.0) y_norm = (2.0 * y / height - 1.0) fov_rad = np.radians(fov) focal = 0.5 * width / np.tan(0.5 * fov_rad) z_cam = focal x_cam = x_norm * width / 2.0 y_cam = y_norm * height / 2.0 norm = np.sqrt(x_cam**2 + y_cam**2 + z_cam**2) x_cam /= norm y_cam /= norm z_cam /= norm theta_rad = np.radians(theta) phi_rad = np.radians(phi) rot_y = np.array([ [np.cos(theta_rad), 0, np.sin(theta_rad)], [0, 1, 0], [-np.sin(theta_rad), 0, np.cos(theta_rad)] ]) rot_x = np.array([ [1, 0, 0], [0, np.cos(phi_rad), -np.sin(phi_rad)], [0, np.sin(phi_rad), np.cos(phi_rad)] ]) rot = rot_y @ rot_x rays = np.stack([x_cam, y_cam, z_cam], axis=-1) rays_rot = rays @ rot.T x_rot = rays_rot[..., 0] y_rot = rays_rot[..., 1] z_rot = rays_rot[..., 2] lon = np.arctan2(x_rot, z_rot) lat = np.arcsin(np.clip(y_rot, -1, 1)) equ_x = (lon / np.pi + 1) * 0.5 * (equ_w - 1) equ_y = (0.5 - lat / np.pi) * (equ_h - 1) equ_x = np.clip(equ_x, 0, equ_w - 1) equ_y = np.clip(equ_y, 0, equ_h - 1) perspective_img = np.zeros((height, width, equirect_img.shape[2]), dtype=equirect_img.dtype) for c in range(equirect_img.shape[2]): perspective_img[..., c] = ndimage.map_coordinates( equirect_img[..., c], [equ_y, equ_x], order=1, mode='wrap' ) return perspective_img @spaces.GPU # ZeroGPU decorator for GPU acceleration def estimate_depth_enhanced(image, processor, model): """Enhanced depth estimation with multi-scale processing""" inputs = processor(images=image, return_tensors="pt") # ZeroGPU automatically handles device placement with torch.no_grad(): outputs = model(**inputs) predicted_depth = outputs.predicted_depth prediction = torch.nn.functional.interpolate( predicted_depth.unsqueeze(1), size=image.shape[:2], mode="bicubic", align_corners=False, ) depth_map = prediction.squeeze().cpu().numpy() depth_map = multi_scale_depth_refinement(depth_map) confidence = estimate_depth_confidence(depth_map) return depth_map, confidence def depth_to_point_cloud_enhanced(depth, color, confidence, camera_params): """Enhanced point cloud generation with confidence weighting""" height, width = depth.shape fx, fy = camera_params['fx'], camera_params['fy'] cx, cy = camera_params['cx'], camera_params['cy'] R_matrix = camera_params.get('R', np.eye(3)) t_vector = camera_params.get('t', np.zeros(3)) u, v = np.meshgrid(np.arange(width), np.arange(height)) z = depth x = (u - cx) * z / fx y = (v - cy) * z / fy points_cam = np.stack([x, y, z], axis=-1) points_world = points_cam @ R_matrix.T + t_vector conf_threshold = np.percentile(confidence, 30) valid_mask = confidence > conf_threshold points = points_world[valid_mask] colors = color[valid_mask] return points, colors def create_realistic_mesh(points, colors, progress_callback): """Create high-quality mesh using Poisson reconstruction""" progress_callback("šŸŽØ Creating realistic mesh...") pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points) pcd.colors = o3d.utility.Vector3dVector(colors / 255.0) progress_callback(" • Removing outliers...") pcd, _ = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) progress_callback(" • Estimating normals...") pcd.estimate_normals( search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30) ) pcd.orient_normals_consistent_tangent_plane(k=15) progress_callback(" • Performing Poisson reconstruction...") mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson( pcd, depth=9, width=0, scale=1.1, linear_fit=False ) progress_callback(" • Cleaning mesh...") densities = np.asarray(densities) density_threshold = np.percentile(densities, 10) vertices_to_remove = densities < density_threshold mesh.remove_vertices_by_mask(vertices_to_remove) mesh = mesh.filter_smooth_simple(number_of_iterations=5) mesh.compute_vertex_normals() # Transfer colors mesh_points = np.asarray(mesh.vertices) pcd_tree = o3d.geometry.KDTreeFlann(pcd) pcd_colors = np.asarray(pcd.colors) mesh_colors = np.zeros_like(mesh_points) for i, point in enumerate(mesh_points): [_, idx, _] = pcd_tree.search_knn_vector_3d(point, 1) mesh_colors[i] = pcd_colors[idx[0]] mesh.vertex_colors = o3d.utility.Vector3dVector(mesh_colors) return mesh def process_video(video_path, num_frames, num_views, quality, progress=gr.Progress()): """Main processing function optimized for Hugging Face""" if dpt_model is None: return None, None, None, "āŒ Model not loaded properly", None if video_path is None: return None, None, None, "āŒ Please upload a video first", None status = [] start_time = time.time() def update_status(msg): status.append(msg) progress(0.1, desc=msg) return "\n".join(status) try: status_text = update_status("="*60) status_text = update_status("šŸŽ¬ STARTING REALISTIC 3D RECONSTRUCTION") status_text = update_status("="*60) # Check video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, None, None, "āŒ Cannot open video file", None total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) duration = total_frames / fps if fps > 0 else 0 cap.release() status_text = update_status(f"\nšŸ“¹ Video Info:") status_text = update_status(f" • Duration: {duration:.1f}s ({total_frames} frames)") status_text = update_status(f" • FPS: {fps:.1f}") # Warn about long videos if duration > 300: status_text = update_status(f"\nāš ļø WARNING: Very long video ({duration:.0f}s)") status_text = update_status(f" • Processing will be slower") status_text = update_status(f" • Consider using a shorter clip") # Extract frames intelligently status_text = update_status(f"\nšŸ“¹ Extracting frames intelligently...") frames, video_info = extract_frames_smart(video_path, num_frames) if not frames: return None, None, None, "āŒ Failed to extract frames", None status_text = update_status(f"āœ… Extracted {len(frames)} frames") status_text = update_status(f" • Sampling strategy: {'Intelligent (long video)' if duration > 120 else 'Uniform'}") preview_img = Image.fromarray(frames[0]) # Quality settings quality_configs = { 'low': {'resolution': 384, 'fov': 90}, 'medium': {'resolution': 512, 'fov': 90}, 'high': {'resolution': 640, 'fov': 85} } config = quality_configs[quality] status_text = update_status(f"\nāš™ļø Settings: {len(frames)} frames Ɨ {num_views} views Ɨ {config['resolution']}px") # Process frames all_points = [] all_colors = [] total_views = len(frames) * num_views processed_views = 0 for frame_idx, frame in enumerate(frames): progress((frame_idx + 1) / len(frames), desc=f"Processing frame {frame_idx+1}/{len(frames)}") status_text = update_status(f"\nšŸ“ Frame {frame_idx + 1}/{len(frames)}:") # Generate view angles view_angles = [(360.0 / num_views * i, 0) for i in range(num_views)] frame_points = [] frame_colors = [] for view_idx, (theta, phi) in enumerate(view_angles): # Convert to perspective persp_img = equirectangular_to_perspective( frame, fov=config['fov'], theta=theta, phi=phi, height=config['resolution'], width=config['resolution'] ) # Depth estimation depth_map, confidence = estimate_depth_enhanced(persp_img, dpt_processor, dpt_model) # Camera params focal = config['resolution'] / (2 * np.tan(np.radians(config['fov']) / 2)) from scipy.spatial.transform import Rotation as R rot = R.from_euler('yz', [theta, phi], degrees=True) R_matrix = rot.as_matrix() camera_params = { 'fx': focal, 'fy': focal, 'cx': config['resolution'] / 2, 'cy': config['resolution'] / 2, 'R': R_matrix, 't': np.zeros(3) } # Generate points points, colors = depth_to_point_cloud_enhanced( depth_map, persp_img, confidence, camera_params ) frame_points.append(points) frame_colors.append(colors) processed_views += 1 if (view_idx + 1) % 2 == 0: status_text = update_status(f" • Processed {view_idx + 1}/{num_views} views") all_points.append(np.vstack(frame_points)) all_colors.append(np.vstack(frame_colors)) # Combine all status_text = update_status(f"\nšŸ”— Combining {len(frames)} frames...") final_points = np.vstack(all_points) final_colors = np.vstack(all_colors) status_text = update_status(f"āœ… Total points: {len(final_points):,}") # Filter status_text = update_status(f"\nšŸŽÆ Filtering and cleaning...") # Remove duplicates unique_indices = np.unique(final_points, axis=0, return_index=True)[1] final_points = final_points[unique_indices] final_colors = final_colors[unique_indices] # Statistical outlier removal pcd_temp = o3d.geometry.PointCloud() pcd_temp.points = o3d.utility.Vector3dVector(final_points) pcd_temp, inlier_indices = pcd_temp.remove_statistical_outlier(nb_neighbors=30, std_ratio=2.0) final_points = final_points[inlier_indices] final_colors = final_colors[inlier_indices] status_text = update_status(f"āœ… Filtered to {len(final_points):,} points") # Downsample if huge if len(final_points) > 500000: keep_ratio = 500000 / len(final_points) keep_indices = np.random.choice(len(final_points), size=int(len(final_points) * keep_ratio), replace=False) final_points = final_points[keep_indices] final_colors = final_colors[keep_indices] status_text = update_status(f" • Downsampled to {len(final_points):,} points") # Visualization status_text = update_status(f"\nšŸ“Š Creating 3D visualization...") vis_sample = min(50000, len(final_points)) vis_indices = np.random.choice(len(final_points), vis_sample, replace=False) vis_points = final_points[vis_indices] vis_colors = final_colors[vis_indices] fig = go.Figure(data=[go.Scatter3d( x=vis_points[:, 0], y=vis_points[:, 1], z=vis_points[:, 2], mode='markers', marker=dict( size=2, color=[f'rgb({int(c[0])},{int(c[1])},{int(c[2])})' for c in vis_colors], opacity=0.8 ) )]) fig.update_layout( title=f"3D Reconstruction ({len(final_points):,} points)", scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z', aspectmode='data'), height=700 ) # Save point cloud status_text = update_status(f"\nšŸ’¾ Saving outputs...") pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(final_points) pcd.colors = o3d.utility.Vector3dVector(final_colors / 255.0) pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30)) ply_path = Path(tempfile.mkdtemp()) / "reconstruction.ply" o3d.io.write_point_cloud(str(ply_path), pcd) ply_path = str(ply_path) # Convert Path to string for Gradio status_text = update_status(f"āœ… Point cloud saved") # Create mesh obj_path = None elapsed = time.time() - start_time if elapsed < 180: # Only if under 3 minutes so far try: def mesh_progress(msg): nonlocal status_text status_text = update_status(msg) mesh = create_realistic_mesh(final_points, final_colors, mesh_progress) obj_path = Path(tempfile.mkdtemp()) / "reconstruction.obj" o3d.io.write_triangle_mesh(str(obj_path), mesh) obj_path = str(obj_path) # Convert Path to string for Gradio status_text = update_status(f"āœ… Mesh created: {len(mesh.vertices):,} vertices") except Exception as e: status_text = update_status(f"āš ļø Mesh generation failed: {str(e)}") else: status_text = update_status("āš ļø Mesh skipped (time limit)") # Final stats elapsed = time.time() - start_time status_text = update_status(f"\n{'='*60}") status_text = update_status(f"šŸŽ‰ SUCCESS! Completed in {elapsed:.1f}s") status_text = update_status(f"šŸ“Š Final: {len(final_points):,} points") status_text = update_status(f"{'='*60}") return fig, ply_path, obj_path, status_text, preview_img except Exception as e: import traceback error_msg = f"āŒ ERROR: {str(e)}\n\n{traceback.format_exc()}" return None, None, None, error_msg, None # Create Gradio interface with gr.Blocks(title="Insta360 3D Reconstruction", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # šŸŒ Insta360 3D Reconstruction ### Transform 360° videos into realistic 3D models **Optimized for videos of any length** - Uses intelligent frame sampling for longer videos """) gr.Markdown(""" ### āš ļø For 8-Minute Videos: - Processing will take 10-15 minutes - Uses intelligent frame sampling (every 15 seconds) - Recommended: Use lower quality settings first - Consider trimming to 1-2 minutes for faster results """) with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload 360° Video") with gr.Accordion("āš™ļø Settings", open=True): num_frames = gr.Slider( minimum=4, maximum=12, value=6, step=2, label="Target Frames (auto-adjusted for long videos)" ) num_views = gr.Slider( minimum=4, maximum=8, value=6, step=2, label="Views per Frame" ) quality = gr.Radio( choices=['low', 'medium', 'high'], value='medium', label="Quality (Start with 'medium' for 8-min videos)" ) process_btn = gr.Button("šŸš€ Start Reconstruction", variant="primary", size="lg") with gr.Column(): status_output = gr.Textbox(label="Processing Status", lines=20, max_lines=25) preview_output = gr.Image(label="Video Preview") with gr.Row(): visualization_output = gr.Plot(label="3D Visualization") with gr.Row(): ply_output = gr.File(label="šŸ“¦ Download Point Cloud (.ply)") obj_output = gr.File(label="šŸ“¦ Download Mesh (.obj)") process_btn.click( fn=process_video, inputs=[video_input, num_frames, num_views, quality], outputs=[visualization_output, ply_output, obj_output, status_output, preview_output] ) gr.Markdown(""" ### šŸ’” Tips for Best Results **For 8-minute videos:** - Start with Medium quality (faster) - Uses intelligent sampling (~ every 15 seconds) - Total processing: 10-15 minutes - Or trim to 1-2 minutes for 3-5 min processing **Quality Guide:** - **Low**: 2-4 min (quick preview) - **Medium**: 5-10 min (good balance) - **High**: 10-20 min (best quality) **Video Requirements:** - Format: MP4 (equirectangular 360°) - Aspect Ratio: 2:1 - Any length (optimized for long videos) """) if __name__ == "__main__": demo.launch()