import torch import math class LatentDiffusionScheduler: def __init__(self, num_train_timesteps=1000, beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear"): self.num_train_timesteps = num_train_timesteps if beta_schedule == "scaled_linear": # standard SD schedule self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2 else: self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32) self.alphas = 1.0 - self.betas self.alphas_cumprod = torch.cumprod(self.alphas, dim=0) def add_noise(self, original_samples: torch.Tensor, noise: torch.Tensor, timesteps: torch.Tensor) -> torch.Tensor: alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device, dtype=original_samples.dtype) timesteps = timesteps.to(original_samples.device) sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5 sqrt_alpha_prod = sqrt_alpha_prod.view(-1, 1, 1, 1) sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5 sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.view(-1, 1, 1, 1) noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise return noisy_samples def step( self, model_output: torch.Tensor, timestep: int, sample: torch.Tensor, prev_timestep: int, clip_sample: bool = True, clip_range: float = 2.0, ) -> torch.Tensor: device = sample.device dtype = sample.dtype alphas_cumprod = self.alphas_cumprod.to(device=device, dtype=dtype) alpha_prod_t = alphas_cumprod[timestep] alpha_prod_t_prev = alphas_cumprod[prev_timestep] if prev_timestep >= 0 else torch.tensor(1.0, device=device, dtype=dtype) beta_prod_t = 1 - alpha_prod_t beta_prod_t_prev = 1 - alpha_prod_t_prev # DDIM step for epsilon prediction # 1. Predict clean latent (x0) pred_original_sample = (sample - beta_prod_t ** 0.5 * model_output) / alpha_prod_t ** 0.5 # 2. Optional clamp. Training supervises wider SDXL VAE latents, so # generation can disable this to avoid washing out valid latent detail. if clip_sample: pred_original_sample = pred_original_sample.clamp(-clip_range, clip_range) # 3. Compute direction pointing to xt pred_prev_sample = alpha_prod_t_prev ** 0.5 * pred_original_sample + beta_prod_t_prev ** 0.5 * model_output return pred_prev_sample def set_timesteps(self, num_inference_steps: int, device: torch.device): # Linear spacing of timesteps timesteps = torch.linspace(self.num_train_timesteps - 1, 0, num_inference_steps).round().long() return timesteps.to(device)