| import faulthandler
|
| faulthandler.enable()
|
|
|
| import gradio as gr
|
| import cv2
|
| import numpy as np
|
| import trimesh
|
| import tempfile
|
| import os
|
| import logging
|
|
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| os.environ['PYOPENGL_PLATFORM'] = 'osmesa'
|
|
|
|
|
|
|
|
|
| _checkerboard_colors = None
|
|
|
|
|
|
|
|
|
| def read_video_frames(video_path, start=0, end=None, frame_step=1):
|
| """Read video frames with proper error handling"""
|
| try:
|
| cap = cv2.VideoCapture(video_path)
|
| if not cap.isOpened():
|
| raise ValueError(f"Cannot open video file: {video_path}")
|
|
|
| frames = []
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
| if total_frames == 0:
|
| raise ValueError("Video file appears to be empty or corrupted")
|
|
|
| if end is None or end > total_frames:
|
| end = total_frames
|
|
|
| count = 0
|
| frames_read = 0
|
|
|
| while True:
|
| ret, frame = cap.read()
|
| if not ret or count >= end:
|
| break
|
|
|
| if count >= start and (count - start) % frame_step == 0:
|
|
|
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
| frames.append(frame)
|
| frames_read += 1
|
|
|
| count += 1
|
|
|
| cap.release()
|
|
|
| if not frames:
|
| raise ValueError("No frames could be read from the video")
|
|
|
| logger.info(f"Successfully read {frames_read} frames")
|
| return np.array(frames)
|
|
|
| except Exception as e:
|
| logger.error(f"Error reading video: {str(e)}")
|
| raise
|
|
|
|
|
|
|
|
|
| def downsample_frames(frames, block_size=1, method='stride'):
|
| """Downsample frames with better error handling"""
|
| if block_size == 1 or frames.size == 0:
|
| return frames
|
|
|
| z, h, w, c = frames.shape
|
|
|
| if method == 'stride':
|
| return frames[:, ::block_size, ::block_size]
|
|
|
| elif method == 'mean':
|
| new_h = h // block_size
|
| new_w = w // block_size
|
| out = np.zeros((z, new_h, new_w, c), dtype=np.uint8)
|
|
|
| for zi in range(z):
|
| for i in range(new_h):
|
| for j in range(new_w):
|
| block = frames[
|
| zi,
|
| i*block_size:(i+1)*block_size,
|
| j*block_size:(j+1)*block_size
|
| ]
|
| if block.size > 0:
|
| out[zi, i, j] = block.mean(axis=(0,1)).astype(np.uint8)
|
| return out
|
|
|
| return frames
|
|
|
|
|
|
|
|
|
| def frames_to_voxels(frames, threshold=10):
|
| """Convert frames to voxel representation"""
|
| if frames.size == 0:
|
| return np.array([])
|
|
|
|
|
| if len(frames.shape) == 4:
|
| return (np.sum(frames, axis=3) > threshold)
|
| else:
|
| raise ValueError("Frames must be 4D array (z, h, w, c)")
|
|
|
|
|
|
|
|
|
| def voxels_to_mesh(frames, voxels, voxel_size=1.0):
|
| """Convert voxels to mesh with proper color handling"""
|
| if voxels.size == 0 or frames.size == 0:
|
| return trimesh.Scene()
|
|
|
| meshes = []
|
| z_len, h, w = voxels.shape
|
|
|
| for z in range(z_len):
|
| for y in range(h):
|
| for x in range(w):
|
| if voxels[z, y, x]:
|
|
|
| if z < frames.shape[0] and y < frames.shape[1] and x < frames.shape[2]:
|
| color = frames[z, frames.shape[1] - 1 - y, x].astype(np.uint8)
|
|
|
| try:
|
| cube = trimesh.creation.box(extents=[voxel_size]*3)
|
| cube.apply_translation([x, y, z])
|
|
|
|
|
| rgba = np.append(color, 255)
|
| cube.visual.face_colors = np.tile(rgba, (12,1))
|
|
|
| meshes.append(cube)
|
| except Exception as e:
|
| logger.warning(f"Could not create cube at position ({x}, {y}, {z}): {str(e)}")
|
|
|
| if meshes:
|
| try:
|
| return trimesh.util.concatenate(meshes)
|
| except Exception as e:
|
| logger.warning(f"Could not concatenate meshes: {str(e)}")
|
| return meshes[0] if meshes else trimesh.Scene()
|
|
|
| return trimesh.Scene()
|
|
|
|
|
|
|
|
|
| def default_checkerboard():
|
| """Generate a default checkerboard pattern"""
|
| global _checkerboard_colors
|
|
|
| h, w, z_len = 10, 10, 2
|
| frames = np.zeros((z_len, h, w, 3), dtype=np.uint8)
|
|
|
| if _checkerboard_colors is None:
|
| _checkerboard_colors = np.random.randint(
|
| 0, 256, size=(z_len, h, w, 3), dtype=np.uint8
|
| )
|
|
|
| for z in range(z_len):
|
| for y in range(h):
|
| for x in range(w):
|
| if (x + y + z) % 2 == 0:
|
| frames[z, y, x] = [0, 0, 0]
|
| else:
|
| frames[z, y, x] = _checkerboard_colors[z, y, x]
|
|
|
| try:
|
| voxels = frames_to_voxels(frames, threshold=1)
|
| mesh = voxels_to_mesh(frames, voxels, voxel_size=2)
|
|
|
| tmp = tempfile.gettempdir()
|
| obj = os.path.join(tmp, "checkerboard.obj")
|
| glb = os.path.join(tmp, "checkerboard.glb")
|
|
|
| mesh.export(obj)
|
| mesh.export(glb)
|
|
|
| return obj, glb, glb
|
| except Exception as e:
|
| logger.error(f"Error creating checkerboard: {str(e)}")
|
| raise
|
|
|
|
|
|
|
|
|
| def generate_voxel_files(
|
| video_file,
|
| start_frame,
|
| end_frame,
|
| frame_step,
|
| block_size,
|
| downsample_method
|
| ):
|
| """Main function to generate voxel files from video"""
|
| try:
|
| if video_file is None:
|
| logger.info("No video file provided, generating checkerboard")
|
| return default_checkerboard()
|
|
|
|
|
| video_path = getattr(video_file, 'name', video_file)
|
| if not video_path or not os.path.exists(video_path):
|
| raise ValueError("Invalid video file path")
|
|
|
| logger.info(f"Processing video: {video_path}")
|
|
|
| frames = read_video_frames(
|
| video_path,
|
| start=start_frame,
|
| end=end_frame,
|
| frame_step=frame_step
|
| )
|
|
|
| if frames.size == 0:
|
| raise ValueError("No frames could be processed")
|
|
|
| frames = downsample_frames(
|
| frames,
|
| block_size=block_size,
|
| method=downsample_method
|
| )
|
|
|
| voxels = frames_to_voxels(frames)
|
| mesh = voxels_to_mesh(frames, voxels)
|
|
|
| tmp = tempfile.gettempdir()
|
| obj = os.path.join(tmp, "output.obj")
|
| glb = os.path.join(tmp, "output.glb")
|
|
|
| mesh.export(obj)
|
| mesh.export(glb)
|
|
|
| logger.info("Successfully generated voxel files")
|
| return obj, glb, glb
|
|
|
| except Exception as e:
|
| logger.error(f"Error in generate_voxel_files: {str(e)}")
|
|
|
| return default_checkerboard()
|
|
|
|
|
|
|
|
|
| def create_interface():
|
| """Create Gradio 6.0+ compatible interface using Blocks"""
|
|
|
| with gr.Blocks(
|
| title="MP4 → Voxels → 3D",
|
| theme=gr.themes.Soft(),
|
| css="""
|
| .gradio-container {max-width: 1200px !important; margin: auto !important;}
|
| .output-file {margin: 10px 0;}
|
| """
|
| ) as interface:
|
|
|
| gr.Markdown("# 📹 MP4 → Voxels → 3D")
|
| gr.Markdown("Convert video files into voxelized 3D meshes. If no file is uploaded, a random-color checkerboard appears.")
|
|
|
| with gr.Row():
|
| with gr.Column(scale=1):
|
| video_input = gr.File(
|
| label="Upload MP4 Video",
|
| file_types=["video"],
|
| file_count="single"
|
| )
|
|
|
| gr.Markdown("### Frame Settings")
|
| start_frame = gr.Slider(
|
| minimum=0,
|
| maximum=500,
|
| value=0,
|
| step=1,
|
| label="Start Frame"
|
| )
|
|
|
| end_frame = gr.Slider(
|
| minimum=0,
|
| maximum=500,
|
| value=50,
|
| step=1,
|
| label="End Frame"
|
| )
|
|
|
| frame_step = gr.Slider(
|
| minimum=1,
|
| maximum=10,
|
| value=1,
|
| step=1,
|
| label="Frame Step"
|
| )
|
|
|
| gr.Markdown("### Processing Settings")
|
| block_size = gr.Slider(
|
| minimum=1,
|
| maximum=32,
|
| value=1,
|
| step=1,
|
| label="Pixel Block Size"
|
| )
|
|
|
| downsample_method = gr.Radio(
|
| choices=["stride", "mean"],
|
| value="stride",
|
| label="Downsample Method"
|
| )
|
|
|
| process_btn = gr.Button("🔄 Convert to Voxels", variant="primary")
|
|
|
| with gr.Column(scale=2):
|
| with gr.Row():
|
| obj_output = gr.File(label="OBJ File", file_types=[".obj"])
|
| glb_output = gr.File(label="GLB File", file_types=[".glb"])
|
|
|
| model_3d = gr.Model3D(
|
| label="3D Preview",
|
| height=600,
|
| camera_position=[0, 0, 0]
|
| )
|
|
|
| status = gr.Textbox(
|
| label="Status",
|
| value="Ready to process...",
|
| interactive=False
|
| )
|
|
|
|
|
| def update_status(message):
|
| return gr.update(value=message)
|
|
|
| def process_with_status(video_file, start, end, step, block, method):
|
| status_update = gr.update(value="Processing video...")
|
| yield [status_update, None, None, None]
|
|
|
| try:
|
| result = generate_voxel_files(video_file, start, end, step, block, method)
|
| if result and len(result) == 3:
|
| obj_path, glb_path, glb_preview = result
|
| status_update = gr.update(value="✅ Processing complete!")
|
| yield [status_update, obj_path, glb_path, glb_preview]
|
| else:
|
| status_update = gr.update(value="❌ Processing failed")
|
| yield [status_update, None, None, None]
|
| except Exception as e:
|
| logger.error(f"Processing error: {str(e)}")
|
| status_update = gr.update(value=f"❌ Error: {str(e)}")
|
| yield [status_update, None, None, None]
|
|
|
|
|
| process_btn.click(
|
| fn=process_with_status,
|
| inputs=[
|
| video_input,
|
| start_frame,
|
| end_frame,
|
| frame_step,
|
| block_size,
|
| downsample_method
|
| ],
|
| outputs=[
|
| status,
|
| obj_output,
|
| glb_output,
|
| model_3d
|
| ]
|
| )
|
|
|
|
|
| video_input.upload(
|
| fn=process_with_status,
|
| inputs=[
|
| video_input,
|
| start_frame,
|
| end_frame,
|
| frame_step,
|
| block_size,
|
| downsample_method
|
| ],
|
| outputs=[
|
| status,
|
| obj_output,
|
| glb_output,
|
| model_3d
|
| ]
|
| )
|
|
|
|
|
| gr.Examples(
|
| examples=[
|
| [None, 0, 50, 1, 1, "stride"]
|
| ],
|
| inputs=[
|
| video_input,
|
| start_frame,
|
| end_frame,
|
| frame_step,
|
| block_size,
|
| downsample_method
|
| ],
|
| label="Example Configurations"
|
| )
|
|
|
| return interface
|
|
|
| if __name__ == "__main__":
|
| try:
|
| interface = create_interface()
|
| interface.launch(
|
| server_name="0.0.0.0",
|
| server_port=7860,
|
| debug=True,
|
| share=False,
|
| show_error=True
|
| )
|
| except Exception as e:
|
| logger.error(f"Failed to launch Gradio interface: {str(e)}")
|
| raise |