|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
from typing import Dict, Optional, Union |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from diffusers import ( |
|
|
AutoencoderKL, |
|
|
DDIMScheduler, |
|
|
DiffusionPipeline, |
|
|
LCMScheduler, |
|
|
UNet2DConditionModel, |
|
|
) |
|
|
from diffusers.utils import BaseOutput |
|
|
from PIL import Image |
|
|
from torch.utils.data import DataLoader, TensorDataset |
|
|
from torchvision.transforms import InterpolationMode |
|
|
from torchvision.transforms.functional import pil_to_tensor, resize |
|
|
from tqdm.auto import tqdm |
|
|
from transformers import CLIPTextModel, CLIPTokenizer |
|
|
|
|
|
from .util.batchsize import find_batch_size |
|
|
from .util.ensemble import ensemble_depth |
|
|
from .util.image_util import ( |
|
|
chw2hwc, |
|
|
colorize_depth_maps, |
|
|
get_tv_resample_method, |
|
|
resize_max_res, |
|
|
) |
|
|
from DA2.depth_anything_v2.dpt import DepthAnythingV2 |
|
|
|
|
|
class MarigoldDepthOutput(BaseOutput): |
|
|
""" |
|
|
Output class for Marigold monocular depth prediction pipeline. |
|
|
|
|
|
Args: |
|
|
depth_np (`np.ndarray`): |
|
|
Predicted depth map, with depth values in the range of [0, 1]. |
|
|
depth_colored (`PIL.Image.Image`): |
|
|
Colorized depth map, with the shape of [3, H, W] and values in [0, 1]. |
|
|
uncertainty (`None` or `np.ndarray`): |
|
|
Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling. |
|
|
""" |
|
|
|
|
|
depth_np: np.ndarray |
|
|
depth_colored: Union[None, Image.Image] |
|
|
uncertainty: Union[None, np.ndarray] |
|
|
|
|
|
|
|
|
class MarigoldPipeline(DiffusionPipeline): |
|
|
""" |
|
|
Pipeline for monocular depth estimation using Marigold: https://marigoldmonodepth.github.io. |
|
|
|
|
|
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
|
|
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) |
|
|
|
|
|
Args: |
|
|
unet (`UNet2DConditionModel`): |
|
|
Conditional U-Net to denoise the depth latent, conditioned on image latent. |
|
|
vae (`AutoencoderKL`): |
|
|
Variational Auto-Encoder (VAE) Model to encode and decode images and depth maps |
|
|
to and from latent representations. |
|
|
scheduler (`DDIMScheduler`): |
|
|
A scheduler to be used in combination with `unet` to denoise the encoded image latents. |
|
|
text_encoder (`CLIPTextModel`): |
|
|
Text-encoder, for empty text embedding. |
|
|
tokenizer (`CLIPTokenizer`): |
|
|
CLIP tokenizer. |
|
|
scale_invariant (`bool`, *optional*): |
|
|
A model property specifying whether the predicted depth maps are scale-invariant. This value must be set in |
|
|
the model config. When used together with the `shift_invariant=True` flag, the model is also called |
|
|
"affine-invariant". NB: overriding this value is not supported. |
|
|
shift_invariant (`bool`, *optional*): |
|
|
A model property specifying whether the predicted depth maps are shift-invariant. This value must be set in |
|
|
the model config. When used together with the `scale_invariant=True` flag, the model is also called |
|
|
"affine-invariant". NB: overriding this value is not supported. |
|
|
default_denoising_steps (`int`, *optional*): |
|
|
The minimum number of denoising diffusion steps that are required to produce a prediction of reasonable |
|
|
quality with the given model. This value must be set in the model config. When the pipeline is called |
|
|
without explicitly setting `num_inference_steps`, the default value is used. This is required to ensure |
|
|
reasonable results with various model flavors compatible with the pipeline, such as those relying on very |
|
|
short denoising schedules (`LCMScheduler`) and those with full diffusion schedules (`DDIMScheduler`). |
|
|
default_processing_resolution (`int`, *optional*): |
|
|
The recommended value of the `processing_resolution` parameter of the pipeline. This value must be set in |
|
|
the model config. When the pipeline is called without explicitly setting `processing_resolution`, the |
|
|
default value is used. This is required to ensure reasonable results with various model flavors trained |
|
|
with varying optimal processing resolution values. |
|
|
""" |
|
|
|
|
|
rgb_latent_scale_factor = 0.18215 |
|
|
depth_latent_scale_factor = 0.18215 |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
unet: UNet2DConditionModel, |
|
|
vae: AutoencoderKL, |
|
|
text_encoder: CLIPTextModel, |
|
|
tokenizer: CLIPTokenizer, |
|
|
scale_invariant: Optional[bool] = True, |
|
|
shift_invariant: Optional[bool] = True, |
|
|
default_denoising_steps: Optional[int] = None, |
|
|
default_processing_resolution: Optional[int] = None, |
|
|
): |
|
|
super().__init__() |
|
|
self.register_modules( |
|
|
unet=unet, |
|
|
vae=vae, |
|
|
text_encoder=text_encoder, |
|
|
tokenizer=tokenizer, |
|
|
) |
|
|
self.register_to_config( |
|
|
scale_invariant=scale_invariant, |
|
|
shift_invariant=shift_invariant, |
|
|
default_denoising_steps=default_denoising_steps, |
|
|
default_processing_resolution=default_processing_resolution, |
|
|
) |
|
|
|
|
|
self.scale_invariant = scale_invariant |
|
|
self.shift_invariant = shift_invariant |
|
|
self.default_denoising_steps = default_denoising_steps |
|
|
self.default_processing_resolution = default_processing_resolution |
|
|
|
|
|
self.empty_text_embed = None |
|
|
|
|
|
self._fft_masks = {} |
|
|
da2_config = { |
|
|
'encoder': 'vitb', |
|
|
'features': 128, |
|
|
'out_channels': [96, 192, 384, 768], |
|
|
} |
|
|
|
|
|
|
|
|
if da2_config is not None: |
|
|
self.da2 = DepthAnythingV2(**da2_config) |
|
|
self.da2.load_state_dict(torch.load(f'./DA2/checkpoints/depth_anything_v2_{da2_config["encoder"]}.pth', map_location='cpu')) |
|
|
self.da2.to(device="cpu").eval() |
|
|
else: |
|
|
self.da2 = None |
|
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
def __call__( |
|
|
self, |
|
|
input_image: Union[Image.Image, torch.Tensor], |
|
|
denoising_steps: Optional[int] = None, |
|
|
ensemble_size: int = 1, |
|
|
processing_res: Optional[int] = None, |
|
|
match_input_res: bool = True, |
|
|
resample_method: str = "bilinear", |
|
|
batch_size: int = 0, |
|
|
color_map: str = "Spectral", |
|
|
show_progress_bar: bool = True, |
|
|
ensemble_kwargs: Dict = None, |
|
|
) -> MarigoldDepthOutput: |
|
|
""" |
|
|
Function invoked when calling the pipeline. |
|
|
|
|
|
Args: |
|
|
input_image (`Image`): |
|
|
Input RGB (or gray-scale) image. |
|
|
denoising_steps (`int`, *optional*, defaults to `None`): |
|
|
Number of denoising diffusion steps during inference. The default value `None` results in automatic |
|
|
selection. The number of steps should be at least 10 with the full Marigold models, and between 1 and 4 |
|
|
for Marigold-LCM models. |
|
|
ensemble_size (`int`, *optional*, defaults to `10`): |
|
|
Number of predictions to be ensembled. |
|
|
processing_res (`int`, *optional*, defaults to `None`): |
|
|
Effective processing resolution. When set to `0`, processes at the original image resolution. This |
|
|
produces crisper predictions, but may also lead to the overall loss of global context. The default |
|
|
value `None` resolves to the optimal value from the model config. |
|
|
match_input_res (`bool`, *optional*, defaults to `True`): |
|
|
Resize depth prediction to match input resolution. |
|
|
Only valid if `processing_res` > 0. |
|
|
resample_method: (`str`, *optional*, defaults to `bilinear`): |
|
|
Resampling method used to resize images and depth predictions. This can be one of `bilinear`, `bicubic` or `nearest`, defaults to: `bilinear`. |
|
|
batch_size (`int`, *optional*, defaults to `0`): |
|
|
Inference batch size, no bigger than `num_ensemble`. |
|
|
If set to 0, the script will automatically decide the proper batch size. |
|
|
generator (`torch.Generator`, *optional*, defaults to `None`) |
|
|
Random generator for initial noise generation. |
|
|
show_progress_bar (`bool`, *optional*, defaults to `True`): |
|
|
Display a progress bar of diffusion denoising. |
|
|
color_map (`str`, *optional*, defaults to `"Spectral"`, pass `None` to skip colorized depth map generation): |
|
|
Colormap used to colorize the depth map. |
|
|
scale_invariant (`str`, *optional*, defaults to `True`): |
|
|
Flag of scale-invariant prediction, if True, scale will be adjusted from the raw prediction. |
|
|
shift_invariant (`str`, *optional*, defaults to `True`): |
|
|
Flag of shift-invariant prediction, if True, shift will be adjusted from the raw prediction, if False, near plane will be fixed at 0m. |
|
|
ensemble_kwargs (`dict`, *optional*, defaults to `None`): |
|
|
Arguments for detailed ensembling settings. |
|
|
Returns: |
|
|
`MarigoldDepthOutput`: Output class for Marigold monocular depth prediction pipeline, including: |
|
|
- **depth_np** (`np.ndarray`) Predicted depth map, with depth values in the range of [0, 1] |
|
|
- **depth_colored** (`PIL.Image.Image`) Colorized depth map, with the shape of [3, H, W] and values in [0, 1], None if `color_map` is `None` |
|
|
- **uncertainty** (`None` or `np.ndarray`) Uncalibrated uncertainty(MAD, median absolute deviation) |
|
|
coming from ensembling. None if `ensemble_size = 1` |
|
|
""" |
|
|
|
|
|
if processing_res is None: |
|
|
processing_res = self.default_processing_resolution |
|
|
|
|
|
assert processing_res >= 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
resample_method: InterpolationMode = get_tv_resample_method(resample_method) |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(input_image, Image.Image): |
|
|
input_image = input_image.convert("RGB") |
|
|
|
|
|
rgb = pil_to_tensor(input_image) |
|
|
rgb = rgb.unsqueeze(0) |
|
|
elif isinstance(input_image, torch.Tensor): |
|
|
rgb = input_image |
|
|
else: |
|
|
raise TypeError(f"Unknown input type: {type(input_image) = }") |
|
|
input_size = rgb.shape |
|
|
assert ( |
|
|
4 == rgb.dim() and 3 == input_size[-3] |
|
|
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
|
|
|
|
|
|
|
if processing_res > 0: |
|
|
rgb = resize_max_res( |
|
|
rgb, |
|
|
max_edge_resolution=processing_res, |
|
|
resample_method=resample_method, |
|
|
) |
|
|
|
|
|
|
|
|
rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 |
|
|
rgb_norm = rgb_norm.to(self.dtype) |
|
|
assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 |
|
|
|
|
|
|
|
|
|
|
|
duplicated_rgb = rgb_norm.expand(1, -1, -1, -1) |
|
|
single_rgb_dataset = TensorDataset(duplicated_rgb) |
|
|
if batch_size > 0: |
|
|
_bs = batch_size |
|
|
else: |
|
|
_bs = 1 |
|
|
|
|
|
single_rgb_loader = DataLoader( |
|
|
single_rgb_dataset, batch_size=_bs, shuffle=False |
|
|
) |
|
|
|
|
|
|
|
|
depth_pred_ls = [] |
|
|
if show_progress_bar: |
|
|
iterable = tqdm( |
|
|
single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False |
|
|
) |
|
|
else: |
|
|
iterable = single_rgb_loader |
|
|
for batch in iterable: |
|
|
(batched_img,) = batch |
|
|
depth_pred_raw = self.single_infer( |
|
|
rgb_in=batched_img, |
|
|
) |
|
|
depth_pred_ls.append(depth_pred_raw.detach()) |
|
|
depth_preds = torch.concat(depth_pred_ls, dim=0) |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
if ensemble_size > 1: |
|
|
depth_pred, pred_uncert = ensemble_depth( |
|
|
depth_preds, |
|
|
scale_invariant=self.scale_invariant, |
|
|
shift_invariant=self.shift_invariant, |
|
|
max_res=50, |
|
|
**(ensemble_kwargs or {}), |
|
|
) |
|
|
else: |
|
|
depth_pred = depth_preds |
|
|
pred_uncert = None |
|
|
|
|
|
|
|
|
if match_input_res: |
|
|
depth_pred = resize( |
|
|
depth_pred, |
|
|
input_size[-2:], |
|
|
interpolation=resample_method, |
|
|
antialias=True, |
|
|
) |
|
|
|
|
|
|
|
|
depth_pred = depth_pred.squeeze() |
|
|
depth_pred = depth_pred.cpu().numpy() |
|
|
if pred_uncert is not None: |
|
|
pred_uncert = pred_uncert.squeeze().cpu().numpy() |
|
|
|
|
|
|
|
|
depth_pred = depth_pred.clip(0, 1) |
|
|
|
|
|
|
|
|
if color_map is not None: |
|
|
depth_colored = colorize_depth_maps( |
|
|
depth_pred, 0, 1, cmap=color_map |
|
|
).squeeze() |
|
|
depth_colored = (depth_colored * 255).astype(np.uint8) |
|
|
depth_colored_hwc = chw2hwc(depth_colored) |
|
|
depth_colored_img = Image.fromarray(depth_colored_hwc) |
|
|
else: |
|
|
depth_colored_img = None |
|
|
|
|
|
return MarigoldDepthOutput( |
|
|
depth_np=depth_pred, |
|
|
depth_colored=depth_colored_img, |
|
|
uncertainty=pred_uncert, |
|
|
) |
|
|
|
|
|
def _check_inference_step(self, n_step: int) -> None: |
|
|
""" |
|
|
Check if denoising step is reasonable |
|
|
Args: |
|
|
n_step (`int`): denoising steps |
|
|
""" |
|
|
assert n_step >= 1 |
|
|
|
|
|
if isinstance(self.scheduler, DDIMScheduler): |
|
|
if n_step < 10: |
|
|
logging.warning( |
|
|
f"Too few denoising steps: {n_step}. Recommended to use the LCM checkpoint for few-step inference." |
|
|
) |
|
|
elif isinstance(self.scheduler, LCMScheduler): |
|
|
if not 1 <= n_step <= 4: |
|
|
logging.warning( |
|
|
f"Non-optimal setting of denoising steps: {n_step}. Recommended setting is 1-4 steps." |
|
|
) |
|
|
else: |
|
|
raise RuntimeError(f"Unsupported scheduler type: {type(self.scheduler)}") |
|
|
|
|
|
def encode_empty_text(self): |
|
|
""" |
|
|
Encode text embedding for empty prompt |
|
|
""" |
|
|
prompt = "" |
|
|
text_inputs = self.tokenizer( |
|
|
prompt, |
|
|
padding="do_not_pad", |
|
|
max_length=self.tokenizer.model_max_length, |
|
|
truncation=True, |
|
|
return_tensors="pt", |
|
|
) |
|
|
text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) |
|
|
self.empty_text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) |
|
|
|
|
|
@torch.no_grad() |
|
|
def single_infer( |
|
|
self, |
|
|
rgb_in: torch.Tensor, |
|
|
) -> torch.Tensor: |
|
|
""" |
|
|
Perform an individual depth prediction without ensembling. |
|
|
|
|
|
Args: |
|
|
rgb_in (`torch.Tensor`): |
|
|
Input RGB image. |
|
|
num_inference_steps (`int`): |
|
|
Number of diffusion denoisign steps (DDIM) during inference. |
|
|
show_pbar (`bool`): |
|
|
Display a progress bar of diffusion denoising. |
|
|
generator (`torch.Generator`) |
|
|
Random generator for initial noise generation. |
|
|
Returns: |
|
|
`torch.Tensor`: Predicted depth map. |
|
|
""" |
|
|
device = self.device |
|
|
rgb_in = rgb_in.to(device) |
|
|
depth_da2 = self.da2.infer_batch(rgb_in).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
|
|
|
rgb_latent = self.encode_rgb(rgb_in) |
|
|
depth_da2_latent = self.encode_rgb(depth_da2) |
|
|
|
|
|
|
|
|
if self.empty_text_embed is None: |
|
|
self.encode_empty_text() |
|
|
batch_empty_text_embed = self.empty_text_embed.repeat( |
|
|
(rgb_latent.shape[0], 1, 1) |
|
|
).to(device) |
|
|
|
|
|
|
|
|
unet_input = torch.cat( |
|
|
[depth_da2_latent, rgb_latent],dim=1 |
|
|
) |
|
|
|
|
|
depth_latent = self.unet( |
|
|
unet_input, 1, encoder_hidden_states=batch_empty_text_embed |
|
|
).sample |
|
|
|
|
|
depth = self.decode_depth(depth_latent) |
|
|
|
|
|
|
|
|
depth = torch.clip(depth, -1.0, 1.0) |
|
|
|
|
|
depth = (depth + 1.0) / 2.0 |
|
|
|
|
|
return depth |
|
|
|
|
|
def encode_rgb(self, rgb_in: torch.Tensor) -> torch.Tensor: |
|
|
""" |
|
|
Encode RGB image into latent. |
|
|
|
|
|
Args: |
|
|
rgb_in (`torch.Tensor`): |
|
|
Input RGB image to be encoded. |
|
|
|
|
|
Returns: |
|
|
`torch.Tensor`: Image latent. |
|
|
""" |
|
|
|
|
|
h = self.vae.encoder(rgb_in) |
|
|
moments = self.vae.quant_conv(h) |
|
|
mean, logvar = torch.chunk(moments, 2, dim=1) |
|
|
|
|
|
rgb_latent = mean * self.rgb_latent_scale_factor |
|
|
return rgb_latent |
|
|
|
|
|
def decode_depth(self, depth_latent: torch.Tensor) -> torch.Tensor: |
|
|
""" |
|
|
Decode depth latent into depth map. |
|
|
|
|
|
Args: |
|
|
depth_latent (`torch.Tensor`): |
|
|
Depth latent to be decoded. |
|
|
|
|
|
Returns: |
|
|
`torch.Tensor`: Decoded depth map. |
|
|
""" |
|
|
|
|
|
depth_latent = depth_latent / self.depth_latent_scale_factor |
|
|
|
|
|
z = self.vae.post_quant_conv(depth_latent) |
|
|
stacked = self.vae.decoder(z) |
|
|
|
|
|
depth_mean = stacked.mean(dim=1, keepdim=True) |
|
|
return depth_mean |
|
|
|