| """ |
| 360° Video Frame Extraction + 3D Reconstruction for Outdoor Scenes |
| Robust version with better error handling |
| """ |
|
|
| import gradio as gr |
| import numpy as np |
| import cv2 |
| from PIL import Image |
| import tempfile |
| import zipfile |
| import os |
| from datetime import datetime |
| import torch |
| from transformers import DPTForDepthEstimation, DPTImageProcessor |
| import open3d as o3d |
| import plotly.graph_objects as go |
| import warnings |
| import traceback |
| warnings.filterwarnings('ignore') |
|
|
| |
| |
| |
|
|
| print("Loading depth estimation model...") |
| MODEL_LOADED = False |
| try: |
| dpt_processor = DPTImageProcessor.from_pretrained("Intel/dpt-large") |
| dpt_model = DPTForDepthEstimation.from_pretrained("Intel/dpt-large") |
| if torch.cuda.is_available(): |
| dpt_model = dpt_model.cuda() |
| print("✓ Using GPU") |
| dpt_model.eval() |
| MODEL_LOADED = True |
| print("✓ Model loaded!") |
| except Exception as e: |
| print(f"⚠️ Model loading failed: {e}") |
| dpt_processor = None |
| dpt_model = None |
|
|
| |
| |
| |
|
|
| def extract_frames_from_360_video(video_path, frame_step=30, max_frames=150): |
| """Extract frames from 360° video""" |
| try: |
| if not os.path.exists(video_path): |
| return [], None, 0, 0, f"Error: Video file not found at {video_path}" |
| |
| cap = cv2.VideoCapture(video_path) |
| |
| if not cap.isOpened(): |
| return [], None, 0, 0, "Error: Could not open video file. Check format (MP4 recommended)" |
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
| if fps == 0 or total_frames == 0: |
| cap.release() |
| return [], None, 0, 0, "Error: Invalid video file" |
| |
| frames_dir = tempfile.mkdtemp() |
| extracted_frames = [] |
| frame_count = 0 |
| saved_count = 0 |
| |
| while cap.isOpened() and saved_count < max_frames: |
| ret, frame = cap.read() |
| |
| if not ret: |
| break |
| |
| if frame_count % frame_step == 0: |
| frame_filename = os.path.join(frames_dir, f"frame_{saved_count:04d}.jpg") |
| success = cv2.imwrite(frame_filename, frame, [cv2.IMWRITE_JPEG_QUALITY, 95]) |
| if success: |
| extracted_frames.append(frame_filename) |
| saved_count += 1 |
| |
| frame_count += 1 |
| |
| cap.release() |
| |
| if len(extracted_frames) == 0: |
| return [], None, fps, total_frames, "Error: No frames could be extracted" |
| |
| return extracted_frames, frames_dir, fps, total_frames, "Success" |
| |
| except Exception as e: |
| return [], None, 0, 0, f"Error during extraction: {str(e)}" |
|
|
| |
| |
| |
|
|
| def estimate_depth(image, processor, model): |
| """Estimate depth for a single image""" |
| try: |
| inputs = processor(images=image, return_tensors="pt") |
| |
| if torch.cuda.is_available(): |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| predicted_depth = outputs.predicted_depth |
| |
| prediction = torch.nn.functional.interpolate( |
| predicted_depth.unsqueeze(1), |
| size=image.shape[:2], |
| mode="bicubic", |
| align_corners=False, |
| ) |
| |
| depth = prediction.squeeze().cpu().numpy() |
| depth = (depth - depth.min()) / (depth.max() - depth.min()) |
| |
| return depth |
| except Exception as e: |
| print(f"Depth estimation error: {e}") |
| return None |
|
|
| def depth_to_point_cloud(image, depth): |
| """Convert depth map to 3D point cloud""" |
| h, w = depth.shape |
| |
| x = np.linspace(0, w-1, w) |
| y = np.linspace(0, h-1, h) |
| xv, yv = np.meshgrid(x, y) |
| |
| x_flat = xv.flatten() |
| y_flat = yv.flatten() |
| z_flat = depth.flatten() |
| |
| points = np.stack([x_flat, y_flat, z_flat], axis=-1) |
| |
| if len(image.shape) == 3: |
| colors = image.reshape(-1, 3) / 255.0 |
| else: |
| colors = np.stack([image.flatten()/255.0]*3, axis=-1) |
| |
| return points, colors |
|
|
| def create_3d_model(frames, max_frames_for_3d=5): |
| """Create 3D model from extracted frames""" |
| if not MODEL_LOADED or dpt_model is None or dpt_processor is None: |
| return None, None, "❌ Depth model not loaded. Use Quick Mode instead." |
| |
| try: |
| all_points = [] |
| all_colors = [] |
| |
| frames_to_process = frames[:max_frames_for_3d] |
| |
| for idx, frame_path in enumerate(frames_to_process): |
| print(f"Processing frame {idx+1}/{len(frames_to_process)}...") |
| |
| if not os.path.exists(frame_path): |
| continue |
| |
| img = cv2.imread(frame_path) |
| if img is None: |
| continue |
| |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| img_small = cv2.resize(img_rgb, (512, 256)) |
| |
| depth = estimate_depth(img_small, dpt_processor, dpt_model) |
| if depth is None: |
| continue |
| |
| points, colors = depth_to_point_cloud(img_small, depth) |
| points[:, 0] += idx * 600 |
| |
| all_points.append(points) |
| all_colors.append(colors) |
| |
| if len(all_points) == 0: |
| return None, None, "❌ No frames could be processed" |
| |
| final_points = np.vstack(all_points) |
| final_colors = np.vstack(all_colors) |
| |
| |
| if len(final_points) > 100000: |
| indices = np.random.choice(len(final_points), 100000, replace=False) |
| final_points = final_points[indices] |
| final_colors = final_colors[indices] |
| |
| |
| fig = go.Figure(data=[go.Scatter3d( |
| x=final_points[:, 0], |
| y=final_points[:, 1], |
| z=final_points[:, 2], |
| mode='markers', |
| marker=dict(size=1, color=final_colors, opacity=0.8) |
| )]) |
| |
| fig.update_layout( |
| title="3D Reconstruction", |
| scene=dict(xaxis_title="X", yaxis_title="Y", zaxis_title="Depth"), |
| width=800, |
| height=600 |
| ) |
| |
| |
| ply_path = os.path.join(tempfile.gettempdir(), f"3d_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.ply") |
| pcd = o3d.geometry.PointCloud() |
| pcd.points = o3d.utility.Vector3dVector(final_points) |
| pcd.colors = o3d.utility.Vector3dVector(final_colors) |
| o3d.io.write_point_cloud(ply_path, pcd) |
| |
| return fig, ply_path, f"✅ Created {len(final_points):,} points" |
| |
| except Exception as e: |
| return None, None, f"❌ 3D creation error: {str(e)}" |
|
|
| |
| |
| |
|
|
| def create_download_package(frames_dir, video_info): |
| """Create ZIP with frames""" |
| try: |
| zip_path = os.path.join(tempfile.gettempdir(), f"360_frames_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip") |
| |
| readme_content = f"""360° OUTDOOR PHOTOGRAMMETRY PACKAGE |
| Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| |
| VIDEO INFO: |
| - FPS: {video_info['fps']:.2f} |
| - Extracted Frames: {video_info['extracted_frames']} |
| - Interval: ~{video_info['frame_step']/video_info['fps']:.2f}s |
| |
| METASHAPE WORKFLOW: |
| 1. Import Photos |
| 2. Set Camera Type to "Spherical" |
| 3. Align Photos (High accuracy, Sequential) |
| 4. Build Dense Cloud |
| 5. Build Mesh |
| 6. Build Texture |
| |
| SOFTWARE: Agisoft Metashape ($179) |
| Good luck! 🌍📸 |
| """ |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| readme_path = os.path.join(tempfile.gettempdir(), "README.txt") |
| with open(readme_path, 'w') as f: |
| f.write(readme_content) |
| zipf.write(readme_path, "README.txt") |
| |
| for frame_file in os.listdir(frames_dir): |
| if frame_file.endswith('.jpg'): |
| frame_path = os.path.join(frames_dir, frame_file) |
| if os.path.exists(frame_path): |
| zipf.write(frame_path, f"frames/{frame_file}") |
| |
| return zip_path |
| except Exception as e: |
| print(f"ZIP creation error: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| def process_video_frames_only(video_file, frame_interval_seconds, max_frames): |
| """Quick frame extraction only""" |
| try: |
| print(f"Starting frame extraction. Video: {video_file}") |
| |
| if video_file is None: |
| return None, "⚠️ Please upload a video file", None |
| |
| |
| if not os.path.exists(video_file): |
| return None, f"❌ Video file not found: {video_file}", None |
| |
| file_size = os.path.getsize(video_file) / (1024*1024) |
| print(f"Video file size: {file_size:.2f} MB") |
| |
| if file_size > 1000: |
| return None, f"❌ Video too large ({file_size:.0f}MB). Max 1GB. Please compress the video.", None |
| |
| status = f"📹 Processing video ({file_size:.1f}MB)...\n\n" |
| |
| |
| cap = cv2.VideoCapture(video_file) |
| if not cap.isOpened(): |
| return None, status + "❌ Could not open video. Try MP4 format.", None |
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = total_frames / fps if fps > 0 else 0 |
| cap.release() |
| |
| if fps == 0: |
| return None, status + "❌ Invalid video file", None |
| |
| status += f"✓ Video: {duration:.1f}s, {fps:.1f} FPS, {total_frames} frames\n\n" |
| |
| frame_step = max(1, int(fps * frame_interval_seconds)) |
| estimated_frames = min(max_frames, total_frames // frame_step) |
| |
| status += f"⚙️ Extracting ~{estimated_frames} frames...\n" |
| status += f" (every {frame_step} frames = ~{frame_interval_seconds}s interval)\n\n" |
| |
| |
| extracted_frames, frames_dir, video_fps, _, extract_status = extract_frames_from_360_video( |
| video_file, frame_step=frame_step, max_frames=max_frames |
| ) |
| |
| if extract_status != "Success": |
| return None, status + f"❌ {extract_status}", None |
| |
| status += f"✓ Extracted {len(extracted_frames)} frames\n\n" |
| |
| |
| first_frame = cv2.imread(extracted_frames[0]) |
| first_frame_rgb = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB) |
| preview_img = Image.fromarray(first_frame_rgb) |
| |
| |
| status += "📦 Creating download package...\n" |
| |
| video_info = { |
| 'fps': video_fps, |
| 'total_frames': total_frames, |
| 'extracted_frames': len(extracted_frames), |
| 'frame_step': frame_step |
| } |
| |
| zip_path = create_download_package(frames_dir, video_info) |
| |
| if zip_path is None: |
| return preview_img, status + "❌ Could not create ZIP", None |
| |
| zip_size = os.path.getsize(zip_path) / (1024*1024) |
| |
| result = f"""✅ SUCCESS! |
| |
| 📊 Summary: |
| • Extracted: {len(extracted_frames)} frames |
| • Interval: ~{frame_interval_seconds}s |
| • ZIP size: {zip_size:.1f}MB |
| |
| 📦 Download ZIP below |
| 🎯 Import to Metashape for 3D model |
| |
| Next: Use Agisoft Metashape ($179) to create professional 3D model |
| """ |
| |
| return preview_img, status + result, zip_path |
| |
| except Exception as e: |
| error_trace = traceback.format_exc() |
| return None, f"❌ ERROR:\n{str(e)}\n\n{error_trace}", None |
|
|
| def process_video_with_3d(video_file, frame_interval_seconds, max_frames, max_frames_3d): |
| """Extract frames AND create 3D model""" |
| try: |
| print(f"Starting full 3D processing. Video: {video_file}") |
| |
| if video_file is None: |
| return None, "⚠️ Please upload a video file", None, None, None |
| |
| if not MODEL_LOADED: |
| return None, "❌ 3D model not loaded. Use Quick Mode instead.", None, None, None |
| |
| if not os.path.exists(video_file): |
| return None, f"❌ Video file not found: {video_file}", None, None, None |
| |
| file_size = os.path.getsize(video_file) / (1024*1024) |
| |
| if file_size > 500: |
| return None, f"❌ Video too large for 3D mode ({file_size:.0f}MB). Max 500MB. Use Quick Mode or compress video.", None, None, None |
| |
| status = f"📹 Full 3D Processing ({file_size:.1f}MB)...\n\n" |
| |
| |
| cap = cv2.VideoCapture(video_file) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| cap.release() |
| |
| frame_step = max(1, int(fps * frame_interval_seconds)) |
| |
| status += f"⚙️ Step 1/3: Extracting frames...\n" |
| |
| extracted_frames, frames_dir, video_fps, _, extract_status = extract_frames_from_360_video( |
| video_file, frame_step=frame_step, max_frames=max_frames |
| ) |
| |
| if extract_status != "Success": |
| return None, status + f"❌ {extract_status}", None, None, None |
| |
| status += f"✓ Extracted {len(extracted_frames)} frames\n\n" |
| |
| |
| first_frame = cv2.imread(extracted_frames[0]) |
| first_frame_rgb = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB) |
| preview_img = Image.fromarray(first_frame_rgb) |
| |
| |
| status += f"⚙️ Step 2/3: Creating 3D model (using {min(max_frames_3d, len(extracted_frames))} frames)...\n" |
| status += "This may take 5-10 minutes...\n\n" |
| |
| fig, ply_path, model_status = create_3d_model(extracted_frames, max_frames_3d) |
| |
| status += f"{model_status}\n\n" |
| |
| |
| status += f"⚙️ Step 3/3: Creating frame package...\n" |
| |
| video_info = { |
| 'fps': video_fps, |
| 'total_frames': total_frames, |
| 'extracted_frames': len(extracted_frames), |
| 'frame_step': frame_step |
| } |
| |
| zip_path = create_download_package(frames_dir, video_info) |
| |
| result = f"""✅ COMPLETE! |
| |
| 📊 Results: |
| • Frames: {len(extracted_frames)} |
| • 3D points: {model_status} |
| |
| 📦 Downloads: |
| • ZIP: Frames for Metashape |
| • PLY: 3D point cloud |
| |
| Note: This is a basic preview. Use Metashape for professional quality! |
| """ |
| |
| return preview_img, status + result, zip_path, fig, ply_path |
| |
| except Exception as e: |
| error_trace = traceback.format_exc() |
| return None, f"❌ ERROR:\n{str(e)}\n\n{error_trace}", None, None, None |
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="360° Outdoor Photogrammetry + 3D") as demo: |
| |
| gr.Markdown("# 🌍 360° Video: Frame Extraction + 3D Reconstruction") |
| gr.Markdown("**Two modes:** Quick frames (30s) OR Full 3D (5-10min)") |
| gr.Markdown("⚠️ **Max file size:** Quick Mode: 1GB | Full 3D: 500MB | **8-minute videos OK!**") |
| |
| with gr.Tabs(): |
| with gr.Tab("🚀 Quick - Frames Only (RECOMMENDED)"): |
| gr.Markdown(""" |
| ### Fast & Free! |
| - Extract frames in 30-60 seconds |
| - Works on FREE tier |
| - Best for professional Metashape workflow |
| """) |
| |
| with gr.Row(): |
| with gr.Column(): |
| video1 = gr.Video(label="Upload 360° Video (MP4 recommended, max 1GB - 8 min videos OK!)") |
| interval1 = gr.Slider(0.5, 5.0, 2.0, step=0.5, label="Frame Interval (seconds) - 2s good for 8min videos") |
| max_frames1 = gr.Slider(20, 500, 150, step=10, label="Max Frames - 150-200 good for 8min") |
| btn1 = gr.Button("🎬 Extract Frames", variant="primary", size="lg") |
| |
| with gr.Column(): |
| status1 = gr.Textbox(label="Status", lines=15) |
| preview1 = gr.Image(label="Preview (First Frame)") |
| |
| download1 = gr.File(label="📦 Download Frames (ZIP)") |
| |
| btn1.click( |
| fn=process_video_frames_only, |
| inputs=[video1, interval1, max_frames1], |
| outputs=[preview1, status1, download1] |
| ) |
| |
| with gr.Tab("🎨 Full - Frames + 3D (SLOW, NEEDS GPU)"): |
| gr.Markdown(""" |
| ### Creates 3D Preview |
| - Takes 5-10 minutes |
| - Requires GPU upgrade ($0.60/hour) |
| - Basic quality (Metashape is better!) |
| """) |
| |
| with gr.Row(): |
| with gr.Column(): |
| video2 = gr.Video(label="Upload 360° Video (MP4, max 500MB - compress long videos)") |
| interval2 = gr.Slider(0.5, 5.0, 2.0, step=0.5, label="Frame Interval (seconds)") |
| max_frames2 = gr.Slider(20, 100, 30, step=10, label="Max Frames") |
| max_3d = gr.Slider(2, 8, 4, step=1, label="Frames for 3D (fewer = faster)") |
| btn2 = gr.Button("🎨 Extract + Create 3D", variant="primary") |
| |
| with gr.Column(): |
| status2 = gr.Textbox(label="Status", lines=15) |
| preview2 = gr.Image(label="Preview") |
| |
| with gr.Row(): |
| viz = gr.Plot(label="3D Visualization") |
| |
| with gr.Row(): |
| download2 = gr.File(label="📦 Frames (ZIP)") |
| ply_download = gr.File(label="📦 3D Model (PLY)") |
| |
| btn2.click( |
| fn=process_video_with_3d, |
| inputs=[video2, interval2, max_frames2, max_3d], |
| outputs=[preview2, status2, download2, viz, ply_download] |
| ) |
| |
| gr.Markdown(""" |
| --- |
| ### 💡 Tips for 8-Minute Videos: |
| - **Quick Mode** - Handles up to 1GB (8 min at 5K: ~400-600MB) |
| - **Frame interval: 2-3 seconds** - Gets 160-240 frames from 8 min |
| - **Use MP4 format** - Best compatibility |
| - **If over 1GB** - Compress with HandBrake (target 5-8 Mbps) |
| - **For best 3D quality** - Use Metashape with extracted frames |
| |
| ### 📐 Expected Frames from 8-Min Video: |
| - 1s interval: ~480 frames (very dense, slow processing) |
| - 2s interval: ~240 frames (recommended for outdoor) |
| - 3s interval: ~160 frames (good for large landscapes) |
| |
| Made for outdoor photogrammetry! 🏔️ |
| """) |
|
|
| if __name__ == "__main__": |
| demo.launch() |