Spaces:
Build error
Build error
| """ | |
| Insta360 3D Reconstruction - Hugging Face Space Version | |
| Optimized for longer videos with intelligent frame sampling | |
| Supports ZeroGPU for faster processing | |
| """ | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from transformers import DPTForDepthEstimation, DPTImageProcessor | |
| import open3d as o3d | |
| import plotly.graph_objects as go | |
| import cv2 | |
| import tempfile | |
| from pathlib import Path | |
| import time | |
| import warnings | |
| from scipy import ndimage | |
| from scipy.ndimage import gaussian_filter | |
| import spaces # For ZeroGPU support | |
| warnings.filterwarnings('ignore') | |
| # Load model | |
| print("π Loading depth estimation model...") | |
| try: | |
| dpt_processor = DPTImageProcessor.from_pretrained("Intel/dpt-large") | |
| dpt_model = DPTForDepthEstimation.from_pretrained("Intel/dpt-large") | |
| # Don't move to CUDA here - ZeroGPU will handle it in decorated functions | |
| dpt_model.eval() | |
| print("β Model loaded successfully! (ZeroGPU will handle GPU allocation)") | |
| except Exception as e: | |
| print(f"β Error loading model: {e}") | |
| dpt_processor = None | |
| dpt_model = None | |
| # Enhanced depth processing functions | |
| def bilateral_filter_depth(depth_map, d=9, sigma_color=75, sigma_space=75): | |
| """Apply bilateral filter to preserve edges while smoothing depth""" | |
| depth_norm = ((depth_map - depth_map.min()) / (depth_map.max() - depth_map.min()) * 255).astype(np.uint8) | |
| filtered = cv2.bilateralFilter(depth_norm, d, sigma_color, sigma_space) | |
| filtered = filtered.astype(np.float32) / 255.0 | |
| filtered = filtered * (depth_map.max() - depth_map.min()) + depth_map.min() | |
| return filtered | |
| def multi_scale_depth_refinement(depth_map, scales=[1.0, 0.5]): | |
| """Process depth at multiple scales and fuse""" | |
| h, w = depth_map.shape | |
| refined_depths = [] | |
| weights = [] | |
| for scale in scales: | |
| if scale == 1.0: | |
| scaled_depth = depth_map | |
| else: | |
| new_h, new_w = int(h * scale), int(w * scale) | |
| scaled_depth = cv2.resize(depth_map, (new_w, new_h), interpolation=cv2.INTER_LINEAR) | |
| scaled_depth = cv2.resize(scaled_depth, (w, h), interpolation=cv2.INTER_LINEAR) | |
| filtered_depth = bilateral_filter_depth(scaled_depth) | |
| refined_depths.append(filtered_depth) | |
| weights.append(scale) | |
| weights = np.array(weights) | |
| weights = weights / weights.sum() | |
| final_depth = np.zeros_like(depth_map) | |
| for depth, weight in zip(refined_depths, weights): | |
| final_depth += depth * weight | |
| return final_depth | |
| def estimate_depth_confidence(depth_map): | |
| """Estimate confidence map based on depth consistency""" | |
| grad_x = cv2.Sobel(depth_map, cv2.CV_64F, 1, 0, ksize=3) | |
| grad_y = cv2.Sobel(depth_map, cv2.CV_64F, 0, 1, ksize=3) | |
| grad_mag = np.sqrt(grad_x**2 + grad_y**2) | |
| confidence = 1.0 / (1.0 + grad_mag / grad_mag.max()) | |
| confidence = gaussian_filter(confidence, sigma=2) | |
| return confidence | |
| def intelligent_frame_sampling(video_path, target_frames=6, max_frames=100): | |
| """ | |
| Intelligently sample frames from video based on motion and content | |
| For long videos, this prevents processing too many similar frames | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| # For very long videos, sample more intelligently | |
| if duration > 120: # 2 minutes | |
| # Sample every N seconds instead of uniformly | |
| sample_interval = max(int(fps * 15), 1) # Every 15 seconds | |
| frame_indices = list(range(0, total_frames, sample_interval)) | |
| else: | |
| # Uniform sampling | |
| frame_indices = np.linspace(0, total_frames - 1, min(target_frames, total_frames), dtype=int) | |
| # Limit to max_frames to prevent timeout | |
| if len(frame_indices) > max_frames: | |
| frame_indices = frame_indices[::len(frame_indices)//max_frames][:max_frames] | |
| cap.release() | |
| return frame_indices, total_frames, fps, duration | |
| def extract_frames_smart(video_path, target_frames=6): | |
| """Extract frames intelligently based on video length""" | |
| frame_indices, total_frames, fps, duration = intelligent_frame_sampling(video_path, target_frames) | |
| cap = cv2.VideoCapture(video_path) | |
| frames = [] | |
| for idx in frame_indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) | |
| ret, frame = cap.read() | |
| if ret: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frames.append(frame_rgb) | |
| cap.release() | |
| info = { | |
| 'total_frames': total_frames, | |
| 'extracted_frames': len(frames), | |
| 'fps': fps, | |
| 'duration': duration, | |
| 'frame_indices': frame_indices | |
| } | |
| return frames, info | |
| def equirectangular_to_perspective(equirect_img, fov=90, theta=0, phi=0, height=512, width=512): | |
| """Convert equirectangular image to perspective view""" | |
| equ_h, equ_w = equirect_img.shape[:2] | |
| y, x = np.meshgrid(np.arange(height), np.arange(width), indexing='ij') | |
| x_norm = (2.0 * x / width - 1.0) | |
| y_norm = (2.0 * y / height - 1.0) | |
| fov_rad = np.radians(fov) | |
| focal = 0.5 * width / np.tan(0.5 * fov_rad) | |
| z_cam = focal | |
| x_cam = x_norm * width / 2.0 | |
| y_cam = y_norm * height / 2.0 | |
| norm = np.sqrt(x_cam**2 + y_cam**2 + z_cam**2) | |
| x_cam /= norm | |
| y_cam /= norm | |
| z_cam /= norm | |
| theta_rad = np.radians(theta) | |
| phi_rad = np.radians(phi) | |
| rot_y = np.array([ | |
| [np.cos(theta_rad), 0, np.sin(theta_rad)], | |
| [0, 1, 0], | |
| [-np.sin(theta_rad), 0, np.cos(theta_rad)] | |
| ]) | |
| rot_x = np.array([ | |
| [1, 0, 0], | |
| [0, np.cos(phi_rad), -np.sin(phi_rad)], | |
| [0, np.sin(phi_rad), np.cos(phi_rad)] | |
| ]) | |
| rot = rot_y @ rot_x | |
| rays = np.stack([x_cam, y_cam, z_cam], axis=-1) | |
| rays_rot = rays @ rot.T | |
| x_rot = rays_rot[..., 0] | |
| y_rot = rays_rot[..., 1] | |
| z_rot = rays_rot[..., 2] | |
| lon = np.arctan2(x_rot, z_rot) | |
| lat = np.arcsin(np.clip(y_rot, -1, 1)) | |
| equ_x = (lon / np.pi + 1) * 0.5 * (equ_w - 1) | |
| equ_y = (0.5 - lat / np.pi) * (equ_h - 1) | |
| equ_x = np.clip(equ_x, 0, equ_w - 1) | |
| equ_y = np.clip(equ_y, 0, equ_h - 1) | |
| perspective_img = np.zeros((height, width, equirect_img.shape[2]), dtype=equirect_img.dtype) | |
| for c in range(equirect_img.shape[2]): | |
| perspective_img[..., c] = ndimage.map_coordinates( | |
| equirect_img[..., c], | |
| [equ_y, equ_x], | |
| order=1, | |
| mode='wrap' | |
| ) | |
| return perspective_img | |
| # ZeroGPU decorator for GPU acceleration | |
| def estimate_depth_enhanced(image, processor, model): | |
| """Enhanced depth estimation with multi-scale processing""" | |
| inputs = processor(images=image, return_tensors="pt") | |
| # ZeroGPU automatically handles device placement | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| predicted_depth = outputs.predicted_depth | |
| prediction = torch.nn.functional.interpolate( | |
| predicted_depth.unsqueeze(1), | |
| size=image.shape[:2], | |
| mode="bicubic", | |
| align_corners=False, | |
| ) | |
| depth_map = prediction.squeeze().cpu().numpy() | |
| depth_map = multi_scale_depth_refinement(depth_map) | |
| confidence = estimate_depth_confidence(depth_map) | |
| return depth_map, confidence | |
| def depth_to_point_cloud_enhanced(depth, color, confidence, camera_params): | |
| """Enhanced point cloud generation with confidence weighting""" | |
| height, width = depth.shape | |
| fx, fy = camera_params['fx'], camera_params['fy'] | |
| cx, cy = camera_params['cx'], camera_params['cy'] | |
| R_matrix = camera_params.get('R', np.eye(3)) | |
| t_vector = camera_params.get('t', np.zeros(3)) | |
| u, v = np.meshgrid(np.arange(width), np.arange(height)) | |
| z = depth | |
| x = (u - cx) * z / fx | |
| y = (v - cy) * z / fy | |
| points_cam = np.stack([x, y, z], axis=-1) | |
| points_world = points_cam @ R_matrix.T + t_vector | |
| conf_threshold = np.percentile(confidence, 30) | |
| valid_mask = confidence > conf_threshold | |
| points = points_world[valid_mask] | |
| colors = color[valid_mask] | |
| return points, colors | |
| def create_realistic_mesh(points, colors, progress_callback): | |
| """Create high-quality mesh using Poisson reconstruction""" | |
| progress_callback("π¨ Creating realistic mesh...") | |
| pcd = o3d.geometry.PointCloud() | |
| pcd.points = o3d.utility.Vector3dVector(points) | |
| pcd.colors = o3d.utility.Vector3dVector(colors / 255.0) | |
| progress_callback(" β’ Removing outliers...") | |
| pcd, _ = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) | |
| progress_callback(" β’ Estimating normals...") | |
| pcd.estimate_normals( | |
| search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30) | |
| ) | |
| pcd.orient_normals_consistent_tangent_plane(k=15) | |
| progress_callback(" β’ Performing Poisson reconstruction...") | |
| mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson( | |
| pcd, depth=9, width=0, scale=1.1, linear_fit=False | |
| ) | |
| progress_callback(" β’ Cleaning mesh...") | |
| densities = np.asarray(densities) | |
| density_threshold = np.percentile(densities, 10) | |
| vertices_to_remove = densities < density_threshold | |
| mesh.remove_vertices_by_mask(vertices_to_remove) | |
| mesh = mesh.filter_smooth_simple(number_of_iterations=5) | |
| mesh.compute_vertex_normals() | |
| # Transfer colors | |
| mesh_points = np.asarray(mesh.vertices) | |
| pcd_tree = o3d.geometry.KDTreeFlann(pcd) | |
| pcd_colors = np.asarray(pcd.colors) | |
| mesh_colors = np.zeros_like(mesh_points) | |
| for i, point in enumerate(mesh_points): | |
| [_, idx, _] = pcd_tree.search_knn_vector_3d(point, 1) | |
| mesh_colors[i] = pcd_colors[idx[0]] | |
| mesh.vertex_colors = o3d.utility.Vector3dVector(mesh_colors) | |
| return mesh | |
| def process_video(video_path, num_frames, num_views, quality, progress=gr.Progress()): | |
| """Main processing function optimized for Hugging Face""" | |
| if dpt_model is None: | |
| return None, None, None, "β Model not loaded properly", None | |
| if video_path is None: | |
| return None, None, None, "β Please upload a video first", None | |
| status = [] | |
| start_time = time.time() | |
| def update_status(msg): | |
| status.append(msg) | |
| progress(0.1, desc=msg) | |
| return "\n".join(status) | |
| try: | |
| status_text = update_status("="*60) | |
| status_text = update_status("π¬ STARTING REALISTIC 3D RECONSTRUCTION") | |
| status_text = update_status("="*60) | |
| # Check video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, None, None, "β Cannot open video file", None | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| cap.release() | |
| status_text = update_status(f"\nπΉ Video Info:") | |
| status_text = update_status(f" β’ Duration: {duration:.1f}s ({total_frames} frames)") | |
| status_text = update_status(f" β’ FPS: {fps:.1f}") | |
| # Warn about long videos | |
| if duration > 300: | |
| status_text = update_status(f"\nβ οΈ WARNING: Very long video ({duration:.0f}s)") | |
| status_text = update_status(f" β’ Processing will be slower") | |
| status_text = update_status(f" β’ Consider using a shorter clip") | |
| # Extract frames intelligently | |
| status_text = update_status(f"\nπΉ Extracting frames intelligently...") | |
| frames, video_info = extract_frames_smart(video_path, num_frames) | |
| if not frames: | |
| return None, None, None, "β Failed to extract frames", None | |
| status_text = update_status(f"β Extracted {len(frames)} frames") | |
| status_text = update_status(f" β’ Sampling strategy: {'Intelligent (long video)' if duration > 120 else 'Uniform'}") | |
| preview_img = Image.fromarray(frames[0]) | |
| # Quality settings | |
| quality_configs = { | |
| 'low': {'resolution': 384, 'fov': 90}, | |
| 'medium': {'resolution': 512, 'fov': 90}, | |
| 'high': {'resolution': 640, 'fov': 85} | |
| } | |
| config = quality_configs[quality] | |
| status_text = update_status(f"\nβοΈ Settings: {len(frames)} frames Γ {num_views} views Γ {config['resolution']}px") | |
| # Process frames | |
| all_points = [] | |
| all_colors = [] | |
| total_views = len(frames) * num_views | |
| processed_views = 0 | |
| for frame_idx, frame in enumerate(frames): | |
| progress((frame_idx + 1) / len(frames), desc=f"Processing frame {frame_idx+1}/{len(frames)}") | |
| status_text = update_status(f"\nπ Frame {frame_idx + 1}/{len(frames)}:") | |
| # Generate view angles | |
| view_angles = [(360.0 / num_views * i, 0) for i in range(num_views)] | |
| frame_points = [] | |
| frame_colors = [] | |
| for view_idx, (theta, phi) in enumerate(view_angles): | |
| # Convert to perspective | |
| persp_img = equirectangular_to_perspective( | |
| frame, fov=config['fov'], theta=theta, phi=phi, | |
| height=config['resolution'], width=config['resolution'] | |
| ) | |
| # Depth estimation | |
| depth_map, confidence = estimate_depth_enhanced(persp_img, dpt_processor, dpt_model) | |
| # Camera params | |
| focal = config['resolution'] / (2 * np.tan(np.radians(config['fov']) / 2)) | |
| from scipy.spatial.transform import Rotation as R | |
| rot = R.from_euler('yz', [theta, phi], degrees=True) | |
| R_matrix = rot.as_matrix() | |
| camera_params = { | |
| 'fx': focal, 'fy': focal, | |
| 'cx': config['resolution'] / 2, | |
| 'cy': config['resolution'] / 2, | |
| 'R': R_matrix, | |
| 't': np.zeros(3) | |
| } | |
| # Generate points | |
| points, colors = depth_to_point_cloud_enhanced( | |
| depth_map, persp_img, confidence, camera_params | |
| ) | |
| frame_points.append(points) | |
| frame_colors.append(colors) | |
| processed_views += 1 | |
| if (view_idx + 1) % 2 == 0: | |
| status_text = update_status(f" β’ Processed {view_idx + 1}/{num_views} views") | |
| all_points.append(np.vstack(frame_points)) | |
| all_colors.append(np.vstack(frame_colors)) | |
| # Combine all | |
| status_text = update_status(f"\nπ Combining {len(frames)} frames...") | |
| final_points = np.vstack(all_points) | |
| final_colors = np.vstack(all_colors) | |
| status_text = update_status(f"β Total points: {len(final_points):,}") | |
| # Filter | |
| status_text = update_status(f"\nπ― Filtering and cleaning...") | |
| # Remove duplicates | |
| unique_indices = np.unique(final_points, axis=0, return_index=True)[1] | |
| final_points = final_points[unique_indices] | |
| final_colors = final_colors[unique_indices] | |
| # Statistical outlier removal | |
| pcd_temp = o3d.geometry.PointCloud() | |
| pcd_temp.points = o3d.utility.Vector3dVector(final_points) | |
| pcd_temp, inlier_indices = pcd_temp.remove_statistical_outlier(nb_neighbors=30, std_ratio=2.0) | |
| final_points = final_points[inlier_indices] | |
| final_colors = final_colors[inlier_indices] | |
| status_text = update_status(f"β Filtered to {len(final_points):,} points") | |
| # Downsample if huge | |
| if len(final_points) > 500000: | |
| keep_ratio = 500000 / len(final_points) | |
| keep_indices = np.random.choice(len(final_points), size=int(len(final_points) * keep_ratio), replace=False) | |
| final_points = final_points[keep_indices] | |
| final_colors = final_colors[keep_indices] | |
| status_text = update_status(f" β’ Downsampled to {len(final_points):,} points") | |
| # Visualization | |
| status_text = update_status(f"\nπ Creating 3D visualization...") | |
| vis_sample = min(50000, len(final_points)) | |
| vis_indices = np.random.choice(len(final_points), vis_sample, replace=False) | |
| vis_points = final_points[vis_indices] | |
| vis_colors = final_colors[vis_indices] | |
| fig = go.Figure(data=[go.Scatter3d( | |
| x=vis_points[:, 0], y=vis_points[:, 1], z=vis_points[:, 2], | |
| mode='markers', | |
| marker=dict( | |
| size=2, | |
| color=[f'rgb({int(c[0])},{int(c[1])},{int(c[2])})' for c in vis_colors], | |
| opacity=0.8 | |
| ) | |
| )]) | |
| fig.update_layout( | |
| title=f"3D Reconstruction ({len(final_points):,} points)", | |
| scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z', aspectmode='data'), | |
| height=700 | |
| ) | |
| # Save point cloud | |
| status_text = update_status(f"\nπΎ Saving outputs...") | |
| pcd = o3d.geometry.PointCloud() | |
| pcd.points = o3d.utility.Vector3dVector(final_points) | |
| pcd.colors = o3d.utility.Vector3dVector(final_colors / 255.0) | |
| pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30)) | |
| ply_path = Path(tempfile.mkdtemp()) / "reconstruction.ply" | |
| o3d.io.write_point_cloud(str(ply_path), pcd) | |
| ply_path = str(ply_path) # Convert Path to string for Gradio | |
| status_text = update_status(f"β Point cloud saved") | |
| # Create mesh | |
| obj_path = None | |
| elapsed = time.time() - start_time | |
| if elapsed < 180: # Only if under 3 minutes so far | |
| try: | |
| def mesh_progress(msg): | |
| nonlocal status_text | |
| status_text = update_status(msg) | |
| mesh = create_realistic_mesh(final_points, final_colors, mesh_progress) | |
| obj_path = Path(tempfile.mkdtemp()) / "reconstruction.obj" | |
| o3d.io.write_triangle_mesh(str(obj_path), mesh) | |
| obj_path = str(obj_path) # Convert Path to string for Gradio | |
| status_text = update_status(f"β Mesh created: {len(mesh.vertices):,} vertices") | |
| except Exception as e: | |
| status_text = update_status(f"β οΈ Mesh generation failed: {str(e)}") | |
| else: | |
| status_text = update_status("β οΈ Mesh skipped (time limit)") | |
| # Final stats | |
| elapsed = time.time() - start_time | |
| status_text = update_status(f"\n{'='*60}") | |
| status_text = update_status(f"π SUCCESS! Completed in {elapsed:.1f}s") | |
| status_text = update_status(f"π Final: {len(final_points):,} points") | |
| status_text = update_status(f"{'='*60}") | |
| return fig, ply_path, obj_path, status_text, preview_img | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"β ERROR: {str(e)}\n\n{traceback.format_exc()}" | |
| return None, None, None, error_msg, None | |
| # Create Gradio interface | |
| with gr.Blocks(title="Insta360 3D Reconstruction", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π Insta360 3D Reconstruction | |
| ### Transform 360Β° videos into realistic 3D models | |
| **Optimized for videos of any length** - Uses intelligent frame sampling for longer videos | |
| """) | |
| gr.Markdown(""" | |
| ### β οΈ For 8-Minute Videos: | |
| - Processing will take 10-15 minutes | |
| - Uses intelligent frame sampling (every 15 seconds) | |
| - Recommended: Use lower quality settings first | |
| - Consider trimming to 1-2 minutes for faster results | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video(label="Upload 360Β° Video") | |
| with gr.Accordion("βοΈ Settings", open=True): | |
| num_frames = gr.Slider( | |
| minimum=4, maximum=12, value=6, step=2, | |
| label="Target Frames (auto-adjusted for long videos)" | |
| ) | |
| num_views = gr.Slider( | |
| minimum=4, maximum=8, value=6, step=2, | |
| label="Views per Frame" | |
| ) | |
| quality = gr.Radio( | |
| choices=['low', 'medium', 'high'], | |
| value='medium', | |
| label="Quality (Start with 'medium' for 8-min videos)" | |
| ) | |
| process_btn = gr.Button("π Start Reconstruction", variant="primary", size="lg") | |
| with gr.Column(): | |
| status_output = gr.Textbox(label="Processing Status", lines=20, max_lines=25) | |
| preview_output = gr.Image(label="Video Preview") | |
| with gr.Row(): | |
| visualization_output = gr.Plot(label="3D Visualization") | |
| with gr.Row(): | |
| ply_output = gr.File(label="π¦ Download Point Cloud (.ply)") | |
| obj_output = gr.File(label="π¦ Download Mesh (.obj)") | |
| process_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, num_frames, num_views, quality], | |
| outputs=[visualization_output, ply_output, obj_output, status_output, preview_output] | |
| ) | |
| gr.Markdown(""" | |
| ### π‘ Tips for Best Results | |
| **For 8-minute videos:** | |
| - Start with Medium quality (faster) | |
| - Uses intelligent sampling (~ every 15 seconds) | |
| - Total processing: 10-15 minutes | |
| - Or trim to 1-2 minutes for 3-5 min processing | |
| **Quality Guide:** | |
| - **Low**: 2-4 min (quick preview) | |
| - **Medium**: 5-10 min (good balance) | |
| - **High**: 10-20 min (best quality) | |
| **Video Requirements:** | |
| - Format: MP4 (equirectangular 360Β°) | |
| - Aspect Ratio: 2:1 | |
| - Any length (optimized for long videos) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() |