| from PIL import Image |
| import numpy as np |
| import os |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import cv2 |
| import random |
| from omegaconf import DictConfig |
| from ppd.utils.diffusion.timesteps import Timesteps |
| from ppd.utils.diffusion.schedule import LinearSchedule |
| from ppd.utils.diffusion.sampler import EulerSampler |
| from ppd.utils.transform import video2tensor |
| from ppd.utils.align_vda import align_video_depth |
|
|
| from ppd.models.dit_video import DiT_Video |
| from safetensors.torch import load_file |
|
|
| |
| INFER_LEN = 16 |
| KEYFRAMES = [0, 8, 15] |
| OVERLAP = 3 |
| STRIDE = 13 |
|
|
| class PixelPerfectVideoDepth(nn.Module): |
| def __init__( |
| self, |
| semantics_model='Pi3', |
| semantics_pth='checkpoints/pi3.safetensors', |
| sampling_steps=4, |
| ): |
| super().__init__() |
| self.sampling_steps = sampling_steps |
| DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu') |
| self.device = DEVICE |
|
|
| if semantics_model == 'Pi3': |
| from ppd.models.pi3.models.pi3 import Pi3 |
| self.sem_encoder = Pi3() |
| self.sem_encoder.load_state_dict(load_file(semantics_pth)) |
|
|
| self.sem_encoder = self.sem_encoder.to(self.device).eval() |
| self.sem_encoder.requires_grad_(False) |
|
|
| self.configure_diffusion() |
| self.dit_video = DiT_Video() |
|
|
| def configure_diffusion(self): |
| self.schedule = LinearSchedule(T=1000) |
| self.sampling_timesteps = Timesteps( |
| T=self.schedule.T, |
| steps=self.sampling_steps, |
| device=self.device, |
| ) |
| self.sampler = EulerSampler( |
| schedule=self.schedule, |
| timesteps=self.sampling_timesteps, |
| prediction_type='velocity' |
| ) |
| |
| @torch.no_grad() |
| def infer_video(self, images, use_fp16: bool = True): |
| images = video2tensor(images) |
| images = [img.to(self.device) for img in images] |
| p_imgs = [F.interpolate(img, size=(512, 512), mode='bilinear', align_corners=False) for img in images] |
| LEN = len(p_imgs) |
| R = (LEN - INFER_LEN) % STRIDE |
| if R != 0: |
| pad_len = STRIDE - R |
| last_img = p_imgs[-1] |
| p_imgs.extend([last_img.clone() for _ in range(pad_len)]) |
| autocast_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 |
| with torch.autocast(device_type=self.device.type, dtype=autocast_dtype): |
| preds = self.forward_test(p_imgs) |
|
|
| preds = [F.interpolate(pred, size=images[0].shape[-2:], mode='bilinear', align_corners=False) for pred in preds] |
| preds = align_video_depth(preds, INFER_LEN, KEYFRAMES, OVERLAP) |
| return preds[:LEN] |
| |
| @torch.no_grad() |
| def forward_test(self, imgs): |
| preds = [] |
| pre_img = None |
| init_latent = torch.randn(size=[INFER_LEN, 1, imgs[0].shape[2], imgs[0].shape[3]]).to(self.device) |
| for i in range(0, len(imgs)-INFER_LEN+1, STRIDE): |
| cur_img = imgs[i:i+INFER_LEN] |
| if pre_img is not None: |
| cur_img[:OVERLAP] = [pre_img[k] for k in KEYFRAMES] |
| pre_img = cur_img |
| concat_img = torch.cat(cur_img, dim=0) |
| semantics = self.semantics_prompt(concat_img) |
| cond = concat_img - 0.5 |
| latent = init_latent |
| |
| for timestep in self.sampling_timesteps: |
| input = torch.cat([latent, cond], dim=1) |
| pred = self.dit_video(x=input, semantics=semantics, timestep=timestep) |
| latent = self.sampler.step(pred=pred, x_t=latent, t=timestep) |
| cur_pred = latent + 0.5 |
| preds.append(cur_pred) |
| return preds |
|
|
| @torch.no_grad() |
| def semantics_prompt(self, images): |
| with torch.no_grad(): |
| semantics = self.sem_encoder.forward_semantics(images) |
| return semantics |
|
|