3d_model / ylff /services /sensor_adapters.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Sensor parsing adapters (Phase 1).
These helpers normalize raw sensor artifacts into numpy arrays with consistent
conventions so downstream teacher/audit/training can be metrologically audited.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import numpy as np
def _dtype_waveform_imu_sample(*, sample_size: int = 60) -> np.dtype:
"""
NumPy dtype for WaveformMobile `IMUSampleBinary` (little-endian).
Swift layout (packed; record size = MemoryLayout<IMUSampleBinary>.size):
- timestamp: f64
- quaternion: 4xf32 [x,y,z,w]
- rotationRate: 3xf32 [x,y,z] rad/s
- userAcceleration: 3xf32 [x,y,z] g
- gravity: 3xf32 [x,y,z] (CoreMotion gravity vector)
"""
# Offsets derived from the Swift struct field ordering (no padding beyond record_size).
return np.dtype(
{
"names": ["t", "q", "r", "a", "g"],
"formats": ["<f8", ("<f4", (4,)), ("<f4", (3,)), ("<f4", (3,)), ("<f4", (3,))],
"offsets": [0, 8, 24, 36, 48],
"itemsize": int(sample_size),
}
)
def _dtype_waveform_imu_frame(*, frame_size: int, sample_size: int = 60) -> np.dtype:
"""
NumPy dtype for WaveformMobile `FrameIMUData` (little-endian).
Swift layout (typical; record size comes from imu_index.json):
- frameIndex: u32 (offset 0)
- (padding to 8-byte alignment)
- frameTimestamp: f64 (offset 8)
- interpolated sample: IMUSampleBinary
- before1, before0, after0, after1: IMUSampleBinary
"""
s = _dtype_waveform_imu_sample(sample_size=sample_size)
# Offsets assume the compiler aligns the f64 at offset 8, then samples contiguous.
# We still gate on `frame_size` read from imu_index.json to avoid mismatches.
base = 16
return np.dtype(
{
"names": [
"frame_index",
"frame_timestamp",
"interp",
"before1",
"before0",
"after0",
"after1",
],
"formats": ["<u4", "<f8", s, s, s, s, s],
"offsets": [
0,
8,
base,
base + 1 * sample_size,
base + 2 * sample_size,
base + 3 * sample_size,
base + 4 * sample_size,
],
"itemsize": int(frame_size),
}
)
def load_waveform_imu_index(index_path: Path) -> Dict[str, Any]:
"""
Load WaveformMobile `imu_index.json` and return raw JSON.
"""
p = Path(index_path)
obj = json.loads(p.read_text())
if not isinstance(obj, dict):
raise ValueError(f"imu_index.json must be an object: {p}")
return obj
def load_waveform_imu_frames(
*,
frames_bin_path: Path,
imu_index_path: Optional[Path] = None,
) -> Dict[str, np.ndarray]:
"""
Load WaveformMobile per-frame IMU (`imu_frames.bin`) into arrays.
Returns:
{
"frame_index": (N,) uint32
"t": (N,) float64 # IMU-relative seconds (CoreMotion domain minus firstSampleTimestamp)
"q": (N,4) float32
"r": (N,3) float32 # rad/s
"a": (N,3) float32 # g (user acceleration)
"g": (N,3) float32 # gravity vector
}
"""
frames_bin_path = Path(frames_bin_path)
if not frames_bin_path.exists():
raise FileNotFoundError(frames_bin_path)
frame_size = None
sample_size = 60
if imu_index_path is not None and Path(imu_index_path).exists():
idx = load_waveform_imu_index(Path(imu_index_path))
bf = idx.get("binaryFormat") if isinstance(idx.get("binaryFormat"), dict) else {}
try:
sample_size = int(bf.get("sampleSize") or sample_size)
except Exception:
sample_size = 60
try:
frame_size = int(bf.get("frameSize") or 0) or None
except Exception:
frame_size = None
if frame_size is None:
# Conservative default: assume typical alignment to 320 bytes.
frame_size = 320
raw = frames_bin_path.read_bytes()
if frame_size <= 0 or len(raw) < frame_size:
raise ValueError("imu_frames.bin too small or invalid frame_size")
n = len(raw) // int(frame_size)
if n <= 0 or (n * int(frame_size)) != len(raw):
raise ValueError("imu_frames.bin size is not a multiple of frame record size")
dt = _dtype_waveform_imu_frame(frame_size=int(frame_size), sample_size=int(sample_size))
arr = np.frombuffer(raw, dtype=dt, count=n)
interp = arr["interp"]
return {
"frame_index": arr["frame_index"].astype(np.uint32, copy=False),
"t": arr["frame_timestamp"].astype(np.float64, copy=False),
"q": interp["q"].astype(np.float32, copy=False),
"r": interp["r"].astype(np.float32, copy=False),
"a": interp["a"].astype(np.float32, copy=False),
"g": interp["g"].astype(np.float32, copy=False),
}
def load_waveform_barometer_index(index_path: Path) -> Dict[str, Any]:
p = Path(index_path)
obj = json.loads(p.read_text())
if not isinstance(obj, dict):
raise ValueError(f"barometer index.json must be an object: {p}")
return obj
def load_waveform_barometer_stream(
*,
stream_bin_path: Path,
index_path: Optional[Path] = None,
) -> Dict[str, np.ndarray]:
"""
Load WaveformMobile barometer stream (`barometer_stream.bin`) into arrays.
Record layout (little-endian, packed):
u32 sampleIndex
f64 unixTimestampSeconds
f64 relativeTimestampSeconds (seconds since capture start)
f64 pressureKPa
f64 relativeAltitudeMeters
"""
stream_bin_path = Path(stream_bin_path)
if not stream_bin_path.exists():
raise FileNotFoundError(stream_bin_path)
rec_size = 36
if index_path is not None and Path(index_path).exists():
idx = load_waveform_barometer_index(Path(index_path))
stream = idx.get("stream") if isinstance(idx.get("stream"), dict) else {}
try:
rec_size = int(stream.get("record_size_bytes") or rec_size)
except Exception:
rec_size = 36
dt = np.dtype(
{
"names": ["sample_index", "unix_ts", "t_rel", "pressure_kpa", "rel_alt_m"],
"formats": ["<u4", "<f8", "<f8", "<f8", "<f8"],
"offsets": [0, 4, 12, 20, 28],
"itemsize": int(rec_size),
}
)
raw = stream_bin_path.read_bytes()
if len(raw) < rec_size:
return {
"sample_index": np.zeros((0,), dtype=np.uint32),
"unix_ts": np.zeros((0,), dtype=np.float64),
"t_rel": np.zeros((0,), dtype=np.float64),
"pressure_kpa": np.zeros((0,), dtype=np.float64),
"rel_alt_m": np.zeros((0,), dtype=np.float64),
}
n = len(raw) // int(rec_size)
if (n * int(rec_size)) != len(raw):
# Best-effort: ignore trailing bytes (footer or partial).
raw = raw[: n * int(rec_size)]
arr = np.frombuffer(raw, dtype=dt, count=n)
return {
"sample_index": arr["sample_index"].astype(np.uint32, copy=False),
"unix_ts": arr["unix_ts"].astype(np.float64, copy=False),
"t_rel": arr["t_rel"].astype(np.float64, copy=False),
"pressure_kpa": arr["pressure_kpa"].astype(np.float64, copy=False),
"rel_alt_m": arr["rel_alt_m"].astype(np.float64, copy=False),
}
def load_lidar_depth_16bit_png(
path: Path,
*,
depth_scale_m: float = 0.001,
) -> np.ndarray:
"""
Load a 16-bit depth PNG and convert to meters.
Common convention: uint16 stores depth in millimeters -> depth_scale_m=0.001.
"""
try:
from PIL import Image # type: ignore
except Exception as e: # pragma: no cover
raise ImportError(
"Loading 16-bit PNG depth requires Pillow. Install with: pip install pillow"
) from e
im = Image.open(Path(path))
arr = np.array(im)
if arr.dtype != np.uint16:
arr = arr.astype(np.uint16, copy=False)
depth_m = arr.astype(np.float32) * float(depth_scale_m)
# Treat 0 as invalid
depth_m[depth_m <= 0] = np.nan
return depth_m
def align_depth_nearest(
depth: np.ndarray,
*,
out_shape_hw: Tuple[int, int],
) -> np.ndarray:
"""
Nearest-neighbor resize for depth maps (no smoothing).
"""
d = np.asarray(depth)
H, W = int(out_shape_hw[0]), int(out_shape_hw[1])
if d.ndim != 2:
raise ValueError(f"depth must be 2D (H,W), got {d.shape}")
in_h, in_w = d.shape
if (in_h, in_w) == (H, W):
return d.astype(np.float32, copy=False)
ys = (np.linspace(0, in_h - 1, num=H)).round().astype(int)
xs = (np.linspace(0, in_w - 1, num=W)).round().astype(int)
out = d[ys[:, None], xs[None, :]].astype(np.float32, copy=False)
return out
# -----------------------------------------------------------------------------
# v2 stream-centric adapters (Waveform v2 capture container)
# -----------------------------------------------------------------------------
def _strip_wfmfoot1_footer(raw: bytes) -> bytes:
"""
If the buffer ends with a WFMFOOT1 v2 footer, strip it.
This keeps binary parsing robust across "footer present" vs "no footer" streams.
"""
if len(raw) >= 36 and raw[-36:-28] == b"WFMFOOT1":
return raw[:-36]
return raw
def load_v2_timeline_frames(*, data_bin_path: Path) -> Dict[str, np.ndarray]:
"""
Load `timeline.frames` fixed-record stream.
Record layout (little-endian, 16 bytes):
u32 frameIndex
u32 flags
u64 t_ns
"""
p = Path(data_bin_path)
if not p.exists():
raise FileNotFoundError(p)
raw = _strip_wfmfoot1_footer(p.read_bytes())
rec = 16
if len(raw) < rec:
return {
"frame_index": np.zeros((0,), dtype=np.uint32),
"flags": np.zeros((0,), dtype=np.uint32),
"t_ns": np.zeros((0,), dtype=np.uint64),
}
n = len(raw) // rec
raw = raw[: n * rec]
dt = np.dtype(
{
"names": ["frame_index", "flags", "t_ns"],
"formats": ["<u4", "<u4", "<u8"],
"offsets": [0, 4, 8],
"itemsize": rec,
}
)
arr = np.frombuffer(raw, dtype=dt, count=n)
return {
"frame_index": arr["frame_index"].astype(np.uint32, copy=False),
"flags": arr["flags"].astype(np.uint32, copy=False),
"t_ns": arr["t_ns"].astype(np.uint64, copy=False),
}
def load_v2_pose_vio(*, data_bin_path: Path) -> Dict[str, np.ndarray]:
"""
Load `pose.vio` fixed-record stream.
Record layout (little-endian, 40 bytes):
u64 t_ns
f32 tx ty tz
f32 qx qy qz qw
u16 quality
u16 provider_code
Returns:
{
"t_ns": (N,) uint64,
"t": (N,3) float32,
"q": (N,4) float32 # xyzw,
"quality": (N,) uint16,
"provider_code": (N,) uint16,
"T_wc": (N,4,4) float64 # world/odom-from-camera (pose)
}
"""
p = Path(data_bin_path)
if not p.exists():
raise FileNotFoundError(p)
raw = _strip_wfmfoot1_footer(p.read_bytes())
rec = 40
if len(raw) < rec:
return {
"t_ns": np.zeros((0,), dtype=np.uint64),
"t": np.zeros((0, 3), dtype=np.float32),
"q": np.zeros((0, 4), dtype=np.float32),
"quality": np.zeros((0,), dtype=np.uint16),
"provider_code": np.zeros((0,), dtype=np.uint16),
"T_wc": np.zeros((0, 4, 4), dtype=np.float64),
}
n = len(raw) // rec
raw = raw[: n * rec]
dt = np.dtype(
{
"names": ["t_ns", "t", "q", "quality", "provider_code"],
"formats": ["<u8", ("<f4", (3,)), ("<f4", (4,)), "<u2", "<u2"],
"offsets": [0, 8, 20, 36, 38],
"itemsize": rec,
}
)
arr = np.frombuffer(raw, dtype=dt, count=n)
t = arr["t"].astype(np.float32, copy=False)
q = arr["q"].astype(np.float32, copy=False)
# Convert quaternion (xyzw) + translation to 4x4 T_wc.
# Note: This is a pure math conversion; coordinate convention conversion
# (e.g., ARKit -> OpenCV) happens at callsites (teacher pipeline).
x, y, z, w = q[:, 0], q[:, 1], q[:, 2], q[:, 3]
# Normalize defensively
norm = np.sqrt(x * x + y * y + z * z + w * w).astype(np.float32)
norm = np.where(norm > 0, norm, 1.0).astype(np.float32)
x, y, z, w = x / norm, y / norm, z / norm, w / norm
xx, yy, zz = x * x, y * y, z * z
xy, xz, yz = x * y, x * z, y * z
wx, wy, wz = w * x, w * y, w * z
R = np.zeros((n, 3, 3), dtype=np.float64)
R[:, 0, 0] = 1.0 - 2.0 * (yy + zz)
R[:, 0, 1] = 2.0 * (xy - wz)
R[:, 0, 2] = 2.0 * (xz + wy)
R[:, 1, 0] = 2.0 * (xy + wz)
R[:, 1, 1] = 1.0 - 2.0 * (xx + zz)
R[:, 1, 2] = 2.0 * (yz - wx)
R[:, 2, 0] = 2.0 * (xz - wy)
R[:, 2, 1] = 2.0 * (yz + wx)
R[:, 2, 2] = 1.0 - 2.0 * (xx + yy)
T = np.zeros((n, 4, 4), dtype=np.float64)
T[:, 3, 3] = 1.0
T[:, :3, :3] = R
T[:, :3, 3] = t.astype(np.float64)
return {
"t_ns": arr["t_ns"].astype(np.uint64, copy=False),
"t": t,
"q": q,
"quality": arr["quality"].astype(np.uint16, copy=False),
"provider_code": arr["provider_code"].astype(np.uint16, copy=False),
"T_wc": T,
}
def _load_waveform_depth_index(index_path: Path) -> Dict[str, Any]:
"""
Load Waveform Mobile depth stream index.json.
Expected schema (example_data):
{
"format": {
"depth": {"width": 256, "height": 192, "type": "float32", "units": "meters",
"bytesPerFrame": 196608},
"depth_smoothed": {...},
"confidence": {...}
},
"frames": [
{"frameIndex": 0, "timestamp": 0.0, "depthOffset": 0, "smoothedDepthOffset": 0, ...},
...
]
}
"""
p = Path(index_path)
obj = json.loads(p.read_text())
if not isinstance(obj, dict):
raise ValueError(f"Waveform depth index must be a JSON object: {p}")
if "format" not in obj or "frames" not in obj:
raise ValueError(f"Waveform depth index missing required keys: {p}")
if not isinstance(obj.get("frames"), list):
raise ValueError(f"Waveform depth index frames must be a list: {p}")
return obj
def try_load_waveform_lidar_depth_frame(
*,
bundle_root: Path,
device_id: str,
frame_index: int,
out_shape_hw: Optional[Tuple[int, int]] = None,
prefer_smoothed: bool = True,
) -> Optional[np.ndarray]:
"""
Best-effort loader for Waveform Mobile LiDAR depth from a packed stream.
Looks for:
<bundle_root>/devices/<device_id>/depth/index.json
<bundle_root>/devices/<device_id>/depth/{depth_smoothed.bin, depth.bin}
Returns:
depth_m: float32 array (H,W) in meters, with non-positive set to NaN.
If out_shape_hw is set, resizes with nearest-neighbor to match.
"""
root = Path(bundle_root)
did = str(device_id)
depth_dir = root / "devices" / did / "depth"
return try_load_waveform_lidar_depth_frame_from_dir(
depth_dir=depth_dir,
frame_index=int(frame_index),
out_shape_hw=out_shape_hw,
prefer_smoothed=prefer_smoothed,
)
def try_load_waveform_lidar_depth_frame_from_dir(
*,
depth_dir: Path,
frame_index: int,
out_shape_hw: Optional[Tuple[int, int]] = None,
prefer_smoothed: bool = True,
) -> Optional[np.ndarray]:
"""
Best-effort loader for Waveform Mobile LiDAR depth from a packed stream,
when you already know the `depth/` directory path.
Looks for:
<depth_dir>/index.json
<depth_dir>/{depth_smoothed.bin, depth.bin}
"""
fi = int(frame_index)
depth_dir = Path(depth_dir)
index_path = depth_dir / "index.json"
if not index_path.exists():
return None
# Prefer smoothed if requested and present; otherwise fall back to raw depth.
bin_path = depth_dir / ("depth_smoothed.bin" if prefer_smoothed else "depth.bin")
if not bin_path.exists():
bin_path = depth_dir / "depth.bin"
if not bin_path.exists():
return None
try:
idx = _load_waveform_depth_index(index_path)
fmt = idx.get("format", {}) if isinstance(idx.get("format"), dict) else {}
depth_fmt = fmt.get(
"depth_smoothed" if (prefer_smoothed and "depth_smoothed" in fmt) else "depth"
)
if not isinstance(depth_fmt, dict):
depth_fmt = fmt.get("depth", {}) if isinstance(fmt.get("depth"), dict) else {}
w = int(depth_fmt.get("width", 0) or 0)
h = int(depth_fmt.get("height", 0) or 0)
bpf = int(depth_fmt.get("bytesPerFrame", 0) or 0)
dtype = str(depth_fmt.get("type", "float32")).lower().strip()
units = str(depth_fmt.get("units", "meters")).lower().strip()
if w <= 0 or h <= 0:
return None
if dtype not in {"float32", "f32"}:
# We only support the packed float32 format for now.
return None
if units not in {"meters", "meter", "m"}:
# Unexpected units; refuse to silently mis-scale.
return None
expected_bpf = int(w * h * 4)
if bpf <= 0:
bpf = expected_bpf
if bpf != expected_bpf:
# Index claims a different layout than float32(H*W).
return None
# Find record for requested ARFrame/video frame index.
rec = None
frames = idx.get("frames", [])
for r in frames:
if not isinstance(r, dict):
continue
if int(r.get("frameIndex", -1)) == fi:
rec = r
break
if rec is None:
return None
# Determine byte offset to read.
key = (
"smoothedDepthOffset"
if (prefer_smoothed and "smoothedDepthOffset" in rec)
else "depthOffset"
)
off = rec.get(key)
if off is None:
off = rec.get("depthOffset")
if off is None:
return None
offset = int(off)
if offset < 0:
return None
with Path(bin_path).open("rb") as f:
f.seek(offset)
raw = f.read(bpf)
if len(raw) != bpf:
return None
arr = np.frombuffer(raw, dtype=np.float32, count=w * h)
if arr.size != w * h:
return None
depth_m = arr.reshape((h, w)).astype(np.float32, copy=False)
depth_m[~np.isfinite(depth_m)] = np.nan
depth_m[depth_m <= 0] = np.nan
if out_shape_hw is not None:
depth_m = align_depth_nearest(depth_m, out_shape_hw=out_shape_hw)
return depth_m
except Exception:
return None
def load_arkit_poses_json(path: Path) -> np.ndarray:
"""
Load ARKit poses from JSON into (N,4,4) camera-to-world matrices.
Accepted formats:
- {"poses": [[[...4x4...]], ...]}
- [ [[...4x4...]], ... ]
"""
obj = json.loads(Path(path).read_text())
if isinstance(obj, dict) and "poses" in obj:
obj = obj["poses"]
if not isinstance(obj, list):
raise ValueError("Expected a list of 4x4 poses or {'poses': [...]} JSON")
mats = []
for p in obj:
a = np.asarray(p, dtype=np.float64)
if a.shape != (4, 4):
raise ValueError(f"Pose must be 4x4, got {a.shape}")
mats.append(a)
return np.stack(mats, axis=0).astype(np.float64)
def load_arkit_poses_with_frame_index(path: Path) -> tuple[np.ndarray, Optional[np.ndarray]]:
"""
Load ARKit poses and (optional) frame_index mapping.
Accepted formats:
- {"poses": [4x4,...], "frame_index": [int,...]} (WaveformMobile normalization writer)
- {"poses": [4x4,...]} (no index)
- [4x4,...] (no index)
Returns: (poses_c2w: (N,4,4) float64, frame_index: (N,) int64 or None)
"""
obj = json.loads(Path(path).read_text())
frame_index = None
poses_obj = obj
if isinstance(obj, dict):
poses_obj = obj.get("poses", obj.get("poses_c2w", obj))
fi = obj.get("frame_index") or obj.get("frameIndex")
if isinstance(fi, list) and fi:
try:
frame_index = np.asarray(fi, dtype=np.int64).reshape(-1)
except Exception:
frame_index = None
# Parse poses list (reuse validation from load_arkit_poses_json)
if isinstance(poses_obj, dict) and "poses" in poses_obj:
poses_obj = poses_obj["poses"]
if not isinstance(poses_obj, list):
raise ValueError("Expected poses list or {'poses': [...]} JSON")
mats = []
for p in poses_obj:
a = np.asarray(p, dtype=np.float64)
if a.shape != (4, 4):
raise ValueError(f"Pose must be 4x4, got {a.shape}")
mats.append(a)
poses = np.stack(mats, axis=0).astype(np.float64)
if frame_index is not None and int(frame_index.size) != int(poses.shape[0]):
# Reject mismatched mapping (better to ignore than silently misalign).
frame_index = None
return poses, frame_index
def normalize_arkit_to_w2c(
c2w_poses: np.ndarray,
*,
convert_coords: bool = True,
) -> np.ndarray:
"""
Convert ARKit camera-to-world to world-to-camera in DA3-friendly 3x4 format.
"""
from ..utils.coordinate_utils import convert_arkit_c2w_to_w2c
c2w = np.asarray(c2w_poses, dtype=np.float64)
if c2w.ndim != 3 or c2w.shape[1:] != (4, 4):
raise ValueError(f"Expected (N,4,4), got {c2w.shape}")
outs = []
for T in c2w:
outs.append(convert_arkit_c2w_to_w2c(T, convert_coords=convert_coords))
return np.asarray(outs, dtype=np.float64)
def load_optional_json(path: Optional[Path]) -> Optional[Dict[str, Any]]:
if path is None:
return None
p = Path(path)
if not p.exists():
return None
obj = json.loads(p.read_text())
if not isinstance(obj, dict):
return None
return obj