| import torch |
| import torch.nn.functional as F |
|
|
|
|
| class MomentumBuffer: |
|
|
| def __init__(self, momentum: float = -0.75): |
| self.momentum = momentum |
| self.running_average = 0 |
|
|
| def update(self, update_value: torch.Tensor): |
| new_average = self.momentum * self.running_average |
| self.running_average = update_value + new_average |
|
|
|
|
| def project( |
| v0: torch.Tensor, |
| v1: torch.Tensor, |
| dims=[-1], |
| ): |
| dtype = v0.dtype |
| device_type = v0.device.type |
| if device_type == "mps": |
| v0, v1 = v0.cpu(), v1.cpu() |
|
|
| v0, v1 = v0.double(), v1.double() |
| v1 = torch.nn.functional.normalize(v1, dim=dims) |
| v0_parallel = (v0 * v1).sum(dim=dims, keepdim=True) * v1 |
| v0_orthogonal = v0 - v0_parallel |
| return v0_parallel.to(dtype).to(device_type), v0_orthogonal.to(dtype).to(device_type) |
|
|
|
|
| def apg_forward( |
| pred_cond: torch.Tensor, |
| pred_uncond: torch.Tensor, |
| guidance_scale: float, |
| momentum_buffer: MomentumBuffer = None, |
| eta: float = 0.0, |
| norm_threshold: float = 2.5, |
| dims=[-1], |
| ): |
| diff = pred_cond - pred_uncond |
| if momentum_buffer is not None: |
| momentum_buffer.update(diff) |
| diff = momentum_buffer.running_average |
|
|
| if norm_threshold > 0: |
| ones = torch.ones_like(diff) |
| diff_norm = diff.norm(p=2, dim=dims, keepdim=True) |
| scale_factor = torch.minimum(ones, norm_threshold / diff_norm) |
| diff = diff * scale_factor |
|
|
| diff_parallel, diff_orthogonal = project(diff, pred_cond, dims) |
| normalized_update = diff_orthogonal + eta * diff_parallel |
| pred_guided = pred_cond + (guidance_scale - 1) * normalized_update |
| return pred_guided |
|
|
|
|
| def cfg_forward(cond_output, uncond_output, cfg_strength): |
| return uncond_output + cfg_strength * (cond_output - uncond_output) |
|
|
|
|
| def call_cos_tensor(tensor1, tensor2): |
| """ |
| Calculate cosine similarity between two normalized tensors. |
| |
| Args: |
| tensor1: First tensor [B, ...] |
| tensor2: Second tensor [B, ...] |
| |
| Returns: |
| Cosine similarity value [B, 1] |
| """ |
| tensor1 = tensor1 / torch.linalg.norm(tensor1, dim=1, keepdim=True) |
| tensor2 = tensor2 / torch.linalg.norm(tensor2, dim=1, keepdim=True) |
| cosvalue = torch.sum(tensor1 * tensor2, dim=1, keepdim=True) |
| return cosvalue |
|
|
|
|
| def compute_perpendicular_component(latent_diff, latent_hat_uncond): |
| """ |
| Decompose latent_diff into parallel and perpendicular components relative to latent_hat_uncond. |
| |
| Args: |
| latent_diff: Difference tensor [B, C, ...] |
| latent_hat_uncond: Unconditional prediction tensor [B, C, ...] |
| |
| Returns: |
| projection: Parallel component |
| perpendicular_component: Perpendicular component |
| """ |
| n, t, c = latent_diff.shape |
| latent_diff = latent_diff.view(n * t, c).float() |
| latent_hat_uncond = latent_hat_uncond.view(n * t, c).float() |
|
|
| if latent_diff.size() != latent_hat_uncond.size(): |
| raise ValueError("latent_diff and latent_hat_uncond must have the same shape [n, d].") |
|
|
| dot_product = torch.sum(latent_diff * latent_hat_uncond, dim=1, keepdim=True) |
| norm_square = torch.sum(latent_hat_uncond * latent_hat_uncond, dim=1, keepdim=True) |
| projection = (dot_product / (norm_square + 1e-8)) * latent_hat_uncond |
| perpendicular_component = latent_diff - projection |
|
|
| return projection.view(n, t, c), perpendicular_component.reshape(n, t, c) |
|
|
|
|
| def adg_forward( |
| latents: torch.Tensor, |
| noise_pred_cond: torch.Tensor, |
| noise_pred_uncond: torch.Tensor, |
| sigma: torch.Tensor, |
| guidance_scale: float, |
| angle_clip: float = 3.14 / 6, |
| apply_norm: bool = False, |
| apply_clip: bool = True, |
| ): |
| """ |
| ADG (Angle-based Dynamic Guidance) forward pass for Flow Matching. |
| |
| In flow matching (including SD3), sigma represents the current timestep t_curr. |
| The predictions are velocity fields v(x_t, t). |
| |
| Args: |
| latents: Current state x_t [N, T, d] where d=64 |
| noise_pred_cond: Conditional velocity prediction v_cond [N, T, d] |
| noise_pred_uncond: Unconditional velocity prediction v_uncond [N, T, d] |
| sigma: Current timestep t_curr (not t_prev!) |
| guidance_scale: Guidance strength |
| angle_clip: Maximum angle for clipping (default: pi/6) |
| apply_norm: Whether to normalize the result (ADG_w_norm variant) |
| apply_clip: Whether to clip the angle (ADG_wo_clip when False) |
| |
| Returns: |
| Guided velocity prediction [N, T, d] |
| """ |
| |
| n = noise_pred_cond.shape[0] |
| noise_pred_text = noise_pred_cond |
| n, t, c = noise_pred_text.shape |
|
|
| |
| if isinstance(sigma, (int, float)): |
| sigma = torch.tensor(sigma, device=latents.device, dtype=latents.dtype) |
| sigma = sigma.view(1, 1, 1).expand(n, 1, 1) |
| elif torch.is_tensor(sigma): |
| if sigma.numel() == 1: |
| sigma = sigma.view(1, 1, 1).expand(n, 1, 1) |
| elif sigma.numel() == n: |
| sigma = sigma.view(n, 1, 1) |
| else: |
| raise ValueError(f"sigma has incompatible shape. Expected scalar or size {n}, got {sigma.shape}") |
| else: |
| raise TypeError(f"sigma must be a number or tensor, got {type(sigma)}") |
|
|
| |
| weight = guidance_scale - 1 |
| weight = weight * (weight > 0) + 1e-3 |
|
|
| latent_hat_text = latents - sigma * noise_pred_text |
| latent_hat_uncond = latents - sigma * noise_pred_uncond |
| latent_diff = latent_hat_text - latent_hat_uncond |
|
|
| |
| latent_theta = torch.acos( |
| call_cos_tensor(latent_hat_text.view(-1, c).to(float), |
| latent_hat_uncond.reshape(-1, c).contiguous().to(float))) |
| latent_theta_new = torch.clip(weight * latent_theta, -angle_clip, angle_clip) if apply_clip else weight * latent_theta |
| proj, perp = compute_perpendicular_component(latent_diff, latent_hat_uncond) |
| latent_v_new = torch.cos(latent_theta_new) * latent_hat_text |
|
|
| latent_p_new = perp * torch.sin(latent_theta_new) / torch.sin(latent_theta) * ( |
| torch.sin(latent_theta) > 1e-3) + perp * weight * (torch.sin(latent_theta) <= 1e-3) |
| latent_new = latent_v_new + latent_p_new |
| if apply_norm: |
| latent_new = latent_new * torch.linalg.norm(latent_hat_text, dim=1, keepdim=True) / torch.linalg.norm( |
| latent_new, dim=1, keepdim=True) |
|
|
| noise_pred = (latents - latent_new) / sigma |
| noise_pred = noise_pred.reshape(n, t, c).to(latents.dtype) |
| return noise_pred |
|
|
|
|
| def adg_w_norm_forward( |
| latents: torch.Tensor, |
| noise_pred_cond: torch.Tensor, |
| noise_pred_uncond: torch.Tensor, |
| sigma: float, |
| guidance_scale: float, |
| angle_clip: float = 3.14 / 3, |
| ): |
| """ |
| ADG with normalization - preserves the magnitude of latent predictions. |
| |
| This variant normalizes the final latent to maintain the same norm as the |
| conditional prediction, which can help preserve image quality. |
| """ |
| return adg_forward(latents, |
| noise_pred_cond, |
| noise_pred_uncond, |
| sigma, |
| guidance_scale, |
| angle_clip=angle_clip, |
| apply_norm=True, |
| apply_clip=True) |
|
|
|
|
| def adg_wo_clip_forward( |
| latents: torch.Tensor, |
| noise_pred_cond: torch.Tensor, |
| noise_pred_uncond: torch.Tensor, |
| sigma: float, |
| guidance_scale: float, |
| ): |
| """ |
| ADG without angle clipping - allows unbounded angle adjustments. |
| |
| This variant doesn't clip the angle, which may result in more aggressive |
| guidance but could be less stable. |
| """ |
| return adg_forward(latents, noise_pred_cond, noise_pred_uncond, sigma, guidance_scale, apply_norm=False, apply_clip=False) |
|
|