| """ |
| Demo: run LiDAR-Perfect Depth on a video sequence with the temporal Kalman |
| filter looped between frames. |
| |
| Usage: |
| python run_lpd_video.py \ |
| --sequence /mnt/sig/datasets/eval_video/bonn_rgbd/rgbd_bonn_balloon \ |
| --dataset bonn \ |
| --weights checkpoints/ppd.pth \ |
| --out outputs/bonn_balloon |
| |
| Requires: PPD weights + DA-V2 weights + RAFT weights under checkpoints/. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import os |
| from pathlib import Path |
|
|
| import imageio.v2 as imageio |
| import numpy as np |
| import torch |
| from omegaconf import OmegaConf |
|
|
| from ppd.lpd.lpd_train import LiDARPerfectDepth |
| from ppd.lpd.lpd_video import run_video, VideoInferenceConfig |
| from ppd.utils.vis_utils import visualize_depth |
|
|
|
|
| def _load_pipeline(ppd_weights: str, device: str = "cuda") -> LiDARPerfectDepth: |
| cfg = OmegaConf.create( |
| { |
| "pretrain": False, |
| "semantics_model": "DA2", |
| "semantics_pth": "checkpoints/depth_anything_v2_vitl.pth", |
| "ppd_weights": ppd_weights, |
| "freeze_backbone": True, |
| "lambda_anchor": 0.0, |
| "R_proj": 0.1, |
| "proj_alpha": 0.1, |
| "init_P": 1.0, |
| "sparse": {"pattern": "hybrid", "density": 0.005, "n_lines": 64, |
| "line_density": 0.5, "grid_stride": 32, "min_points": 16, |
| "measurement_noise_std": 0.0}, |
| "score_model": {"depth": 24, "hidden_size": 1024, "patch_size": 8, |
| "num_heads": 16, "in_channels": 4, "out_channels": 1}, |
| "diffusion": { |
| "schedule": {"type": "lerp", "T": 1000}, |
| "sampler": {"type": "euler", "prediction_type": "v_lerp"}, |
| "timesteps": { |
| "training": {"type": "logitnormal", "loc": 0.0, "scale": 1.0}, |
| "sampling": {"type": "uniform", "steps": 4}, |
| }, |
| }, |
| } |
| ) |
| pipeline = LiDARPerfectDepth(cfg).to(device).eval() |
| return pipeline |
|
|
|
|
| def _yield_bonn(seq_dir: str, max_frames: int | None = None): |
| rgb_dir = Path(seq_dir) / "rgb" |
| dpt_dir = Path(seq_dir) / "depth" |
| rgb_files = sorted(p for p in rgb_dir.iterdir() if p.suffix == ".png") |
| dpt_files = sorted(p for p in dpt_dir.iterdir() if p.suffix == ".png") |
| n = min(len(rgb_files), len(dpt_files)) |
| if max_frames is not None: |
| n = min(n, max_frames) |
| for i in range(n): |
| rgb = imageio.imread(rgb_files[i]).astype(np.float32) / 255.0 |
| depth = imageio.imread(dpt_files[i]).astype(np.float32) / 5000.0 |
| mask = (depth > 0.01) & (depth < 10.0) |
| yield { |
| "image": torch.from_numpy(rgb).permute(2, 0, 1)[None], |
| "depth": torch.from_numpy(depth)[None, None], |
| "mask": torch.from_numpy(mask.astype(np.uint8))[None, None], |
| "frame_idx": i, |
| } |
|
|
|
|
| def main(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--sequence", required=True, help="path to a video sequence") |
| p.add_argument("--dataset", default="bonn", choices=["bonn"]) |
| p.add_argument("--weights", default="checkpoints/ppd.pth") |
| p.add_argument("--out", default="outputs/lpd_video") |
| p.add_argument("--max-frames", type=int, default=None) |
| p.add_argument("--device", default="cuda") |
| args = p.parse_args() |
|
|
| os.makedirs(args.out, exist_ok=True) |
| print(f"[lpd_video] loading pipeline from {args.weights}") |
| pipeline = _load_pipeline(args.weights, args.device) |
|
|
| if args.dataset == "bonn": |
| frames = list(_yield_bonn(args.sequence, args.max_frames)) |
| else: |
| raise ValueError(f"unknown dataset: {args.dataset}") |
|
|
| print(f"[lpd_video] running on {len(frames)} frames") |
| cfg = VideoInferenceConfig() |
| with torch.autocast(device_type="cuda", dtype=torch.bfloat16): |
| outs = run_video(pipeline, frames, config=cfg) |
|
|
| print(f"[lpd_video] writing visualizations to {args.out}") |
| for i, (frame, out) in enumerate(zip(frames, outs)): |
| depth_np = out["depth"][0, 0].float().cpu().numpy() |
| var_np = out["kalman_variance_running"][0, 0].float().cpu().numpy() |
| vis_d = visualize_depth(depth_np) |
| vis_v = visualize_depth(var_np, var_np.min(), var_np.max()) |
| rgb_np = frame["image"][0].permute(1, 2, 0).numpy() |
| panel = np.concatenate([rgb_np, vis_d, vis_v], axis=1) |
| imageio.imwrite(os.path.join(args.out, f"{i:05d}.png"), (panel * 255).astype(np.uint8)) |
|
|
| print("[lpd_video] done") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|