3d_model / scripts /tests /smoke_test.py
Azan
Clean deployment build (Squashed)
7a87926
#!/usr/bin/env python3
"""
Smoke test for YLFF pipeline using robot_unitree.mp4 video.
"""
import logging
import sys
from pathlib import Path
# Check dependencies
try:
import cv2
import numpy as np
import torch
except ImportError as e:
print(f"ERROR: Missing dependency: {e}")
print("\nPlease install dependencies:")
print(" pip install -e .")
print(" # Or install manually:")
print(" pip install torch torchvision numpy opencv-python")
sys.exit(1)
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def extract_frames_from_video(video_path: Path, max_frames: int = 10) -> list:
"""Extract frames from video file."""
logger.info(f"Extracting frames from {video_path}")
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
frames = []
frame_count = 0
while len(frames) < max_frames:
ret, frame = cap.read()
if not ret:
break
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(frame_rgb)
frame_count += 1
cap.release()
logger.info(f"Extracted {len(frames)} frames from video")
return frames
def test_da3_inference(frames: list):
"""Test DA3 model inference."""
logger.info("Testing DA3 inference...")
try:
from ylff.utils.model_loader import load_da3_model
# Load model
logger.info("Loading DA3 model...")
model = load_da3_model(
"depth-anything/DA3-SMALL", device="cuda" if torch.cuda.is_available() else "cpu"
)
logger.info("βœ“ Model loaded")
# Run inference
logger.info(f"Running inference on {len(frames)} frames...")
with torch.no_grad():
output = model.inference(frames)
logger.info("βœ“ Inference complete")
logger.info(f" - Depth shape: {output.depth.shape}")
logger.info(f" - Poses shape: {output.extrinsics.shape}")
intrinsics_shape = output.intrinsics.shape if hasattr(output, "intrinsics") else "N/A"
logger.info(f" - Intrinsics shape: {intrinsics_shape}")
return output
except ImportError as e:
logger.error(f"Failed to import DA3: {e}")
logger.error("Make sure DA3 is installed or available from HuggingFace")
return None
except Exception as e:
logger.error(f"DA3 inference failed: {e}")
import traceback
traceback.print_exc()
return None
def test_ba_validator_structure(frames: list, poses: np.ndarray):
"""Test BA validator structure (without full BA execution)."""
logger.info("Testing BA validator structure...")
try:
from ylff.services.ba_validator import BAValidator
# Create validator
validator = BAValidator(
accept_threshold=2.0,
reject_threshold=30.0,
)
logger.info("βœ“ BA validator created")
# Test pose error computation (without full BA)
logger.info("Testing pose error computation...")
# Create dummy target poses (slightly different)
poses_target = poses.copy()
poses_target[0, :3, 3] += 0.1 # Small translation change
error_metrics = validator._compute_pose_error(poses, poses_target)
logger.info("βœ“ Pose error computation works")
logger.info(f" - Max rotation error: {error_metrics['max_rotation_error_deg']:.2f}Β°")
logger.info(f" - Mean rotation error: {error_metrics['mean_rotation_error_deg']:.2f}Β°")
return True
except ImportError as e:
logger.warning(f"BA validator dependencies not available: {e}")
logger.warning("This is expected if pycolmap/hloc are not installed")
return False
except Exception as e:
logger.error(f"BA validator test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_data_pipeline_structure(frames: list):
"""Test data pipeline structure."""
logger.info("Testing data pipeline structure...")
try:
from ylff.services.data_pipeline import BADataPipeline
from ylff.services.ba_validator import BAValidator
from ylff.utils.model_loader import load_da3_model
# Create pipeline components
model = load_da3_model(
"depth-anything/DA3-SMALL", device="cuda" if torch.cuda.is_available() else "cpu"
)
validator = BAValidator()
pipeline = BADataPipeline(model, validator)
logger.info("βœ“ Data pipeline created")
logger.info(f" - Stats: {pipeline.stats}")
return True
except Exception as e:
logger.warning(f"Data pipeline test skipped: {e}")
return False
def test_loss_functions():
"""Test loss function computation."""
logger.info("Testing loss functions...")
try:
import torch
from ylff.utils.losses import geodesic_rotation_loss, pose_loss
# Create dummy poses
poses1 = torch.randn(5, 3, 4)
poses2 = poses1 + torch.randn(5, 3, 4) * 0.1
# Test rotation loss
rot_loss = geodesic_rotation_loss(poses1[:, :3, :3], poses2[:, :3, :3])
logger.info(f"βœ“ Rotation loss: {rot_loss.item():.4f}")
# Test pose loss
pose_loss_val = pose_loss(poses1, poses2)
logger.info(f"βœ“ Pose loss: {pose_loss_val.item():.4f}")
return True
except Exception as e:
logger.error(f"Loss function test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run smoke tests."""
logger.info("=" * 60)
logger.info("YLFF Smoke Test")
logger.info("=" * 60)
video_path = project_root / "assets" / "examples" / "robot_unitree.mp4"
if not video_path.exists():
logger.error(f"Video not found: {video_path}")
return 1
# Test 1: Extract frames
logger.info("\n[Test 1] Extracting frames from video...")
try:
frames = extract_frames_from_video(video_path, max_frames=5)
logger.info(f"βœ“ Extracted {len(frames)} frames")
except Exception as e:
logger.error(f"βœ— Frame extraction failed: {e}")
return 1
# Test 2: DA3 inference
logger.info("\n[Test 2] Testing DA3 inference...")
output = test_da3_inference(frames)
if output is None:
logger.error("βœ— DA3 inference test failed")
return 1
# Test 3: Loss functions
logger.info("\n[Test 3] Testing loss functions...")
if not test_loss_functions():
logger.error("βœ— Loss function test failed")
return 1
# Test 4: BA validator structure
logger.info("\n[Test 4] Testing BA validator structure...")
test_ba_validator_structure(frames, output.extrinsics)
# Test 5: Data pipeline structure
logger.info("\n[Test 5] Testing data pipeline structure...")
test_data_pipeline_structure(frames)
logger.info("\n" + "=" * 60)
logger.info("βœ“ Smoke test complete!")
logger.info("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())