|
|
""" |
|
|
ARKit Data Processor: Extract and process ARKit video and metadata. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
import cv2 |
|
|
import numpy as np |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ARKitProcessor: |
|
|
"""Process ARKit video and metadata for BA validation.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
arkit_dir: Optional[Path] = None, |
|
|
video_path: Optional[Path] = None, |
|
|
metadata_path: Optional[Path] = None, |
|
|
): |
|
|
""" |
|
|
Initialize ARKit processor. Can be initialized from: |
|
|
1. Directory structure (arkit_dir with videos/ and json-metadata/ subdirs) |
|
|
2. Explicit paths (video_path and metadata_path) |
|
|
|
|
|
Args: |
|
|
arkit_dir: Directory containing ARKit data (with videos/ and json-metadata/ subdirs) |
|
|
video_path: Path to ARKit video file (.MOV) - used if arkit_dir not provided |
|
|
metadata_path: Path to ARKit metadata JSON file - used if arkit_dir not provided |
|
|
""" |
|
|
if arkit_dir: |
|
|
|
|
|
arkit_dir = Path(arkit_dir) |
|
|
|
|
|
video_files = list(arkit_dir.rglob("*.MOV")) + list(arkit_dir.rglob("*.mov")) |
|
|
if not video_files: |
|
|
raise FileNotFoundError(f"No video file found in {arkit_dir}") |
|
|
self.video_path = video_files[0] |
|
|
|
|
|
|
|
|
metadata_files = list(arkit_dir.rglob("*.json")) |
|
|
if not metadata_files: |
|
|
raise FileNotFoundError(f"No metadata file found in {arkit_dir}") |
|
|
self.metadata_path = metadata_files[0] |
|
|
else: |
|
|
|
|
|
if video_path is None or metadata_path is None: |
|
|
raise ValueError( |
|
|
"Either arkit_dir or both video_path and metadata_path must be provided" |
|
|
) |
|
|
self.video_path = Path(video_path) |
|
|
self.metadata_path = Path(metadata_path) |
|
|
|
|
|
if not self.video_path.exists(): |
|
|
raise FileNotFoundError(f"Video not found: {self.video_path}") |
|
|
if not self.metadata_path.exists(): |
|
|
raise FileNotFoundError(f"Metadata not found: {self.metadata_path}") |
|
|
|
|
|
|
|
|
with open(self.metadata_path) as f: |
|
|
self.metadata = json.load(f) |
|
|
|
|
|
|
|
|
self.frames_data = self.metadata.get("frames") or self.metadata.get("arkit_poses", []) |
|
|
logger.info(f"Loaded ARKit metadata: {len(self.frames_data)} frames") |
|
|
logger.info(f" Video: {self.video_path.name}") |
|
|
logger.info(f" Metadata: {self.metadata_path.name}") |
|
|
|
|
|
def extract_frames( |
|
|
self, |
|
|
output_dir: Optional[Path] = None, |
|
|
max_frames: Optional[int] = None, |
|
|
frame_interval: int = 1, |
|
|
return_images: bool = True, |
|
|
) -> List: |
|
|
""" |
|
|
Extract frames from ARKit video. |
|
|
|
|
|
Args: |
|
|
output_dir: Directory to save extracted frames |
|
|
max_frames: Maximum number of frames to extract |
|
|
frame_interval: Extract every Nth frame |
|
|
return_images: Whether to return images in memory (list of numpy arrays) |
|
|
|
|
|
Returns: |
|
|
List of extracted frame paths (if return_images=False) or images (if return_images=True) |
|
|
""" |
|
|
if output_dir: |
|
|
output_dir = Path(output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
elif not return_images: |
|
|
raise ValueError("output_dir must be provided if return_images is False") |
|
|
|
|
|
cap = cv2.VideoCapture(str(self.video_path)) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Could not open video: {self.video_path}") |
|
|
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
logger.info(f"Video: {total_frames} frames, {fps:.2f} fps") |
|
|
|
|
|
extracted_results = [] |
|
|
frame_idx = 0 |
|
|
saved_count = 0 |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if frame_idx % frame_interval == 0: |
|
|
if max_frames and saved_count >= max_frames: |
|
|
break |
|
|
|
|
|
if output_dir: |
|
|
frame_path = output_dir / f"frame_{frame_idx:06d}.jpg" |
|
|
cv2.imwrite(str(frame_path), frame) |
|
|
if not return_images: |
|
|
extracted_results.append(frame_path) |
|
|
|
|
|
if return_images: |
|
|
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
extracted_results.append(img_rgb) |
|
|
|
|
|
saved_count += 1 |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
if return_images: |
|
|
logger.info(f"Extracted {len(extracted_results)} frames") |
|
|
else: |
|
|
logger.info(f"Extracted {len(extracted_results)} frames to {output_dir}") |
|
|
|
|
|
return extracted_results |
|
|
|
|
|
def get_arkit_poses( |
|
|
self, frame_indices: Optional[List[int]] = None |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Extract ARKit poses and intrinsics from metadata. |
|
|
|
|
|
Args: |
|
|
frame_indices: Optional list of frame indices to extract. |
|
|
If None, extracts all frames. |
|
|
|
|
|
Returns: |
|
|
Tuple of (poses, intrinsics) |
|
|
- poses: (N, 4, 4) camera-to-world transformation matrices |
|
|
- intrinsics: (N, 3, 3) camera intrinsics matrices |
|
|
""" |
|
|
if frame_indices is None: |
|
|
frame_indices = list(range(len(self.frames_data))) |
|
|
|
|
|
poses = [] |
|
|
intrinsics = [] |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(str(self.video_path)) |
|
|
video_w = cap.get(cv2.CAP_PROP_FRAME_WIDTH) |
|
|
video_h = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) |
|
|
cap.release() |
|
|
|
|
|
for idx in frame_indices: |
|
|
if idx >= len(self.frames_data): |
|
|
logger.warning(f"Frame index {idx} out of range") |
|
|
continue |
|
|
|
|
|
frame_data = self.frames_data[idx] |
|
|
|
|
|
|
|
|
camera = frame_data.get("camera", {}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
view_matrix_raw = camera.get("viewMatrix") or frame_data.get("camera_pose") |
|
|
view_matrix = np.array(view_matrix_raw) if view_matrix_raw is not None else np.array([]) |
|
|
|
|
|
if view_matrix.shape == (4, 4): |
|
|
poses.append(view_matrix) |
|
|
else: |
|
|
logger.warning(f"Invalid view matrix for frame {idx}") |
|
|
poses.append(np.eye(4)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
intrinsics_raw = camera.get("intrinsics") or frame_data.get("intrinsics") |
|
|
|
|
|
if isinstance(intrinsics_raw, dict): |
|
|
|
|
|
fx = intrinsics_raw.get("fx", 1000) |
|
|
fy = intrinsics_raw.get("fy", 1000) |
|
|
cx = intrinsics_raw.get("cx", 0) |
|
|
cy = intrinsics_raw.get("cy", 0) |
|
|
|
|
|
|
|
|
meta_w = intrinsics_raw.get("width", video_w) |
|
|
meta_h = intrinsics_raw.get("height", video_h) |
|
|
|
|
|
if meta_w != video_w and meta_w > 0: |
|
|
scale_x = video_w / meta_w |
|
|
fx *= scale_x |
|
|
cx *= scale_x |
|
|
if meta_h != video_h and meta_h > 0: |
|
|
scale_y = video_h / meta_h |
|
|
fy *= scale_y |
|
|
cy *= scale_y |
|
|
|
|
|
intr_array = np.array([ |
|
|
[fx, 0, cx], |
|
|
[0, fy, cy], |
|
|
[0, 0, 1] |
|
|
]) |
|
|
intrinsics.append(intr_array) |
|
|
elif isinstance(intrinsics_raw, (list, np.ndarray)) and np.array(intrinsics_raw).shape == (3, 3): |
|
|
|
|
|
intrinsics.append(np.array(intrinsics_raw)) |
|
|
else: |
|
|
logger.warning(f"Invalid intrinsics for frame {idx}") |
|
|
intrinsics.append(np.eye(3) * 1000) |
|
|
|
|
|
poses = np.array(poses) |
|
|
intrinsics = np.array(intrinsics) |
|
|
|
|
|
logger.info(f"Extracted {len(poses)} ARKit poses and intrinsics (scaled to {int(video_w)}x{int(video_h)})") |
|
|
return poses, intrinsics |
|
|
|
|
|
def convert_arkit_to_w2c( |
|
|
self, c2w_poses: np.ndarray, convert_coords: bool = True |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Convert ARKit camera-to-world poses to world-to-camera (for DA3 compatibility). |
|
|
|
|
|
Args: |
|
|
c2w_poses: (N, 4, 4) camera-to-world poses (ARKit convention, Y-up) |
|
|
convert_coords: If True, convert from ARKit (Y-up) to OpenCV/DA3 (Z-up) convention |
|
|
|
|
|
Returns: |
|
|
(N, 3, 4) world-to-camera poses (DA3 format, OpenCV convention if convert_coords=True) |
|
|
""" |
|
|
from ..utils.coordinate_utils import convert_arkit_c2w_to_w2c |
|
|
|
|
|
w2c_poses = [] |
|
|
for c2w in c2w_poses: |
|
|
w2c = convert_arkit_c2w_to_w2c(c2w, convert_coords=convert_coords) |
|
|
w2c_poses.append(w2c) |
|
|
|
|
|
return np.array(w2c_poses) |
|
|
|
|
|
def get_tracking_status(self, frame_indices: Optional[List[int]] = None) -> List[Dict]: |
|
|
""" |
|
|
Get tracking status for frames. |
|
|
|
|
|
Args: |
|
|
frame_indices: Optional list of frame indices |
|
|
|
|
|
Returns: |
|
|
List of tracking status dicts with keys: |
|
|
- trackingStateReason: 'normal', 'initializing', 'relocalizing', etc. |
|
|
- worldMappingStatus: 'mapped', 'extending', 'limited', 'notAvailable' |
|
|
- featurePointCount: Number of tracked feature points |
|
|
""" |
|
|
if frame_indices is None: |
|
|
frame_indices = list(range(len(self.frames_data))) |
|
|
|
|
|
statuses = [] |
|
|
for idx in frame_indices: |
|
|
if idx >= len(self.frames_data): |
|
|
continue |
|
|
|
|
|
frame_data = self.frames_data[idx] |
|
|
|
|
|
camera = frame_data.get("camera", {}) |
|
|
has_pose_raw = camera.get("viewMatrix") or frame_data.get("camera_pose") |
|
|
|
|
|
status = { |
|
|
"trackingStateReason": camera.get("trackingStateReason", "normal"), |
|
|
"trackingState": camera.get("trackingState", "normal"), |
|
|
"worldMappingStatus": frame_data.get("worldMappingStatus", "mapped"), |
|
|
"featurePointCount": frame_data.get("featurePointCount", 100), |
|
|
"hasPose": has_pose_raw is not None, |
|
|
"frameIndex": frame_data.get("frameIndex", idx), |
|
|
"timestamp": frame_data.get("timestamp", 0), |
|
|
} |
|
|
statuses.append(status) |
|
|
|
|
|
return statuses |
|
|
|
|
|
def filter_good_frames( |
|
|
self, |
|
|
min_feature_points: int = 50, |
|
|
exclude_states: List[str] = ["relocalizing"], |
|
|
exclude_tracking_states: List[str] = ["notAvailable"], |
|
|
) -> List[int]: |
|
|
""" |
|
|
Filter frames with good tracking status. |
|
|
|
|
|
Args: |
|
|
min_feature_points: Minimum number of feature points |
|
|
exclude_states: Tracking state reasons to exclude |
|
|
exclude_tracking_states: Tracking states to exclude (e.g., 'notAvailable') |
|
|
|
|
|
Returns: |
|
|
List of frame indices with good tracking |
|
|
""" |
|
|
good_indices = [] |
|
|
statuses = self.get_tracking_status() |
|
|
|
|
|
for idx, status in enumerate(statuses): |
|
|
|
|
|
if status["trackingStateReason"] in exclude_states and status["trackingStateReason"] != "normal": |
|
|
continue |
|
|
|
|
|
|
|
|
if status.get("trackingState", "") in exclude_tracking_states and status.get("trackingState", "") != "normal": |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if status["featurePointCount"] < min_feature_points and not status.get("hasPose", False): |
|
|
continue |
|
|
|
|
|
good_indices.append(idx) |
|
|
|
|
|
logger.info(f"Found {len(good_indices)}/{len(statuses)} frames with good tracking") |
|
|
return good_indices |
|
|
|
|
|
def process_for_ba_validation( |
|
|
self, |
|
|
output_dir: Path, |
|
|
max_frames: Optional[int] = None, |
|
|
frame_interval: int = 1, |
|
|
use_good_tracking_only: bool = True, |
|
|
) -> Dict: |
|
|
""" |
|
|
Process ARKit data for BA validation. |
|
|
|
|
|
Args: |
|
|
output_dir: Output directory for frames and data |
|
|
max_frames: Maximum frames to process |
|
|
frame_interval: Extract every Nth frame |
|
|
use_good_tracking_only: Only use frames with good tracking |
|
|
|
|
|
Returns: |
|
|
Dictionary with: |
|
|
- image_paths: List of frame paths |
|
|
- arkit_poses: ARKit poses (c2w, 4x4) |
|
|
- arkit_poses_w2c: ARKit poses (w2c, 3x4) for DA3 |
|
|
- arkit_intrinsics: ARKit intrinsics (3x3) |
|
|
- tracking_status: List of tracking status dicts |
|
|
""" |
|
|
output_dir = Path(output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if use_good_tracking_only: |
|
|
good_indices = self.filter_good_frames() |
|
|
if len(good_indices) == 0: |
|
|
logger.warning("No frames with good tracking found. Using all frames.") |
|
|
good_indices = None |
|
|
else: |
|
|
good_indices = None |
|
|
|
|
|
|
|
|
image_dir = output_dir / "images" |
|
|
image_paths = self.extract_frames( |
|
|
image_dir, |
|
|
max_frames=max_frames, |
|
|
frame_interval=frame_interval, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if good_indices: |
|
|
|
|
|
frame_indices = [ |
|
|
good_indices[i] for i in range(len(image_paths)) if i < len(good_indices) |
|
|
] |
|
|
else: |
|
|
frame_indices = list(range(len(image_paths))) |
|
|
|
|
|
|
|
|
c2w_poses, intrinsics = self.get_arkit_poses(frame_indices) |
|
|
w2c_poses = self.convert_arkit_to_w2c(c2w_poses) |
|
|
|
|
|
|
|
|
tracking_status = self.get_tracking_status(frame_indices) |
|
|
|
|
|
|
|
|
np.save(output_dir / "arkit_poses_c2w.npy", c2w_poses) |
|
|
np.save(output_dir / "arkit_poses_w2c.npy", w2c_poses) |
|
|
np.save(output_dir / "arkit_intrinsics.npy", intrinsics) |
|
|
|
|
|
result = { |
|
|
"image_paths": [str(p) for p in image_paths], |
|
|
"arkit_poses_c2w": c2w_poses, |
|
|
"arkit_poses_w2c": w2c_poses, |
|
|
"arkit_intrinsics": intrinsics, |
|
|
"tracking_status": tracking_status, |
|
|
"frame_indices": frame_indices, |
|
|
} |
|
|
|
|
|
logger.info(f"Processed ARKit data: {len(image_paths)} frames") |
|
|
logger.info(f" - Poses: {c2w_poses.shape}") |
|
|
logger.info(f" - Intrinsics: {intrinsics.shape}") |
|
|
|
|
|
return result |
|
|
|
|
|
def get_lidar_depths(self, frame_indices: Optional[List[int]] = None) -> Optional[np.ndarray]: |
|
|
""" |
|
|
Extract LiDAR depth maps from ARKit metadata (if available). |
|
|
|
|
|
Note: LiDAR depth is typically sparse and may not be available in all frames. |
|
|
This is a placeholder - actual implementation would need to extract from |
|
|
ARKit's depth buffers if available in metadata. |
|
|
|
|
|
Args: |
|
|
frame_indices: Optional list of frame indices |
|
|
|
|
|
Returns: |
|
|
(N, H, W) depth maps or None if not available |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
logger.warning("LiDAR depth extraction not yet implemented") |
|
|
return None |
|
|
|