""" Sensor parsing adapters (Phase 1). These helpers normalize raw sensor artifacts into numpy arrays with consistent conventions so downstream teacher/audit/training can be metrologically audited. """ from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, Optional, Tuple import numpy as np def _dtype_waveform_imu_sample(*, sample_size: int = 60) -> np.dtype: """ NumPy dtype for WaveformMobile `IMUSampleBinary` (little-endian). Swift layout (packed; record size = MemoryLayout.size): - timestamp: f64 - quaternion: 4xf32 [x,y,z,w] - rotationRate: 3xf32 [x,y,z] rad/s - userAcceleration: 3xf32 [x,y,z] g - gravity: 3xf32 [x,y,z] (CoreMotion gravity vector) """ # Offsets derived from the Swift struct field ordering (no padding beyond record_size). return np.dtype( { "names": ["t", "q", "r", "a", "g"], "formats": [" np.dtype: """ NumPy dtype for WaveformMobile `FrameIMUData` (little-endian). Swift layout (typical; record size comes from imu_index.json): - frameIndex: u32 (offset 0) - (padding to 8-byte alignment) - frameTimestamp: f64 (offset 8) - interpolated sample: IMUSampleBinary - before1, before0, after0, after1: IMUSampleBinary """ s = _dtype_waveform_imu_sample(sample_size=sample_size) # Offsets assume the compiler aligns the f64 at offset 8, then samples contiguous. # We still gate on `frame_size` read from imu_index.json to avoid mismatches. base = 16 return np.dtype( { "names": [ "frame_index", "frame_timestamp", "interp", "before1", "before0", "after0", "after1", ], "formats": [" Dict[str, Any]: """ Load WaveformMobile `imu_index.json` and return raw JSON. """ p = Path(index_path) obj = json.loads(p.read_text()) if not isinstance(obj, dict): raise ValueError(f"imu_index.json must be an object: {p}") return obj def load_waveform_imu_frames( *, frames_bin_path: Path, imu_index_path: Optional[Path] = None, ) -> Dict[str, np.ndarray]: """ Load WaveformMobile per-frame IMU (`imu_frames.bin`) into arrays. Returns: { "frame_index": (N,) uint32 "t": (N,) float64 # IMU-relative seconds (CoreMotion domain minus firstSampleTimestamp) "q": (N,4) float32 "r": (N,3) float32 # rad/s "a": (N,3) float32 # g (user acceleration) "g": (N,3) float32 # gravity vector } """ frames_bin_path = Path(frames_bin_path) if not frames_bin_path.exists(): raise FileNotFoundError(frames_bin_path) frame_size = None sample_size = 60 if imu_index_path is not None and Path(imu_index_path).exists(): idx = load_waveform_imu_index(Path(imu_index_path)) bf = idx.get("binaryFormat") if isinstance(idx.get("binaryFormat"), dict) else {} try: sample_size = int(bf.get("sampleSize") or sample_size) except Exception: sample_size = 60 try: frame_size = int(bf.get("frameSize") or 0) or None except Exception: frame_size = None if frame_size is None: # Conservative default: assume typical alignment to 320 bytes. frame_size = 320 raw = frames_bin_path.read_bytes() if frame_size <= 0 or len(raw) < frame_size: raise ValueError("imu_frames.bin too small or invalid frame_size") n = len(raw) // int(frame_size) if n <= 0 or (n * int(frame_size)) != len(raw): raise ValueError("imu_frames.bin size is not a multiple of frame record size") dt = _dtype_waveform_imu_frame(frame_size=int(frame_size), sample_size=int(sample_size)) arr = np.frombuffer(raw, dtype=dt, count=n) interp = arr["interp"] return { "frame_index": arr["frame_index"].astype(np.uint32, copy=False), "t": arr["frame_timestamp"].astype(np.float64, copy=False), "q": interp["q"].astype(np.float32, copy=False), "r": interp["r"].astype(np.float32, copy=False), "a": interp["a"].astype(np.float32, copy=False), "g": interp["g"].astype(np.float32, copy=False), } def load_waveform_barometer_index(index_path: Path) -> Dict[str, Any]: p = Path(index_path) obj = json.loads(p.read_text()) if not isinstance(obj, dict): raise ValueError(f"barometer index.json must be an object: {p}") return obj def load_waveform_barometer_stream( *, stream_bin_path: Path, index_path: Optional[Path] = None, ) -> Dict[str, np.ndarray]: """ Load WaveformMobile barometer stream (`barometer_stream.bin`) into arrays. Record layout (little-endian, packed): u32 sampleIndex f64 unixTimestampSeconds f64 relativeTimestampSeconds (seconds since capture start) f64 pressureKPa f64 relativeAltitudeMeters """ stream_bin_path = Path(stream_bin_path) if not stream_bin_path.exists(): raise FileNotFoundError(stream_bin_path) rec_size = 36 if index_path is not None and Path(index_path).exists(): idx = load_waveform_barometer_index(Path(index_path)) stream = idx.get("stream") if isinstance(idx.get("stream"), dict) else {} try: rec_size = int(stream.get("record_size_bytes") or rec_size) except Exception: rec_size = 36 dt = np.dtype( { "names": ["sample_index", "unix_ts", "t_rel", "pressure_kpa", "rel_alt_m"], "formats": [" np.ndarray: """ Load a 16-bit depth PNG and convert to meters. Common convention: uint16 stores depth in millimeters -> depth_scale_m=0.001. """ try: from PIL import Image # type: ignore except Exception as e: # pragma: no cover raise ImportError( "Loading 16-bit PNG depth requires Pillow. Install with: pip install pillow" ) from e im = Image.open(Path(path)) arr = np.array(im) if arr.dtype != np.uint16: arr = arr.astype(np.uint16, copy=False) depth_m = arr.astype(np.float32) * float(depth_scale_m) # Treat 0 as invalid depth_m[depth_m <= 0] = np.nan return depth_m def align_depth_nearest( depth: np.ndarray, *, out_shape_hw: Tuple[int, int], ) -> np.ndarray: """ Nearest-neighbor resize for depth maps (no smoothing). """ d = np.asarray(depth) H, W = int(out_shape_hw[0]), int(out_shape_hw[1]) if d.ndim != 2: raise ValueError(f"depth must be 2D (H,W), got {d.shape}") in_h, in_w = d.shape if (in_h, in_w) == (H, W): return d.astype(np.float32, copy=False) ys = (np.linspace(0, in_h - 1, num=H)).round().astype(int) xs = (np.linspace(0, in_w - 1, num=W)).round().astype(int) out = d[ys[:, None], xs[None, :]].astype(np.float32, copy=False) return out # ----------------------------------------------------------------------------- # v2 stream-centric adapters (Waveform v2 capture container) # ----------------------------------------------------------------------------- def _strip_wfmfoot1_footer(raw: bytes) -> bytes: """ If the buffer ends with a WFMFOOT1 v2 footer, strip it. This keeps binary parsing robust across "footer present" vs "no footer" streams. """ if len(raw) >= 36 and raw[-36:-28] == b"WFMFOOT1": return raw[:-36] return raw def load_v2_timeline_frames(*, data_bin_path: Path) -> Dict[str, np.ndarray]: """ Load `timeline.frames` fixed-record stream. Record layout (little-endian, 16 bytes): u32 frameIndex u32 flags u64 t_ns """ p = Path(data_bin_path) if not p.exists(): raise FileNotFoundError(p) raw = _strip_wfmfoot1_footer(p.read_bytes()) rec = 16 if len(raw) < rec: return { "frame_index": np.zeros((0,), dtype=np.uint32), "flags": np.zeros((0,), dtype=np.uint32), "t_ns": np.zeros((0,), dtype=np.uint64), } n = len(raw) // rec raw = raw[: n * rec] dt = np.dtype( { "names": ["frame_index", "flags", "t_ns"], "formats": [" Dict[str, np.ndarray]: """ Load `pose.vio` fixed-record stream. Record layout (little-endian, 40 bytes): u64 t_ns f32 tx ty tz f32 qx qy qz qw u16 quality u16 provider_code Returns: { "t_ns": (N,) uint64, "t": (N,3) float32, "q": (N,4) float32 # xyzw, "quality": (N,) uint16, "provider_code": (N,) uint16, "T_wc": (N,4,4) float64 # world/odom-from-camera (pose) } """ p = Path(data_bin_path) if not p.exists(): raise FileNotFoundError(p) raw = _strip_wfmfoot1_footer(p.read_bytes()) rec = 40 if len(raw) < rec: return { "t_ns": np.zeros((0,), dtype=np.uint64), "t": np.zeros((0, 3), dtype=np.float32), "q": np.zeros((0, 4), dtype=np.float32), "quality": np.zeros((0,), dtype=np.uint16), "provider_code": np.zeros((0,), dtype=np.uint16), "T_wc": np.zeros((0, 4, 4), dtype=np.float64), } n = len(raw) // rec raw = raw[: n * rec] dt = np.dtype( { "names": ["t_ns", "t", "q", "quality", "provider_code"], "formats": [" OpenCV) happens at callsites (teacher pipeline). x, y, z, w = q[:, 0], q[:, 1], q[:, 2], q[:, 3] # Normalize defensively norm = np.sqrt(x * x + y * y + z * z + w * w).astype(np.float32) norm = np.where(norm > 0, norm, 1.0).astype(np.float32) x, y, z, w = x / norm, y / norm, z / norm, w / norm xx, yy, zz = x * x, y * y, z * z xy, xz, yz = x * y, x * z, y * z wx, wy, wz = w * x, w * y, w * z R = np.zeros((n, 3, 3), dtype=np.float64) R[:, 0, 0] = 1.0 - 2.0 * (yy + zz) R[:, 0, 1] = 2.0 * (xy - wz) R[:, 0, 2] = 2.0 * (xz + wy) R[:, 1, 0] = 2.0 * (xy + wz) R[:, 1, 1] = 1.0 - 2.0 * (xx + zz) R[:, 1, 2] = 2.0 * (yz - wx) R[:, 2, 0] = 2.0 * (xz - wy) R[:, 2, 1] = 2.0 * (yz + wx) R[:, 2, 2] = 1.0 - 2.0 * (xx + yy) T = np.zeros((n, 4, 4), dtype=np.float64) T[:, 3, 3] = 1.0 T[:, :3, :3] = R T[:, :3, 3] = t.astype(np.float64) return { "t_ns": arr["t_ns"].astype(np.uint64, copy=False), "t": t, "q": q, "quality": arr["quality"].astype(np.uint16, copy=False), "provider_code": arr["provider_code"].astype(np.uint16, copy=False), "T_wc": T, } def _load_waveform_depth_index(index_path: Path) -> Dict[str, Any]: """ Load Waveform Mobile depth stream index.json. Expected schema (example_data): { "format": { "depth": {"width": 256, "height": 192, "type": "float32", "units": "meters", "bytesPerFrame": 196608}, "depth_smoothed": {...}, "confidence": {...} }, "frames": [ {"frameIndex": 0, "timestamp": 0.0, "depthOffset": 0, "smoothedDepthOffset": 0, ...}, ... ] } """ p = Path(index_path) obj = json.loads(p.read_text()) if not isinstance(obj, dict): raise ValueError(f"Waveform depth index must be a JSON object: {p}") if "format" not in obj or "frames" not in obj: raise ValueError(f"Waveform depth index missing required keys: {p}") if not isinstance(obj.get("frames"), list): raise ValueError(f"Waveform depth index frames must be a list: {p}") return obj def try_load_waveform_lidar_depth_frame( *, bundle_root: Path, device_id: str, frame_index: int, out_shape_hw: Optional[Tuple[int, int]] = None, prefer_smoothed: bool = True, ) -> Optional[np.ndarray]: """ Best-effort loader for Waveform Mobile LiDAR depth from a packed stream. Looks for: /devices//depth/index.json /devices//depth/{depth_smoothed.bin, depth.bin} Returns: depth_m: float32 array (H,W) in meters, with non-positive set to NaN. If out_shape_hw is set, resizes with nearest-neighbor to match. """ root = Path(bundle_root) did = str(device_id) depth_dir = root / "devices" / did / "depth" return try_load_waveform_lidar_depth_frame_from_dir( depth_dir=depth_dir, frame_index=int(frame_index), out_shape_hw=out_shape_hw, prefer_smoothed=prefer_smoothed, ) def try_load_waveform_lidar_depth_frame_from_dir( *, depth_dir: Path, frame_index: int, out_shape_hw: Optional[Tuple[int, int]] = None, prefer_smoothed: bool = True, ) -> Optional[np.ndarray]: """ Best-effort loader for Waveform Mobile LiDAR depth from a packed stream, when you already know the `depth/` directory path. Looks for: /index.json /{depth_smoothed.bin, depth.bin} """ fi = int(frame_index) depth_dir = Path(depth_dir) index_path = depth_dir / "index.json" if not index_path.exists(): return None # Prefer smoothed if requested and present; otherwise fall back to raw depth. bin_path = depth_dir / ("depth_smoothed.bin" if prefer_smoothed else "depth.bin") if not bin_path.exists(): bin_path = depth_dir / "depth.bin" if not bin_path.exists(): return None try: idx = _load_waveform_depth_index(index_path) fmt = idx.get("format", {}) if isinstance(idx.get("format"), dict) else {} depth_fmt = fmt.get( "depth_smoothed" if (prefer_smoothed and "depth_smoothed" in fmt) else "depth" ) if not isinstance(depth_fmt, dict): depth_fmt = fmt.get("depth", {}) if isinstance(fmt.get("depth"), dict) else {} w = int(depth_fmt.get("width", 0) or 0) h = int(depth_fmt.get("height", 0) or 0) bpf = int(depth_fmt.get("bytesPerFrame", 0) or 0) dtype = str(depth_fmt.get("type", "float32")).lower().strip() units = str(depth_fmt.get("units", "meters")).lower().strip() if w <= 0 or h <= 0: return None if dtype not in {"float32", "f32"}: # We only support the packed float32 format for now. return None if units not in {"meters", "meter", "m"}: # Unexpected units; refuse to silently mis-scale. return None expected_bpf = int(w * h * 4) if bpf <= 0: bpf = expected_bpf if bpf != expected_bpf: # Index claims a different layout than float32(H*W). return None # Find record for requested ARFrame/video frame index. rec = None frames = idx.get("frames", []) for r in frames: if not isinstance(r, dict): continue if int(r.get("frameIndex", -1)) == fi: rec = r break if rec is None: return None # Determine byte offset to read. key = ( "smoothedDepthOffset" if (prefer_smoothed and "smoothedDepthOffset" in rec) else "depthOffset" ) off = rec.get(key) if off is None: off = rec.get("depthOffset") if off is None: return None offset = int(off) if offset < 0: return None with Path(bin_path).open("rb") as f: f.seek(offset) raw = f.read(bpf) if len(raw) != bpf: return None arr = np.frombuffer(raw, dtype=np.float32, count=w * h) if arr.size != w * h: return None depth_m = arr.reshape((h, w)).astype(np.float32, copy=False) depth_m[~np.isfinite(depth_m)] = np.nan depth_m[depth_m <= 0] = np.nan if out_shape_hw is not None: depth_m = align_depth_nearest(depth_m, out_shape_hw=out_shape_hw) return depth_m except Exception: return None def load_arkit_poses_json(path: Path) -> np.ndarray: """ Load ARKit poses from JSON into (N,4,4) camera-to-world matrices. Accepted formats: - {"poses": [[[...4x4...]], ...]} - [ [[...4x4...]], ... ] """ obj = json.loads(Path(path).read_text()) if isinstance(obj, dict) and "poses" in obj: obj = obj["poses"] if not isinstance(obj, list): raise ValueError("Expected a list of 4x4 poses or {'poses': [...]} JSON") mats = [] for p in obj: a = np.asarray(p, dtype=np.float64) if a.shape != (4, 4): raise ValueError(f"Pose must be 4x4, got {a.shape}") mats.append(a) return np.stack(mats, axis=0).astype(np.float64) def load_arkit_poses_with_frame_index(path: Path) -> tuple[np.ndarray, Optional[np.ndarray]]: """ Load ARKit poses and (optional) frame_index mapping. Accepted formats: - {"poses": [4x4,...], "frame_index": [int,...]} (WaveformMobile normalization writer) - {"poses": [4x4,...]} (no index) - [4x4,...] (no index) Returns: (poses_c2w: (N,4,4) float64, frame_index: (N,) int64 or None) """ obj = json.loads(Path(path).read_text()) frame_index = None poses_obj = obj if isinstance(obj, dict): poses_obj = obj.get("poses", obj.get("poses_c2w", obj)) fi = obj.get("frame_index") or obj.get("frameIndex") if isinstance(fi, list) and fi: try: frame_index = np.asarray(fi, dtype=np.int64).reshape(-1) except Exception: frame_index = None # Parse poses list (reuse validation from load_arkit_poses_json) if isinstance(poses_obj, dict) and "poses" in poses_obj: poses_obj = poses_obj["poses"] if not isinstance(poses_obj, list): raise ValueError("Expected poses list or {'poses': [...]} JSON") mats = [] for p in poses_obj: a = np.asarray(p, dtype=np.float64) if a.shape != (4, 4): raise ValueError(f"Pose must be 4x4, got {a.shape}") mats.append(a) poses = np.stack(mats, axis=0).astype(np.float64) if frame_index is not None and int(frame_index.size) != int(poses.shape[0]): # Reject mismatched mapping (better to ignore than silently misalign). frame_index = None return poses, frame_index def normalize_arkit_to_w2c( c2w_poses: np.ndarray, *, convert_coords: bool = True, ) -> np.ndarray: """ Convert ARKit camera-to-world to world-to-camera in DA3-friendly 3x4 format. """ from ..utils.coordinate_utils import convert_arkit_c2w_to_w2c c2w = np.asarray(c2w_poses, dtype=np.float64) if c2w.ndim != 3 or c2w.shape[1:] != (4, 4): raise ValueError(f"Expected (N,4,4), got {c2w.shape}") outs = [] for T in c2w: outs.append(convert_arkit_c2w_to_w2c(T, convert_coords=convert_coords)) return np.asarray(outs, dtype=np.float64) def load_optional_json(path: Optional[Path]) -> Optional[Dict[str, Any]]: if path is None: return None p = Path(path) if not p.exists(): return None obj = json.loads(p.read_text()) if not isinstance(obj, dict): return None return obj