|
|
|
|
|
""" |
|
|
Smoke test for YLFF pipeline using robot_unitree.mp4 video. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
try: |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import torch |
|
|
except ImportError as e: |
|
|
print(f"ERROR: Missing dependency: {e}") |
|
|
print("\nPlease install dependencies:") |
|
|
print(" pip install -e .") |
|
|
print(" # Or install manually:") |
|
|
print(" pip install torch torchvision numpy opencv-python") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
project_root = Path(__file__).parent.parent |
|
|
sys.path.insert(0, str(project_root)) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def extract_frames_from_video(video_path: Path, max_frames: int = 10) -> list: |
|
|
"""Extract frames from video file.""" |
|
|
logger.info(f"Extracting frames from {video_path}") |
|
|
|
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Could not open video: {video_path}") |
|
|
|
|
|
frames = [] |
|
|
frame_count = 0 |
|
|
|
|
|
while len(frames) < max_frames: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
frames.append(frame_rgb) |
|
|
frame_count += 1 |
|
|
|
|
|
cap.release() |
|
|
logger.info(f"Extracted {len(frames)} frames from video") |
|
|
return frames |
|
|
|
|
|
|
|
|
def test_da3_inference(frames: list): |
|
|
"""Test DA3 model inference.""" |
|
|
logger.info("Testing DA3 inference...") |
|
|
|
|
|
try: |
|
|
from ylff.utils.model_loader import load_da3_model |
|
|
|
|
|
|
|
|
logger.info("Loading DA3 model...") |
|
|
model = load_da3_model( |
|
|
"depth-anything/DA3-SMALL", device="cuda" if torch.cuda.is_available() else "cpu" |
|
|
) |
|
|
logger.info("β Model loaded") |
|
|
|
|
|
|
|
|
logger.info(f"Running inference on {len(frames)} frames...") |
|
|
with torch.no_grad(): |
|
|
output = model.inference(frames) |
|
|
|
|
|
logger.info("β Inference complete") |
|
|
logger.info(f" - Depth shape: {output.depth.shape}") |
|
|
logger.info(f" - Poses shape: {output.extrinsics.shape}") |
|
|
intrinsics_shape = output.intrinsics.shape if hasattr(output, "intrinsics") else "N/A" |
|
|
logger.info(f" - Intrinsics shape: {intrinsics_shape}") |
|
|
|
|
|
return output |
|
|
|
|
|
except ImportError as e: |
|
|
logger.error(f"Failed to import DA3: {e}") |
|
|
logger.error("Make sure DA3 is installed or available from HuggingFace") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"DA3 inference failed: {e}") |
|
|
import traceback |
|
|
|
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
|
|
|
def test_ba_validator_structure(frames: list, poses: np.ndarray): |
|
|
"""Test BA validator structure (without full BA execution).""" |
|
|
logger.info("Testing BA validator structure...") |
|
|
|
|
|
try: |
|
|
from ylff.services.ba_validator import BAValidator |
|
|
|
|
|
|
|
|
validator = BAValidator( |
|
|
accept_threshold=2.0, |
|
|
reject_threshold=30.0, |
|
|
) |
|
|
logger.info("β BA validator created") |
|
|
|
|
|
|
|
|
logger.info("Testing pose error computation...") |
|
|
|
|
|
|
|
|
poses_target = poses.copy() |
|
|
poses_target[0, :3, 3] += 0.1 |
|
|
|
|
|
error_metrics = validator._compute_pose_error(poses, poses_target) |
|
|
logger.info("β Pose error computation works") |
|
|
logger.info(f" - Max rotation error: {error_metrics['max_rotation_error_deg']:.2f}Β°") |
|
|
logger.info(f" - Mean rotation error: {error_metrics['mean_rotation_error_deg']:.2f}Β°") |
|
|
|
|
|
return True |
|
|
|
|
|
except ImportError as e: |
|
|
logger.warning(f"BA validator dependencies not available: {e}") |
|
|
logger.warning("This is expected if pycolmap/hloc are not installed") |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"BA validator test failed: {e}") |
|
|
import traceback |
|
|
|
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
def test_data_pipeline_structure(frames: list): |
|
|
"""Test data pipeline structure.""" |
|
|
logger.info("Testing data pipeline structure...") |
|
|
|
|
|
try: |
|
|
from ylff.services.data_pipeline import BADataPipeline |
|
|
from ylff.services.ba_validator import BAValidator |
|
|
from ylff.utils.model_loader import load_da3_model |
|
|
|
|
|
|
|
|
model = load_da3_model( |
|
|
"depth-anything/DA3-SMALL", device="cuda" if torch.cuda.is_available() else "cpu" |
|
|
) |
|
|
validator = BAValidator() |
|
|
pipeline = BADataPipeline(model, validator) |
|
|
|
|
|
logger.info("β Data pipeline created") |
|
|
logger.info(f" - Stats: {pipeline.stats}") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Data pipeline test skipped: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_loss_functions(): |
|
|
"""Test loss function computation.""" |
|
|
logger.info("Testing loss functions...") |
|
|
|
|
|
try: |
|
|
import torch |
|
|
|
|
|
from ylff.utils.losses import geodesic_rotation_loss, pose_loss |
|
|
|
|
|
|
|
|
poses1 = torch.randn(5, 3, 4) |
|
|
poses2 = poses1 + torch.randn(5, 3, 4) * 0.1 |
|
|
|
|
|
|
|
|
rot_loss = geodesic_rotation_loss(poses1[:, :3, :3], poses2[:, :3, :3]) |
|
|
logger.info(f"β Rotation loss: {rot_loss.item():.4f}") |
|
|
|
|
|
|
|
|
pose_loss_val = pose_loss(poses1, poses2) |
|
|
logger.info(f"β Pose loss: {pose_loss_val.item():.4f}") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Loss function test failed: {e}") |
|
|
import traceback |
|
|
|
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Run smoke tests.""" |
|
|
logger.info("=" * 60) |
|
|
logger.info("YLFF Smoke Test") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
video_path = project_root / "assets" / "examples" / "robot_unitree.mp4" |
|
|
|
|
|
if not video_path.exists(): |
|
|
logger.error(f"Video not found: {video_path}") |
|
|
return 1 |
|
|
|
|
|
|
|
|
logger.info("\n[Test 1] Extracting frames from video...") |
|
|
try: |
|
|
frames = extract_frames_from_video(video_path, max_frames=5) |
|
|
logger.info(f"β Extracted {len(frames)} frames") |
|
|
except Exception as e: |
|
|
logger.error(f"β Frame extraction failed: {e}") |
|
|
return 1 |
|
|
|
|
|
|
|
|
logger.info("\n[Test 2] Testing DA3 inference...") |
|
|
output = test_da3_inference(frames) |
|
|
if output is None: |
|
|
logger.error("β DA3 inference test failed") |
|
|
return 1 |
|
|
|
|
|
|
|
|
logger.info("\n[Test 3] Testing loss functions...") |
|
|
if not test_loss_functions(): |
|
|
logger.error("β Loss function test failed") |
|
|
return 1 |
|
|
|
|
|
|
|
|
logger.info("\n[Test 4] Testing BA validator structure...") |
|
|
test_ba_validator_structure(frames, output.extrinsics) |
|
|
|
|
|
|
|
|
logger.info("\n[Test 5] Testing data pipeline structure...") |
|
|
test_data_pipeline_structure(frames) |
|
|
|
|
|
logger.info("\n" + "=" * 60) |
|
|
logger.info("β Smoke test complete!") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|