3d_model / ylff /services /arkit_processor.py
Azan
Clean deployment build (Squashed)
7a87926
"""
ARKit Data Processor: Extract and process ARKit video and metadata.
"""
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class ARKitProcessor:
"""Process ARKit video and metadata for BA validation."""
def __init__(
self,
arkit_dir: Optional[Path] = None,
video_path: Optional[Path] = None,
metadata_path: Optional[Path] = None,
):
"""
Initialize ARKit processor. Can be initialized from:
1. Directory structure (arkit_dir with videos/ and json-metadata/ subdirs)
2. Explicit paths (video_path and metadata_path)
Args:
arkit_dir: Directory containing ARKit data (with videos/ and json-metadata/ subdirs)
video_path: Path to ARKit video file (.MOV) - used if arkit_dir not provided
metadata_path: Path to ARKit metadata JSON file - used if arkit_dir not provided
"""
if arkit_dir:
# Directory-based initialization
arkit_dir = Path(arkit_dir)
# Find video file (recursive search for flexibility)
video_files = list(arkit_dir.rglob("*.MOV")) + list(arkit_dir.rglob("*.mov"))
if not video_files:
raise FileNotFoundError(f"No video file found in {arkit_dir}")
self.video_path = video_files[0]
# Find metadata file (recursive search for flexibility)
metadata_files = list(arkit_dir.rglob("*.json"))
if not metadata_files:
raise FileNotFoundError(f"No metadata file found in {arkit_dir}")
self.metadata_path = metadata_files[0]
else:
# Explicit path initialization
if video_path is None or metadata_path is None:
raise ValueError(
"Either arkit_dir or both video_path and metadata_path must be provided"
)
self.video_path = Path(video_path)
self.metadata_path = Path(metadata_path)
if not self.video_path.exists():
raise FileNotFoundError(f"Video not found: {self.video_path}")
if not self.metadata_path.exists():
raise FileNotFoundError(f"Metadata not found: {self.metadata_path}")
# Load metadata
with open(self.metadata_path) as f:
self.metadata = json.load(f)
# Support both 'frames' (standard) and 'arkit_poses' (new user format)
self.frames_data = self.metadata.get("frames") or self.metadata.get("arkit_poses", [])
logger.info(f"Loaded ARKit metadata: {len(self.frames_data)} frames")
logger.info(f" Video: {self.video_path.name}")
logger.info(f" Metadata: {self.metadata_path.name}")
def extract_frames(
self,
output_dir: Optional[Path] = None,
max_frames: Optional[int] = None,
frame_interval: int = 1,
return_images: bool = True,
) -> List:
"""
Extract frames from ARKit video.
Args:
output_dir: Directory to save extracted frames
max_frames: Maximum number of frames to extract
frame_interval: Extract every Nth frame
return_images: Whether to return images in memory (list of numpy arrays)
Returns:
List of extracted frame paths (if return_images=False) or images (if return_images=True)
"""
if output_dir:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
elif not return_images:
raise ValueError("output_dir must be provided if return_images is False")
cap = cv2.VideoCapture(str(self.video_path))
if not cap.isOpened():
raise ValueError(f"Could not open video: {self.video_path}")
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Video: {total_frames} frames, {fps:.2f} fps")
extracted_results = []
frame_idx = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_idx % frame_interval == 0:
if max_frames and saved_count >= max_frames:
break
if output_dir:
frame_path = output_dir / f"frame_{frame_idx:06d}.jpg"
cv2.imwrite(str(frame_path), frame)
if not return_images:
extracted_results.append(frame_path)
if return_images:
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
extracted_results.append(img_rgb)
saved_count += 1
frame_idx += 1
cap.release()
if return_images:
logger.info(f"Extracted {len(extracted_results)} frames")
else:
logger.info(f"Extracted {len(extracted_results)} frames to {output_dir}")
return extracted_results
def get_arkit_poses(
self, frame_indices: Optional[List[int]] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""
Extract ARKit poses and intrinsics from metadata.
Args:
frame_indices: Optional list of frame indices to extract.
If None, extracts all frames.
Returns:
Tuple of (poses, intrinsics)
- poses: (N, 4, 4) camera-to-world transformation matrices
- intrinsics: (N, 3, 3) camera intrinsics matrices
"""
if frame_indices is None:
frame_indices = list(range(len(self.frames_data)))
poses = []
intrinsics = []
# Get video resolution for intrinsic scaling
cap = cv2.VideoCapture(str(self.video_path))
video_w = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
video_h = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
cap.release()
for idx in frame_indices:
if idx >= len(self.frames_data):
logger.warning(f"Frame index {idx} out of range")
continue
frame_data = self.frames_data[idx]
# Support both 'camera' (standard) and top-level keys (user format)
camera = frame_data.get("camera", {})
# Extract view matrix (camera-to-world)
# Standard: camera.viewMatrix
# User format: camera_pose
view_matrix_raw = camera.get("viewMatrix") or frame_data.get("camera_pose")
view_matrix = np.array(view_matrix_raw) if view_matrix_raw is not None else np.array([])
if view_matrix.shape == (4, 4):
poses.append(view_matrix)
else:
logger.warning(f"Invalid view matrix for frame {idx}")
poses.append(np.eye(4))
# Extract intrinsics
# Standard: camera.intrinsics (3x3 array)
# User format: intrinsics (object with fx, fy, cx, cy)
intrinsics_raw = camera.get("intrinsics") or frame_data.get("intrinsics")
if isinstance(intrinsics_raw, dict):
# User format object
fx = intrinsics_raw.get("fx", 1000)
fy = intrinsics_raw.get("fy", 1000)
cx = intrinsics_raw.get("cx", 0)
cy = intrinsics_raw.get("cy", 0)
# Auto-scale intrinsics to video resolution
meta_w = intrinsics_raw.get("width", video_w)
meta_h = intrinsics_raw.get("height", video_h)
if meta_w != video_w and meta_w > 0:
scale_x = video_w / meta_w
fx *= scale_x
cx *= scale_x
if meta_h != video_h and meta_h > 0:
scale_y = video_h / meta_h
fy *= scale_y
cy *= scale_y
intr_array = np.array([
[fx, 0, cx],
[0, fy, cy],
[0, 0, 1]
])
intrinsics.append(intr_array)
elif isinstance(intrinsics_raw, (list, np.ndarray)) and np.array(intrinsics_raw).shape == (3, 3):
# Standard array format
intrinsics.append(np.array(intrinsics_raw))
else:
logger.warning(f"Invalid intrinsics for frame {idx}")
intrinsics.append(np.eye(3) * 1000)
poses = np.array(poses)
intrinsics = np.array(intrinsics)
logger.info(f"Extracted {len(poses)} ARKit poses and intrinsics (scaled to {int(video_w)}x{int(video_h)})")
return poses, intrinsics
def convert_arkit_to_w2c(
self, c2w_poses: np.ndarray, convert_coords: bool = True
) -> np.ndarray:
"""
Convert ARKit camera-to-world poses to world-to-camera (for DA3 compatibility).
Args:
c2w_poses: (N, 4, 4) camera-to-world poses (ARKit convention, Y-up)
convert_coords: If True, convert from ARKit (Y-up) to OpenCV/DA3 (Z-up) convention
Returns:
(N, 3, 4) world-to-camera poses (DA3 format, OpenCV convention if convert_coords=True)
"""
from ..utils.coordinate_utils import convert_arkit_c2w_to_w2c
w2c_poses = []
for c2w in c2w_poses:
w2c = convert_arkit_c2w_to_w2c(c2w, convert_coords=convert_coords)
w2c_poses.append(w2c)
return np.array(w2c_poses)
def get_tracking_status(self, frame_indices: Optional[List[int]] = None) -> List[Dict]:
"""
Get tracking status for frames.
Args:
frame_indices: Optional list of frame indices
Returns:
List of tracking status dicts with keys:
- trackingStateReason: 'normal', 'initializing', 'relocalizing', etc.
- worldMappingStatus: 'mapped', 'extending', 'limited', 'notAvailable'
- featurePointCount: Number of tracked feature points
"""
if frame_indices is None:
frame_indices = list(range(len(self.frames_data)))
statuses = []
for idx in frame_indices:
if idx >= len(self.frames_data):
continue
frame_data = self.frames_data[idx]
# Support both 'camera' (standard) and top-level keys (user format)
camera = frame_data.get("camera", {})
has_pose_raw = camera.get("viewMatrix") or frame_data.get("camera_pose")
status = {
"trackingStateReason": camera.get("trackingStateReason", "normal"), # Default to normal
"trackingState": camera.get("trackingState", "normal"),
"worldMappingStatus": frame_data.get("worldMappingStatus", "mapped"),
"featurePointCount": frame_data.get("featurePointCount", 100), # Assume enough points if pose exists
"hasPose": has_pose_raw is not None,
"frameIndex": frame_data.get("frameIndex", idx),
"timestamp": frame_data.get("timestamp", 0),
}
statuses.append(status)
return statuses
def filter_good_frames(
self,
min_feature_points: int = 50, # Lowered default
exclude_states: List[str] = ["relocalizing"], # Only exclude relocalizing
exclude_tracking_states: List[str] = ["notAvailable"],
) -> List[int]:
"""
Filter frames with good tracking status.
Args:
min_feature_points: Minimum number of feature points
exclude_states: Tracking state reasons to exclude
exclude_tracking_states: Tracking states to exclude (e.g., 'notAvailable')
Returns:
List of frame indices with good tracking
"""
good_indices = []
statuses = self.get_tracking_status()
for idx, status in enumerate(statuses):
# Check tracking state reason
if status["trackingStateReason"] in exclude_states and status["trackingStateReason"] != "normal":
continue
# Check tracking state
if status.get("trackingState", "") in exclude_tracking_states and status.get("trackingState", "") != "normal":
continue
# Check feature points
# If we have a pose but no feature count (user format), we assume it's good
if status["featurePointCount"] < min_feature_points and not status.get("hasPose", False):
continue
good_indices.append(idx)
logger.info(f"Found {len(good_indices)}/{len(statuses)} frames with good tracking")
return good_indices
def process_for_ba_validation(
self,
output_dir: Path,
max_frames: Optional[int] = None,
frame_interval: int = 1,
use_good_tracking_only: bool = True,
) -> Dict:
"""
Process ARKit data for BA validation.
Args:
output_dir: Output directory for frames and data
max_frames: Maximum frames to process
frame_interval: Extract every Nth frame
use_good_tracking_only: Only use frames with good tracking
Returns:
Dictionary with:
- image_paths: List of frame paths
- arkit_poses: ARKit poses (c2w, 4x4)
- arkit_poses_w2c: ARKit poses (w2c, 3x4) for DA3
- arkit_intrinsics: ARKit intrinsics (3x3)
- tracking_status: List of tracking status dicts
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Filter frames if needed
if use_good_tracking_only:
good_indices = self.filter_good_frames()
if len(good_indices) == 0:
logger.warning("No frames with good tracking found. Using all frames.")
good_indices = None
else:
good_indices = None
# Extract frames
image_dir = output_dir / "images"
image_paths = self.extract_frames(
image_dir,
max_frames=max_frames,
frame_interval=frame_interval,
)
# Map image paths to frame indices
# Assuming frames are extracted in order
if good_indices:
# Filter to only good indices
frame_indices = [
good_indices[i] for i in range(len(image_paths)) if i < len(good_indices)
]
else:
frame_indices = list(range(len(image_paths)))
# Get ARKit poses and intrinsics
c2w_poses, intrinsics = self.get_arkit_poses(frame_indices)
w2c_poses = self.convert_arkit_to_w2c(c2w_poses)
# Get tracking status
tracking_status = self.get_tracking_status(frame_indices)
# Save ARKit data
np.save(output_dir / "arkit_poses_c2w.npy", c2w_poses)
np.save(output_dir / "arkit_poses_w2c.npy", w2c_poses)
np.save(output_dir / "arkit_intrinsics.npy", intrinsics)
result = {
"image_paths": [str(p) for p in image_paths],
"arkit_poses_c2w": c2w_poses,
"arkit_poses_w2c": w2c_poses,
"arkit_intrinsics": intrinsics,
"tracking_status": tracking_status,
"frame_indices": frame_indices,
}
logger.info(f"Processed ARKit data: {len(image_paths)} frames")
logger.info(f" - Poses: {c2w_poses.shape}")
logger.info(f" - Intrinsics: {intrinsics.shape}")
return result
def get_lidar_depths(self, frame_indices: Optional[List[int]] = None) -> Optional[np.ndarray]:
"""
Extract LiDAR depth maps from ARKit metadata (if available).
Note: LiDAR depth is typically sparse and may not be available in all frames.
This is a placeholder - actual implementation would need to extract from
ARKit's depth buffers if available in metadata.
Args:
frame_indices: Optional list of frame indices
Returns:
(N, H, W) depth maps or None if not available
"""
# TODO: Implement actual LiDAR depth extraction from ARKit metadata
# ARKit LiDAR depth is typically 256x192 and may be in depth buffers
# For now, return None to indicate not available
logger.warning("LiDAR depth extraction not yet implemented")
return None