""" Demo: run LiDAR-Perfect Depth on a video sequence with the temporal Kalman filter looped between frames. Usage: python run_lpd_video.py \ --sequence /mnt/sig/datasets/eval_video/bonn_rgbd/rgbd_bonn_balloon \ --dataset bonn \ --weights checkpoints/ppd.pth \ --out outputs/bonn_balloon Requires: PPD weights + DA-V2 weights + RAFT weights under checkpoints/. """ from __future__ import annotations import argparse import os from pathlib import Path import imageio.v2 as imageio import numpy as np import torch from omegaconf import OmegaConf from ppd.lpd.lpd_train import LiDARPerfectDepth from ppd.lpd.lpd_video import run_video, VideoInferenceConfig from ppd.utils.vis_utils import visualize_depth def _load_pipeline(ppd_weights: str, device: str = "cuda") -> LiDARPerfectDepth: cfg = OmegaConf.create( { "pretrain": False, "semantics_model": "DA2", "semantics_pth": "checkpoints/depth_anything_v2_vitl.pth", "ppd_weights": ppd_weights, "freeze_backbone": True, "lambda_anchor": 0.0, "R_proj": 0.1, "proj_alpha": 0.1, "init_P": 1.0, "sparse": {"pattern": "hybrid", "density": 0.005, "n_lines": 64, "line_density": 0.5, "grid_stride": 32, "min_points": 16, "measurement_noise_std": 0.0}, "score_model": {"depth": 24, "hidden_size": 1024, "patch_size": 8, "num_heads": 16, "in_channels": 4, "out_channels": 1}, "diffusion": { "schedule": {"type": "lerp", "T": 1000}, "sampler": {"type": "euler", "prediction_type": "v_lerp"}, "timesteps": { "training": {"type": "logitnormal", "loc": 0.0, "scale": 1.0}, "sampling": {"type": "uniform", "steps": 4}, }, }, } ) pipeline = LiDARPerfectDepth(cfg).to(device).eval() return pipeline def _yield_bonn(seq_dir: str, max_frames: int | None = None): rgb_dir = Path(seq_dir) / "rgb" dpt_dir = Path(seq_dir) / "depth" rgb_files = sorted(p for p in rgb_dir.iterdir() if p.suffix == ".png") dpt_files = sorted(p for p in dpt_dir.iterdir() if p.suffix == ".png") n = min(len(rgb_files), len(dpt_files)) if max_frames is not None: n = min(n, max_frames) for i in range(n): rgb = imageio.imread(rgb_files[i]).astype(np.float32) / 255.0 depth = imageio.imread(dpt_files[i]).astype(np.float32) / 5000.0 mask = (depth > 0.01) & (depth < 10.0) yield { "image": torch.from_numpy(rgb).permute(2, 0, 1)[None], "depth": torch.from_numpy(depth)[None, None], "mask": torch.from_numpy(mask.astype(np.uint8))[None, None], "frame_idx": i, } def main(): p = argparse.ArgumentParser() p.add_argument("--sequence", required=True, help="path to a video sequence") p.add_argument("--dataset", default="bonn", choices=["bonn"]) p.add_argument("--weights", default="checkpoints/ppd.pth") p.add_argument("--out", default="outputs/lpd_video") p.add_argument("--max-frames", type=int, default=None) p.add_argument("--device", default="cuda") args = p.parse_args() os.makedirs(args.out, exist_ok=True) print(f"[lpd_video] loading pipeline from {args.weights}") pipeline = _load_pipeline(args.weights, args.device) if args.dataset == "bonn": frames = list(_yield_bonn(args.sequence, args.max_frames)) else: raise ValueError(f"unknown dataset: {args.dataset}") print(f"[lpd_video] running on {len(frames)} frames") cfg = VideoInferenceConfig() with torch.autocast(device_type="cuda", dtype=torch.bfloat16): outs = run_video(pipeline, frames, config=cfg) print(f"[lpd_video] writing visualizations to {args.out}") for i, (frame, out) in enumerate(zip(frames, outs)): depth_np = out["depth"][0, 0].float().cpu().numpy() var_np = out["kalman_variance_running"][0, 0].float().cpu().numpy() vis_d = visualize_depth(depth_np) vis_v = visualize_depth(var_np, var_np.min(), var_np.max()) rgb_np = frame["image"][0].permute(1, 2, 0).numpy() panel = np.concatenate([rgb_np, vis_d, vis_v], axis=1) imageio.imwrite(os.path.join(args.out, f"{i:05d}.png"), (panel * 255).astype(np.uint8)) print("[lpd_video] done") if __name__ == "__main__": main()