""" ARKit Data Processor: Extract and process ARKit video and metadata. """ import json import logging from pathlib import Path from typing import Dict, List, Optional, Tuple import cv2 import numpy as np logger = logging.getLogger(__name__) class ARKitProcessor: """Process ARKit video and metadata for BA validation.""" def __init__( self, arkit_dir: Optional[Path] = None, video_path: Optional[Path] = None, metadata_path: Optional[Path] = None, ): """ Initialize ARKit processor. Can be initialized from: 1. Directory structure (arkit_dir with videos/ and json-metadata/ subdirs) 2. Explicit paths (video_path and metadata_path) Args: arkit_dir: Directory containing ARKit data (with videos/ and json-metadata/ subdirs) video_path: Path to ARKit video file (.MOV) - used if arkit_dir not provided metadata_path: Path to ARKit metadata JSON file - used if arkit_dir not provided """ if arkit_dir: # Directory-based initialization arkit_dir = Path(arkit_dir) # Find video file (recursive search for flexibility) video_files = list(arkit_dir.rglob("*.MOV")) + list(arkit_dir.rglob("*.mov")) if not video_files: raise FileNotFoundError(f"No video file found in {arkit_dir}") self.video_path = video_files[0] # Find metadata file (recursive search for flexibility) metadata_files = list(arkit_dir.rglob("*.json")) if not metadata_files: raise FileNotFoundError(f"No metadata file found in {arkit_dir}") self.metadata_path = metadata_files[0] else: # Explicit path initialization if video_path is None or metadata_path is None: raise ValueError( "Either arkit_dir or both video_path and metadata_path must be provided" ) self.video_path = Path(video_path) self.metadata_path = Path(metadata_path) if not self.video_path.exists(): raise FileNotFoundError(f"Video not found: {self.video_path}") if not self.metadata_path.exists(): raise FileNotFoundError(f"Metadata not found: {self.metadata_path}") # Load metadata with open(self.metadata_path) as f: self.metadata = json.load(f) # Support both 'frames' (standard) and 'arkit_poses' (new user format) self.frames_data = self.metadata.get("frames") or self.metadata.get("arkit_poses", []) logger.info(f"Loaded ARKit metadata: {len(self.frames_data)} frames") logger.info(f" Video: {self.video_path.name}") logger.info(f" Metadata: {self.metadata_path.name}") def extract_frames( self, output_dir: Optional[Path] = None, max_frames: Optional[int] = None, frame_interval: int = 1, return_images: bool = True, ) -> List: """ Extract frames from ARKit video. Args: output_dir: Directory to save extracted frames max_frames: Maximum number of frames to extract frame_interval: Extract every Nth frame return_images: Whether to return images in memory (list of numpy arrays) Returns: List of extracted frame paths (if return_images=False) or images (if return_images=True) """ if output_dir: output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) elif not return_images: raise ValueError("output_dir must be provided if return_images is False") cap = cv2.VideoCapture(str(self.video_path)) if not cap.isOpened(): raise ValueError(f"Could not open video: {self.video_path}") total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) logger.info(f"Video: {total_frames} frames, {fps:.2f} fps") extracted_results = [] frame_idx = 0 saved_count = 0 while True: ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: if max_frames and saved_count >= max_frames: break if output_dir: frame_path = output_dir / f"frame_{frame_idx:06d}.jpg" cv2.imwrite(str(frame_path), frame) if not return_images: extracted_results.append(frame_path) if return_images: img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) extracted_results.append(img_rgb) saved_count += 1 frame_idx += 1 cap.release() if return_images: logger.info(f"Extracted {len(extracted_results)} frames") else: logger.info(f"Extracted {len(extracted_results)} frames to {output_dir}") return extracted_results def get_arkit_poses( self, frame_indices: Optional[List[int]] = None ) -> Tuple[np.ndarray, np.ndarray]: """ Extract ARKit poses and intrinsics from metadata. Args: frame_indices: Optional list of frame indices to extract. If None, extracts all frames. Returns: Tuple of (poses, intrinsics) - poses: (N, 4, 4) camera-to-world transformation matrices - intrinsics: (N, 3, 3) camera intrinsics matrices """ if frame_indices is None: frame_indices = list(range(len(self.frames_data))) poses = [] intrinsics = [] # Get video resolution for intrinsic scaling cap = cv2.VideoCapture(str(self.video_path)) video_w = cap.get(cv2.CAP_PROP_FRAME_WIDTH) video_h = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) cap.release() for idx in frame_indices: if idx >= len(self.frames_data): logger.warning(f"Frame index {idx} out of range") continue frame_data = self.frames_data[idx] # Support both 'camera' (standard) and top-level keys (user format) camera = frame_data.get("camera", {}) # Extract view matrix (camera-to-world) # Standard: camera.viewMatrix # User format: camera_pose view_matrix_raw = camera.get("viewMatrix") or frame_data.get("camera_pose") view_matrix = np.array(view_matrix_raw) if view_matrix_raw is not None else np.array([]) if view_matrix.shape == (4, 4): poses.append(view_matrix) else: logger.warning(f"Invalid view matrix for frame {idx}") poses.append(np.eye(4)) # Extract intrinsics # Standard: camera.intrinsics (3x3 array) # User format: intrinsics (object with fx, fy, cx, cy) intrinsics_raw = camera.get("intrinsics") or frame_data.get("intrinsics") if isinstance(intrinsics_raw, dict): # User format object fx = intrinsics_raw.get("fx", 1000) fy = intrinsics_raw.get("fy", 1000) cx = intrinsics_raw.get("cx", 0) cy = intrinsics_raw.get("cy", 0) # Auto-scale intrinsics to video resolution meta_w = intrinsics_raw.get("width", video_w) meta_h = intrinsics_raw.get("height", video_h) if meta_w != video_w and meta_w > 0: scale_x = video_w / meta_w fx *= scale_x cx *= scale_x if meta_h != video_h and meta_h > 0: scale_y = video_h / meta_h fy *= scale_y cy *= scale_y intr_array = np.array([ [fx, 0, cx], [0, fy, cy], [0, 0, 1] ]) intrinsics.append(intr_array) elif isinstance(intrinsics_raw, (list, np.ndarray)) and np.array(intrinsics_raw).shape == (3, 3): # Standard array format intrinsics.append(np.array(intrinsics_raw)) else: logger.warning(f"Invalid intrinsics for frame {idx}") intrinsics.append(np.eye(3) * 1000) poses = np.array(poses) intrinsics = np.array(intrinsics) logger.info(f"Extracted {len(poses)} ARKit poses and intrinsics (scaled to {int(video_w)}x{int(video_h)})") return poses, intrinsics def convert_arkit_to_w2c( self, c2w_poses: np.ndarray, convert_coords: bool = True ) -> np.ndarray: """ Convert ARKit camera-to-world poses to world-to-camera (for DA3 compatibility). Args: c2w_poses: (N, 4, 4) camera-to-world poses (ARKit convention, Y-up) convert_coords: If True, convert from ARKit (Y-up) to OpenCV/DA3 (Z-up) convention Returns: (N, 3, 4) world-to-camera poses (DA3 format, OpenCV convention if convert_coords=True) """ from ..utils.coordinate_utils import convert_arkit_c2w_to_w2c w2c_poses = [] for c2w in c2w_poses: w2c = convert_arkit_c2w_to_w2c(c2w, convert_coords=convert_coords) w2c_poses.append(w2c) return np.array(w2c_poses) def get_tracking_status(self, frame_indices: Optional[List[int]] = None) -> List[Dict]: """ Get tracking status for frames. Args: frame_indices: Optional list of frame indices Returns: List of tracking status dicts with keys: - trackingStateReason: 'normal', 'initializing', 'relocalizing', etc. - worldMappingStatus: 'mapped', 'extending', 'limited', 'notAvailable' - featurePointCount: Number of tracked feature points """ if frame_indices is None: frame_indices = list(range(len(self.frames_data))) statuses = [] for idx in frame_indices: if idx >= len(self.frames_data): continue frame_data = self.frames_data[idx] # Support both 'camera' (standard) and top-level keys (user format) camera = frame_data.get("camera", {}) has_pose_raw = camera.get("viewMatrix") or frame_data.get("camera_pose") status = { "trackingStateReason": camera.get("trackingStateReason", "normal"), # Default to normal "trackingState": camera.get("trackingState", "normal"), "worldMappingStatus": frame_data.get("worldMappingStatus", "mapped"), "featurePointCount": frame_data.get("featurePointCount", 100), # Assume enough points if pose exists "hasPose": has_pose_raw is not None, "frameIndex": frame_data.get("frameIndex", idx), "timestamp": frame_data.get("timestamp", 0), } statuses.append(status) return statuses def filter_good_frames( self, min_feature_points: int = 50, # Lowered default exclude_states: List[str] = ["relocalizing"], # Only exclude relocalizing exclude_tracking_states: List[str] = ["notAvailable"], ) -> List[int]: """ Filter frames with good tracking status. Args: min_feature_points: Minimum number of feature points exclude_states: Tracking state reasons to exclude exclude_tracking_states: Tracking states to exclude (e.g., 'notAvailable') Returns: List of frame indices with good tracking """ good_indices = [] statuses = self.get_tracking_status() for idx, status in enumerate(statuses): # Check tracking state reason if status["trackingStateReason"] in exclude_states and status["trackingStateReason"] != "normal": continue # Check tracking state if status.get("trackingState", "") in exclude_tracking_states and status.get("trackingState", "") != "normal": continue # Check feature points # If we have a pose but no feature count (user format), we assume it's good if status["featurePointCount"] < min_feature_points and not status.get("hasPose", False): continue good_indices.append(idx) logger.info(f"Found {len(good_indices)}/{len(statuses)} frames with good tracking") return good_indices def process_for_ba_validation( self, output_dir: Path, max_frames: Optional[int] = None, frame_interval: int = 1, use_good_tracking_only: bool = True, ) -> Dict: """ Process ARKit data for BA validation. Args: output_dir: Output directory for frames and data max_frames: Maximum frames to process frame_interval: Extract every Nth frame use_good_tracking_only: Only use frames with good tracking Returns: Dictionary with: - image_paths: List of frame paths - arkit_poses: ARKit poses (c2w, 4x4) - arkit_poses_w2c: ARKit poses (w2c, 3x4) for DA3 - arkit_intrinsics: ARKit intrinsics (3x3) - tracking_status: List of tracking status dicts """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Filter frames if needed if use_good_tracking_only: good_indices = self.filter_good_frames() if len(good_indices) == 0: logger.warning("No frames with good tracking found. Using all frames.") good_indices = None else: good_indices = None # Extract frames image_dir = output_dir / "images" image_paths = self.extract_frames( image_dir, max_frames=max_frames, frame_interval=frame_interval, ) # Map image paths to frame indices # Assuming frames are extracted in order if good_indices: # Filter to only good indices frame_indices = [ good_indices[i] for i in range(len(image_paths)) if i < len(good_indices) ] else: frame_indices = list(range(len(image_paths))) # Get ARKit poses and intrinsics c2w_poses, intrinsics = self.get_arkit_poses(frame_indices) w2c_poses = self.convert_arkit_to_w2c(c2w_poses) # Get tracking status tracking_status = self.get_tracking_status(frame_indices) # Save ARKit data np.save(output_dir / "arkit_poses_c2w.npy", c2w_poses) np.save(output_dir / "arkit_poses_w2c.npy", w2c_poses) np.save(output_dir / "arkit_intrinsics.npy", intrinsics) result = { "image_paths": [str(p) for p in image_paths], "arkit_poses_c2w": c2w_poses, "arkit_poses_w2c": w2c_poses, "arkit_intrinsics": intrinsics, "tracking_status": tracking_status, "frame_indices": frame_indices, } logger.info(f"Processed ARKit data: {len(image_paths)} frames") logger.info(f" - Poses: {c2w_poses.shape}") logger.info(f" - Intrinsics: {intrinsics.shape}") return result def get_lidar_depths(self, frame_indices: Optional[List[int]] = None) -> Optional[np.ndarray]: """ Extract LiDAR depth maps from ARKit metadata (if available). Note: LiDAR depth is typically sparse and may not be available in all frames. This is a placeholder - actual implementation would need to extract from ARKit's depth buffers if available in metadata. Args: frame_indices: Optional list of frame indices Returns: (N, H, W) depth maps or None if not available """ # TODO: Implement actual LiDAR depth extraction from ARKit metadata # ARKit LiDAR depth is typically 256x192 and may be in depth buffers # For now, return None to indicate not available logger.warning("LiDAR depth extraction not yet implemented") return None