| """ |
| Inference pipeline (SPECIFICATIONS.md Section 9). |
| |
| This module runs: |
| 1) frame sampling from a video (or capture bundle device video) |
| 2) model inference (depth + σ) |
| 3) (optional) GTSAM optimization with reprojection + ray-depth priors |
| 4) outputs reconstruction artifacts and confidence bounds |
| |
| This is a minimal, end-to-end runnable baseline. It is designed so that feature |
| extraction/matching + multi-view track building can be swapped in later without |
| changing the API surface. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Protocol, Tuple |
| import numpy as np |
|
|
| from ..gtsam import has_gtsam |
| from ..gtsam.factors.ray_depth_prior import RayDepthPriorSpec, make_ray_depth_prior_factor |
| from ..utils.artifact_store import ArtifactStore |
| from ..utils.capture_bundle import CaptureBundle |
| from ..utils.dataset_layout import ensure_dir |
| from ..utils.telemetry import span |
| from ..utils.wandb_utils import ensure_wandb_run, log_artifact, log_metrics |
| from .teacher_uncertainty import temporal_consensus_sigma |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _apply_sigma_calibration_from_json( |
| sigma: np.ndarray, |
| calibration_json: str, |
| *, |
| operating_regime: Optional[str] = None, |
| ) -> Tuple[np.ndarray, Dict[str, object]]: |
| """ |
| Apply a calibration JSON produced by audit. |
| |
| Supported shapes: |
| - {"a":..., "b":...} |
| - {"calibration": {"a":..., "b":...}} (legacy audit summary) |
| - {"calibration_table": {...}} (audit summary) |
| - SigmaCalibrationTable: {"calibration_version":..., "method":..., "params": {...}} |
| """ |
|
|
| obj = json.loads(Path(calibration_json).read_text()) |
|
|
| |
| if ( |
| isinstance(obj, dict) |
| and "calibration_table" in obj |
| and isinstance(obj["calibration_table"], dict) |
| ): |
| obj = obj["calibration_table"] |
| if isinstance(obj, dict) and "calibration" in obj and isinstance(obj["calibration"], dict): |
| obj = obj["calibration"] |
|
|
| meta: Dict[str, object] = {"source": str(calibration_json)} |
|
|
| |
| if isinstance(obj, dict) and "calibration_version" in obj and "params" in obj: |
| meta["calibration_version"] = str(obj.get("calibration_version")) |
| meta["method"] = str(obj.get("method", "unknown")) |
| params = obj.get("params", {}) if isinstance(obj.get("params"), dict) else {} |
|
|
| if "per_regime" in params and isinstance(params["per_regime"], dict): |
| key = str(operating_regime or "") |
| entry = params["per_regime"].get(key) if key else None |
| if entry is None: |
| entry = {"a": 1.0, "b": 0.0} |
| a = float(entry.get("a", 1.0)) |
| b = float(entry.get("b", 0.0)) |
| meta.update({"applied": "per_regime_affine", "regime": key, "a": a, "b": b}) |
| s2 = (a * sigma.astype(np.float32) + b).astype(np.float32) |
| return np.clip(s2, 1e-6, 1e6), meta |
|
|
| if "x" in params and "y" in params: |
| xs = np.asarray(params.get("x", []), dtype=np.float64) |
| ys = np.asarray(params.get("y", []), dtype=np.float64) |
| if xs.size >= 2 and ys.size == xs.size: |
| meta.update({"applied": "isotonic"}) |
| s = np.asarray(sigma, dtype=np.float64) |
| s2 = np.interp(s, xs, ys).astype(np.float32) |
| return np.clip(s2, 1e-6, 1e6), meta |
|
|
| if "a" in params or "b" in params: |
| a = float(params.get("a", 1.0)) |
| b = float(params.get("b", 0.0)) |
| meta.update({"applied": "affine", "a": a, "b": b}) |
| s2 = (a * sigma.astype(np.float32) + b).astype(np.float32) |
| return np.clip(s2, 1e-6, 1e6), meta |
|
|
| |
| if isinstance(obj, dict): |
| a = float(obj.get("a", 1.0)) |
| b = float(obj.get("b", 0.0)) |
| meta.update({"applied": "affine", "a": a, "b": b}) |
| s2 = (a * sigma.astype(np.float32) + b).astype(np.float32) |
| return np.clip(s2, 1e-6, 1e6), meta |
|
|
| raise ValueError("Unrecognized calibration JSON format") |
|
|
|
|
| @dataclass(frozen=True) |
| class InferenceConfig: |
| device_id: Optional[str] = None |
| model_name: Optional[str] = None |
| device: str = "cuda" |
| max_frames: Optional[int] = 60 |
| frame_interval: int = 2 |
| enable_gtsam_ba: bool = True |
| reproj_sigma_px: float = 1.5 |
| sigma_min: float = 1e-3 |
| sigma_max: float = 10.0 |
| sparse_grid: int = 32 |
| ba_mode: str = "tracks" |
| max_tracks: int = 500 |
| use_isam2: bool = True |
| track_builder: str = "orb" |
| isam2_refinement_steps: int = 3 |
| orb_pair_offsets: Tuple[int, ...] = (1, 2, 3) |
| orb_use_ransac_fmat: bool = True |
| orb_min_track_length: int = 3 |
| |
| |
| enable_quality_gates: bool = False |
| enable_sync_validation: bool = False |
|
|
| |
| calibration_json: Optional[str] = None |
|
|
| |
| export_onnx_path: Optional[str] = None |
| export_tensorrt_dir: Optional[str] = None |
|
|
|
|
| def _compute_metrology_bounds( |
| depth: np.ndarray, sigma_z: np.ndarray |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """ |
| Compute derived metrology-facing quantities: |
| - EAE (expected absolute error) assuming Gaussian: σ * sqrt(2/pi) |
| - 95% bounds: depth ± 1.96σ (nominal; audit may provide calibrated σ) |
| """ |
| eae = sigma_z * np.sqrt(2.0 / np.pi) |
| lower = depth - 1.96 * sigma_z |
| upper = depth + 1.96 * sigma_z |
| return eae.astype(np.float32), lower.astype(np.float32), upper.astype(np.float32) |
|
|
|
|
| def _extract_video_frames( |
| video_path: Path, max_frames: Optional[int], frame_interval: int |
| ) -> List[np.ndarray]: |
| import cv2 |
|
|
| cap = cv2.VideoCapture(str(video_path)) |
| if not cap.isOpened(): |
| raise ValueError(f"Could not open video: {video_path}") |
|
|
| frames: List[np.ndarray] = [] |
| idx = 0 |
| kept = 0 |
| while True: |
| ok, frame_bgr = cap.read() |
| if not ok: |
| break |
| if idx % max(int(frame_interval), 1) == 0: |
| frames.append(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)) |
| kept += 1 |
| if max_frames is not None and kept >= int(max_frames): |
| break |
| idx += 1 |
| cap.release() |
| return frames |
|
|
|
|
| def _pixel_to_camera_ray(u: float, v: float, K: np.ndarray) -> np.ndarray: |
| fx, fy, cx, cy = float(K[0, 0]), float(K[1, 1]), float(K[0, 2]), float(K[1, 2]) |
| x = (u - cx) / fx |
| y = (v - cy) / fy |
| return np.array([x, y, 1.0], dtype=np.float64) |
|
|
|
|
| class _DepthModel(Protocol): |
| def inference(self, frames: List[np.ndarray]) -> Any: |
| raise NotImplementedError |
|
|
|
|
| def run_inference( |
| input_path: Path, |
| output_dir: Path, |
| *, |
| config: Optional[InferenceConfig] = None, |
| model: Optional[_DepthModel] = None, |
| wandb_required: bool = False, |
| artifact_store: Optional[ArtifactStore] = None, |
| ) -> Dict[str, object]: |
| config = config or InferenceConfig() |
| output_dir = Path(output_dir) |
| ensure_dir(output_dir) |
|
|
| |
| input_path = Path(input_path) |
| bundle = None |
| if input_path.is_dir() and (input_path / "manifest.json").exists(): |
| bundle = CaptureBundle.load(input_path) |
| if not bundle.manifest.devices: |
| raise ValueError("Bundle has no devices") |
| did = config.device_id |
| if did is None: |
| if len(bundle.manifest.devices) != 1: |
| raise ValueError("device_id required for multi-device bundle inference") |
| did = bundle.manifest.devices[0].device_id |
| video_path = bundle.device_video_path(did) |
| device_id = did |
| else: |
| video_path = input_path |
| device_id = config.device_id or "video" |
|
|
| with span("inference.run", attributes={"device_id": device_id, "input": str(input_path)}): |
| frames = _extract_video_frames( |
| video_path, max_frames=config.max_frames, frame_interval=config.frame_interval |
| ) |
| if len(frames) < 2: |
| raise ValueError(f"Need at least 2 frames, got {len(frames)}") |
|
|
| |
| constraints_meta = None |
| if bundle is not None: |
| try: |
| from .constraints.selection import select_constraints |
|
|
| selected = select_constraints( |
| scene_type=bundle.manifest.scene_type, |
| confidence=1.0 if bundle.manifest.scene_type else 0.0, |
| operating_regime=bundle.manifest.operating_regime, |
| ) |
| constraints_meta = { |
| "mode": selected.mode, |
| "scene_type": selected.scene_type, |
| "confidence": selected.confidence, |
| "constraints": { |
| "manhattan_weight": selected.constraints.manhattan_weight, |
| "ceiling_prior": ( |
| { |
| "mean": selected.constraints.ceiling_prior.mean, |
| "sigma": selected.constraints.ceiling_prior.sigma, |
| } |
| if selected.constraints.ceiling_prior is not None |
| else None |
| ), |
| "room_scale_min_m": selected.constraints.room_scale_min_m, |
| "room_scale_max_m": selected.constraints.room_scale_max_m, |
| }, |
| } |
| except Exception: |
| constraints_meta = {"mode": "unavailable"} |
|
|
| if config.enable_quality_gates: |
| from .ingest_validation import QualityGateConfig, lidar_coverage, run_quality_gates |
|
|
| |
| if ( |
| config.enable_sync_validation |
| and bundle is not None |
| and bundle.manifest.calibration |
| and bundle.manifest.calibration.sync_offsets_path |
| ): |
| from .ingest_validation import validate_sync_offsets_json |
|
|
| sync_res = validate_sync_offsets_json( |
| bundle.root / bundle.manifest.calibration.sync_offsets_path |
| ) |
| if not sync_res.ok: |
| raise ValueError(f"sync_offsets.json validation failed: {sync_res.details}") |
|
|
| gates = run_quality_gates(frames, cfg=QualityGateConfig()) |
| if not gates.passed: |
| raise ValueError(f"Quality gates failed: {gates.details}") |
|
|
| if bundle is not None: |
| dev = bundle.get_device(device_id) |
| if dev.lidar_depth_dir: |
| cov, cov_details = lidar_coverage(bundle.root / dev.lidar_depth_dir) |
| if cov is not None and cov < QualityGateConfig().min_lidar_coverage: |
| raise ValueError( |
| f"LiDAR coverage gate failed: coverage={cov} details={cov_details}" |
| ) |
|
|
| if model is None: |
| from ..utils.model_loader import load_da3_model |
|
|
| model = load_da3_model( |
| model_name=config.model_name, |
| device=config.device, |
| use_case="metric_depth", |
| compile_model=False, |
| ) |
| |
| if config.export_onnx_path: |
| try: |
| from ..utils.onnx_export import export_to_onnx |
| except Exception as e: |
| raise RuntimeError("ONNX export requires optional deps (torch/onnx).") from e |
| onnx_path = Path(config.export_onnx_path) |
| if not (onnx_path.exists() and onnx_path.stat().st_size > 0): |
| export_to_onnx(model, sample_input=[], output_path=onnx_path) |
|
|
| if config.export_tensorrt_dir: |
| try: |
| from ..utils.tensorrt_export import build_tensorrt_engine |
| except Exception as e: |
| raise RuntimeError("TensorRT export requires optional deps (tensorrt).") from e |
| if not config.export_onnx_path: |
| raise ValueError("export_tensorrt_dir requires export_onnx_path to be set") |
| out_dir = Path(config.export_tensorrt_dir) |
| out_dir.mkdir(parents=True, exist_ok=True) |
| engine_path = out_dir / "model.engine" |
| if not (engine_path.exists() and engine_path.stat().st_size > 0): |
| build_tensorrt_engine( |
| onnx_path=Path(config.export_onnx_path), |
| engine_path=engine_path, |
| precision="fp16", |
| ) |
|
|
| logger.info(f"Inference: running model on {len(frames)} frames") |
| m = model |
| if m is None: |
| raise RuntimeError("Inference model was not initialized") |
| try: |
| import torch |
|
|
| no_grad = torch.no_grad |
| except Exception: |
| from contextlib import nullcontext |
|
|
| no_grad = nullcontext |
|
|
| with no_grad(): |
| out = m.inference(frames) |
|
|
| depth = np.asarray(out.depth, dtype=np.float32) |
| if depth.ndim != 3: |
| raise ValueError(f"Expected depth (T,H,W), got {depth.shape}") |
|
|
| |
| |
| sigma_model = None |
| for attr in ("sigma_z", "sigma", "log_sigma"): |
| if hasattr(out, attr): |
| sigma_model = getattr(out, attr) |
| break |
| if sigma_model is not None: |
| s = np.asarray(sigma_model, dtype=np.float32) |
| if attr == "log_sigma": |
| s = np.exp(s).astype(np.float32) |
| if s.ndim == 2: |
| s = np.repeat(s[None, :, :], depth.shape[0], axis=0) |
| if s.shape != depth.shape: |
| raise ValueError(f"Model sigma has wrong shape: {s.shape} vs depth {depth.shape}") |
| sigma_z = s |
| else: |
| sigma_z = temporal_consensus_sigma(depth, window=5) |
| sigma_z = np.clip(sigma_z, float(config.sigma_min), float(config.sigma_max)).astype( |
| np.float32 |
| ) |
|
|
| calib_prov: Optional[Dict[str, object]] = None |
| if config.calibration_json: |
| try: |
| regime = None |
| if bundle is not None and bundle.manifest.operating_regime is not None: |
| regime = str(bundle.manifest.operating_regime.value) |
| sigma_z, calib_prov = _apply_sigma_calibration_from_json( |
| sigma_z, config.calibration_json, operating_regime=regime |
| ) |
| except Exception as e: |
| logger.warning(f"Failed to apply sigma calibration: {e}") |
|
|
| |
| depth_dir = ensure_dir(output_dir / "depth") |
| unc_dir = ensure_dir(output_dir / "uncertainty") |
| for t in range(depth.shape[0]): |
| np.save(depth_dir / f"frame_{t:06d}.npy", depth[t]) |
| np.save(unc_dir / f"frame_{t:06d}.npy", sigma_z[t]) |
|
|
| |
| recon = {"optimized": False} |
| if config.enable_gtsam_ba and has_gtsam(): |
| from ..gtsam import require_gtsam |
|
|
| gtsam = require_gtsam() |
|
|
| |
| H, W = depth.shape[1:] |
| if bundle is not None: |
| K = bundle.load_intrinsics_matrix(device_id).astype(np.float64) |
| fx = float(K[0, 0]) |
| fy = float(K[1, 1]) |
| cx = float(K[0, 2]) |
| cy = float(K[1, 2]) |
| else: |
| fx = fy = 0.8 * max(H, W) |
| cx = W / 2.0 |
| cy = H / 2.0 |
| K = np.array([[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float64) |
| calib = gtsam.Cal3_S2(float(fx), float(fy), 0.0, float(cx), float(cy)) |
|
|
| ba_mode = str(getattr(config, "ba_mode", "grid")).lower().strip() |
| if ba_mode == "tracks": |
| |
| try: |
| from .teacher_gtsam_ba import build_problem_from_tracks, run_teacher_ba |
| from .tracks.orb_track_builder import OrbTrackBuilderConfig, build_orb_tracks |
|
|
| tracks = build_orb_tracks( |
| frames, |
| cfg=OrbTrackBuilderConfig( |
| pair_offsets=tuple(getattr(config, "orb_pair_offsets", (1, 2, 3))), |
| use_ransac_fmat=bool(getattr(config, "orb_use_ransac_fmat", True)), |
| min_track_length=int(getattr(config, "orb_min_track_length", 3)), |
| ), |
| ) |
| poses_init = [np.eye(4, dtype=np.float64) for _ in range(depth.shape[0])] |
| problem, _ = build_problem_from_tracks( |
| tracks=tracks, |
| K=K, |
| poses_init=poses_init, |
| depth_stack=depth, |
| sigma_stack=sigma_z, |
| max_sigma_prior=float(getattr(config, "sigma_max", 10.0)), |
| max_tracks=int(getattr(config, "max_tracks", 500)), |
| ) |
| ba = run_teacher_ba( |
| problem, |
| reproj_sigma_px=float(getattr(config, "reproj_sigma_px", 1.5)), |
| use_isam2=bool(getattr(config, "use_isam2", True)), |
| isam2_refinement_steps=int(getattr(config, "isam2_refinement_steps", 3)), |
| ) |
| recon = { |
| "optimized": True, |
| "mode": "tracks", |
| "reproj_rmse_px": float(ba.reproj_rmse_px), |
| "num_points": int(len(problem.points_init)), |
| "num_obs": int(len(problem.observations)), |
| } |
|
|
| pts = [] |
| for j in range(len(problem.points_init)): |
| lk = gtsam.symbol("L", j) |
| if ba.optimized_values.exists(lk): |
| p = np.asarray( |
| ba.optimized_values.atPoint3(lk), dtype=np.float64 |
| ).reshape(3) |
| pts.append(p.tolist()) |
| (output_dir / "reconstruction_points.json").write_text( |
| json.dumps({"points": pts}) |
| ) |
| except Exception as e: |
| recon = {"optimized": False, "mode": "tracks", "error": str(e)} |
| else: |
| |
| graph = gtsam.NonlinearFactorGraph() |
| initial = gtsam.Values() |
|
|
| pose_keys = [gtsam.symbol("P", i) for i in range(depth.shape[0])] |
| prior_noise = gtsam.noiseModel.Diagonal.Sigmas( |
| np.array([0.1, 0.1, 0.1, 1.0, 1.0, 1.0], dtype=np.float64) |
| ) |
| for i, pk in enumerate(pose_keys): |
| pose = gtsam.Pose3(np.eye(4)) |
| initial.insert(pk, pose) |
| if i == 0: |
| graph.add(gtsam.PriorFactorPose3(pk, pose, prior_noise)) |
|
|
| center = depth.shape[0] // 2 |
| grid = max(8, int(config.sparse_grid)) |
| us = np.arange(grid // 2, W, grid, dtype=np.int32) |
| vs = np.arange(grid // 2, H, grid, dtype=np.int32) |
|
|
| noise_reproj = gtsam.noiseModel.Isotropic.Sigma( |
| 2, float(getattr(config, "reproj_sigma_px", 1.5)) |
| ) |
|
|
| obs_count = 0 |
| lm_idx = 0 |
| for v in vs: |
| for u in us: |
| z = float(depth[center, v, u]) |
| if not np.isfinite(z) or z <= 0: |
| continue |
| ray = _pixel_to_camera_ray(float(u), float(v), K) |
| ray = ray / (np.linalg.norm(ray) + 1e-12) |
| X_w = ray * z |
|
|
| lk = gtsam.symbol("L", lm_idx) |
| lm_idx += 1 |
| initial.insert( |
| lk, gtsam.Point3(float(X_w[0]), float(X_w[1]), float(X_w[2])) |
| ) |
|
|
| meas = gtsam.Point2(float(u), float(v)) |
| graph.add( |
| gtsam.GenericProjectionFactorCal3_S2( |
| meas, noise_reproj, pose_keys[center], lk, calib |
| ) |
| ) |
|
|
| spec = RayDepthPriorSpec( |
| pixel_uv=(float(u), float(v)), |
| K=K, |
| z_pred=z, |
| sigma_z=float(sigma_z[center, v, u]), |
| ) |
| graph.add( |
| make_ray_depth_prior_factor(pose_keys[center], lk, spec, robust=True) |
| ) |
| obs_count += 1 |
|
|
| if obs_count > 0: |
| params = gtsam.LevenbergMarquardtParams() |
| params.setMaxIterations(25) |
| optimizer = gtsam.LevenbergMarquardtOptimizer(graph, initial, params) |
| result = optimizer.optimize() |
| recon = { |
| "optimized": True, |
| "mode": "grid", |
| "num_landmarks": int(lm_idx), |
| "num_obs": int(obs_count), |
| } |
|
|
| pts = [] |
| for j in range(lm_idx): |
| lk = gtsam.symbol("L", j) |
| if result.exists(lk): |
| p = np.asarray(result.atPoint3(lk), dtype=np.float64).reshape(3) |
| pts.append(p.tolist()) |
| (output_dir / "reconstruction_points.json").write_text( |
| json.dumps({"points": pts}) |
| ) |
|
|
| |
| eae, lo95, hi95 = _compute_metrology_bounds(depth, sigma_z) |
| np.save(output_dir / "eae.npy", eae) |
| np.save(output_dir / "depth_lower_95.npy", lo95) |
| np.save(output_dir / "depth_upper_95.npy", hi95) |
|
|
| metadata = { |
| "input": str(input_path), |
| "device_id": device_id, |
| "num_frames": int(depth.shape[0]), |
| "depth_shape": [int(depth.shape[1]), int(depth.shape[2])], |
| "gtsam_optimized": bool(recon.get("optimized", False)), |
| "reconstruction": recon, |
| "sigma_calibration": calib_prov, |
| "selected_constraints": constraints_meta, |
| "operating_regime": ( |
| str(bundle.manifest.operating_regime.value) |
| if (bundle is not None and bundle.manifest.operating_regime is not None) |
| else None |
| ), |
| } |
| metadata_path = output_dir / "inference_metadata.json" |
| metadata_path.write_text(json.dumps(metadata, indent=2)) |
|
|
| artifact_uri = None |
| if artifact_store is not None: |
| artifact_uri = artifact_store.put_json(metadata) |
|
|
| run = ensure_wandb_run( |
| required=wandb_required, |
| project=os.getenv("WANDB_PROJECT", "ylff"), |
| entity=os.getenv("WANDB_ENTITY"), |
| name=f"infer-{device_id}", |
| config={"inference": metadata}, |
| tags=["inference"], |
| mode=os.getenv("WANDB_MODE"), |
| ) |
| if run is not None: |
| log_metrics( |
| { |
| "inference/num_frames": int(depth.shape[0]), |
| "inference/gtsam_optimized": int(bool(recon.get("optimized", False))), |
| } |
| ) |
| log_artifact(str(output_dir), name=f"inference_outputs_{device_id}", type="inference") |
|
|
| return { |
| **metadata, |
| "metadata_path": str(metadata_path), |
| "metadata_artifact_uri": artifact_uri, |
| } |
|
|