World_Model / URSA /diffnext /schedulers /scheduling_ddpm.py
BryanW's picture
Add files using upload-large-folder tool
b6ff324 verified
# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim
import math
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union
import numpy as np
import torch
from diffusers.configuration_utils import ConfigMixin, register_to_config
from diffusers.models.modeling_outputs import BaseOutput
from diffusers.utils.torch_utils import randn_tensor
from diffusers.schedulers.scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
@dataclass
class DDPMSchedulerOutput(BaseOutput):
"""Output class for the scheduler's `step` function output."""
prev_sample: torch.Tensor
pred_original_sample: Optional[torch.Tensor] = None
def betas_for_alpha_bar(num_diffusion_timesteps, max_beta=0.999, alpha_transform_type="cosine"):
"""Create a beta schedule that discretizes the given alpha_t_bar function."""
if alpha_transform_type == "cosine":
alpha_bar_fn = lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2 # noqa
elif alpha_transform_type == "exp":
alpha_bar_fn = lambda t: math.exp(t * -12.0) # noqa
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
betas = []
for i in range(num_diffusion_timesteps):
t1 = i / num_diffusion_timesteps
t2 = (i + 1) / num_diffusion_timesteps
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
return torch.tensor(betas, dtype=torch.float32)
def rescale_zero_terminal_snr(betas):
"""Rescales betas to have zero terminal SNR."""
# Convert betas to alphas_bar_sqrt
alphas = 1.0 - betas
alphas_cumprod = torch.cumprod(alphas, dim=0)
alphas_bar_sqrt = alphas_cumprod.sqrt()
# Store old values.
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone()
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone()
# Shift so the last timestep is zero.
alphas_bar_sqrt -= alphas_bar_sqrt_T
# Scale so the first timestep is back to the old value.
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
# Convert alphas_bar_sqrt to betas
alphas_bar = alphas_bar_sqrt**2 # Revert sqrt
alphas = alphas_bar[1:] / alphas_bar[:-1] # Revert cumprod
alphas = torch.cat([alphas_bar[0:1], alphas])
betas = 1 - alphas
return betas
class DDPMScheduler(SchedulerMixin, ConfigMixin):
"""
`DDPMScheduler` explores the connections between denoising score matching and Langevin dynamics sampling.
This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
methods the library implements for all schedulers such as loading and saving.
Args:
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
beta_start (`float`, defaults to 0.0001):
The starting `beta` value of inference.
beta_end (`float`, defaults to 0.02):
The final `beta` value.
beta_schedule (`str`, defaults to `"linear"`):
The beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from
`linear`, `scaled_linear`, or `squaredcos_cap_v2`.
trained_betas (`np.ndarray`, *optional*):
An array of betas to pass directly to the constructor without using `beta_start` and `beta_end`.
variance_type (`str`, defaults to `"fixed_small"`):
Clip the variance when adding noise to the denoised sample. Choose from `fixed_small`, `fixed_small_log`,
`fixed_large`, `fixed_large_log`, `learned` or `learned_range`.
clip_sample (`bool`, defaults to `True`):
Clip the predicted sample for numerical stability.
clip_sample_range (`float`, defaults to 1.0):
The maximum magnitude for sample clipping. Valid only when `clip_sample=True`.
prediction_type (`str`, defaults to `epsilon`, *optional*):
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
Video](https://imagen.research.google/video/paper.pdf) paper).
thresholding (`bool`, defaults to `False`):
Whether to use the "dynamic thresholding" method. This is unsuitable for latent-space diffusion models such
as Stable Diffusion.
dynamic_thresholding_ratio (`float`, defaults to 0.995):
The ratio for the dynamic thresholding method. Valid only when `thresholding=True`.
sample_max_value (`float`, defaults to 1.0):
The threshold value for dynamic thresholding. Valid only when `thresholding=True`.
timestep_spacing (`str`, defaults to `"leading"`):
The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information.
steps_offset (`int`, defaults to 0):
An offset added to the inference steps, as required by some model families.
rescale_betas_zero_snr (`bool`, defaults to `False`):
Whether to rescale the betas to have zero terminal SNR. This enables the model to generate very bright and
dark samples instead of limiting it to samples with medium brightness. Loosely related to
[`--offset_noise`](https://github.com/huggingface/diffusers/blob/74fd735eb073eb1d774b1ab4154a0876eb82f055/examples/dreambooth/train_dreambooth.py#L506).
""" # noqa
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
order = 1
@register_to_config
def __init__(
self,
num_train_timesteps: int = 1000,
beta_start: float = 0.0001,
beta_end: float = 0.02,
beta_schedule: str = "linear",
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
variance_type: str = "fixed_small",
clip_sample: bool = True,
prediction_type: str = "epsilon",
thresholding: bool = False,
dynamic_thresholding_ratio: float = 0.995,
clip_sample_range: float = 1.0,
sample_max_value: float = 1.0,
timestep_spacing: str = "leading",
steps_offset: int = 0,
rescale_betas_zero_snr: int = False,
):
if trained_betas is not None:
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
elif beta_schedule == "linear":
self.betas = torch.linspace(
beta_start, beta_end, num_train_timesteps, dtype=torch.float32
)
elif beta_schedule == "scaled_linear":
a, b = beta_start**0.5, beta_end**0.5
self.betas = torch.linspace(a, b, num_train_timesteps, dtype=torch.float32) ** 2
elif beta_schedule == "squaredcos_cap_v2": # Glide cosine schedule
self.betas = betas_for_alpha_bar(num_train_timesteps)
elif beta_schedule == "sigmoid": # GeoDiff sigmoid schedule
betas = torch.linspace(-6, 6, num_train_timesteps)
self.betas = torch.sigmoid(betas) * (beta_end - beta_start) + beta_start
else:
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# Rescale for zero SNR
if rescale_betas_zero_snr:
self.betas = rescale_zero_terminal_snr(self.betas)
self.alphas = 1.0 - self.betas
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
self.one = torch.tensor(1.0)
self.init_noise_sigma = 1.0
self.custom_timesteps = False
self.num_inference_steps = None
self.timesteps = torch.from_numpy(np.arange(num_train_timesteps)[::-1].copy())
self.variance_type = variance_type
def scale_model_input(
self, sample: torch.Tensor, timestep: Optional[int] = None
) -> torch.Tensor:
"""Scale the denoising model input depending on the current timestep."""
return sample
def sample_timesteps(self, size, device=None):
return torch.randint(0, self.config.num_train_timesteps, size, device=device)
def set_timesteps(
self,
num_inference_steps: Optional[int] = None,
device: Union[str, torch.device] = None,
timesteps: Optional[List[int]] = None,
):
"""Sets the discrete timesteps used for the diffusion chain (to be run before inference)."""
if num_inference_steps is not None and timesteps is not None:
raise ValueError("Can only pass one of `num_inference_steps` or `custom_timesteps`.")
self.custom_timesteps = timesteps is not None
self.num_inference_steps = num_inference_steps
if timesteps is not None:
timesteps = np.array(timesteps, dtype=np.int64)
# See Table 2. of https://arxiv.org/abs/2305.08891
elif self.config.timestep_spacing == "linspace":
timesteps = np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps)
timesteps = timesteps.round()[::-1].copy().astype(np.int64)
elif self.config.timestep_spacing == "leading":
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
timesteps = np.arange(0, num_inference_steps) * step_ratio
timesteps = timesteps.round()[::-1].copy().astype(np.int64) + self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
timesteps = np.arange(self.config.num_train_timesteps, 0, -step_ratio)
timesteps = timesteps.round().astype(np.int64) - 1
else:
raise ValueError(f"{self.config.timestep_spacing} is not supported.")
self.timesteps = torch.as_tensor(timesteps, device=device)
def _get_variance(self, t, predicted_variance=None):
prev_t = self.previous_timestep(t)
alpha_prod_t = self.alphas_cumprod[t]
alpha_prod_t_prev = self.alphas_cumprod[prev_t] if prev_t >= 0 else self.one
current_beta_t = 1 - alpha_prod_t / alpha_prod_t_prev
# For t > 0, compute predicted variance βt (see formula (6) and (7) from https://arxiv.org/pdf/2006.11239.pdf) # noqa
# and sample from it to get previous sample
# x_{t-1} ~ N(pred_prev_sample, variance) == add variance to pred_sample
variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * current_beta_t
# we always take the log of variance, so clamp it to ensure it's not 0
variance = torch.clamp(variance, min=1e-20)
if self.config.variance_type == "fixed_small_log": # for rl-diffuser
return torch.exp(0.5 * variance.log())
elif self.config.variance_type == "fixed_large":
return current_beta_t
elif self.config.variance_type == "fixed_large_log": # Glide max_log
return torch.log(current_beta_t)
elif self.config.variance_type == "learned":
return predicted_variance
elif self.config.variance_type == "learned_range":
frac = (predicted_variance + 1) / 2
min_log, max_log = variance.log(), torch.log(current_beta_t)
return frac * max_log + (1 - frac) * min_log
return variance
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
generator=None,
return_dict: bool = True,
) -> Union[DDPMSchedulerOutput, Tuple]:
"""
Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion
process from the learned model outputs (most often the predicted noise).
Args:
model_output (`torch.Tensor`):
The direct output from learned diffusion model.
timestep (`float`):
The current discrete timestep in the diffusion chain.
sample (`torch.Tensor`):
A current instance of a sample created by the diffusion process.
generator (`torch.Generator`, *optional*):
A random number generator.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`~schedulers.scheduling_ddpm.DDPMSchedulerOutput`] or `tuple`.
Returns:
[`~schedulers.scheduling_ddpm.DDPMSchedulerOutput`] or `tuple`:
If return_dict is `True`, [`~schedulers.scheduling_ddpm.DDPMSchedulerOutput`] is returned, otherwise a
tuple is returned where the first element is the sample tensor.
""" # noqa
t = timestep
prev_t = self.previous_timestep(t)
predicted_variance = None
if self.variance_type in ("learned", "learned_range"):
if model_output.shape[1] == sample.shape[1] * 2:
model_output, predicted_variance = model_output.chunk(2, dim=1)
# 1. compute alphas, betas
alpha_prod_t = self.alphas_cumprod[t]
alpha_prod_t_prev = self.alphas_cumprod[prev_t] if prev_t >= 0 else self.one
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
current_alpha_t = alpha_prod_t / alpha_prod_t_prev
current_beta_t = 1 - current_alpha_t
# 2. compute predicted original sample from predicted noise also called
# "predicted x_0" of formula (15) from https://arxiv.org/pdf/2006.11239.pdf
if self.config.prediction_type == "epsilon":
pred_sample = (sample - beta_prod_t**0.5 * model_output) / alpha_prod_t**0.5
elif self.config.prediction_type == "sample":
pred_sample = model_output
elif self.config.prediction_type == "v_prediction":
pred_sample = alpha_prod_t**0.5 * sample - beta_prod_t**0.5 * model_output
else:
raise ValueError(f"Unsupported prediction type given as {self.config.prediction_type}.")
# 4. Compute coefficients for pred_sample x_0 and current sample x_t
# See formula (7) from https://arxiv.org/pdf/2006.11239.pdf
pred_sample_coeff = alpha_prod_t_prev**0.5 * current_beta_t / beta_prod_t
current_sample_coeff = current_alpha_t**0.5 * beta_prod_t_prev / beta_prod_t
# 5. Compute predicted previous sample µ_t
# See formula (7) from https://arxiv.org/pdf/2006.11239.pdf
prev_sample = pred_sample_coeff * pred_sample + current_sample_coeff * sample
# 6. Add noise
if t > 0:
device, dtype = model_output.device, model_output.dtype
noise = randn_tensor(sample.shape, generator=generator, device=device, dtype=dtype)
if self.variance_type == "fixed_small_log":
variance = self._get_variance(t, predicted_variance)
elif self.variance_type == "learned_range":
variance = self._get_variance(t, predicted_variance).mul(0.5).exp()
else:
variance = self._get_variance(t, predicted_variance) ** 0.5
prev_sample.add_(noise.mul_(variance))
if not return_dict:
return (prev_sample,)
return DDPMSchedulerOutput(prev_sample=prev_sample)
def previous_timestep(self, timestep):
if self.custom_timesteps:
index = (self.timesteps == timestep).nonzero(as_tuple=True)[0][0]
if index == self.timesteps.shape[0] - 1:
return torch.tensor(-1)
return self.timesteps[index + 1]
num_inference_steps = self.num_inference_steps or self.config.num_train_timesteps
return timestep - self.config.num_train_timesteps // num_inference_steps
def add_noise(
self, original_samples: torch.Tensor, noise: torch.Tensor, timesteps: torch.Tensor
) -> torch.Tensor:
timesteps = timesteps.to(device=original_samples.device)
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
expand_shape = timesteps.shape + (1,) * (noise.dim() - timesteps.dim())
sqrt_alpha_prod = sqrt_alpha_prod.view(expand_shape)
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.view(expand_shape)
return sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
def get_velocity(
self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.Tensor
) -> torch.Tensor:
timesteps = timesteps.to(sample.device)
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
expand_shape = timesteps.shape + (1,) * (noise.dim() - timesteps.dim())
sqrt_alpha_prod = sqrt_alpha_prod.view(expand_shape)
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.view(expand_shape)
return sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
def __len__(self):
return self.config.num_train_timesteps