| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | import math |
| | from dataclasses import dataclass |
| | from typing import List, Optional, Tuple, Union |
| |
|
| | import torch |
| |
|
| | from diffusers.configuration_utils import ConfigMixin, register_to_config |
| | from diffusers.utils import BaseOutput |
| | from diffusers.schedulers.scheduling_utils import SchedulerMixin |
| | import torch.nn.functional as F |
| |
|
| | def gumbel_noise(t, generator=None): |
| | device = generator.device if generator is not None else t.device |
| | noise = torch.zeros_like(t, device=device).uniform_(0, 1, generator=generator).to(t.device) |
| | return -torch.log((-torch.log(noise.clamp(1e-20))).clamp(1e-20)) |
| |
|
| |
|
| | def mask_by_random_topk(mask_len, probs, temperature=1.0, generator=None): |
| | confidence = torch.log(probs.clamp(1e-20)) + temperature * gumbel_noise(probs, generator=generator) |
| | sorted_confidence = torch.sort(confidence, dim=-1).values |
| | cut_off = torch.gather(sorted_confidence, 1, mask_len.long()) |
| | masking = confidence < cut_off |
| | return masking |
| |
|
| |
|
| | @dataclass |
| | class SchedulerOutput(BaseOutput): |
| | """ |
| | Output class for the scheduler's `step` function output. |
| | |
| | Args: |
| | prev_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images): |
| | Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the |
| | denoising loop. |
| | pred_original_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images): |
| | The predicted denoised sample `(x_{0})` based on the model output from the current timestep. |
| | `pred_original_sample` can be used to preview progress or for guidance. |
| | """ |
| |
|
| | prev_sample: torch.Tensor |
| | pred_original_sample: torch.Tensor = None |
| |
|
| |
|
| | class Scheduler(SchedulerMixin, ConfigMixin): |
| | order = 1 |
| |
|
| | temperatures: torch.Tensor |
| |
|
| | @register_to_config |
| | def __init__( |
| | self, |
| | mask_token_id: int, |
| | masking_schedule: str = "cosine", |
| | ): |
| | self.temperatures = None |
| | self.timesteps = None |
| |
|
| | def set_timesteps( |
| | self, |
| | num_inference_steps: int, |
| | temperature: Union[int, Tuple[int, int], List[int]] = (2, 0), |
| | device: Union[str, torch.device] = None, |
| | ): |
| | self.timesteps = torch.arange(num_inference_steps, device=device).flip(0) |
| |
|
| | if isinstance(temperature, (tuple, list)): |
| | self.temperatures = torch.linspace(temperature[0], temperature[1], num_inference_steps, device=device) |
| | else: |
| | self.temperatures = torch.linspace(temperature, 0.01, num_inference_steps, device=device) |
| |
|
| |
|
| | |
| | def top_k_top_p_filtering( |
| | self, |
| | logits, |
| | top_k: int = 0, |
| | top_p: float = 1.0, |
| | filter_value: float = -float("Inf"), |
| | min_tokens_to_keep: int = 1, |
| | ): |
| | """Filter a distribution of logits using top-k and/or nucleus (top-p) filtering |
| | Args: |
| | logits: logits distribution shape (batch size, vocabulary size) |
| | if top_k > 0: keep only top k tokens with highest probability (top-k filtering). |
| | if top_p < 1.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). |
| | Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751) |
| | Make sure we keep at least min_tokens_to_keep per batch example in the output |
| | From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317 |
| | """ |
| | if top_k > 0: |
| | top_k = min(max(top_k, min_tokens_to_keep), logits.size(-1)) |
| | indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] |
| | logits[indices_to_remove] = filter_value |
| |
|
| | if top_p < 1.0: |
| | sorted_logits, sorted_indices = torch.sort(logits, descending=True) |
| | cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) |
| |
|
| | |
| | sorted_indices_to_remove = cumulative_probs > top_p |
| | if min_tokens_to_keep > 1: |
| | sorted_indices_to_remove[..., :min_tokens_to_keep] = 0 |
| | sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() |
| | sorted_indices_to_remove[..., 0] = 0 |
| |
|
| | indices_to_remove = torch.zeros_like(logits, dtype=torch.bool).scatter_(-1, sorted_indices, sorted_indices_to_remove) |
| | logits[indices_to_remove] = filter_value |
| |
|
| | return logits |
| |
|
| |
|
| | def step( |
| | self, |
| | model_output: torch.Tensor, |
| | timestep: torch.long, |
| | sample: torch.LongTensor, |
| | starting_mask_ratio: int = 1, |
| | generator: Optional[torch.Generator] = None, |
| | return_dict: bool = True, |
| | using_topk_topp: Optional[bool] = False, |
| | sampling_temperature: Optional[float] = 1.0, |
| | ) -> Union[SchedulerOutput, Tuple]: |
| | two_dim_input = sample.ndim == 3 and model_output.ndim == 4 |
| |
|
| | if two_dim_input: |
| | batch_size, codebook_size, height, width = model_output.shape |
| | sample = sample.reshape(batch_size, height * width) |
| | model_output = model_output.reshape(batch_size, codebook_size, height * width).permute(0, 2, 1) |
| |
|
| | unknown_map = sample == self.config.mask_token_id |
| |
|
| | if using_topk_topp: |
| | model_output = model_output / max(sampling_temperature, 1e-5) |
| |
|
| | if using_topk_topp: |
| | top_k=8192 |
| | top_p=0.2 |
| | if top_k > 0 or top_p < 1.0: |
| | model_output = self.top_k_top_p_filtering(model_output, top_k=top_k, top_p=top_p) |
| |
|
| | probs = model_output.softmax(dim=-1) |
| |
|
| | device = probs.device |
| | probs_ = probs.to(generator.device) if generator is not None else probs |
| | if probs_.device.type == "cpu" and probs_.dtype != torch.float32: |
| | probs_ = probs_.float() |
| | probs_ = probs_.reshape(-1, probs.size(-1)) |
| | pred_original_sample = torch.multinomial(probs_, 1, generator=generator).to(device=device) |
| | pred_original_sample = pred_original_sample[:, 0].view(*probs.shape[:-1]) |
| | pred_original_sample = torch.where(unknown_map, pred_original_sample, sample) |
| |
|
| | if timestep == 0: |
| | prev_sample = pred_original_sample |
| | else: |
| | seq_len = sample.shape[1] |
| | step_idx = (self.timesteps == timestep).nonzero() |
| | ratio = (step_idx + 1) / len(self.timesteps) |
| |
|
| | if self.config.masking_schedule == "cosine": |
| | mask_ratio = torch.cos(ratio * math.pi / 2) |
| | elif self.config.masking_schedule == "linear": |
| | mask_ratio = 1 - ratio |
| | else: |
| | raise ValueError(f"unknown masking schedule {self.config.masking_schedule}") |
| |
|
| | mask_ratio = starting_mask_ratio * mask_ratio |
| |
|
| | mask_len = (seq_len * mask_ratio).floor() |
| | |
| | mask_len = torch.min(unknown_map.sum(dim=-1, keepdim=True) - 1, mask_len) |
| | |
| | mask_len = torch.max(torch.tensor([1], device=model_output.device), mask_len) |
| |
|
| | selected_probs = torch.gather(probs, -1, pred_original_sample[:, :, None])[:, :, 0] |
| | |
| | selected_probs = torch.where(unknown_map, selected_probs, torch.finfo(selected_probs.dtype).max) |
| |
|
| | masking = mask_by_random_topk(mask_len, selected_probs, self.temperatures[step_idx], generator) |
| |
|
| | |
| | prev_sample = torch.where(masking, self.config.mask_token_id, pred_original_sample) |
| |
|
| | if two_dim_input: |
| | prev_sample = prev_sample.reshape(batch_size, height, width) |
| | pred_original_sample = pred_original_sample.reshape(batch_size, height, width) |
| |
|
| | if not return_dict: |
| | return (prev_sample, pred_original_sample) |
| |
|
| | return SchedulerOutput(prev_sample, pred_original_sample) |
| |
|
| | def add_noise(self, sample, timesteps, generator=None): |
| | step_idx = (self.timesteps == timesteps).nonzero() |
| | ratio = (step_idx + 1) / len(self.timesteps) |
| |
|
| | if self.config.masking_schedule == "cosine": |
| | mask_ratio = torch.cos(ratio * math.pi / 2) |
| | elif self.config.masking_schedule == "linear": |
| | mask_ratio = 1 - ratio |
| | else: |
| | raise ValueError(f"unknown masking schedule {self.config.masking_schedule}") |
| |
|
| | mask_indices = ( |
| | torch.rand( |
| | sample.shape, device=generator.device if generator is not None else sample.device, generator=generator |
| | ).to(sample.device) |
| | < mask_ratio |
| | ) |
| |
|
| | masked_sample = sample.clone() |
| |
|
| | masked_sample[mask_indices] = self.config.mask_token_id |
| |
|
| | return masked_sample |
| |
|