| import torch |
| from torch import nn |
| from functools import partial |
| from utils.interpolation_modules.attention import SelfAttention, CrossAttention, TemporalAttention |
|
|
|
|
| def modulate(x, shift, scale): |
| return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1) |
|
|
|
|
| class Mlp(nn.Module): |
| def __init__(self, dim=768, mlp_ratio=4., act_layer=nn.GELU, proj_drop_rate=0.): |
| super().__init__() |
| hidden_dim = int(dim * mlp_ratio) |
| self.fc1 = nn.Linear(dim, hidden_dim) |
| self.act = act_layer() |
| self.fc2 = nn.Linear(hidden_dim, dim) |
| self.drop = nn.Dropout(proj_drop_rate) |
|
|
| def forward(self, x): |
| x = self.drop(self.act(self.fc1(x))) |
| x = self.drop(self.fc2(x)) |
| return x |
|
|
|
|
| class VAEBlock(nn.Module): |
| def __init__(self, dim=768, num_heads=12, mlp_ratio=4., qkv_bias=False, attn_drop_rate=0., proj_drop_rate=0., act_layer=nn.GELU, |
| norm_layer=partial(nn.LayerNorm, eps=1e-5), use_xformers=True, is_encoder=True, add_attn_encoder=False, |
| add_attn_decoder=False, add_attn_type="temporal_attn"): |
| super().__init__() |
| self.is_encoder = is_encoder |
| self.add_attn_encoder = add_attn_encoder |
| self.add_attn_decoder = add_attn_decoder |
| self.add_attn_type = add_attn_type |
| self.norm_self_attn = norm_layer(dim) |
| self.self_attn = SelfAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers) |
| if (add_attn_encoder and is_encoder) or (add_attn_decoder and not is_encoder): |
| if add_attn_type == "cross_attn": |
| self.norm_cross_attn = norm_layer(dim) |
| self.cross_attn = CrossAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers) |
| elif add_attn_type == "temporal_attn": |
| self.norm_temp_attn = norm_layer(dim) |
| self.temp_attn = TemporalAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers) |
| else: |
| raise f"No implementation of {add_attn_type}" |
| self.norm_mlp = norm_layer(dim) |
| self.mlp = Mlp(dim, mlp_ratio, act_layer, proj_drop_rate) |
|
|
| def forward(self, query_tokens, mid_tokens, cond_tokens, ph=None, pw=None): |
| b, n, d = mid_tokens.shape |
| x = torch.cat((mid_tokens, query_tokens), dim=1) |
| x = x + self.self_attn(self.norm_self_attn(x)) |
| if self.is_encoder: |
| y = x[:, n:, :] |
| else: |
| y = x[:, :n, :] |
| if (self.add_attn_encoder and self.is_encoder) or (self.add_attn_decoder and not self.is_encoder): |
| if self.add_attn_type == "cross_attn": |
| y = y + self.cross_attn(self.norm_cross_attn(y), y=cond_tokens) |
| elif self.add_attn_type == "temporal_attn": |
| tokens0, tokens1 = cond_tokens.chunk(2, dim=1) |
| y = y + self.temp_attn(self.norm_temp_attn(y), x0=tokens0, x1=tokens1, ph=ph, pw=pw) |
| y = y + self.mlp(self.norm_mlp(y)) |
| return y |
|
|
|
|
| class DiTBlock(nn.Module): |
| def __init__(self, dim=768, num_heads=12, mlp_ratio=4., qkv_bias=False, attn_drop_rate=0., proj_drop_rate=0., act_layer=nn.GELU, |
| norm_layer=partial(nn.LayerNorm, eps=1e-5), use_xformers=True): |
| super().__init__() |
| self.norm_self_attn = norm_layer(dim) |
| self.self_attn = SelfAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers) |
| self.norm_temp_attn = norm_layer(dim) |
| self.temp_attn = TemporalAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers) |
| self.norm_mlp = norm_layer(dim) |
| self.mlp = Mlp(dim, mlp_ratio, act_layer, proj_drop_rate) |
|
|
| def forward(self, query_tokens, tokens0, tokens1, ph, pw, modulations): |
| shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = modulations.chunk(6, dim=1) |
| query_tokens = query_tokens + gate_msa.unsqueeze(1) * self.self_attn(modulate(self.norm_self_attn(query_tokens), |
| shift_msa, scale_msa)) |
| query_tokens = query_tokens + self.temp_attn(self.norm_temp_attn(query_tokens), x0=tokens0, x1=tokens1, |
| ph=ph, pw=pw) |
| query_tokens = query_tokens + gate_mlp.unsqueeze(1) * self.mlp(modulate(self.norm_mlp(query_tokens), |
| shift_mlp, scale_mlp)) |
| return query_tokens |
|
|