vidfom's picture
Upload folder using huggingface_hub
e00eceb verified
import numbers
from typing import Dict, Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange
class RMSNorm(nn.Module):
def __init__(self, dim, eps: float, elementwise_affine: bool = True):
super().__init__()
self.eps = eps
if isinstance(dim, numbers.Integral):
dim = (dim,)
self.dim = torch.Size(dim)
if elementwise_affine:
self.weight = nn.Parameter(torch.ones(dim))
else:
self.weight = None
def forward(self, hidden_states):
input_dtype = hidden_states.dtype
variance = hidden_states.to(torch.float32).pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.eps)
if self.weight is not None:
# convert into half-precision if necessary
if self.weight.dtype in [torch.float16, torch.bfloat16]:
hidden_states = hidden_states.to(self.weight.dtype)
hidden_states = hidden_states * self.weight
else:
hidden_states = hidden_states.to(input_dtype)
return hidden_states
class IPAFluxAttnProcessor2_0(nn.Module):
"""Attention processor used typically in processing the SD3-like self-attention projections."""
def __init__(self, hidden_size, cross_attention_dim=None, scale=1.0, num_tokens=4, timestep_range=None):
super().__init__()
self.hidden_size = hidden_size # 3072
self.cross_attention_dim = cross_attention_dim # 4096
self.scale = scale
self.num_tokens = num_tokens
self.to_k_ip = nn.Linear(cross_attention_dim or hidden_size, hidden_size, bias=False)
self.to_v_ip = nn.Linear(cross_attention_dim or hidden_size, hidden_size, bias=False)
self.norm_added_k = RMSNorm(128, eps=1e-5, elementwise_affine=False)
self.norm_added_v = RMSNorm(128, eps=1e-5, elementwise_affine=False)
self.timestep_range = timestep_range
def __call__(
self,
num_heads,
query,
image_emb: torch.FloatTensor,
t: torch.FloatTensor
) -> torch.FloatTensor:
# only apply IPA if timestep is within range
if self.timestep_range is not None:
if t[0] > self.timestep_range[0] or t[0] < self.timestep_range[1]:
return None
# `ip-adapter` projections
ip_hidden_states = image_emb
ip_hidden_states_key_proj = self.to_k_ip(ip_hidden_states)
ip_hidden_states_value_proj = self.to_v_ip(ip_hidden_states)
ip_hidden_states_key_proj = rearrange(ip_hidden_states_key_proj, 'B L (H D) -> B H L D', H=num_heads)
ip_hidden_states_value_proj = rearrange(ip_hidden_states_value_proj, 'B L (H D) -> B H L D', H=num_heads)
ip_hidden_states_key_proj = self.norm_added_k(ip_hidden_states_key_proj)
ip_hidden_states_value_proj = self.norm_added_v(ip_hidden_states_value_proj)
ip_hidden_states = F.scaled_dot_product_attention(query.to(image_emb.device).to(image_emb.dtype),
ip_hidden_states_key_proj,
ip_hidden_states_value_proj,
dropout_p=0.0, is_causal=False)
ip_hidden_states = rearrange(ip_hidden_states, "B H L D -> B L (H D)", H=num_heads)
ip_hidden_states = ip_hidden_states.to(query.dtype).to(query.device)
return self.scale * ip_hidden_states