3d_model / ylff /services /inference_pipeline.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Inference pipeline (SPECIFICATIONS.md Section 9).
This module runs:
1) frame sampling from a video (or capture bundle device video)
2) model inference (depth + σ)
3) (optional) GTSAM optimization with reprojection + ray-depth priors
4) outputs reconstruction artifacts and confidence bounds
This is a minimal, end-to-end runnable baseline. It is designed so that feature
extraction/matching + multi-view track building can be swapped in later without
changing the API surface.
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Protocol, Tuple
import numpy as np
from ..gtsam import has_gtsam
from ..gtsam.factors.ray_depth_prior import RayDepthPriorSpec, make_ray_depth_prior_factor
from ..utils.artifact_store import ArtifactStore
from ..utils.capture_bundle import CaptureBundle
from ..utils.dataset_layout import ensure_dir
from ..utils.telemetry import span
from ..utils.wandb_utils import ensure_wandb_run, log_artifact, log_metrics
from .teacher_uncertainty import temporal_consensus_sigma
logger = logging.getLogger(__name__)
def _apply_sigma_calibration_from_json(
sigma: np.ndarray,
calibration_json: str,
*,
operating_regime: Optional[str] = None,
) -> Tuple[np.ndarray, Dict[str, object]]:
"""
Apply a calibration JSON produced by audit.
Supported shapes:
- {"a":..., "b":...}
- {"calibration": {"a":..., "b":...}} (legacy audit summary)
- {"calibration_table": {...}} (audit summary)
- SigmaCalibrationTable: {"calibration_version":..., "method":..., "params": {...}}
"""
obj = json.loads(Path(calibration_json).read_text())
# Audit summary JSON wrapper
if (
isinstance(obj, dict)
and "calibration_table" in obj
and isinstance(obj["calibration_table"], dict)
):
obj = obj["calibration_table"]
if isinstance(obj, dict) and "calibration" in obj and isinstance(obj["calibration"], dict):
obj = obj["calibration"]
meta: Dict[str, object] = {"source": str(calibration_json)}
# Versioned table
if isinstance(obj, dict) and "calibration_version" in obj and "params" in obj:
meta["calibration_version"] = str(obj.get("calibration_version"))
meta["method"] = str(obj.get("method", "unknown"))
params = obj.get("params", {}) if isinstance(obj.get("params"), dict) else {}
if "per_regime" in params and isinstance(params["per_regime"], dict):
key = str(operating_regime or "")
entry = params["per_regime"].get(key) if key else None
if entry is None:
entry = {"a": 1.0, "b": 0.0}
a = float(entry.get("a", 1.0))
b = float(entry.get("b", 0.0))
meta.update({"applied": "per_regime_affine", "regime": key, "a": a, "b": b})
s2 = (a * sigma.astype(np.float32) + b).astype(np.float32)
return np.clip(s2, 1e-6, 1e6), meta
if "x" in params and "y" in params:
xs = np.asarray(params.get("x", []), dtype=np.float64)
ys = np.asarray(params.get("y", []), dtype=np.float64)
if xs.size >= 2 and ys.size == xs.size:
meta.update({"applied": "isotonic"})
s = np.asarray(sigma, dtype=np.float64)
s2 = np.interp(s, xs, ys).astype(np.float32)
return np.clip(s2, 1e-6, 1e6), meta
if "a" in params or "b" in params:
a = float(params.get("a", 1.0))
b = float(params.get("b", 0.0))
meta.update({"applied": "affine", "a": a, "b": b})
s2 = (a * sigma.astype(np.float32) + b).astype(np.float32)
return np.clip(s2, 1e-6, 1e6), meta
# Plain affine
if isinstance(obj, dict):
a = float(obj.get("a", 1.0))
b = float(obj.get("b", 0.0))
meta.update({"applied": "affine", "a": a, "b": b})
s2 = (a * sigma.astype(np.float32) + b).astype(np.float32)
return np.clip(s2, 1e-6, 1e6), meta
raise ValueError("Unrecognized calibration JSON format")
@dataclass(frozen=True)
class InferenceConfig:
device_id: Optional[str] = None
model_name: Optional[str] = None
device: str = "cuda"
max_frames: Optional[int] = 60
frame_interval: int = 2
enable_gtsam_ba: bool = True
reproj_sigma_px: float = 1.5
sigma_min: float = 1e-3
sigma_max: float = 10.0
sparse_grid: int = 32 # pixels between sampled points (approx)
ba_mode: str = "tracks" # "tracks" (default) | "grid" (fallback)
max_tracks: int = 500
use_isam2: bool = True
track_builder: str = "orb"
isam2_refinement_steps: int = 3
orb_pair_offsets: Tuple[int, ...] = (1, 2, 3)
orb_use_ransac_fmat: bool = True
orb_min_track_length: int = 3
# Default off so unit tests / minimal smoke runs don't require gate tuning.
# API/CLI entrypoints should enable this for real runs.
enable_quality_gates: bool = False
enable_sync_validation: bool = False
# Optional post-hoc sigma calibration (from audit)
calibration_json: Optional[str] = None # JSON with {"a":..., "b":...} or full summary
# Acceleration exports (optional)
export_onnx_path: Optional[str] = None
export_tensorrt_dir: Optional[str] = None
def _compute_metrology_bounds(
depth: np.ndarray, sigma_z: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute derived metrology-facing quantities:
- EAE (expected absolute error) assuming Gaussian: σ * sqrt(2/pi)
- 95% bounds: depth ± 1.96σ (nominal; audit may provide calibrated σ)
"""
eae = sigma_z * np.sqrt(2.0 / np.pi)
lower = depth - 1.96 * sigma_z
upper = depth + 1.96 * sigma_z
return eae.astype(np.float32), lower.astype(np.float32), upper.astype(np.float32)
def _extract_video_frames(
video_path: Path, max_frames: Optional[int], frame_interval: int
) -> List[np.ndarray]:
import cv2 # type: ignore
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
frames: List[np.ndarray] = []
idx = 0
kept = 0
while True:
ok, frame_bgr = cap.read()
if not ok:
break
if idx % max(int(frame_interval), 1) == 0:
frames.append(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
kept += 1
if max_frames is not None and kept >= int(max_frames):
break
idx += 1
cap.release()
return frames
def _pixel_to_camera_ray(u: float, v: float, K: np.ndarray) -> np.ndarray:
fx, fy, cx, cy = float(K[0, 0]), float(K[1, 1]), float(K[0, 2]), float(K[1, 2])
x = (u - cx) / fx
y = (v - cy) / fy
return np.array([x, y, 1.0], dtype=np.float64)
class _DepthModel(Protocol):
def inference(self, frames: List[np.ndarray]) -> Any:
raise NotImplementedError
def run_inference(
input_path: Path,
output_dir: Path,
*,
config: Optional[InferenceConfig] = None,
model: Optional[_DepthModel] = None,
wandb_required: bool = False,
artifact_store: Optional[ArtifactStore] = None,
) -> Dict[str, object]:
config = config or InferenceConfig()
output_dir = Path(output_dir)
ensure_dir(output_dir)
# Resolve input: either a capture bundle dir or a raw video file.
input_path = Path(input_path)
bundle = None
if input_path.is_dir() and (input_path / "manifest.json").exists():
bundle = CaptureBundle.load(input_path)
if not bundle.manifest.devices:
raise ValueError("Bundle has no devices")
did = config.device_id
if did is None:
if len(bundle.manifest.devices) != 1:
raise ValueError("device_id required for multi-device bundle inference")
did = bundle.manifest.devices[0].device_id
video_path = bundle.device_video_path(did)
device_id = did
else:
video_path = input_path
device_id = config.device_id or "video"
with span("inference.run", attributes={"device_id": device_id, "input": str(input_path)}):
frames = _extract_video_frames(
video_path, max_frames=config.max_frames, frame_interval=config.frame_interval
)
if len(frames) < 2:
raise ValueError(f"Need at least 2 frames, got {len(frames)}")
# Semantic constraints (SPEC §7): record selected constraint set for provenance.
constraints_meta = None
if bundle is not None:
try:
from .constraints.selection import select_constraints
selected = select_constraints(
scene_type=bundle.manifest.scene_type,
confidence=1.0 if bundle.manifest.scene_type else 0.0,
operating_regime=bundle.manifest.operating_regime,
)
constraints_meta = {
"mode": selected.mode,
"scene_type": selected.scene_type,
"confidence": selected.confidence,
"constraints": {
"manhattan_weight": selected.constraints.manhattan_weight,
"ceiling_prior": (
{
"mean": selected.constraints.ceiling_prior.mean,
"sigma": selected.constraints.ceiling_prior.sigma,
}
if selected.constraints.ceiling_prior is not None
else None
),
"room_scale_min_m": selected.constraints.room_scale_min_m,
"room_scale_max_m": selected.constraints.room_scale_max_m,
},
}
except Exception:
constraints_meta = {"mode": "unavailable"}
if config.enable_quality_gates:
from .ingest_validation import QualityGateConfig, lidar_coverage, run_quality_gates
# Multi-device sync sanity check (if provided)
if (
config.enable_sync_validation
and bundle is not None
and bundle.manifest.calibration
and bundle.manifest.calibration.sync_offsets_path
):
from .ingest_validation import validate_sync_offsets_json
sync_res = validate_sync_offsets_json(
bundle.root / bundle.manifest.calibration.sync_offsets_path
)
if not sync_res.ok:
raise ValueError(f"sync_offsets.json validation failed: {sync_res.details}")
gates = run_quality_gates(frames, cfg=QualityGateConfig())
if not gates.passed:
raise ValueError(f"Quality gates failed: {gates.details}")
if bundle is not None:
dev = bundle.get_device(device_id)
if dev.lidar_depth_dir:
cov, cov_details = lidar_coverage(bundle.root / dev.lidar_depth_dir)
if cov is not None and cov < QualityGateConfig().min_lidar_coverage:
raise ValueError(
f"LiDAR coverage gate failed: coverage={cov} details={cov_details}"
)
if model is None:
from ..utils.model_loader import load_da3_model
model = load_da3_model(
model_name=config.model_name,
device=config.device,
use_case="metric_depth",
compile_model=False,
)
# Optional exports for production acceleration.
if config.export_onnx_path:
try:
from ..utils.onnx_export import export_to_onnx # type: ignore
except Exception as e:
raise RuntimeError("ONNX export requires optional deps (torch/onnx).") from e
onnx_path = Path(config.export_onnx_path)
if not (onnx_path.exists() and onnx_path.stat().st_size > 0):
export_to_onnx(model, sample_input=[], output_path=onnx_path)
if config.export_tensorrt_dir:
try:
from ..utils.tensorrt_export import build_tensorrt_engine # type: ignore
except Exception as e:
raise RuntimeError("TensorRT export requires optional deps (tensorrt).") from e
if not config.export_onnx_path:
raise ValueError("export_tensorrt_dir requires export_onnx_path to be set")
out_dir = Path(config.export_tensorrt_dir)
out_dir.mkdir(parents=True, exist_ok=True)
engine_path = out_dir / "model.engine"
if not (engine_path.exists() and engine_path.stat().st_size > 0):
build_tensorrt_engine(
onnx_path=Path(config.export_onnx_path),
engine_path=engine_path,
precision="fp16",
)
logger.info(f"Inference: running model on {len(frames)} frames")
m = model
if m is None: # pragma: no cover (defensive for type-checkers)
raise RuntimeError("Inference model was not initialized")
try:
import torch # type: ignore
no_grad = torch.no_grad
except Exception:
from contextlib import nullcontext
no_grad = nullcontext
with no_grad():
out = m.inference(frames)
depth = np.asarray(out.depth, dtype=np.float32) # (T,H,W)
if depth.ndim != 3:
raise ValueError(f"Expected depth (T,H,W), got {depth.shape}")
# Prefer model-provided σ_z (meters) if available; otherwise fall back to
# temporal consensus.
sigma_model = None
for attr in ("sigma_z", "sigma", "log_sigma"):
if hasattr(out, attr):
sigma_model = getattr(out, attr)
break
if sigma_model is not None:
s = np.asarray(sigma_model, dtype=np.float32)
if attr == "log_sigma":
s = np.exp(s).astype(np.float32)
if s.ndim == 2:
s = np.repeat(s[None, :, :], depth.shape[0], axis=0)
if s.shape != depth.shape:
raise ValueError(f"Model sigma has wrong shape: {s.shape} vs depth {depth.shape}")
sigma_z = s
else:
sigma_z = temporal_consensus_sigma(depth, window=5)
sigma_z = np.clip(sigma_z, float(config.sigma_min), float(config.sigma_max)).astype(
np.float32
)
calib_prov: Optional[Dict[str, object]] = None
if config.calibration_json:
try:
regime = None
if bundle is not None and bundle.manifest.operating_regime is not None:
regime = str(bundle.manifest.operating_regime.value)
sigma_z, calib_prov = _apply_sigma_calibration_from_json(
sigma_z, config.calibration_json, operating_regime=regime
)
except Exception as e:
logger.warning(f"Failed to apply sigma calibration: {e}")
# Save per-frame outputs
depth_dir = ensure_dir(output_dir / "depth")
unc_dir = ensure_dir(output_dir / "uncertainty")
for t in range(depth.shape[0]):
np.save(depth_dir / f"frame_{t:06d}.npy", depth[t])
np.save(unc_dir / f"frame_{t:06d}.npy", sigma_z[t])
# Optionally run a minimal GTSAM optimization with ray-depth priors.
recon = {"optimized": False}
if config.enable_gtsam_ba and has_gtsam():
from ..gtsam import require_gtsam
gtsam = require_gtsam()
# Intrinsics: use first frame intrinsics if available; otherwise approximate.
H, W = depth.shape[1:]
if bundle is not None:
K = bundle.load_intrinsics_matrix(device_id).astype(np.float64)
fx = float(K[0, 0])
fy = float(K[1, 1])
cx = float(K[0, 2])
cy = float(K[1, 2])
else:
fx = fy = 0.8 * max(H, W)
cx = W / 2.0
cy = H / 2.0
K = np.array([[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float64)
calib = gtsam.Cal3_S2(float(fx), float(fy), 0.0, float(cx), float(cy))
ba_mode = str(getattr(config, "ba_mode", "grid")).lower().strip()
if ba_mode == "tracks":
# Phase 5 scaffold: track-based BA
try:
from .teacher_gtsam_ba import build_problem_from_tracks, run_teacher_ba
from .tracks.orb_track_builder import OrbTrackBuilderConfig, build_orb_tracks
tracks = build_orb_tracks(
frames,
cfg=OrbTrackBuilderConfig(
pair_offsets=tuple(getattr(config, "orb_pair_offsets", (1, 2, 3))),
use_ransac_fmat=bool(getattr(config, "orb_use_ransac_fmat", True)),
min_track_length=int(getattr(config, "orb_min_track_length", 3)),
),
)
poses_init = [np.eye(4, dtype=np.float64) for _ in range(depth.shape[0])]
problem, _ = build_problem_from_tracks(
tracks=tracks,
K=K,
poses_init=poses_init,
depth_stack=depth,
sigma_stack=sigma_z,
max_sigma_prior=float(getattr(config, "sigma_max", 10.0)),
max_tracks=int(getattr(config, "max_tracks", 500)),
)
ba = run_teacher_ba(
problem,
reproj_sigma_px=float(getattr(config, "reproj_sigma_px", 1.5)),
use_isam2=bool(getattr(config, "use_isam2", True)),
isam2_refinement_steps=int(getattr(config, "isam2_refinement_steps", 3)),
)
recon = {
"optimized": True,
"mode": "tracks",
"reproj_rmse_px": float(ba.reproj_rmse_px),
"num_points": int(len(problem.points_init)),
"num_obs": int(len(problem.observations)),
}
pts = []
for j in range(len(problem.points_init)):
lk = gtsam.symbol("L", j)
if ba.optimized_values.exists(lk):
p = np.asarray(
ba.optimized_values.atPoint3(lk), dtype=np.float64
).reshape(3)
pts.append(p.tolist())
(output_dir / "reconstruction_points.json").write_text(
json.dumps({"points": pts})
)
except Exception as e:
recon = {"optimized": False, "mode": "tracks", "error": str(e)}
else:
# Grid-sampled landmarks around the center frame (legacy baseline)
graph = gtsam.NonlinearFactorGraph()
initial = gtsam.Values()
pose_keys = [gtsam.symbol("P", i) for i in range(depth.shape[0])]
prior_noise = gtsam.noiseModel.Diagonal.Sigmas(
np.array([0.1, 0.1, 0.1, 1.0, 1.0, 1.0], dtype=np.float64)
)
for i, pk in enumerate(pose_keys):
pose = gtsam.Pose3(np.eye(4))
initial.insert(pk, pose)
if i == 0:
graph.add(gtsam.PriorFactorPose3(pk, pose, prior_noise))
center = depth.shape[0] // 2
grid = max(8, int(config.sparse_grid))
us = np.arange(grid // 2, W, grid, dtype=np.int32)
vs = np.arange(grid // 2, H, grid, dtype=np.int32)
noise_reproj = gtsam.noiseModel.Isotropic.Sigma(
2, float(getattr(config, "reproj_sigma_px", 1.5))
)
obs_count = 0
lm_idx = 0
for v in vs:
for u in us:
z = float(depth[center, v, u])
if not np.isfinite(z) or z <= 0:
continue
ray = _pixel_to_camera_ray(float(u), float(v), K)
ray = ray / (np.linalg.norm(ray) + 1e-12)
X_w = ray * z # since pose is identity
lk = gtsam.symbol("L", lm_idx)
lm_idx += 1
initial.insert(
lk, gtsam.Point3(float(X_w[0]), float(X_w[1]), float(X_w[2]))
)
meas = gtsam.Point2(float(u), float(v))
graph.add(
gtsam.GenericProjectionFactorCal3_S2(
meas, noise_reproj, pose_keys[center], lk, calib
)
)
spec = RayDepthPriorSpec(
pixel_uv=(float(u), float(v)),
K=K,
z_pred=z,
sigma_z=float(sigma_z[center, v, u]),
)
graph.add(
make_ray_depth_prior_factor(pose_keys[center], lk, spec, robust=True)
)
obs_count += 1
if obs_count > 0:
params = gtsam.LevenbergMarquardtParams()
params.setMaxIterations(25)
optimizer = gtsam.LevenbergMarquardtOptimizer(graph, initial, params)
result = optimizer.optimize()
recon = {
"optimized": True,
"mode": "grid",
"num_landmarks": int(lm_idx),
"num_obs": int(obs_count),
}
pts = []
for j in range(lm_idx):
lk = gtsam.symbol("L", j)
if result.exists(lk):
p = np.asarray(result.atPoint3(lk), dtype=np.float64).reshape(3)
pts.append(p.tolist())
(output_dir / "reconstruction_points.json").write_text(
json.dumps({"points": pts})
)
# Derived metrology quantities for reporting (EAE and 95% interval at pixel-level)
eae, lo95, hi95 = _compute_metrology_bounds(depth, sigma_z)
np.save(output_dir / "eae.npy", eae)
np.save(output_dir / "depth_lower_95.npy", lo95)
np.save(output_dir / "depth_upper_95.npy", hi95)
metadata = {
"input": str(input_path),
"device_id": device_id,
"num_frames": int(depth.shape[0]),
"depth_shape": [int(depth.shape[1]), int(depth.shape[2])],
"gtsam_optimized": bool(recon.get("optimized", False)),
"reconstruction": recon,
"sigma_calibration": calib_prov,
"selected_constraints": constraints_meta,
"operating_regime": (
str(bundle.manifest.operating_regime.value)
if (bundle is not None and bundle.manifest.operating_regime is not None)
else None
),
}
metadata_path = output_dir / "inference_metadata.json"
metadata_path.write_text(json.dumps(metadata, indent=2))
artifact_uri = None
if artifact_store is not None:
artifact_uri = artifact_store.put_json(metadata)
run = ensure_wandb_run(
required=wandb_required,
project=os.getenv("WANDB_PROJECT", "ylff"),
entity=os.getenv("WANDB_ENTITY"),
name=f"infer-{device_id}",
config={"inference": metadata},
tags=["inference"],
mode=os.getenv("WANDB_MODE"),
)
if run is not None:
log_metrics(
{
"inference/num_frames": int(depth.shape[0]),
"inference/gtsam_optimized": int(bool(recon.get("optimized", False))),
}
)
log_artifact(str(output_dir), name=f"inference_outputs_{device_id}", type="inference")
return {
**metadata,
"metadata_path": str(metadata_path),
"metadata_artifact_uri": artifact_uri,
}