3d_model / ylff /utils /capture_bundle.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Capture bundle reader/validator (SPECIFICATIONS.md Appendix C).
This module provides a single entry point (`CaptureBundle`) that:
- loads a bundle manifest,
- validates referenced files exist,
- offers typed accessors to common assets.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from ..models.capture_models import CaptureManifest, DetailedAnnotation
from .dataset_layout import CaptureBundleLayout, validate_paths_exist
class CaptureBundleError(ValueError):
pass
@dataclass(frozen=True)
class _CalibrationCompat:
rig_extrinsics_path: Optional[str] = None
sync_offsets_path: Optional[str] = None
@dataclass(frozen=True)
class _AnnotationsCompat:
quick_annotation_path: Optional[str] = None
detailed_annotation_path: Optional[str] = None
@dataclass(frozen=True)
class _DeviceCompat:
device_id: str
video_path: Optional[str] = None
intrinsics_path: Optional[str] = None
timestamps_path: Optional[str] = None
arkit_poses_path: Optional[str] = None
lidar_depth_dir: Optional[str] = None
# Optional forward-compat passthrough (mirrors pydantic extra)
model_extra: Dict[str, Any] = None # type: ignore[assignment]
@dataclass(frozen=True)
class _ManifestCompat:
schema_version: str
capture_id: str
devices: List[_DeviceCompat]
calibration: Optional[_CalibrationCompat] = None
annotations: Optional[_AnnotationsCompat] = None
teacher_outputs: Optional[Any] = None
metadata: Dict[str, Any] = None # type: ignore[assignment]
@dataclass(frozen=True)
class CaptureBundle:
root: Path
# Backward-compatible view of the manifest. For v1 this is derived from
# the pydantic model; for v2 this is synthesized from the stream registry.
manifest: Union[CaptureManifest, _ManifestCompat]
manifest_obj: Dict[str, Any]
schema_version: str
_v2_stream_index: Dict[Tuple[str, str], str]
@classmethod
def load(cls, root: Path) -> CaptureBundle:
root = Path(root)
layout = CaptureBundleLayout(root=root)
if not root.exists():
raise CaptureBundleError(f"Capture bundle root does not exist: {root}")
if not layout.manifest_path.exists():
raise CaptureBundleError(f"Missing manifest.json at: {layout.manifest_path}")
try:
manifest_obj = json.loads(layout.manifest_path.read_text())
except Exception as e:
raise CaptureBundleError(f"Failed to parse manifest.json: {e}") from e
schema_version = str(manifest_obj.get("schema_version", "1.0") or "1.0")
# v1: strict pydantic schema
if schema_version.startswith("1"):
try:
manifest = CaptureManifest.model_validate(manifest_obj)
except Exception as e:
raise CaptureBundleError(f"Invalid manifest schema: {e}") from e
bundle = cls(
root=root,
manifest=manifest,
manifest_obj=manifest_obj,
schema_version=schema_version,
_v2_stream_index={},
)
bundle.validate(repairable_ok=False)
return bundle
# v2: stream-centric schema. Synthesize a backward-compatible manifest view.
if not schema_version.startswith("2"):
raise CaptureBundleError(f"Unsupported manifest schema_version: {schema_version}")
devices = manifest_obj.get("devices", []) or []
if not isinstance(devices, list) or not devices:
raise CaptureBundleError("v2 manifest has no devices[] pointers")
# Build stream index: (device_id, kind) -> stream.json path
v2_stream_index: Dict[Tuple[str, str], str] = {}
for s in manifest_obj.get("streams", []) or []:
if not isinstance(s, dict):
continue
did = s.get("device_id")
kind = s.get("kind")
path = s.get("path")
if (
isinstance(did, str)
and isinstance(kind, str)
and isinstance(path, str)
and did
and kind
and path
):
v2_stream_index[(did, kind)] = path
def _resolve_stream_data_rel(did: str, kind: str) -> Optional[str]:
sp = v2_stream_index.get((did, kind))
if not sp:
return None
sj_path = (root / sp).resolve()
if not sj_path.exists():
return None
try:
sj = json.loads(sj_path.read_text())
except Exception:
return None
data = sj.get("data") if isinstance(sj, dict) else None
if isinstance(data, dict) and isinstance(data.get("path"), str) and data["path"]:
# Convert to bundle-root-relative path
rel = (sj_path.parent / data["path"]).resolve()
try:
return str(rel.relative_to(root))
except Exception:
return str(rel)
return None
compat_devices: List[_DeviceCompat] = []
for dref in devices:
if not isinstance(dref, dict):
continue
did = dref.get("device_id")
p = dref.get("path")
if not (isinstance(did, str) and did and isinstance(p, str) and p):
continue
# Prefer stream registry for primary assets.
video_rel = _resolve_stream_data_rel(did, "video.rgb")
# Intrinsics: v2 prefers per-device calibration/intrinsics.json;
# fall back to v1 intrinsics.json.
intr_v2 = root / "devices" / did / "calibration" / "intrinsics.json"
intr_v1 = root / "devices" / did / "intrinsics.json"
intr_rel = None
if intr_v2.exists():
intr_rel = str(intr_v2.relative_to(root))
elif intr_v1.exists():
intr_rel = str(intr_v1.relative_to(root))
ts_v1 = root / "devices" / did / "timestamps.json"
ts_rel = str(ts_v1.relative_to(root)) if ts_v1.exists() else None
poses_v1 = root / "devices" / did / "arkit_poses.json"
poses_rel = str(poses_v1.relative_to(root)) if poses_v1.exists() else None
depth_dir_v1 = root / "devices" / did / "depth"
depth_dir_rel = str(depth_dir_v1.relative_to(root)) if depth_dir_v1.exists() else None
extra_streams: Dict[str, Any] = {}
if depth_dir_rel:
extra_streams["depth"] = {"directory": depth_dir_rel}
compat_devices.append(
_DeviceCompat(
device_id=did,
video_path=video_rel,
intrinsics_path=intr_rel,
timestamps_path=ts_rel,
arkit_poses_path=poses_rel,
lidar_depth_dir=None,
model_extra={"streams": extra_streams} if extra_streams else {},
)
)
if not compat_devices:
raise CaptureBundleError("v2 manifest has no usable device entries")
# Calibration + annotations compat: map well-known files when present.
calib_files = []
calib = (
manifest_obj.get("calibration")
if isinstance(manifest_obj.get("calibration"), dict)
else {}
)
if isinstance(calib, dict) and isinstance(calib.get("files"), list):
calib_files = [str(x) for x in calib.get("files") if isinstance(x, str)]
cal = _CalibrationCompat(
rig_extrinsics_path=(
"calibration/rig_extrinsics.json"
if "calibration/rig_extrinsics.json" in calib_files
else None
),
sync_offsets_path=(
"calibration/sync_offsets.json"
if "calibration/sync_offsets.json" in calib_files
else None
),
)
ann_files = []
anns = (
manifest_obj.get("annotations")
if isinstance(manifest_obj.get("annotations"), dict)
else {}
)
if isinstance(anns, dict) and isinstance(anns.get("files"), list):
ann_files = [str(x) for x in anns.get("files") if isinstance(x, str)]
ann = _AnnotationsCompat(
quick_annotation_path=(
"annotations/quick_annotation.json"
if "annotations/quick_annotation.json" in ann_files
else None
),
detailed_annotation_path=(
"annotations/detailed_annotation.json"
if "annotations/detailed_annotation.json" in ann_files
else None
),
)
compat_manifest = _ManifestCompat(
schema_version=schema_version,
capture_id=str(manifest_obj.get("capture_id") or ""),
devices=compat_devices,
calibration=cal,
annotations=ann,
metadata={},
)
bundle = cls(
root=root,
manifest=compat_manifest,
manifest_obj=manifest_obj,
schema_version=schema_version,
_v2_stream_index=v2_stream_index,
)
bundle.validate(repairable_ok=False)
return bundle
@property
def layout(self) -> CaptureBundleLayout:
return CaptureBundleLayout(root=self.root)
def validate(self, repairable_ok: bool = True) -> None:
"""
Validate that referenced paths exist.
If `repairable_ok` is True, missing optional assets do not fail validation.
"""
rels: List[Optional[str]] = []
devices = getattr(self.manifest, "devices", []) or []
for d in devices:
rels.extend(
[
getattr(d, "video_path", None),
getattr(d, "intrinsics_path", None),
getattr(d, "timestamps_path", None),
getattr(d, "arkit_poses_path", None),
]
)
rels.append(getattr(d, "lidar_depth_dir", None))
# WaveformMobile forward-compat: some manifests store depth under
# device.extra.streams.depth.
# Prefer lidar_depth_dir, but validate stream directory when present.
extra = getattr(d, "model_extra", None) or {}
streams = extra.get("streams") if isinstance(extra, dict) else None
if isinstance(streams, dict):
depth = streams.get("depth")
if isinstance(depth, dict):
rel = depth.get("directory")
if isinstance(rel, str) and rel:
rels.append(rel)
if getattr(self.manifest, "calibration", None):
cal = self.manifest.calibration # type: ignore[union-attr]
rels.extend(
[
getattr(cal, "rig_extrinsics_path", None),
getattr(cal, "sync_offsets_path", None),
]
)
if getattr(self.manifest, "annotations", None):
ann = self.manifest.annotations # type: ignore[union-attr]
rels.extend(
[
getattr(ann, "quick_annotation_path", None),
getattr(ann, "detailed_annotation_path", None),
]
)
if getattr(self.manifest, "teacher_outputs", None):
tout = self.manifest.teacher_outputs # type: ignore[union-attr]
rels.extend(
[
getattr(tout, "depth_dir", None),
getattr(tout, "uncertainty_dir", None),
getattr(tout, "reconstruction_path", None),
]
)
errors = validate_paths_exist(self.root, rels)
if errors and not repairable_ok:
raise CaptureBundleError("Capture bundle validation failed:\n- " + "\n- ".join(errors))
# ---- Convenience accessors -------------------------------------------------
def list_devices(self) -> List[str]:
devices = getattr(self.manifest, "devices", []) or []
return [str(getattr(d, "device_id")) for d in devices]
def get_device(self, device_id: str):
devices = getattr(self.manifest, "devices", []) or []
for d in devices:
if str(getattr(d, "device_id")) == device_id:
return d
raise CaptureBundleError(f"Unknown device_id: {device_id}")
def device_video_path(self, device_id: str) -> Path:
d = self.get_device(device_id)
if not getattr(d, "video_path", None):
raise CaptureBundleError(
f"No video_path for device_id={device_id} (schema_version={self.schema_version})"
)
return (self.root / str(getattr(d, "video_path"))).resolve()
def device_intrinsics_path(self, device_id: str) -> Path:
d = self.get_device(device_id)
if not getattr(d, "intrinsics_path", None):
raise CaptureBundleError(
f"No intrinsics_path for device_id={device_id} "
f"(schema_version={self.schema_version})"
)
return (self.root / str(getattr(d, "intrinsics_path"))).resolve()
def device_timestamps_path(self, device_id: str) -> Path:
d = self.get_device(device_id)
if not getattr(d, "timestamps_path", None):
raise CaptureBundleError(
f"No timestamps_path for device_id={device_id} "
f"(schema_version={self.schema_version}). "
"Use the timeline.frames stream (v2) instead."
)
return (self.root / str(getattr(d, "timestamps_path"))).resolve()
def device_arkit_poses_path(self, device_id: str) -> Optional[Path]:
d = self.get_device(device_id)
if not getattr(d, "arkit_poses_path", None):
return None
return (self.root / str(getattr(d, "arkit_poses_path"))).resolve()
def device_lidar_depth_dir(self, device_id: str) -> Optional[Path]:
d = self.get_device(device_id)
if not getattr(d, "lidar_depth_dir", None):
return None
return (self.root / str(getattr(d, "lidar_depth_dir"))).resolve()
def device_depth_dir_best_effort(self, device_id: str) -> Optional[Path]:
"""
Prefer canonical `lidar_depth_dir`, but fall back to WaveformMobile packed stream directory
stored under `device.streams.depth.directory` if present.
"""
d = self.get_device(device_id)
if getattr(d, "lidar_depth_dir", None):
p = (self.root / str(getattr(d, "lidar_depth_dir"))).resolve()
return p
extra = getattr(d, "model_extra", None) or {}
if not isinstance(extra, dict):
return None
streams = extra.get("streams")
if not isinstance(streams, dict):
return None
depth = streams.get("depth")
if not isinstance(depth, dict):
return None
rel = depth.get("directory")
if not isinstance(rel, str) or not rel:
return None
return (self.root / rel).resolve()
# ---- v2 stream-centric helpers --------------------------------------------
def v2_has_streams(self) -> bool:
return bool(self.schema_version.startswith("2") and self._v2_stream_index)
def v2_stream_ref(self, *, device_id: str, kind: str) -> Optional[Path]:
"""
Return the path to `stream.json` for (device_id, kind) when present in v2.
"""
if not self.schema_version.startswith("2"):
return None
rel = self._v2_stream_index.get((device_id, kind))
return (self.root / rel).resolve() if rel else None
def v2_stream_json(self, *, device_id: str, kind: str) -> Optional[Dict[str, Any]]:
p = self.v2_stream_ref(device_id=device_id, kind=kind)
if not p or not p.exists():
return None
try:
obj = json.loads(p.read_text())
return obj if isinstance(obj, dict) else None
except Exception:
return None
def v2_stream_data_path(self, *, device_id: str, kind: str) -> Optional[Path]:
sj = self.v2_stream_json(device_id=device_id, kind=kind)
if not sj:
return None
sp = self.v2_stream_ref(device_id=device_id, kind=kind)
if not sp:
return None
data = sj.get("data") if isinstance(sj.get("data"), dict) else None
if not isinstance(data, dict):
return None
rel = data.get("path")
if not isinstance(rel, str) or not rel:
return None
return (sp.parent / rel).resolve()
# ---- WaveformMobile optional sensor streams -----------------------------------------
def _device_streams_extra(self, device_id: str) -> Dict[str, Any]:
d = self.get_device(device_id)
extra = getattr(d, "model_extra", None) or {}
if not isinstance(extra, dict):
return {}
streams = extra.get("streams")
return streams if isinstance(streams, dict) else {}
def device_imu_paths(
self, device_id: str
) -> tuple[Optional[Path], Optional[Path], Optional[Path]]:
"""
Return (imu_stream.bin, imu_frames.bin, imu_index.json) paths when present
in device.streams.imu.
"""
streams = self._device_streams_extra(device_id)
imu = streams.get("imu")
if not isinstance(imu, dict):
return (None, None, None)
stream = imu.get("stream")
frames = imu.get("frames")
index = imu.get("index")
sp = (self.root / str(stream)).resolve() if isinstance(stream, str) and stream else None
fp = (self.root / str(frames)).resolve() if isinstance(frames, str) and frames else None
ip = (self.root / str(index)).resolve() if isinstance(index, str) and index else None
return (sp, fp, ip)
def device_barometer_paths(self, device_id: str) -> tuple[Optional[Path], Optional[Path]]:
"""
Return (barometer_stream.bin, barometer/index.json) paths when present in
device.streams.barometer.
"""
streams = self._device_streams_extra(device_id)
bar = streams.get("barometer")
if not isinstance(bar, dict):
return (None, None)
stream = bar.get("stream")
index = bar.get("index")
sp = (self.root / str(stream)).resolve() if isinstance(stream, str) and stream else None
ip = (self.root / str(index)).resolve() if isinstance(index, str) and index else None
return (sp, ip)
def load_detailed_annotation(self) -> Optional[DetailedAnnotation]:
if not self.manifest.annotations or not self.manifest.annotations.detailed_annotation_path:
return None
p = (self.root / self.manifest.annotations.detailed_annotation_path).resolve()
if not p.exists():
return None
try:
obj = json.loads(p.read_text())
return DetailedAnnotation.model_validate(obj)
except Exception as e:
raise CaptureBundleError(f"Failed to parse detailed annotation: {p}: {e}") from e
def load_intrinsics_matrix(self, device_id: str) -> np.ndarray:
"""
Load intrinsics from JSON as a (3, 3) float32 matrix.
The exact intrinsics JSON schema is intentionally flexible; we accept:
- {"K": [[...],[...],[...]]} or
- {"intrinsics": [[...],[...],[...]]} or
- {"fx":..., "fy":..., "cx":..., "cy":...}
"""
p = self.device_intrinsics_path(device_id)
obj = json.loads(p.read_text())
if "K" in obj:
K = np.asarray(obj["K"], dtype=np.float32)
elif "intrinsics" in obj:
K = np.asarray(obj["intrinsics"], dtype=np.float32)
elif all(k in obj for k in ("fx", "fy", "cx", "cy")):
fx, fy, cx, cy = float(obj["fx"]), float(obj["fy"]), float(obj["cx"]), float(obj["cy"])
K = np.array([[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float32)
else:
raise CaptureBundleError(f"Unsupported intrinsics JSON schema: {p}")
if K.shape != (3, 3):
raise CaptureBundleError(f"Intrinsics must be 3x3, got {K.shape} in {p}")
return K
def load_intrinsics_and_distortion(
self, device_id: str
) -> tuple[np.ndarray, Optional[np.ndarray]]:
"""
Load intrinsics matrix K and (optional) distortion coefficients.
Distortion support is best-effort and forward-compatible. Accepted keys:
- {"distortion": [...]} (OpenCV-style coeffs)
- {"dist": [...]} or {"dist_coeffs": [...]}
- {"k": [...]} (legacy)
"""
p = self.device_intrinsics_path(device_id)
obj = json.loads(p.read_text())
K = self.load_intrinsics_matrix(device_id)
dist = None
for key in ("distortion", "dist", "dist_coeffs", "k"):
if key in obj and isinstance(obj[key], list):
try:
dist = np.asarray(obj[key], dtype=np.float64).reshape(-1)
except Exception:
dist = None
break
return K.astype(np.float32, copy=False), dist
def load_sync_offsets(self) -> Optional[Dict[str, float]]:
"""
Load per-device sync offsets (seconds) if available.
"""
if not self.manifest.calibration or not self.manifest.calibration.sync_offsets_path:
return None
p = (self.root / self.manifest.calibration.sync_offsets_path).resolve()
if not p.exists():
return None
obj = json.loads(p.read_text())
# Expected {device_id: offset_seconds}
return {str(k): float(v) for k, v in obj.items()}
def load_rig_extrinsics(self) -> Optional[Dict[str, Any]]:
"""
Load rig extrinsics if available.
The exact schema is not locked yet; we return raw JSON for now.
"""
if not self.manifest.calibration or not self.manifest.calibration.rig_extrinsics_path:
return None
p = (self.root / self.manifest.calibration.rig_extrinsics_path).resolve()
if not p.exists():
return None
return json.loads(p.read_text())