| | """ |
| | Sensor parsing adapters (Phase 1). |
| | |
| | These helpers normalize raw sensor artifacts into numpy arrays with consistent |
| | conventions so downstream teacher/audit/training can be metrologically audited. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import json |
| | from pathlib import Path |
| | from typing import Any, Dict, Optional, Tuple |
| | import numpy as np |
| |
|
| |
|
| | def _dtype_waveform_imu_sample(*, sample_size: int = 60) -> np.dtype: |
| | """ |
| | NumPy dtype for WaveformMobile `IMUSampleBinary` (little-endian). |
| | |
| | Swift layout (packed; record size = MemoryLayout<IMUSampleBinary>.size): |
| | - timestamp: f64 |
| | - quaternion: 4xf32 [x,y,z,w] |
| | - rotationRate: 3xf32 [x,y,z] rad/s |
| | - userAcceleration: 3xf32 [x,y,z] g |
| | - gravity: 3xf32 [x,y,z] (CoreMotion gravity vector) |
| | """ |
| | |
| | return np.dtype( |
| | { |
| | "names": ["t", "q", "r", "a", "g"], |
| | "formats": ["<f8", ("<f4", (4,)), ("<f4", (3,)), ("<f4", (3,)), ("<f4", (3,))], |
| | "offsets": [0, 8, 24, 36, 48], |
| | "itemsize": int(sample_size), |
| | } |
| | ) |
| |
|
| |
|
| | def _dtype_waveform_imu_frame(*, frame_size: int, sample_size: int = 60) -> np.dtype: |
| | """ |
| | NumPy dtype for WaveformMobile `FrameIMUData` (little-endian). |
| | |
| | Swift layout (typical; record size comes from imu_index.json): |
| | - frameIndex: u32 (offset 0) |
| | - (padding to 8-byte alignment) |
| | - frameTimestamp: f64 (offset 8) |
| | - interpolated sample: IMUSampleBinary |
| | - before1, before0, after0, after1: IMUSampleBinary |
| | """ |
| | s = _dtype_waveform_imu_sample(sample_size=sample_size) |
| | |
| | |
| | base = 16 |
| | return np.dtype( |
| | { |
| | "names": [ |
| | "frame_index", |
| | "frame_timestamp", |
| | "interp", |
| | "before1", |
| | "before0", |
| | "after0", |
| | "after1", |
| | ], |
| | "formats": ["<u4", "<f8", s, s, s, s, s], |
| | "offsets": [ |
| | 0, |
| | 8, |
| | base, |
| | base + 1 * sample_size, |
| | base + 2 * sample_size, |
| | base + 3 * sample_size, |
| | base + 4 * sample_size, |
| | ], |
| | "itemsize": int(frame_size), |
| | } |
| | ) |
| |
|
| |
|
| | def load_waveform_imu_index(index_path: Path) -> Dict[str, Any]: |
| | """ |
| | Load WaveformMobile `imu_index.json` and return raw JSON. |
| | """ |
| | p = Path(index_path) |
| | obj = json.loads(p.read_text()) |
| | if not isinstance(obj, dict): |
| | raise ValueError(f"imu_index.json must be an object: {p}") |
| | return obj |
| |
|
| |
|
| | def load_waveform_imu_frames( |
| | *, |
| | frames_bin_path: Path, |
| | imu_index_path: Optional[Path] = None, |
| | ) -> Dict[str, np.ndarray]: |
| | """ |
| | Load WaveformMobile per-frame IMU (`imu_frames.bin`) into arrays. |
| | |
| | Returns: |
| | { |
| | "frame_index": (N,) uint32 |
| | "t": (N,) float64 # IMU-relative seconds (CoreMotion domain minus firstSampleTimestamp) |
| | "q": (N,4) float32 |
| | "r": (N,3) float32 # rad/s |
| | "a": (N,3) float32 # g (user acceleration) |
| | "g": (N,3) float32 # gravity vector |
| | } |
| | """ |
| | frames_bin_path = Path(frames_bin_path) |
| | if not frames_bin_path.exists(): |
| | raise FileNotFoundError(frames_bin_path) |
| |
|
| | frame_size = None |
| | sample_size = 60 |
| | if imu_index_path is not None and Path(imu_index_path).exists(): |
| | idx = load_waveform_imu_index(Path(imu_index_path)) |
| | bf = idx.get("binaryFormat") if isinstance(idx.get("binaryFormat"), dict) else {} |
| | try: |
| | sample_size = int(bf.get("sampleSize") or sample_size) |
| | except Exception: |
| | sample_size = 60 |
| | try: |
| | frame_size = int(bf.get("frameSize") or 0) or None |
| | except Exception: |
| | frame_size = None |
| | if frame_size is None: |
| | |
| | frame_size = 320 |
| |
|
| | raw = frames_bin_path.read_bytes() |
| | if frame_size <= 0 or len(raw) < frame_size: |
| | raise ValueError("imu_frames.bin too small or invalid frame_size") |
| | n = len(raw) // int(frame_size) |
| | if n <= 0 or (n * int(frame_size)) != len(raw): |
| | raise ValueError("imu_frames.bin size is not a multiple of frame record size") |
| |
|
| | dt = _dtype_waveform_imu_frame(frame_size=int(frame_size), sample_size=int(sample_size)) |
| | arr = np.frombuffer(raw, dtype=dt, count=n) |
| |
|
| | interp = arr["interp"] |
| | return { |
| | "frame_index": arr["frame_index"].astype(np.uint32, copy=False), |
| | "t": arr["frame_timestamp"].astype(np.float64, copy=False), |
| | "q": interp["q"].astype(np.float32, copy=False), |
| | "r": interp["r"].astype(np.float32, copy=False), |
| | "a": interp["a"].astype(np.float32, copy=False), |
| | "g": interp["g"].astype(np.float32, copy=False), |
| | } |
| |
|
| |
|
| | def load_waveform_barometer_index(index_path: Path) -> Dict[str, Any]: |
| | p = Path(index_path) |
| | obj = json.loads(p.read_text()) |
| | if not isinstance(obj, dict): |
| | raise ValueError(f"barometer index.json must be an object: {p}") |
| | return obj |
| |
|
| |
|
| | def load_waveform_barometer_stream( |
| | *, |
| | stream_bin_path: Path, |
| | index_path: Optional[Path] = None, |
| | ) -> Dict[str, np.ndarray]: |
| | """ |
| | Load WaveformMobile barometer stream (`barometer_stream.bin`) into arrays. |
| | |
| | Record layout (little-endian, packed): |
| | u32 sampleIndex |
| | f64 unixTimestampSeconds |
| | f64 relativeTimestampSeconds (seconds since capture start) |
| | f64 pressureKPa |
| | f64 relativeAltitudeMeters |
| | """ |
| | stream_bin_path = Path(stream_bin_path) |
| | if not stream_bin_path.exists(): |
| | raise FileNotFoundError(stream_bin_path) |
| |
|
| | rec_size = 36 |
| | if index_path is not None and Path(index_path).exists(): |
| | idx = load_waveform_barometer_index(Path(index_path)) |
| | stream = idx.get("stream") if isinstance(idx.get("stream"), dict) else {} |
| | try: |
| | rec_size = int(stream.get("record_size_bytes") or rec_size) |
| | except Exception: |
| | rec_size = 36 |
| |
|
| | dt = np.dtype( |
| | { |
| | "names": ["sample_index", "unix_ts", "t_rel", "pressure_kpa", "rel_alt_m"], |
| | "formats": ["<u4", "<f8", "<f8", "<f8", "<f8"], |
| | "offsets": [0, 4, 12, 20, 28], |
| | "itemsize": int(rec_size), |
| | } |
| | ) |
| |
|
| | raw = stream_bin_path.read_bytes() |
| | if len(raw) < rec_size: |
| | return { |
| | "sample_index": np.zeros((0,), dtype=np.uint32), |
| | "unix_ts": np.zeros((0,), dtype=np.float64), |
| | "t_rel": np.zeros((0,), dtype=np.float64), |
| | "pressure_kpa": np.zeros((0,), dtype=np.float64), |
| | "rel_alt_m": np.zeros((0,), dtype=np.float64), |
| | } |
| | n = len(raw) // int(rec_size) |
| | if (n * int(rec_size)) != len(raw): |
| | |
| | raw = raw[: n * int(rec_size)] |
| | arr = np.frombuffer(raw, dtype=dt, count=n) |
| | return { |
| | "sample_index": arr["sample_index"].astype(np.uint32, copy=False), |
| | "unix_ts": arr["unix_ts"].astype(np.float64, copy=False), |
| | "t_rel": arr["t_rel"].astype(np.float64, copy=False), |
| | "pressure_kpa": arr["pressure_kpa"].astype(np.float64, copy=False), |
| | "rel_alt_m": arr["rel_alt_m"].astype(np.float64, copy=False), |
| | } |
| |
|
| |
|
| | def load_lidar_depth_16bit_png( |
| | path: Path, |
| | *, |
| | depth_scale_m: float = 0.001, |
| | ) -> np.ndarray: |
| | """ |
| | Load a 16-bit depth PNG and convert to meters. |
| | |
| | Common convention: uint16 stores depth in millimeters -> depth_scale_m=0.001. |
| | """ |
| |
|
| | try: |
| | from PIL import Image |
| | except Exception as e: |
| | raise ImportError( |
| | "Loading 16-bit PNG depth requires Pillow. Install with: pip install pillow" |
| | ) from e |
| |
|
| | im = Image.open(Path(path)) |
| | arr = np.array(im) |
| | if arr.dtype != np.uint16: |
| | arr = arr.astype(np.uint16, copy=False) |
| | depth_m = arr.astype(np.float32) * float(depth_scale_m) |
| | |
| | depth_m[depth_m <= 0] = np.nan |
| | return depth_m |
| |
|
| |
|
| | def align_depth_nearest( |
| | depth: np.ndarray, |
| | *, |
| | out_shape_hw: Tuple[int, int], |
| | ) -> np.ndarray: |
| | """ |
| | Nearest-neighbor resize for depth maps (no smoothing). |
| | """ |
| |
|
| | d = np.asarray(depth) |
| | H, W = int(out_shape_hw[0]), int(out_shape_hw[1]) |
| | if d.ndim != 2: |
| | raise ValueError(f"depth must be 2D (H,W), got {d.shape}") |
| | in_h, in_w = d.shape |
| | if (in_h, in_w) == (H, W): |
| | return d.astype(np.float32, copy=False) |
| |
|
| | ys = (np.linspace(0, in_h - 1, num=H)).round().astype(int) |
| | xs = (np.linspace(0, in_w - 1, num=W)).round().astype(int) |
| | out = d[ys[:, None], xs[None, :]].astype(np.float32, copy=False) |
| | return out |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _strip_wfmfoot1_footer(raw: bytes) -> bytes: |
| | """ |
| | If the buffer ends with a WFMFOOT1 v2 footer, strip it. |
| | This keeps binary parsing robust across "footer present" vs "no footer" streams. |
| | """ |
| | if len(raw) >= 36 and raw[-36:-28] == b"WFMFOOT1": |
| | return raw[:-36] |
| | return raw |
| |
|
| |
|
| | def load_v2_timeline_frames(*, data_bin_path: Path) -> Dict[str, np.ndarray]: |
| | """ |
| | Load `timeline.frames` fixed-record stream. |
| | |
| | Record layout (little-endian, 16 bytes): |
| | u32 frameIndex |
| | u32 flags |
| | u64 t_ns |
| | """ |
| | p = Path(data_bin_path) |
| | if not p.exists(): |
| | raise FileNotFoundError(p) |
| | raw = _strip_wfmfoot1_footer(p.read_bytes()) |
| | rec = 16 |
| | if len(raw) < rec: |
| | return { |
| | "frame_index": np.zeros((0,), dtype=np.uint32), |
| | "flags": np.zeros((0,), dtype=np.uint32), |
| | "t_ns": np.zeros((0,), dtype=np.uint64), |
| | } |
| | n = len(raw) // rec |
| | raw = raw[: n * rec] |
| | dt = np.dtype( |
| | { |
| | "names": ["frame_index", "flags", "t_ns"], |
| | "formats": ["<u4", "<u4", "<u8"], |
| | "offsets": [0, 4, 8], |
| | "itemsize": rec, |
| | } |
| | ) |
| | arr = np.frombuffer(raw, dtype=dt, count=n) |
| | return { |
| | "frame_index": arr["frame_index"].astype(np.uint32, copy=False), |
| | "flags": arr["flags"].astype(np.uint32, copy=False), |
| | "t_ns": arr["t_ns"].astype(np.uint64, copy=False), |
| | } |
| |
|
| |
|
| | def load_v2_pose_vio(*, data_bin_path: Path) -> Dict[str, np.ndarray]: |
| | """ |
| | Load `pose.vio` fixed-record stream. |
| | |
| | Record layout (little-endian, 40 bytes): |
| | u64 t_ns |
| | f32 tx ty tz |
| | f32 qx qy qz qw |
| | u16 quality |
| | u16 provider_code |
| | |
| | Returns: |
| | { |
| | "t_ns": (N,) uint64, |
| | "t": (N,3) float32, |
| | "q": (N,4) float32 # xyzw, |
| | "quality": (N,) uint16, |
| | "provider_code": (N,) uint16, |
| | "T_wc": (N,4,4) float64 # world/odom-from-camera (pose) |
| | } |
| | """ |
| | p = Path(data_bin_path) |
| | if not p.exists(): |
| | raise FileNotFoundError(p) |
| | raw = _strip_wfmfoot1_footer(p.read_bytes()) |
| | rec = 40 |
| | if len(raw) < rec: |
| | return { |
| | "t_ns": np.zeros((0,), dtype=np.uint64), |
| | "t": np.zeros((0, 3), dtype=np.float32), |
| | "q": np.zeros((0, 4), dtype=np.float32), |
| | "quality": np.zeros((0,), dtype=np.uint16), |
| | "provider_code": np.zeros((0,), dtype=np.uint16), |
| | "T_wc": np.zeros((0, 4, 4), dtype=np.float64), |
| | } |
| | n = len(raw) // rec |
| | raw = raw[: n * rec] |
| | dt = np.dtype( |
| | { |
| | "names": ["t_ns", "t", "q", "quality", "provider_code"], |
| | "formats": ["<u8", ("<f4", (3,)), ("<f4", (4,)), "<u2", "<u2"], |
| | "offsets": [0, 8, 20, 36, 38], |
| | "itemsize": rec, |
| | } |
| | ) |
| | arr = np.frombuffer(raw, dtype=dt, count=n) |
| |
|
| | t = arr["t"].astype(np.float32, copy=False) |
| | q = arr["q"].astype(np.float32, copy=False) |
| |
|
| | |
| | |
| | |
| | x, y, z, w = q[:, 0], q[:, 1], q[:, 2], q[:, 3] |
| | |
| | norm = np.sqrt(x * x + y * y + z * z + w * w).astype(np.float32) |
| | norm = np.where(norm > 0, norm, 1.0).astype(np.float32) |
| | x, y, z, w = x / norm, y / norm, z / norm, w / norm |
| |
|
| | xx, yy, zz = x * x, y * y, z * z |
| | xy, xz, yz = x * y, x * z, y * z |
| | wx, wy, wz = w * x, w * y, w * z |
| |
|
| | R = np.zeros((n, 3, 3), dtype=np.float64) |
| | R[:, 0, 0] = 1.0 - 2.0 * (yy + zz) |
| | R[:, 0, 1] = 2.0 * (xy - wz) |
| | R[:, 0, 2] = 2.0 * (xz + wy) |
| | R[:, 1, 0] = 2.0 * (xy + wz) |
| | R[:, 1, 1] = 1.0 - 2.0 * (xx + zz) |
| | R[:, 1, 2] = 2.0 * (yz - wx) |
| | R[:, 2, 0] = 2.0 * (xz - wy) |
| | R[:, 2, 1] = 2.0 * (yz + wx) |
| | R[:, 2, 2] = 1.0 - 2.0 * (xx + yy) |
| |
|
| | T = np.zeros((n, 4, 4), dtype=np.float64) |
| | T[:, 3, 3] = 1.0 |
| | T[:, :3, :3] = R |
| | T[:, :3, 3] = t.astype(np.float64) |
| |
|
| | return { |
| | "t_ns": arr["t_ns"].astype(np.uint64, copy=False), |
| | "t": t, |
| | "q": q, |
| | "quality": arr["quality"].astype(np.uint16, copy=False), |
| | "provider_code": arr["provider_code"].astype(np.uint16, copy=False), |
| | "T_wc": T, |
| | } |
| |
|
| |
|
| | def _load_waveform_depth_index(index_path: Path) -> Dict[str, Any]: |
| | """ |
| | Load Waveform Mobile depth stream index.json. |
| | |
| | Expected schema (example_data): |
| | { |
| | "format": { |
| | "depth": {"width": 256, "height": 192, "type": "float32", "units": "meters", |
| | "bytesPerFrame": 196608}, |
| | "depth_smoothed": {...}, |
| | "confidence": {...} |
| | }, |
| | "frames": [ |
| | {"frameIndex": 0, "timestamp": 0.0, "depthOffset": 0, "smoothedDepthOffset": 0, ...}, |
| | ... |
| | ] |
| | } |
| | """ |
| | p = Path(index_path) |
| | obj = json.loads(p.read_text()) |
| | if not isinstance(obj, dict): |
| | raise ValueError(f"Waveform depth index must be a JSON object: {p}") |
| | if "format" not in obj or "frames" not in obj: |
| | raise ValueError(f"Waveform depth index missing required keys: {p}") |
| | if not isinstance(obj.get("frames"), list): |
| | raise ValueError(f"Waveform depth index frames must be a list: {p}") |
| | return obj |
| |
|
| |
|
| | def try_load_waveform_lidar_depth_frame( |
| | *, |
| | bundle_root: Path, |
| | device_id: str, |
| | frame_index: int, |
| | out_shape_hw: Optional[Tuple[int, int]] = None, |
| | prefer_smoothed: bool = True, |
| | ) -> Optional[np.ndarray]: |
| | """ |
| | Best-effort loader for Waveform Mobile LiDAR depth from a packed stream. |
| | |
| | Looks for: |
| | <bundle_root>/devices/<device_id>/depth/index.json |
| | <bundle_root>/devices/<device_id>/depth/{depth_smoothed.bin, depth.bin} |
| | |
| | Returns: |
| | depth_m: float32 array (H,W) in meters, with non-positive set to NaN. |
| | If out_shape_hw is set, resizes with nearest-neighbor to match. |
| | """ |
| | root = Path(bundle_root) |
| | did = str(device_id) |
| | depth_dir = root / "devices" / did / "depth" |
| | return try_load_waveform_lidar_depth_frame_from_dir( |
| | depth_dir=depth_dir, |
| | frame_index=int(frame_index), |
| | out_shape_hw=out_shape_hw, |
| | prefer_smoothed=prefer_smoothed, |
| | ) |
| |
|
| |
|
| | def try_load_waveform_lidar_depth_frame_from_dir( |
| | *, |
| | depth_dir: Path, |
| | frame_index: int, |
| | out_shape_hw: Optional[Tuple[int, int]] = None, |
| | prefer_smoothed: bool = True, |
| | ) -> Optional[np.ndarray]: |
| | """ |
| | Best-effort loader for Waveform Mobile LiDAR depth from a packed stream, |
| | when you already know the `depth/` directory path. |
| | |
| | Looks for: |
| | <depth_dir>/index.json |
| | <depth_dir>/{depth_smoothed.bin, depth.bin} |
| | """ |
| | fi = int(frame_index) |
| | depth_dir = Path(depth_dir) |
| | index_path = depth_dir / "index.json" |
| | if not index_path.exists(): |
| | return None |
| |
|
| | |
| | bin_path = depth_dir / ("depth_smoothed.bin" if prefer_smoothed else "depth.bin") |
| | if not bin_path.exists(): |
| | bin_path = depth_dir / "depth.bin" |
| | if not bin_path.exists(): |
| | return None |
| |
|
| | try: |
| | idx = _load_waveform_depth_index(index_path) |
| | fmt = idx.get("format", {}) if isinstance(idx.get("format"), dict) else {} |
| | depth_fmt = fmt.get( |
| | "depth_smoothed" if (prefer_smoothed and "depth_smoothed" in fmt) else "depth" |
| | ) |
| | if not isinstance(depth_fmt, dict): |
| | depth_fmt = fmt.get("depth", {}) if isinstance(fmt.get("depth"), dict) else {} |
| | w = int(depth_fmt.get("width", 0) or 0) |
| | h = int(depth_fmt.get("height", 0) or 0) |
| | bpf = int(depth_fmt.get("bytesPerFrame", 0) or 0) |
| | dtype = str(depth_fmt.get("type", "float32")).lower().strip() |
| | units = str(depth_fmt.get("units", "meters")).lower().strip() |
| | if w <= 0 or h <= 0: |
| | return None |
| | if dtype not in {"float32", "f32"}: |
| | |
| | return None |
| | if units not in {"meters", "meter", "m"}: |
| | |
| | return None |
| |
|
| | expected_bpf = int(w * h * 4) |
| | if bpf <= 0: |
| | bpf = expected_bpf |
| | if bpf != expected_bpf: |
| | |
| | return None |
| |
|
| | |
| | rec = None |
| | frames = idx.get("frames", []) |
| | for r in frames: |
| | if not isinstance(r, dict): |
| | continue |
| | if int(r.get("frameIndex", -1)) == fi: |
| | rec = r |
| | break |
| | if rec is None: |
| | return None |
| |
|
| | |
| | key = ( |
| | "smoothedDepthOffset" |
| | if (prefer_smoothed and "smoothedDepthOffset" in rec) |
| | else "depthOffset" |
| | ) |
| | off = rec.get(key) |
| | if off is None: |
| | off = rec.get("depthOffset") |
| | if off is None: |
| | return None |
| | offset = int(off) |
| | if offset < 0: |
| | return None |
| |
|
| | with Path(bin_path).open("rb") as f: |
| | f.seek(offset) |
| | raw = f.read(bpf) |
| | if len(raw) != bpf: |
| | return None |
| | arr = np.frombuffer(raw, dtype=np.float32, count=w * h) |
| | if arr.size != w * h: |
| | return None |
| | depth_m = arr.reshape((h, w)).astype(np.float32, copy=False) |
| | depth_m[~np.isfinite(depth_m)] = np.nan |
| | depth_m[depth_m <= 0] = np.nan |
| |
|
| | if out_shape_hw is not None: |
| | depth_m = align_depth_nearest(depth_m, out_shape_hw=out_shape_hw) |
| | return depth_m |
| | except Exception: |
| | return None |
| |
|
| |
|
| | def load_arkit_poses_json(path: Path) -> np.ndarray: |
| | """ |
| | Load ARKit poses from JSON into (N,4,4) camera-to-world matrices. |
| | |
| | Accepted formats: |
| | - {"poses": [[[...4x4...]], ...]} |
| | - [ [[...4x4...]], ... ] |
| | """ |
| |
|
| | obj = json.loads(Path(path).read_text()) |
| | if isinstance(obj, dict) and "poses" in obj: |
| | obj = obj["poses"] |
| | if not isinstance(obj, list): |
| | raise ValueError("Expected a list of 4x4 poses or {'poses': [...]} JSON") |
| |
|
| | mats = [] |
| | for p in obj: |
| | a = np.asarray(p, dtype=np.float64) |
| | if a.shape != (4, 4): |
| | raise ValueError(f"Pose must be 4x4, got {a.shape}") |
| | mats.append(a) |
| | return np.stack(mats, axis=0).astype(np.float64) |
| |
|
| |
|
| | def load_arkit_poses_with_frame_index(path: Path) -> tuple[np.ndarray, Optional[np.ndarray]]: |
| | """ |
| | Load ARKit poses and (optional) frame_index mapping. |
| | |
| | Accepted formats: |
| | - {"poses": [4x4,...], "frame_index": [int,...]} (WaveformMobile normalization writer) |
| | - {"poses": [4x4,...]} (no index) |
| | - [4x4,...] (no index) |
| | |
| | Returns: (poses_c2w: (N,4,4) float64, frame_index: (N,) int64 or None) |
| | """ |
| | obj = json.loads(Path(path).read_text()) |
| | frame_index = None |
| | poses_obj = obj |
| | if isinstance(obj, dict): |
| | poses_obj = obj.get("poses", obj.get("poses_c2w", obj)) |
| | fi = obj.get("frame_index") or obj.get("frameIndex") |
| | if isinstance(fi, list) and fi: |
| | try: |
| | frame_index = np.asarray(fi, dtype=np.int64).reshape(-1) |
| | except Exception: |
| | frame_index = None |
| | |
| | if isinstance(poses_obj, dict) and "poses" in poses_obj: |
| | poses_obj = poses_obj["poses"] |
| | if not isinstance(poses_obj, list): |
| | raise ValueError("Expected poses list or {'poses': [...]} JSON") |
| | mats = [] |
| | for p in poses_obj: |
| | a = np.asarray(p, dtype=np.float64) |
| | if a.shape != (4, 4): |
| | raise ValueError(f"Pose must be 4x4, got {a.shape}") |
| | mats.append(a) |
| | poses = np.stack(mats, axis=0).astype(np.float64) |
| | if frame_index is not None and int(frame_index.size) != int(poses.shape[0]): |
| | |
| | frame_index = None |
| | return poses, frame_index |
| |
|
| |
|
| | def normalize_arkit_to_w2c( |
| | c2w_poses: np.ndarray, |
| | *, |
| | convert_coords: bool = True, |
| | ) -> np.ndarray: |
| | """ |
| | Convert ARKit camera-to-world to world-to-camera in DA3-friendly 3x4 format. |
| | """ |
| |
|
| | from ..utils.coordinate_utils import convert_arkit_c2w_to_w2c |
| |
|
| | c2w = np.asarray(c2w_poses, dtype=np.float64) |
| | if c2w.ndim != 3 or c2w.shape[1:] != (4, 4): |
| | raise ValueError(f"Expected (N,4,4), got {c2w.shape}") |
| | outs = [] |
| | for T in c2w: |
| | outs.append(convert_arkit_c2w_to_w2c(T, convert_coords=convert_coords)) |
| | return np.asarray(outs, dtype=np.float64) |
| |
|
| |
|
| | def load_optional_json(path: Optional[Path]) -> Optional[Dict[str, Any]]: |
| | if path is None: |
| | return None |
| | p = Path(path) |
| | if not p.exists(): |
| | return None |
| | obj = json.loads(p.read_text()) |
| | if not isinstance(obj, dict): |
| | return None |
| | return obj |
| |
|