| import numbers |
| from typing import Dict, Optional, Tuple |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from einops import rearrange |
|
|
| class RMSNorm(nn.Module): |
| def __init__(self, dim, eps: float, elementwise_affine: bool = True): |
| super().__init__() |
|
|
| self.eps = eps |
|
|
| if isinstance(dim, numbers.Integral): |
| dim = (dim,) |
|
|
| self.dim = torch.Size(dim) |
|
|
| if elementwise_affine: |
| self.weight = nn.Parameter(torch.ones(dim)) |
| else: |
| self.weight = None |
|
|
| def forward(self, hidden_states): |
| input_dtype = hidden_states.dtype |
| variance = hidden_states.to(torch.float32).pow(2).mean(-1, keepdim=True) |
| hidden_states = hidden_states * torch.rsqrt(variance + self.eps) |
|
|
| if self.weight is not None: |
| |
| if self.weight.dtype in [torch.float16, torch.bfloat16]: |
| hidden_states = hidden_states.to(self.weight.dtype) |
| hidden_states = hidden_states * self.weight |
| else: |
| hidden_states = hidden_states.to(input_dtype) |
|
|
| return hidden_states |
|
|
| class IPAFluxAttnProcessor2_0(nn.Module): |
| """Attention processor used typically in processing the SD3-like self-attention projections.""" |
|
|
| def __init__(self, hidden_size, cross_attention_dim=None, scale=1.0, num_tokens=4, timestep_range=None): |
| super().__init__() |
|
|
| self.hidden_size = hidden_size |
| self.cross_attention_dim = cross_attention_dim |
| self.scale = scale |
| self.num_tokens = num_tokens |
|
|
| self.to_k_ip = nn.Linear(cross_attention_dim or hidden_size, hidden_size, bias=False) |
| self.to_v_ip = nn.Linear(cross_attention_dim or hidden_size, hidden_size, bias=False) |
|
|
| self.norm_added_k = RMSNorm(128, eps=1e-5, elementwise_affine=False) |
| self.norm_added_v = RMSNorm(128, eps=1e-5, elementwise_affine=False) |
| self.timestep_range = timestep_range |
|
|
| def __call__( |
| self, |
| num_heads, |
| query, |
| image_emb: torch.FloatTensor, |
| t: torch.FloatTensor |
| ) -> torch.FloatTensor: |
| |
| if self.timestep_range is not None: |
| if t[0] > self.timestep_range[0] or t[0] < self.timestep_range[1]: |
| return None |
| |
| ip_hidden_states = image_emb |
| ip_hidden_states_key_proj = self.to_k_ip(ip_hidden_states) |
| ip_hidden_states_value_proj = self.to_v_ip(ip_hidden_states) |
|
|
| ip_hidden_states_key_proj = rearrange(ip_hidden_states_key_proj, 'B L (H D) -> B H L D', H=num_heads) |
| ip_hidden_states_value_proj = rearrange(ip_hidden_states_value_proj, 'B L (H D) -> B H L D', H=num_heads) |
|
|
| ip_hidden_states_key_proj = self.norm_added_k(ip_hidden_states_key_proj) |
| ip_hidden_states_value_proj = self.norm_added_v(ip_hidden_states_value_proj) |
|
|
| ip_hidden_states = F.scaled_dot_product_attention(query.to(image_emb.device).to(image_emb.dtype), |
| ip_hidden_states_key_proj, |
| ip_hidden_states_value_proj, |
| dropout_p=0.0, is_causal=False) |
|
|
| ip_hidden_states = rearrange(ip_hidden_states, "B H L D -> B L (H D)", H=num_heads) |
| ip_hidden_states = ip_hidden_states.to(query.dtype).to(query.device) |
|
|
| return self.scale * ip_hidden_states |