""" Capture bundle reader/validator (SPECIFICATIONS.md Appendix C). This module provides a single entry point (`CaptureBundle`) that: - loads a bundle manifest, - validates referenced files exist, - offers typed accessors to common assets. """ from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np from ..models.capture_models import CaptureManifest, DetailedAnnotation from .dataset_layout import CaptureBundleLayout, validate_paths_exist class CaptureBundleError(ValueError): pass @dataclass(frozen=True) class _CalibrationCompat: rig_extrinsics_path: Optional[str] = None sync_offsets_path: Optional[str] = None @dataclass(frozen=True) class _AnnotationsCompat: quick_annotation_path: Optional[str] = None detailed_annotation_path: Optional[str] = None @dataclass(frozen=True) class _DeviceCompat: device_id: str video_path: Optional[str] = None intrinsics_path: Optional[str] = None timestamps_path: Optional[str] = None arkit_poses_path: Optional[str] = None lidar_depth_dir: Optional[str] = None # Optional forward-compat passthrough (mirrors pydantic extra) model_extra: Dict[str, Any] = None # type: ignore[assignment] @dataclass(frozen=True) class _ManifestCompat: schema_version: str capture_id: str devices: List[_DeviceCompat] calibration: Optional[_CalibrationCompat] = None annotations: Optional[_AnnotationsCompat] = None teacher_outputs: Optional[Any] = None metadata: Dict[str, Any] = None # type: ignore[assignment] @dataclass(frozen=True) class CaptureBundle: root: Path # Backward-compatible view of the manifest. For v1 this is derived from # the pydantic model; for v2 this is synthesized from the stream registry. manifest: Union[CaptureManifest, _ManifestCompat] manifest_obj: Dict[str, Any] schema_version: str _v2_stream_index: Dict[Tuple[str, str], str] @classmethod def load(cls, root: Path) -> CaptureBundle: root = Path(root) layout = CaptureBundleLayout(root=root) if not root.exists(): raise CaptureBundleError(f"Capture bundle root does not exist: {root}") if not layout.manifest_path.exists(): raise CaptureBundleError(f"Missing manifest.json at: {layout.manifest_path}") try: manifest_obj = json.loads(layout.manifest_path.read_text()) except Exception as e: raise CaptureBundleError(f"Failed to parse manifest.json: {e}") from e schema_version = str(manifest_obj.get("schema_version", "1.0") or "1.0") # v1: strict pydantic schema if schema_version.startswith("1"): try: manifest = CaptureManifest.model_validate(manifest_obj) except Exception as e: raise CaptureBundleError(f"Invalid manifest schema: {e}") from e bundle = cls( root=root, manifest=manifest, manifest_obj=manifest_obj, schema_version=schema_version, _v2_stream_index={}, ) bundle.validate(repairable_ok=False) return bundle # v2: stream-centric schema. Synthesize a backward-compatible manifest view. if not schema_version.startswith("2"): raise CaptureBundleError(f"Unsupported manifest schema_version: {schema_version}") devices = manifest_obj.get("devices", []) or [] if not isinstance(devices, list) or not devices: raise CaptureBundleError("v2 manifest has no devices[] pointers") # Build stream index: (device_id, kind) -> stream.json path v2_stream_index: Dict[Tuple[str, str], str] = {} for s in manifest_obj.get("streams", []) or []: if not isinstance(s, dict): continue did = s.get("device_id") kind = s.get("kind") path = s.get("path") if ( isinstance(did, str) and isinstance(kind, str) and isinstance(path, str) and did and kind and path ): v2_stream_index[(did, kind)] = path def _resolve_stream_data_rel(did: str, kind: str) -> Optional[str]: sp = v2_stream_index.get((did, kind)) if not sp: return None sj_path = (root / sp).resolve() if not sj_path.exists(): return None try: sj = json.loads(sj_path.read_text()) except Exception: return None data = sj.get("data") if isinstance(sj, dict) else None if isinstance(data, dict) and isinstance(data.get("path"), str) and data["path"]: # Convert to bundle-root-relative path rel = (sj_path.parent / data["path"]).resolve() try: return str(rel.relative_to(root)) except Exception: return str(rel) return None compat_devices: List[_DeviceCompat] = [] for dref in devices: if not isinstance(dref, dict): continue did = dref.get("device_id") p = dref.get("path") if not (isinstance(did, str) and did and isinstance(p, str) and p): continue # Prefer stream registry for primary assets. video_rel = _resolve_stream_data_rel(did, "video.rgb") # Intrinsics: v2 prefers per-device calibration/intrinsics.json; # fall back to v1 intrinsics.json. intr_v2 = root / "devices" / did / "calibration" / "intrinsics.json" intr_v1 = root / "devices" / did / "intrinsics.json" intr_rel = None if intr_v2.exists(): intr_rel = str(intr_v2.relative_to(root)) elif intr_v1.exists(): intr_rel = str(intr_v1.relative_to(root)) ts_v1 = root / "devices" / did / "timestamps.json" ts_rel = str(ts_v1.relative_to(root)) if ts_v1.exists() else None poses_v1 = root / "devices" / did / "arkit_poses.json" poses_rel = str(poses_v1.relative_to(root)) if poses_v1.exists() else None depth_dir_v1 = root / "devices" / did / "depth" depth_dir_rel = str(depth_dir_v1.relative_to(root)) if depth_dir_v1.exists() else None extra_streams: Dict[str, Any] = {} if depth_dir_rel: extra_streams["depth"] = {"directory": depth_dir_rel} compat_devices.append( _DeviceCompat( device_id=did, video_path=video_rel, intrinsics_path=intr_rel, timestamps_path=ts_rel, arkit_poses_path=poses_rel, lidar_depth_dir=None, model_extra={"streams": extra_streams} if extra_streams else {}, ) ) if not compat_devices: raise CaptureBundleError("v2 manifest has no usable device entries") # Calibration + annotations compat: map well-known files when present. calib_files = [] calib = ( manifest_obj.get("calibration") if isinstance(manifest_obj.get("calibration"), dict) else {} ) if isinstance(calib, dict) and isinstance(calib.get("files"), list): calib_files = [str(x) for x in calib.get("files") if isinstance(x, str)] cal = _CalibrationCompat( rig_extrinsics_path=( "calibration/rig_extrinsics.json" if "calibration/rig_extrinsics.json" in calib_files else None ), sync_offsets_path=( "calibration/sync_offsets.json" if "calibration/sync_offsets.json" in calib_files else None ), ) ann_files = [] anns = ( manifest_obj.get("annotations") if isinstance(manifest_obj.get("annotations"), dict) else {} ) if isinstance(anns, dict) and isinstance(anns.get("files"), list): ann_files = [str(x) for x in anns.get("files") if isinstance(x, str)] ann = _AnnotationsCompat( quick_annotation_path=( "annotations/quick_annotation.json" if "annotations/quick_annotation.json" in ann_files else None ), detailed_annotation_path=( "annotations/detailed_annotation.json" if "annotations/detailed_annotation.json" in ann_files else None ), ) compat_manifest = _ManifestCompat( schema_version=schema_version, capture_id=str(manifest_obj.get("capture_id") or ""), devices=compat_devices, calibration=cal, annotations=ann, metadata={}, ) bundle = cls( root=root, manifest=compat_manifest, manifest_obj=manifest_obj, schema_version=schema_version, _v2_stream_index=v2_stream_index, ) bundle.validate(repairable_ok=False) return bundle @property def layout(self) -> CaptureBundleLayout: return CaptureBundleLayout(root=self.root) def validate(self, repairable_ok: bool = True) -> None: """ Validate that referenced paths exist. If `repairable_ok` is True, missing optional assets do not fail validation. """ rels: List[Optional[str]] = [] devices = getattr(self.manifest, "devices", []) or [] for d in devices: rels.extend( [ getattr(d, "video_path", None), getattr(d, "intrinsics_path", None), getattr(d, "timestamps_path", None), getattr(d, "arkit_poses_path", None), ] ) rels.append(getattr(d, "lidar_depth_dir", None)) # WaveformMobile forward-compat: some manifests store depth under # device.extra.streams.depth. # Prefer lidar_depth_dir, but validate stream directory when present. extra = getattr(d, "model_extra", None) or {} streams = extra.get("streams") if isinstance(extra, dict) else None if isinstance(streams, dict): depth = streams.get("depth") if isinstance(depth, dict): rel = depth.get("directory") if isinstance(rel, str) and rel: rels.append(rel) if getattr(self.manifest, "calibration", None): cal = self.manifest.calibration # type: ignore[union-attr] rels.extend( [ getattr(cal, "rig_extrinsics_path", None), getattr(cal, "sync_offsets_path", None), ] ) if getattr(self.manifest, "annotations", None): ann = self.manifest.annotations # type: ignore[union-attr] rels.extend( [ getattr(ann, "quick_annotation_path", None), getattr(ann, "detailed_annotation_path", None), ] ) if getattr(self.manifest, "teacher_outputs", None): tout = self.manifest.teacher_outputs # type: ignore[union-attr] rels.extend( [ getattr(tout, "depth_dir", None), getattr(tout, "uncertainty_dir", None), getattr(tout, "reconstruction_path", None), ] ) errors = validate_paths_exist(self.root, rels) if errors and not repairable_ok: raise CaptureBundleError("Capture bundle validation failed:\n- " + "\n- ".join(errors)) # ---- Convenience accessors ------------------------------------------------- def list_devices(self) -> List[str]: devices = getattr(self.manifest, "devices", []) or [] return [str(getattr(d, "device_id")) for d in devices] def get_device(self, device_id: str): devices = getattr(self.manifest, "devices", []) or [] for d in devices: if str(getattr(d, "device_id")) == device_id: return d raise CaptureBundleError(f"Unknown device_id: {device_id}") def device_video_path(self, device_id: str) -> Path: d = self.get_device(device_id) if not getattr(d, "video_path", None): raise CaptureBundleError( f"No video_path for device_id={device_id} (schema_version={self.schema_version})" ) return (self.root / str(getattr(d, "video_path"))).resolve() def device_intrinsics_path(self, device_id: str) -> Path: d = self.get_device(device_id) if not getattr(d, "intrinsics_path", None): raise CaptureBundleError( f"No intrinsics_path for device_id={device_id} " f"(schema_version={self.schema_version})" ) return (self.root / str(getattr(d, "intrinsics_path"))).resolve() def device_timestamps_path(self, device_id: str) -> Path: d = self.get_device(device_id) if not getattr(d, "timestamps_path", None): raise CaptureBundleError( f"No timestamps_path for device_id={device_id} " f"(schema_version={self.schema_version}). " "Use the timeline.frames stream (v2) instead." ) return (self.root / str(getattr(d, "timestamps_path"))).resolve() def device_arkit_poses_path(self, device_id: str) -> Optional[Path]: d = self.get_device(device_id) if not getattr(d, "arkit_poses_path", None): return None return (self.root / str(getattr(d, "arkit_poses_path"))).resolve() def device_lidar_depth_dir(self, device_id: str) -> Optional[Path]: d = self.get_device(device_id) if not getattr(d, "lidar_depth_dir", None): return None return (self.root / str(getattr(d, "lidar_depth_dir"))).resolve() def device_depth_dir_best_effort(self, device_id: str) -> Optional[Path]: """ Prefer canonical `lidar_depth_dir`, but fall back to WaveformMobile packed stream directory stored under `device.streams.depth.directory` if present. """ d = self.get_device(device_id) if getattr(d, "lidar_depth_dir", None): p = (self.root / str(getattr(d, "lidar_depth_dir"))).resolve() return p extra = getattr(d, "model_extra", None) or {} if not isinstance(extra, dict): return None streams = extra.get("streams") if not isinstance(streams, dict): return None depth = streams.get("depth") if not isinstance(depth, dict): return None rel = depth.get("directory") if not isinstance(rel, str) or not rel: return None return (self.root / rel).resolve() # ---- v2 stream-centric helpers -------------------------------------------- def v2_has_streams(self) -> bool: return bool(self.schema_version.startswith("2") and self._v2_stream_index) def v2_stream_ref(self, *, device_id: str, kind: str) -> Optional[Path]: """ Return the path to `stream.json` for (device_id, kind) when present in v2. """ if not self.schema_version.startswith("2"): return None rel = self._v2_stream_index.get((device_id, kind)) return (self.root / rel).resolve() if rel else None def v2_stream_json(self, *, device_id: str, kind: str) -> Optional[Dict[str, Any]]: p = self.v2_stream_ref(device_id=device_id, kind=kind) if not p or not p.exists(): return None try: obj = json.loads(p.read_text()) return obj if isinstance(obj, dict) else None except Exception: return None def v2_stream_data_path(self, *, device_id: str, kind: str) -> Optional[Path]: sj = self.v2_stream_json(device_id=device_id, kind=kind) if not sj: return None sp = self.v2_stream_ref(device_id=device_id, kind=kind) if not sp: return None data = sj.get("data") if isinstance(sj.get("data"), dict) else None if not isinstance(data, dict): return None rel = data.get("path") if not isinstance(rel, str) or not rel: return None return (sp.parent / rel).resolve() # ---- WaveformMobile optional sensor streams ----------------------------------------- def _device_streams_extra(self, device_id: str) -> Dict[str, Any]: d = self.get_device(device_id) extra = getattr(d, "model_extra", None) or {} if not isinstance(extra, dict): return {} streams = extra.get("streams") return streams if isinstance(streams, dict) else {} def device_imu_paths( self, device_id: str ) -> tuple[Optional[Path], Optional[Path], Optional[Path]]: """ Return (imu_stream.bin, imu_frames.bin, imu_index.json) paths when present in device.streams.imu. """ streams = self._device_streams_extra(device_id) imu = streams.get("imu") if not isinstance(imu, dict): return (None, None, None) stream = imu.get("stream") frames = imu.get("frames") index = imu.get("index") sp = (self.root / str(stream)).resolve() if isinstance(stream, str) and stream else None fp = (self.root / str(frames)).resolve() if isinstance(frames, str) and frames else None ip = (self.root / str(index)).resolve() if isinstance(index, str) and index else None return (sp, fp, ip) def device_barometer_paths(self, device_id: str) -> tuple[Optional[Path], Optional[Path]]: """ Return (barometer_stream.bin, barometer/index.json) paths when present in device.streams.barometer. """ streams = self._device_streams_extra(device_id) bar = streams.get("barometer") if not isinstance(bar, dict): return (None, None) stream = bar.get("stream") index = bar.get("index") sp = (self.root / str(stream)).resolve() if isinstance(stream, str) and stream else None ip = (self.root / str(index)).resolve() if isinstance(index, str) and index else None return (sp, ip) def load_detailed_annotation(self) -> Optional[DetailedAnnotation]: if not self.manifest.annotations or not self.manifest.annotations.detailed_annotation_path: return None p = (self.root / self.manifest.annotations.detailed_annotation_path).resolve() if not p.exists(): return None try: obj = json.loads(p.read_text()) return DetailedAnnotation.model_validate(obj) except Exception as e: raise CaptureBundleError(f"Failed to parse detailed annotation: {p}: {e}") from e def load_intrinsics_matrix(self, device_id: str) -> np.ndarray: """ Load intrinsics from JSON as a (3, 3) float32 matrix. The exact intrinsics JSON schema is intentionally flexible; we accept: - {"K": [[...],[...],[...]]} or - {"intrinsics": [[...],[...],[...]]} or - {"fx":..., "fy":..., "cx":..., "cy":...} """ p = self.device_intrinsics_path(device_id) obj = json.loads(p.read_text()) if "K" in obj: K = np.asarray(obj["K"], dtype=np.float32) elif "intrinsics" in obj: K = np.asarray(obj["intrinsics"], dtype=np.float32) elif all(k in obj for k in ("fx", "fy", "cx", "cy")): fx, fy, cx, cy = float(obj["fx"]), float(obj["fy"]), float(obj["cx"]), float(obj["cy"]) K = np.array([[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float32) else: raise CaptureBundleError(f"Unsupported intrinsics JSON schema: {p}") if K.shape != (3, 3): raise CaptureBundleError(f"Intrinsics must be 3x3, got {K.shape} in {p}") return K def load_intrinsics_and_distortion( self, device_id: str ) -> tuple[np.ndarray, Optional[np.ndarray]]: """ Load intrinsics matrix K and (optional) distortion coefficients. Distortion support is best-effort and forward-compatible. Accepted keys: - {"distortion": [...]} (OpenCV-style coeffs) - {"dist": [...]} or {"dist_coeffs": [...]} - {"k": [...]} (legacy) """ p = self.device_intrinsics_path(device_id) obj = json.loads(p.read_text()) K = self.load_intrinsics_matrix(device_id) dist = None for key in ("distortion", "dist", "dist_coeffs", "k"): if key in obj and isinstance(obj[key], list): try: dist = np.asarray(obj[key], dtype=np.float64).reshape(-1) except Exception: dist = None break return K.astype(np.float32, copy=False), dist def load_sync_offsets(self) -> Optional[Dict[str, float]]: """ Load per-device sync offsets (seconds) if available. """ if not self.manifest.calibration or not self.manifest.calibration.sync_offsets_path: return None p = (self.root / self.manifest.calibration.sync_offsets_path).resolve() if not p.exists(): return None obj = json.loads(p.read_text()) # Expected {device_id: offset_seconds} return {str(k): float(v) for k, v in obj.items()} def load_rig_extrinsics(self) -> Optional[Dict[str, Any]]: """ Load rig extrinsics if available. The exact schema is not locked yet; we return raw JSON for now. """ if not self.manifest.calibration or not self.manifest.calibration.rig_extrinsics_path: return None p = (self.root / self.manifest.calibration.rig_extrinsics_path).resolve() if not p.exists(): return None return json.loads(p.read_text())