|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import math |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
import torch.amp as amp |
|
|
|
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
class FeedForwardSwiGLU(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
dim: int, |
|
|
hidden_dim: int, |
|
|
multiple_of: int = 256, |
|
|
ffn_dim_multiplier: Optional[float] = None, |
|
|
): |
|
|
super().__init__() |
|
|
hidden_dim = int(2 * hidden_dim / 3) |
|
|
|
|
|
if ffn_dim_multiplier is not None: |
|
|
hidden_dim = int(ffn_dim_multiplier * hidden_dim) |
|
|
hidden_dim = multiple_of * ((hidden_dim + multiple_of - 1) // multiple_of) |
|
|
|
|
|
self.dim = dim |
|
|
self.hidden_dim = hidden_dim |
|
|
self.ffn_mult = self.hidden_dim / float(self.dim) |
|
|
self.w1 = nn.Linear(dim, hidden_dim, bias=False) |
|
|
self.w2 = nn.Linear(hidden_dim, dim, bias=False) |
|
|
self.w3 = nn.Linear(dim, hidden_dim, bias=False) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.w2(F.silu(self.w1(x)) * self.w3(x)) |
|
|
|
|
|
|
|
|
class RMSNorm_FP32(torch.nn.Module): |
|
|
def __init__(self, dim: int, eps: float): |
|
|
super().__init__() |
|
|
self.eps = eps |
|
|
self.weight = nn.Parameter(torch.ones(dim)) |
|
|
|
|
|
def _norm(self, x): |
|
|
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
|
|
|
def forward(self, x): |
|
|
output = self._norm(x.float()).type_as(x) |
|
|
return output * self.weight.to(output.dtype) |
|
|
|
|
|
|
|
|
class LayerNorm_FP32(nn.LayerNorm): |
|
|
def __init__(self, dim, eps, elementwise_affine): |
|
|
super().__init__(dim, eps=eps, elementwise_affine=elementwise_affine) |
|
|
|
|
|
def forward(self, inputs: torch.Tensor) -> torch.Tensor: |
|
|
origin_dtype = inputs.dtype |
|
|
out = F.layer_norm( |
|
|
inputs.float(), |
|
|
self.normalized_shape, |
|
|
None if self.weight is None else self.weight.float(), |
|
|
None if self.bias is None else self.bias.float() , |
|
|
self.eps |
|
|
).to(origin_dtype) |
|
|
return out |
|
|
|
|
|
|
|
|
class PatchEmbed3D(nn.Module): |
|
|
"""Video to Patch Embedding. |
|
|
|
|
|
Args: |
|
|
patch_size (int): Patch token size. Default: (2,4,4). |
|
|
in_chans (int): Number of input video channels. Default: 3. |
|
|
embed_dim (int): Number of linear projection output channels. Default: 96. |
|
|
norm_layer (nn.Module, optional): Normalization layer. Default: None |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
patch_size=(2, 4, 4), |
|
|
in_chans=3, |
|
|
embed_dim=96, |
|
|
norm_layer=None, |
|
|
flatten=True, |
|
|
): |
|
|
super().__init__() |
|
|
self.patch_size = patch_size |
|
|
self.flatten = flatten |
|
|
|
|
|
self.in_chans = in_chans |
|
|
self.embed_dim = embed_dim |
|
|
|
|
|
self.proj = nn.Conv3d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size) |
|
|
if norm_layer is not None: |
|
|
self.norm = norm_layer(embed_dim) |
|
|
else: |
|
|
self.norm = None |
|
|
|
|
|
def forward(self, x): |
|
|
"""Forward function.""" |
|
|
|
|
|
_, _, D, H, W = x.size() |
|
|
if W % self.patch_size[2] != 0: |
|
|
x = F.pad(x, (0, self.patch_size[2] - W % self.patch_size[2])) |
|
|
if H % self.patch_size[1] != 0: |
|
|
x = F.pad(x, (0, 0, 0, self.patch_size[1] - H % self.patch_size[1])) |
|
|
if D % self.patch_size[0] != 0: |
|
|
x = F.pad(x, (0, 0, 0, 0, 0, self.patch_size[0] - D % self.patch_size[0])) |
|
|
|
|
|
B, C, T, H, W = x.shape |
|
|
x = self.proj(x) |
|
|
if self.norm is not None: |
|
|
D, Wh, Ww = x.size(2), x.size(3), x.size(4) |
|
|
x = x.flatten(2).transpose(1, 2) |
|
|
x = self.norm(x) |
|
|
x = x.transpose(1, 2).view(-1, self.embed_dim, D, Wh, Ww) |
|
|
if self.flatten: |
|
|
x = x.flatten(2).transpose(1, 2) |
|
|
return x |
|
|
|
|
|
|
|
|
def modulate_fp32(norm_func, x, shift, scale): |
|
|
|
|
|
|
|
|
assert shift.dtype == torch.float32, scale.dtype == torch.float32 |
|
|
dtype = x.dtype |
|
|
x = norm_func(x.to(torch.float32)) |
|
|
scale = scale + 1.0 |
|
|
x.mul_(scale).add_(shift) |
|
|
return x.to(dtype) |
|
|
|
|
|
|
|
|
class FinalLayer_FP32(nn.Module): |
|
|
""" |
|
|
The final layer of DiT. |
|
|
""" |
|
|
|
|
|
def __init__(self, hidden_size, num_patch, out_channels, adaln_tembed_dim): |
|
|
super().__init__() |
|
|
self.hidden_size = hidden_size |
|
|
self.num_patch = num_patch |
|
|
self.out_channels = out_channels |
|
|
self.adaln_tembed_dim = adaln_tembed_dim |
|
|
|
|
|
self.norm_final = LayerNorm_FP32(hidden_size, elementwise_affine=False, eps=1e-6) |
|
|
self.linear = nn.Linear(hidden_size, num_patch * out_channels, bias=True) |
|
|
self.adaLN_modulation = nn.Sequential(nn.SiLU(), nn.Linear(adaln_tembed_dim, 2 * hidden_size, bias=True)) |
|
|
|
|
|
def forward(self, x, t, latent_shape): |
|
|
|
|
|
t = t.to(torch.float32) |
|
|
B, N, C = x.shape |
|
|
T, _, _ = latent_shape |
|
|
|
|
|
with amp.autocast('cuda', dtype=torch.float32): |
|
|
shift, scale = self.adaLN_modulation(t).unsqueeze(2).chunk(2, dim=-1) |
|
|
x = modulate_fp32(self.norm_final, x.view(B, T, -1, C), shift, scale).view(B, N, C) |
|
|
x = self.linear(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class TimestepEmbedder(nn.Module): |
|
|
""" |
|
|
Embeds scalar timesteps into vector representations. |
|
|
""" |
|
|
|
|
|
def __init__(self, t_embed_dim, frequency_embedding_size=256): |
|
|
super().__init__() |
|
|
self.t_embed_dim = t_embed_dim |
|
|
self.frequency_embedding_size = frequency_embedding_size |
|
|
self.mlp = nn.Sequential( |
|
|
nn.Linear(frequency_embedding_size, t_embed_dim, bias=True), |
|
|
nn.SiLU(), |
|
|
nn.Linear(t_embed_dim, t_embed_dim, bias=True), |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
def timestep_embedding(t, dim, max_period=10000): |
|
|
""" |
|
|
Create sinusoidal timestep embeddings. |
|
|
:param t: a 1-D Tensor of N indices, one per batch element. |
|
|
These may be fractional. |
|
|
:param dim: the dimension of the output. |
|
|
:param max_period: controls the minimum frequency of the embeddings. |
|
|
:return: an (N, D) Tensor of positional embeddings. |
|
|
""" |
|
|
half = dim // 2 |
|
|
freqs = torch.exp(-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half) |
|
|
freqs = freqs.to(device=t.device) |
|
|
args = t[:, None].float() * freqs[None] |
|
|
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) |
|
|
if dim % 2: |
|
|
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) |
|
|
return embedding |
|
|
|
|
|
def forward(self, t, dtype): |
|
|
t_freq = self.timestep_embedding(t, self.frequency_embedding_size) |
|
|
if t_freq.dtype != dtype: |
|
|
t_freq = t_freq.to(dtype) |
|
|
t_emb = self.mlp(t_freq) |
|
|
return t_emb |
|
|
|
|
|
|
|
|
class CaptionEmbedder(nn.Module): |
|
|
""" |
|
|
Embeds class labels into vector representations. |
|
|
""" |
|
|
|
|
|
def __init__(self, in_channels, hidden_size): |
|
|
super().__init__() |
|
|
self.in_channels = in_channels |
|
|
self.hidden_size = hidden_size |
|
|
self.y_proj = nn.Sequential( |
|
|
nn.Linear(in_channels, hidden_size, bias=True), |
|
|
nn.GELU(approximate="tanh"), |
|
|
nn.Linear(hidden_size, hidden_size, bias=True), |
|
|
) |
|
|
|
|
|
def forward(self, caption): |
|
|
B, _, N, C = caption.shape |
|
|
caption = self.y_proj(caption) |
|
|
return caption |
|
|
|