File size: 4,535 Bytes
81f9834
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import torch
from torch import nn
from functools import partial
from utils.interpolation_modules.attention import SelfAttention, CrossAttention, TemporalAttention


def modulate(x, shift, scale):
    return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)


class Mlp(nn.Module):
    def __init__(self, dim=768, mlp_ratio=4., act_layer=nn.GELU, proj_drop_rate=0.):
        super().__init__()
        hidden_dim = int(dim * mlp_ratio)
        self.fc1 = nn.Linear(dim, hidden_dim)
        self.act = act_layer()
        self.fc2 = nn.Linear(hidden_dim, dim)
        self.drop = nn.Dropout(proj_drop_rate)

    def forward(self, x):
        x = self.drop(self.act(self.fc1(x)))
        x = self.drop(self.fc2(x))
        return x


class VAEBlock(nn.Module):
    def __init__(self, dim=768, num_heads=12, mlp_ratio=4., qkv_bias=False, attn_drop_rate=0., proj_drop_rate=0., act_layer=nn.GELU,
                 norm_layer=partial(nn.LayerNorm, eps=1e-5), use_xformers=True, is_encoder=True, add_attn_encoder=False,
                 add_attn_decoder=False, add_attn_type="temporal_attn"):
        super().__init__()
        self.is_encoder = is_encoder
        self.add_attn_encoder = add_attn_encoder
        self.add_attn_decoder = add_attn_decoder
        self.add_attn_type = add_attn_type
        self.norm_self_attn = norm_layer(dim)
        self.self_attn = SelfAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers)
        if (add_attn_encoder and is_encoder) or (add_attn_decoder and not is_encoder):
            if add_attn_type == "cross_attn":
                self.norm_cross_attn = norm_layer(dim)
                self.cross_attn = CrossAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers)
            elif add_attn_type == "temporal_attn":
                self.norm_temp_attn = norm_layer(dim)
                self.temp_attn = TemporalAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers)
            else:
                raise f"No implementation of {add_attn_type}"
        self.norm_mlp = norm_layer(dim)
        self.mlp = Mlp(dim, mlp_ratio, act_layer, proj_drop_rate)

    def forward(self, query_tokens, mid_tokens, cond_tokens, ph=None, pw=None):
        b, n, d = mid_tokens.shape
        x = torch.cat((mid_tokens, query_tokens), dim=1)
        x = x + self.self_attn(self.norm_self_attn(x))
        if self.is_encoder:
            y = x[:, n:, :]
        else:
            y = x[:, :n, :]
        if (self.add_attn_encoder and self.is_encoder) or (self.add_attn_decoder and not self.is_encoder):
            if self.add_attn_type == "cross_attn":
                y = y + self.cross_attn(self.norm_cross_attn(y), y=cond_tokens)
            elif self.add_attn_type == "temporal_attn":
                tokens0, tokens1 = cond_tokens.chunk(2, dim=1)
                y = y + self.temp_attn(self.norm_temp_attn(y), x0=tokens0, x1=tokens1, ph=ph, pw=pw)
        y = y + self.mlp(self.norm_mlp(y))
        return y


class DiTBlock(nn.Module):
    def __init__(self, dim=768, num_heads=12, mlp_ratio=4., qkv_bias=False, attn_drop_rate=0., proj_drop_rate=0., act_layer=nn.GELU,
                 norm_layer=partial(nn.LayerNorm, eps=1e-5), use_xformers=True):
        super().__init__()
        self.norm_self_attn = norm_layer(dim)
        self.self_attn = SelfAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers)
        self.norm_temp_attn = norm_layer(dim)
        self.temp_attn = TemporalAttention(dim, num_heads, qkv_bias, attn_drop_rate, proj_drop_rate, use_xformers)
        self.norm_mlp = norm_layer(dim)
        self.mlp = Mlp(dim, mlp_ratio, act_layer, proj_drop_rate)

    def forward(self, query_tokens, tokens0, tokens1, ph, pw, modulations):
        shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = modulations.chunk(6, dim=1)
        query_tokens = query_tokens + gate_msa.unsqueeze(1) * self.self_attn(modulate(self.norm_self_attn(query_tokens),
                                                                                      shift_msa, scale_msa))
        query_tokens = query_tokens + self.temp_attn(self.norm_temp_attn(query_tokens), x0=tokens0, x1=tokens1,
                                                     ph=ph, pw=pw)
        query_tokens = query_tokens + gate_mlp.unsqueeze(1) * self.mlp(modulate(self.norm_mlp(query_tokens),
                                                                                shift_mlp, scale_mlp))
        return query_tokens