Spaces:
Build error
Build error
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import os | |
| import subprocess | |
| import json | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import torch | |
| class NeRFReconstructor: | |
| def __init__(self): | |
| self.temp_dir = None | |
| self.frames_dir = None | |
| self.output_dir = None | |
| self.colmap_dir = None | |
| def setup_directories(self): | |
| """Create temporary directories for processing""" | |
| self.temp_dir = tempfile.mkdtemp(prefix="nerf_") | |
| self.frames_dir = os.path.join(self.temp_dir, "images") | |
| self.colmap_dir = os.path.join(self.temp_dir, "colmap") | |
| self.output_dir = os.path.join(self.temp_dir, "output") | |
| os.makedirs(self.frames_dir, exist_ok=True) | |
| os.makedirs(self.colmap_dir, exist_ok=True) | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| return self.temp_dir | |
| def extract_frames(self, video_path, fps=2, max_frames=100, progress=gr.Progress()): | |
| """Extract frames from video""" | |
| progress(0, desc="πΉ Opening video file...") | |
| if not os.path.exists(video_path): | |
| raise ValueError("Video file not found") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError("Could not open video file") | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| duration = total_frames / video_fps if video_fps > 0 else 0 | |
| # Calculate frame interval | |
| frame_interval = max(1, int(video_fps / fps)) | |
| frames_extracted = 0 | |
| frame_count = 0 | |
| frame_paths = [] | |
| progress(0.05, desc=f"π¬ Extracting frames (0/{max_frames})...") | |
| while frames_extracted < max_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % frame_interval == 0: | |
| # Save frame with high quality | |
| frame_path = os.path.join(self.frames_dir, f"frame_{frames_extracted:05d}.jpg") | |
| cv2.imwrite(frame_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 95]) | |
| frame_paths.append(frame_path) | |
| frames_extracted += 1 | |
| # Update progress (5-25%) | |
| progress_val = 0.05 + (0.20 * frames_extracted / max_frames) | |
| progress(progress_val, desc=f"π¬ Extracting frames ({frames_extracted}/{max_frames})...") | |
| frame_count += 1 | |
| cap.release() | |
| return frame_paths, { | |
| "frames_extracted": frames_extracted, | |
| "video_fps": video_fps, | |
| "video_duration": duration, | |
| "resolution": f"{width}x{height}", | |
| "total_video_frames": total_frames | |
| } | |
| def run_colmap(self, progress=gr.Progress()): | |
| """Run COLMAP for Structure from Motion""" | |
| database_path = os.path.join(self.colmap_dir, "database.db") | |
| sparse_dir = os.path.join(self.colmap_dir, "sparse") | |
| os.makedirs(sparse_dir, exist_ok=True) | |
| try: | |
| # Check if COLMAP is available | |
| result = subprocess.run(["which", "colmap"], capture_output=True, text=True) | |
| if result.returncode != 0: | |
| return {"status": "colmap_not_found", "message": "COLMAP not installed"} | |
| # Feature extraction (25-40%) | |
| progress(0.25, desc="π COLMAP: Extracting image features...") | |
| result = subprocess.run([ | |
| "colmap", "feature_extractor", | |
| "--database_path", database_path, | |
| "--image_path", self.frames_dir, | |
| "--ImageReader.single_camera", "1", | |
| "--ImageReader.camera_model", "OPENCV", | |
| "--SiftExtraction.use_gpu", "1" | |
| ], capture_output=True, text=True, timeout=600) | |
| if result.returncode != 0: | |
| return {"status": "error", "message": f"Feature extraction failed: {result.stderr}"} | |
| # Feature matching (40-55%) | |
| progress(0.40, desc="π COLMAP: Matching features...") | |
| result = subprocess.run([ | |
| "colmap", "exhaustive_matcher", | |
| "--database_path", database_path, | |
| "--SiftMatching.use_gpu", "1" | |
| ], capture_output=True, text=True, timeout=600) | |
| if result.returncode != 0: | |
| return {"status": "error", "message": f"Feature matching failed: {result.stderr}"} | |
| # Sparse reconstruction (55-70%) | |
| progress(0.55, desc="π COLMAP: Building sparse 3D reconstruction...") | |
| result = subprocess.run([ | |
| "colmap", "mapper", | |
| "--database_path", database_path, | |
| "--image_path", self.frames_dir, | |
| "--output_path", sparse_dir | |
| ], capture_output=True, text=True, timeout=900) | |
| if result.returncode != 0: | |
| return {"status": "error", "message": f"Mapping failed: {result.stderr}"} | |
| # Check if reconstruction was successful | |
| model_dir = os.path.join(sparse_dir, "0") | |
| if not os.path.exists(model_dir): | |
| return {"status": "error", "message": "No reconstruction created"} | |
| return {"status": "success", "sparse_dir": sparse_dir} | |
| except subprocess.TimeoutExpired: | |
| return {"status": "timeout", "message": "COLMAP processing timeout"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def convert_colmap_to_nerf(self, progress=gr.Progress()): | |
| """Convert COLMAP output to NeRF format""" | |
| progress(0.70, desc="π Converting COLMAP to NeRF format...") | |
| try: | |
| # Use nerfstudio's data processing | |
| result = subprocess.run([ | |
| "ns-process-data", "images", | |
| "--data", self.frames_dir, | |
| "--output-dir", self.output_dir, | |
| "--skip-colmap" | |
| ], capture_output=True, text=True, timeout=300) | |
| if result.returncode != 0: | |
| return {"status": "error", "message": f"Conversion failed: {result.stderr}"} | |
| # Copy COLMAP results | |
| colmap_sparse = os.path.join(self.colmap_dir, "sparse", "0") | |
| output_colmap = os.path.join(self.output_dir, "colmap", "sparse", "0") | |
| os.makedirs(os.path.dirname(output_colmap), exist_ok=True) | |
| if os.path.exists(colmap_sparse): | |
| shutil.copytree(colmap_sparse, output_colmap, dirs_exist_ok=True) | |
| return {"status": "success"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def train_nerf(self, num_iterations=1000, progress=gr.Progress()): | |
| """Train NeRF model using Nerfstudio""" | |
| progress(0.75, desc="π§ Training NeRF model...") | |
| try: | |
| # Check for nerfstudio | |
| result = subprocess.run(["which", "ns-train"], capture_output=True, text=True) | |
| if result.returncode != 0: | |
| return {"status": "nerfstudio_not_found", "message": "Nerfstudio not installed"} | |
| # Check GPU availability | |
| gpu_available = torch.cuda.is_available() | |
| gpu_info = f"GPU: {torch.cuda.get_device_name(0)}" if gpu_available else "CPU mode" | |
| progress(0.75, desc=f"π§ Training NeRF ({gpu_info})...") | |
| # Train with nerfacto method (optimized for quality and speed) | |
| config_path = os.path.join(self.output_dir, "config.yml") | |
| result = subprocess.run([ | |
| "ns-train", "nerfacto", | |
| "--data", self.output_dir, | |
| "--output-dir", os.path.join(self.output_dir, "models"), | |
| "--max-num-iterations", str(num_iterations), | |
| "--pipeline.model.predict-normals", "False", | |
| "--viewer.quit-on-train-completion", "True", | |
| "--vis", "viewer+tensorboard" | |
| ], capture_output=True, text=True, timeout=3600) | |
| # Update progress incrementally during training | |
| for i in range(10): | |
| progress(0.75 + (i * 0.015), desc=f"π§ Training NeRF... ({(i+1)*10}% complete)") | |
| if result.returncode != 0: | |
| return {"status": "error", "message": f"Training failed: {result.stderr[:500]}"} | |
| return {"status": "success", "gpu_used": gpu_available} | |
| except subprocess.TimeoutExpired: | |
| return {"status": "timeout", "message": "Training timeout (>1 hour)"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def export_model(self, progress=gr.Progress()): | |
| """Export point cloud and model""" | |
| progress(0.90, desc="πΎ Exporting 3D model...") | |
| try: | |
| # Find the latest model | |
| models_dir = os.path.join(self.output_dir, "models") | |
| if not os.path.exists(models_dir): | |
| return {"status": "error", "message": "No trained model found"} | |
| # Get the most recent model directory | |
| model_dirs = [d for d in os.listdir(models_dir) if os.path.isdir(os.path.join(models_dir, d))] | |
| if not model_dirs: | |
| return {"status": "error", "message": "No model directories found"} | |
| latest_model = sorted(model_dirs)[-1] | |
| model_path = os.path.join(models_dir, latest_model) | |
| config_path = os.path.join(model_path, "config.yml") | |
| if not os.path.exists(config_path): | |
| return {"status": "error", "message": "Model config not found"} | |
| # Export point cloud | |
| output_ply = os.path.join(self.output_dir, "point_cloud.ply") | |
| result = subprocess.run([ | |
| "ns-export", "pointcloud", | |
| "--load-config", config_path, | |
| "--output-dir", self.output_dir, | |
| "--num-points", "1000000", | |
| "--remove-outliers", "True", | |
| "--use-bounding-box", "True" | |
| ], capture_output=True, text=True, timeout=600) | |
| if result.returncode != 0: | |
| # Try COLMAP export as fallback | |
| colmap_sparse = os.path.join(self.colmap_dir, "sparse", "0") | |
| if os.path.exists(colmap_sparse): | |
| result = subprocess.run([ | |
| "colmap", "model_converter", | |
| "--input_path", colmap_sparse, | |
| "--output_path", output_ply, | |
| "--output_type", "PLY" | |
| ], capture_output=True, text=True, timeout=300) | |
| # Check if PLY was created | |
| if os.path.exists(output_ply): | |
| file_size = os.path.getsize(output_ply) / (1024 * 1024) # MB | |
| return { | |
| "status": "success", | |
| "ply_path": output_ply, | |
| "file_size_mb": round(file_size, 2) | |
| } | |
| return {"status": "error", "message": "Export failed"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def cleanup(self): | |
| """Clean up temporary files""" | |
| if self.temp_dir and os.path.exists(self.temp_dir): | |
| try: | |
| shutil.rmtree(self.temp_dir) | |
| except: | |
| pass | |
| def create_preview_grid(frame_paths, max_frames=9): | |
| """Create a grid preview of extracted frames""" | |
| preview_frames = frame_paths[:min(max_frames, len(frame_paths))] | |
| images = [] | |
| for fp in preview_frames: | |
| img = cv2.imread(fp) | |
| if img is not None: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| img = cv2.resize(img, (256, 256)) | |
| images.append(img) | |
| if not images: | |
| return None | |
| # Create 3x3 grid | |
| rows = [] | |
| for i in range(0, len(images), 3): | |
| row_imgs = images[i:i+3] | |
| # Pad if needed | |
| while len(row_imgs) < 3: | |
| row_imgs.append(np.zeros((256, 256, 3), dtype=np.uint8)) | |
| rows.append(np.hstack(row_imgs)) | |
| # Pad rows if needed | |
| while len(rows) < 3: | |
| rows.append(np.zeros((256, 768, 3), dtype=np.uint8)) | |
| grid = np.vstack(rows) | |
| return grid | |
| def process_video_full_pipeline( | |
| video_path, | |
| fps, | |
| max_frames, | |
| train_iterations, | |
| progress=gr.Progress() | |
| ): | |
| """Full NeRF reconstruction pipeline""" | |
| if video_path is None: | |
| return ( | |
| "β Please upload a video file first.", | |
| None, | |
| None, | |
| json.dumps({"error": "No video uploaded"}, indent=2) | |
| ) | |
| reconstructor = NeRFReconstructor() | |
| results = {"steps": []} | |
| try: | |
| # Step 1: Setup | |
| progress(0, desc="βοΈ Setting up workspace...") | |
| reconstructor.setup_directories() | |
| results["workspace"] = reconstructor.temp_dir | |
| # Step 2: Extract frames (0-25%) | |
| progress(0.05, desc="π¬ Extracting frames...") | |
| frame_paths, video_info = reconstructor.extract_frames( | |
| video_path, fps, max_frames, progress | |
| ) | |
| if len(frame_paths) < 3: | |
| return ( | |
| "β Error: Not enough frames extracted (minimum 3 required).", | |
| None, | |
| None, | |
| json.dumps({"error": "Insufficient frames", "frames": len(frame_paths)}, indent=2) | |
| ) | |
| results["steps"].append({"name": "Frame Extraction", "status": "β Complete"}) | |
| results["video_info"] = video_info | |
| # Create preview | |
| preview_grid = create_preview_grid(frame_paths) | |
| # Step 3: COLMAP (25-70%) | |
| progress(0.25, desc="π Running COLMAP reconstruction...") | |
| colmap_result = reconstructor.run_colmap(progress) | |
| if colmap_result["status"] != "success": | |
| return ( | |
| f"β COLMAP failed: {colmap_result.get('message', 'Unknown error')}", | |
| preview_grid, | |
| None, | |
| json.dumps(results, indent=2) | |
| ) | |
| results["steps"].append({"name": "COLMAP SfM", "status": "β Complete"}) | |
| # Step 4: Convert to NeRF format (70-75%) | |
| convert_result = reconstructor.convert_colmap_to_nerf(progress) | |
| if convert_result["status"] != "success": | |
| return ( | |
| f"β Conversion failed: {convert_result.get('message', 'Unknown error')}", | |
| preview_grid, | |
| None, | |
| json.dumps(results, indent=2) | |
| ) | |
| results["steps"].append({"name": "Data Conversion", "status": "β Complete"}) | |
| # Step 5: Train NeRF (75-90%) | |
| progress(0.75, desc="π§ Training NeRF model (this may take several minutes)...") | |
| train_result = reconstructor.train_nerf(train_iterations, progress) | |
| if train_result["status"] != "success": | |
| return ( | |
| f"β Training failed: {train_result.get('message', 'Unknown error')}", | |
| preview_grid, | |
| None, | |
| json.dumps(results, indent=2) | |
| ) | |
| results["steps"].append({"name": "NeRF Training", "status": "β Complete"}) | |
| results["gpu_used"] = train_result.get("gpu_used", False) | |
| # Step 6: Export (90-100%) | |
| export_result = reconstructor.export_model(progress) | |
| if export_result["status"] != "success": | |
| return ( | |
| f"β οΈ Training complete but export failed: {export_result.get('message', 'Unknown error')}", | |
| preview_grid, | |
| None, | |
| json.dumps(results, indent=2) | |
| ) | |
| results["steps"].append({"name": "Model Export", "status": "β Complete"}) | |
| results["output_file"] = export_result.get("ply_path") | |
| results["file_size_mb"] = export_result.get("file_size_mb") | |
| progress(1.0, desc="β Complete!") | |
| # Generate success message | |
| status_msg = f""" | |
| # β 3D Reconstruction Complete! | |
| ## π Processing Results | |
| **Video Information:** | |
| - Frames extracted: {video_info['frames_extracted']} | |
| - Video resolution: {video_info['resolution']} | |
| - Video duration: {video_info['video_duration']:.2f} seconds | |
| - Video FPS: {video_info['video_fps']:.1f} | |
| **Processing Pipeline:** | |
| {''.join([f"- {step['name']}: {step['status']}" + chr(10) for step in results['steps']])} | |
| **Output:** | |
| - 3D Point Cloud: {results['file_size_mb']:.2f} MB | |
| - Format: PLY (compatible with CloudCompare, MeshLab, Blender) | |
| - GPU Acceleration: {'β Yes' if results.get('gpu_used') else 'β No'} | |
| ## π₯ Next Steps | |
| 1. Download the PLY file below | |
| 2. Import into your GIS software or 3D viewer | |
| 3. Use for spatial analysis and visualization | |
| ## π¬ Suggested Tools | |
| - **CloudCompare**: Point cloud analysis | |
| - **MeshLab**: Mesh processing | |
| - **Blender**: 3D visualization and animation | |
| - **QGIS**: Geographic analysis | |
| """ | |
| return ( | |
| status_msg, | |
| preview_grid, | |
| export_result.get("ply_path"), | |
| json.dumps(results, indent=2) | |
| ) | |
| except Exception as e: | |
| results["error"] = str(e) | |
| return ( | |
| f"β Error during processing: {str(e)}", | |
| None, | |
| None, | |
| json.dumps(results, indent=2) | |
| ) | |
| finally: | |
| # Note: Keep temp files for download, clean up later | |
| pass | |
| # Create Gradio interface | |
| def create_app(): | |
| with gr.Blocks(theme=gr.themes.Soft(), title="NeRF 3D Reconstruction") as app: | |
| gr.Markdown(""" | |
| # π₯ NeRF 3D Reconstruction from Insta360 Video | |
| ### Neural Radiance Fields for Geographic Research | |
| Transform your 360Β° Insta360 videos into detailed 3D models using state-of-the-art NeRF technology. | |
| Powered by COLMAP and Nerfstudio with GPU acceleration. | |
| """) | |
| # Check GPU availability | |
| gpu_info = "" | |
| if torch.cuda.is_available(): | |
| gpu_name = torch.cuda.get_device_name(0) | |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
| gpu_info = f"π **GPU Detected:** {gpu_name} ({gpu_memory:.1f} GB)" | |
| else: | |
| gpu_info = "β οΈ **No GPU detected** - Processing will be slower" | |
| gr.Markdown(gpu_info) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Input Configuration") | |
| video_input = gr.Video( | |
| label="Upload Insta360 Video", | |
| height=300 | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("**Frame Extraction Settings**") | |
| fps_slider = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=2, | |
| step=1, | |
| label="Extraction FPS", | |
| info="Frames per second to extract (2-3 recommended)" | |
| ) | |
| max_frames_slider = gr.Slider( | |
| minimum=20, | |
| maximum=300, | |
| value=100, | |
| step=10, | |
| label="Maximum Frames", | |
| info="More frames = better quality but longer processing" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("**NeRF Training Settings**") | |
| iterations_slider = gr.Slider( | |
| minimum=500, | |
| maximum=5000, | |
| value=2000, | |
| step=500, | |
| label="Training Iterations", | |
| info="More iterations = better quality (2000 recommended)" | |
| ) | |
| process_btn = gr.Button( | |
| "π Start Full Reconstruction", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| gr.Markdown(""" | |
| ### β±οΈ Expected Processing Time | |
| - Frame extraction: 1-2 minutes | |
| - COLMAP reconstruction: 5-10 minutes | |
| - NeRF training: 10-30 minutes | |
| - **Total: ~20-45 minutes** | |
| ### π‘ Tips for Best Results | |
| - Use well-lit outdoor scenes | |
| - Ensure camera movement has overlap | |
| - Avoid fast movements | |
| - 50-150 frames optimal for most scenes | |
| """) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Results") | |
| status_output = gr.Markdown( | |
| value="Ready to process. Upload a video and click 'Start Full Reconstruction'.", | |
| label="Status" | |
| ) | |
| preview_output = gr.Image( | |
| label="Extracted Frames Preview", | |
| height=400 | |
| ) | |
| model_output = gr.File( | |
| label="π₯ Download 3D Model (PLY)", | |
| file_types=[".ply"] | |
| ) | |
| with gr.Accordion("π Technical Details", open=False): | |
| json_output = gr.JSON(label="Processing Log") | |
| gr.Markdown(""" | |
| --- | |
| ## π Geographic Research Applications | |
| ### Landscape Analysis | |
| - Terrain modeling and elevation analysis | |
| - Geomorphological feature detection | |
| - Erosion and landform studies | |
| ### Urban Geography | |
| - 3D city modeling | |
| - Building reconstruction | |
| - Urban planning visualization | |
| ### Environmental Monitoring | |
| - Vegetation structure analysis | |
| - Coastal erosion monitoring | |
| - Land use change detection | |
| ### Cultural Heritage | |
| - Archaeological site documentation | |
| - Historical structure preservation | |
| - Virtual field trips | |
| ## π Technical Pipeline | |
| 1. **Frame Extraction**: Extract key frames from 360Β° video | |
| 2. **COLMAP SfM**: Structure from Motion for camera poses | |
| 3. **NeRF Training**: Neural network learns 3D scene representation | |
| 4. **Export**: Generate point cloud in PLY format | |
| ## π Export Compatibility | |
| The generated PLY files work with: | |
| - CloudCompare (point cloud processing) | |
| - MeshLab (mesh editing) | |
| - Blender (3D modeling) | |
| - QGIS (geographic analysis) | |
| - ArcGIS (spatial analysis) | |
| --- | |
| **Powered by:** Nerfstudio β’ COLMAP β’ PyTorch β’ Gradio | |
| """) | |
| # Event handler | |
| process_btn.click( | |
| fn=process_video_full_pipeline, | |
| inputs=[ | |
| video_input, | |
| fps_slider, | |
| max_frames_slider, | |
| iterations_slider | |
| ], | |
| outputs=[ | |
| status_output, | |
| preview_output, | |
| model_output, | |
| json_output | |
| ] | |
| ) | |
| return app | |
| # Launch app | |
| if __name__ == "__main__": | |
| app = create_app() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |