LiDAR-Perfect-Depth / code /run_lpd_video.py
chenming-wu's picture
code
436b829 verified
"""
Demo: run LiDAR-Perfect Depth on a video sequence with the temporal Kalman
filter looped between frames.
Usage:
python run_lpd_video.py \
--sequence /mnt/sig/datasets/eval_video/bonn_rgbd/rgbd_bonn_balloon \
--dataset bonn \
--weights checkpoints/ppd.pth \
--out outputs/bonn_balloon
Requires: PPD weights + DA-V2 weights + RAFT weights under checkpoints/.
"""
from __future__ import annotations
import argparse
import os
from pathlib import Path
import imageio.v2 as imageio
import numpy as np
import torch
from omegaconf import OmegaConf
from ppd.lpd.lpd_train import LiDARPerfectDepth
from ppd.lpd.lpd_video import run_video, VideoInferenceConfig
from ppd.utils.vis_utils import visualize_depth
def _load_pipeline(ppd_weights: str, device: str = "cuda") -> LiDARPerfectDepth:
cfg = OmegaConf.create(
{
"pretrain": False,
"semantics_model": "DA2",
"semantics_pth": "checkpoints/depth_anything_v2_vitl.pth",
"ppd_weights": ppd_weights,
"freeze_backbone": True,
"lambda_anchor": 0.0,
"R_proj": 0.1,
"proj_alpha": 0.1,
"init_P": 1.0,
"sparse": {"pattern": "hybrid", "density": 0.005, "n_lines": 64,
"line_density": 0.5, "grid_stride": 32, "min_points": 16,
"measurement_noise_std": 0.0},
"score_model": {"depth": 24, "hidden_size": 1024, "patch_size": 8,
"num_heads": 16, "in_channels": 4, "out_channels": 1},
"diffusion": {
"schedule": {"type": "lerp", "T": 1000},
"sampler": {"type": "euler", "prediction_type": "v_lerp"},
"timesteps": {
"training": {"type": "logitnormal", "loc": 0.0, "scale": 1.0},
"sampling": {"type": "uniform", "steps": 4},
},
},
}
)
pipeline = LiDARPerfectDepth(cfg).to(device).eval()
return pipeline
def _yield_bonn(seq_dir: str, max_frames: int | None = None):
rgb_dir = Path(seq_dir) / "rgb"
dpt_dir = Path(seq_dir) / "depth"
rgb_files = sorted(p for p in rgb_dir.iterdir() if p.suffix == ".png")
dpt_files = sorted(p for p in dpt_dir.iterdir() if p.suffix == ".png")
n = min(len(rgb_files), len(dpt_files))
if max_frames is not None:
n = min(n, max_frames)
for i in range(n):
rgb = imageio.imread(rgb_files[i]).astype(np.float32) / 255.0
depth = imageio.imread(dpt_files[i]).astype(np.float32) / 5000.0
mask = (depth > 0.01) & (depth < 10.0)
yield {
"image": torch.from_numpy(rgb).permute(2, 0, 1)[None],
"depth": torch.from_numpy(depth)[None, None],
"mask": torch.from_numpy(mask.astype(np.uint8))[None, None],
"frame_idx": i,
}
def main():
p = argparse.ArgumentParser()
p.add_argument("--sequence", required=True, help="path to a video sequence")
p.add_argument("--dataset", default="bonn", choices=["bonn"])
p.add_argument("--weights", default="checkpoints/ppd.pth")
p.add_argument("--out", default="outputs/lpd_video")
p.add_argument("--max-frames", type=int, default=None)
p.add_argument("--device", default="cuda")
args = p.parse_args()
os.makedirs(args.out, exist_ok=True)
print(f"[lpd_video] loading pipeline from {args.weights}")
pipeline = _load_pipeline(args.weights, args.device)
if args.dataset == "bonn":
frames = list(_yield_bonn(args.sequence, args.max_frames))
else:
raise ValueError(f"unknown dataset: {args.dataset}")
print(f"[lpd_video] running on {len(frames)} frames")
cfg = VideoInferenceConfig()
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
outs = run_video(pipeline, frames, config=cfg)
print(f"[lpd_video] writing visualizations to {args.out}")
for i, (frame, out) in enumerate(zip(frames, outs)):
depth_np = out["depth"][0, 0].float().cpu().numpy()
var_np = out["kalman_variance_running"][0, 0].float().cpu().numpy()
vis_d = visualize_depth(depth_np)
vis_v = visualize_depth(var_np, var_np.min(), var_np.max())
rgb_np = frame["image"][0].permute(1, 2, 0).numpy()
panel = np.concatenate([rgb_np, vis_d, vis_v], axis=1)
imageio.imwrite(os.path.join(args.out, f"{i:05d}.png"), (panel * 255).astype(np.uint8))
print("[lpd_video] done")
if __name__ == "__main__":
main()