Yihua7's picture
Initial commit: AniGen - Animatable 3D Generation
6b92ff7
from typing import *
import torch
import torch.nn as nn
from ..basic import SparseTensor
from ..linear import SparseLinear
from ..nonlinearity import SparseGELU
from ..attention import SparseMultiHeadAttention, SerializeMode
from ...norm import LayerNorm32
class SparseFeedForwardNet(nn.Module):
def __init__(self, channels: int, mlp_ratio: float = 4.0):
super().__init__()
self.mlp = nn.Sequential(
SparseLinear(channels, int(channels * mlp_ratio)),
SparseGELU(approximate="tanh"),
SparseLinear(int(channels * mlp_ratio), channels),
)
def forward(self, x: SparseTensor) -> SparseTensor:
return self.mlp(x)
class SparseTransformerBlock(nn.Module):
"""
Sparse Transformer block (MSA + FFN).
"""
def __init__(
self,
channels: int,
num_heads: int,
mlp_ratio: float = 4.0,
attn_mode: Literal["full", "shift_window", "shift_sequence", "shift_order", "swin"] = "full",
window_size: Optional[int] = None,
shift_sequence: Optional[int] = None,
shift_window: Optional[Tuple[int, int, int]] = None,
serialize_mode: Optional[SerializeMode] = None,
use_checkpoint: bool = False,
use_rope: bool = False,
qk_rms_norm: bool = False,
qkv_bias: bool = True,
ln_affine: bool = False,
):
super().__init__()
self.use_checkpoint = use_checkpoint
self.norm1 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
self.norm2 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
self.attn = SparseMultiHeadAttention(
channels,
num_heads=num_heads,
attn_mode=attn_mode,
window_size=window_size,
shift_sequence=shift_sequence,
shift_window=shift_window,
serialize_mode=serialize_mode,
qkv_bias=qkv_bias,
use_rope=use_rope,
qk_rms_norm=qk_rms_norm,
)
self.mlp = SparseFeedForwardNet(
channels,
mlp_ratio=mlp_ratio,
)
def _forward(self, x: SparseTensor) -> SparseTensor:
# Self-attention
h = x.replace(self.norm1(x.feats))
h = self.attn(h)
x = x + h
# Feed-forward network
h = x.replace(self.norm2(x.feats))
h = self.mlp(h)
x = x + h
return x
def forward(self, x: SparseTensor) -> SparseTensor:
if self.use_checkpoint:
return torch.utils.checkpoint.checkpoint(self._forward, x, use_reentrant=False)
else:
return self._forward(x)
class SparseTransformerCrossBlock(nn.Module):
"""
Sparse Transformer cross-attention block (MSA + MCA + FFN).
"""
def __init__(
self,
channels: int,
ctx_channels: int,
num_heads: int,
mlp_ratio: float = 4.0,
attn_mode: Literal["full", "serialized", "windowed"] = "full",
attn_mode_cross: Literal["full", "serialized", "windowed"] = "full",
window_size: Optional[int] = None,
shift_sequence: Optional[int] = None,
shift_window: Optional[Tuple[int, int, int]] = None,
serialize_mode: Optional[SerializeMode] = None,
use_checkpoint: bool = False,
use_rope: bool = False,
qk_rms_norm: bool = False,
qk_rms_norm_cross: bool = False,
qkv_bias: bool = True,
ln_affine: bool = False,
context_is_dual: bool = False,
):
super().__init__()
self.use_checkpoint = use_checkpoint
self.context_is_dual = context_is_dual
self.norm1 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
self.norm2 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
self.norm3 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
if context_is_dual:
self.norm4 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
self.self_attn = SparseMultiHeadAttention(
channels,
num_heads=num_heads,
type="self",
attn_mode=attn_mode,
window_size=window_size,
shift_sequence=shift_sequence,
shift_window=shift_window,
serialize_mode=serialize_mode,
qkv_bias=qkv_bias,
use_rope=use_rope,
qk_rms_norm=qk_rms_norm,
)
self.cross_attn = SparseMultiHeadAttention(
channels,
ctx_channels=ctx_channels,
num_heads=num_heads,
type="cross",
attn_mode=attn_mode_cross,
window_size=window_size,
shift_sequence=shift_sequence,
shift_window=shift_window,
serialize_mode=serialize_mode,
qkv_bias=qkv_bias,
qk_rms_norm=qk_rms_norm_cross,
)
self.mlp = SparseFeedForwardNet(
channels,
mlp_ratio=mlp_ratio,
)
def _forward(self, x: SparseTensor, context: SparseTensor):
# Self-attention
h = x.replace(self.norm1(x.feats))
h = self.self_attn(h)
x = x + h
# Cross-attention
h = x.replace(self.norm2(x.feats))
if self.context_is_dual:
context = context.replace(self.norm4(context.feats))
h = self.cross_attn(h, context)
x = x + h
# Feed-forward network
h = x.replace(self.norm3(x.feats))
h = self.mlp(h)
x = x + h
return x
def forward(self, x: SparseTensor, context: SparseTensor):
if self.use_checkpoint:
return torch.utils.checkpoint.checkpoint(self._forward, x, context, use_reentrant=False)
else:
return self._forward(x, context)
class SparseTransformerMultiContextCrossBlock(nn.Module):
"""
Sparse Transformer cross-attention block (MSA + MCA + FFN).
"""
def __init__(
self,
channels: int,
ctx_channels: List[int],
num_heads: int,
mlp_ratio: float = 4.0,
attn_mode: Literal["full", "serialized", "windowed"] = "full",
attn_mode_cross: Literal["full", "serialized", "windowed"] = "full",
window_size: Optional[int] = None,
shift_sequence: Optional[int] = None,
shift_window: Optional[Tuple[int, int, int]] = None,
serialize_mode: Optional[SerializeMode] = None,
use_checkpoint: bool = False,
use_rope: bool = False,
qk_rms_norm: bool = False,
qk_rms_norm_cross: bool = False,
qkv_bias: bool = True,
ln_affine: bool = False,
cross_attn_cache_suffix: str = '',
):
super().__init__()
self.context_num = len(ctx_channels)
self.use_checkpoint = use_checkpoint
self.norm1 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
if self.context_num > 0:
self.norm2 = LayerNorm32(channels, elementwise_affine=True, eps=1e-6)
self.norm3 = LayerNorm32(channels, elementwise_affine=ln_affine, eps=1e-6)
self.self_attn = SparseMultiHeadAttention(
channels,
num_heads=num_heads,
type="self",
attn_mode=attn_mode,
window_size=window_size,
shift_sequence=shift_sequence,
shift_window=shift_window,
serialize_mode=serialize_mode,
qkv_bias=qkv_bias,
use_rope=use_rope,
qk_rms_norm=qk_rms_norm,
)
for i in range(self.context_num):
setattr(self, f'cross_attn_{i}', SparseMultiHeadAttention(
channels,
ctx_channels=ctx_channels[i],
num_heads=num_heads,
type="cross",
attn_mode=attn_mode_cross,
window_size=window_size,
shift_sequence=shift_sequence,
shift_window=shift_window,
serialize_mode=serialize_mode,
qkv_bias=qkv_bias,
qk_rms_norm=qk_rms_norm_cross,
cross_attn_cache_suffix=cross_attn_cache_suffix + f'_modality_{i}'
))
setattr(self, f'ctx_norm_{i}', LayerNorm32(ctx_channels[i], elementwise_affine=True, eps=1e-6))
self.mlp = SparseFeedForwardNet(
channels,
mlp_ratio=mlp_ratio,
)
def _forward(self, x: SparseTensor, contexts: List[SparseTensor]) -> SparseTensor:
# Self-attention
h = x.replace(self.norm1(x.feats))
h = self.self_attn(h)
x = x + h
# Cross-attention
if self.context_num > 0 and len(contexts) > 0:
h_norm = x.replace(self.norm2(x.feats))
for i, context in enumerate(contexts):
context = context.replace(getattr(self, f'ctx_norm_{i}')(context.feats))
h = getattr(self, f'cross_attn_{i}')(h_norm, context)
x = x + h
# Feed-forward network
h = x.replace(self.norm3(x.feats))
h = self.mlp(h)
x = x + h
return x
def forward(self, x: SparseTensor, contexts: List[SparseTensor]) -> SparseTensor:
if self.use_checkpoint:
return torch.utils.checkpoint.checkpoint(self._forward, x, contexts, use_reentrant=False)
else:
return self._forward(x, contexts)