Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Utils for Common Transformer Blocks used in UniCeption | |
| References: | |
| HuggingFace PyTorch Image Models (Timm) | |
| CroCoV2 | |
| """ | |
| import collections.abc | |
| import math | |
| from itertools import repeat | |
| from typing import Callable, Optional | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from torch.jit import Final | |
| from uniception.models.utils.config import use_fused_attn | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| def _ntuple(n): | |
| "Helper function to create n-tuple." | |
| def parse(x): | |
| if isinstance(x, collections.abc.Iterable) and not isinstance(x, str): | |
| return x | |
| return tuple(repeat(x, n)) | |
| return parse | |
| to_2tuple = _ntuple(2) | |
| def drop_path(x, drop_prob: float = 0.0, training: bool = False, scale_by_keep: bool = True): | |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" | |
| if drop_prob == 0.0 or not training: | |
| return x | |
| keep_prob = 1 - drop_prob | |
| shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets | |
| random_tensor = x.new_empty(shape).bernoulli_(keep_prob) | |
| if keep_prob > 0.0 and scale_by_keep: | |
| random_tensor.div_(keep_prob) | |
| return x * random_tensor | |
| class DropPath(nn.Module): | |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" | |
| def __init__(self, drop_prob: float = 0.0, scale_by_keep: bool = True): | |
| super(DropPath, self).__init__() | |
| self.drop_prob = drop_prob | |
| self.scale_by_keep = scale_by_keep | |
| def forward(self, x): | |
| return drop_path(x, self.drop_prob, self.training, self.scale_by_keep) | |
| def extra_repr(self): | |
| return f"drop_prob={round(self.drop_prob,3):0.3f}" | |
| class Mlp(nn.Module): | |
| """MLP as used in Vision Transformer, MLP-Mixer and related networks""" | |
| def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, bias=True, drop=0.0): | |
| super().__init__() | |
| out_features = out_features or in_features | |
| hidden_features = hidden_features or in_features | |
| bias = to_2tuple(bias) | |
| drop_probs = to_2tuple(drop) | |
| self.fc1 = nn.Linear(in_features, hidden_features, bias=bias[0]) | |
| self.act = act_layer() | |
| self.drop1 = nn.Dropout(drop_probs[0]) | |
| self.fc2 = nn.Linear(hidden_features, out_features, bias=bias[1]) | |
| self.drop2 = nn.Dropout(drop_probs[1]) | |
| def forward(self, x): | |
| x = self.fc1(x) | |
| x = self.act(x) | |
| x = self.drop1(x) | |
| x = self.fc2(x) | |
| x = self.drop2(x) | |
| return x | |
| class Attention(nn.Module): | |
| "Self-Attention Layer" | |
| fused_attn: Final[bool] | |
| def __init__( | |
| self, | |
| dim: int, | |
| latent_attn_dim: Optional[int] = None, | |
| num_heads: int = 8, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| attn_drop: float = 0.0, | |
| proj_drop: float = 0.0, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| custom_positional_encoding: Callable = None, | |
| ): | |
| """ | |
| Initialize the Attention layer. | |
| Args: | |
| dim (int): Dimension of input features | |
| latent_attn_dim (int): Dimension of latent attention features (default: None) | |
| num_heads (int): Number of attention heads (default: 8) | |
| qkv_bias (bool): Whether to include bias in qkv projection (default: False) | |
| qk_norm (bool): Whether to normalize q and k (default: False) | |
| attn_drop (float): Dropout rate for attention weights (default: 0.) | |
| proj_drop (float): Dropout rate for output (default: 0.) | |
| norm_layer (nn.Module): Normalization layer (default: nn.LayerNorm) | |
| custom_positional_encoding (Callable): Custom positional encoding function (default: None) | |
| """ | |
| super().__init__() | |
| if latent_attn_dim is not None: | |
| assert latent_attn_dim % num_heads == 0, "latent_attn_dim should be divisible by num_heads" | |
| self.latent_attn_dim = latent_attn_dim | |
| self.latent_attn = True | |
| else: | |
| self.latent_attn = False | |
| assert dim % num_heads == 0, "dim should be divisible by num_heads" | |
| self.num_heads = num_heads | |
| self.head_dim = dim // num_heads if not self.latent_attn else latent_attn_dim // num_heads | |
| self.scale = self.head_dim**-0.5 | |
| self.fused_attn = use_fused_attn() | |
| self.qkv = ( | |
| nn.Linear(dim, dim * 3, bias=qkv_bias) | |
| if not self.latent_attn | |
| else nn.Linear(dim, latent_attn_dim * 3, bias=qkv_bias) | |
| ) | |
| self.q_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.k_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.attn_drop = nn.Dropout(attn_drop) | |
| self.proj = nn.Linear(dim, dim) if not self.latent_attn else nn.Linear(latent_attn_dim, dim) | |
| self.proj_drop = nn.Dropout(proj_drop) | |
| self.custom_positional_encoding = custom_positional_encoding | |
| def forward(self, x: torch.Tensor, xpos: torch.Tensor = None) -> torch.Tensor: | |
| """ | |
| Forward pass of the Attention layer. | |
| Args: | |
| x (torch.Tensor): Input features | |
| xpos (torch.Tensor): Positions of tokens (required when using custom positional encoding) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| B, N, C = x.shape | |
| qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4) | |
| q, k, v = qkv[0], qkv[1], qkv[2] | |
| q, k = self.q_norm(q), self.k_norm(k) | |
| if self.custom_positional_encoding is not None: | |
| assert ( | |
| xpos is not None | |
| ), "Positions of tokens (xpos) are a required input when using custom positional encoding" | |
| q = self.custom_positional_encoding(q, xpos) | |
| k = self.custom_positional_encoding(k, xpos) | |
| if self.fused_attn: | |
| x = F.scaled_dot_product_attention( | |
| q, k, v, dropout_p=(self.attn_drop.p if self.training else 0.0), scale=self.scale | |
| ) | |
| else: | |
| q = q * self.scale | |
| attn = q @ k.transpose(-2, -1) | |
| attn = attn.softmax(dim=-1) | |
| attn = self.attn_drop(attn) | |
| x = attn @ v | |
| x = x.transpose(1, 2).reshape(B, N, -1) | |
| x = self.proj(x) | |
| x = self.proj_drop(x) | |
| return x | |
| class CrossAttention(nn.Module): | |
| "Cross-Attention Layer" | |
| fused_attn: Final[bool] | |
| def __init__( | |
| self, | |
| dim: int, | |
| num_heads: int = 8, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| attn_drop: float = 0.0, | |
| proj_drop: float = 0.0, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| custom_positional_encoding: Callable = None, | |
| ): | |
| """ | |
| Initialize the Cross-Attention layer. | |
| Args: | |
| dim (int): Dimension of input features | |
| num_heads (int): Number of attention heads (default: 8) | |
| qkv_bias (bool): Whether to include bias in qkv projection (default: False) | |
| qk_norm (bool): Whether to normalize q and k (default: False) | |
| attn_drop (float): Dropout rate for attention weights (default: 0.) | |
| proj_drop (float): Dropout rate for output (default: 0.) | |
| norm_layer (nn.Module): Normalization layer (default: nn.LayerNorm) | |
| custom_positional_encoding (Callable): Custom positional encoding function (default: None) | |
| """ | |
| super().__init__() | |
| assert dim % num_heads == 0, "dim should be divisible by num_heads" | |
| self.num_heads = num_heads | |
| self.head_dim = dim // num_heads | |
| self.scale = self.head_dim**-0.5 | |
| self.fused_attn = use_fused_attn() | |
| self.projq = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.projk = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.projv = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.q_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.k_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.attn_drop = nn.Dropout(attn_drop) | |
| self.proj = nn.Linear(dim, dim) | |
| self.proj_drop = nn.Dropout(proj_drop) | |
| self.custom_positional_encoding = custom_positional_encoding | |
| def forward( | |
| self, | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| qpos: torch.Tensor = None, | |
| kpos: torch.Tensor = None, | |
| ) -> torch.Tensor: | |
| """ | |
| Forward pass of the Cross-Attention layer. | |
| Args: | |
| query (torch.Tensor): Query features | |
| key (torch.Tensor): Key features | |
| value (torch.Tensor): Value features | |
| qpos (torch.Tensor): Positions of queries (required when using custom positional encoding) | |
| kpos (torch.Tensor): Positions of keys (required when using custom positional encoding) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| B, Nq, C = query.shape | |
| Nk = key.shape[1] | |
| Nv = value.shape[1] | |
| q = self.projq(query).reshape(B, Nq, self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| k = self.projk(key).reshape(B, Nk, self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| v = self.projv(value).reshape(B, Nv, self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| q, k = self.q_norm(q), self.k_norm(k) | |
| if self.custom_positional_encoding is not None: | |
| assert ( | |
| qpos is not None | |
| ), "Positions of queries (qpos) are a required input when using custom positional encoding" | |
| assert ( | |
| kpos is not None | |
| ), "Positions of keys (kpos) are a required input when using custom positional encoding" | |
| q = self.custom_positional_encoding(q, qpos) | |
| k = self.custom_positional_encoding(k, kpos) | |
| if self.fused_attn: | |
| x = F.scaled_dot_product_attention( | |
| q, k, v, dropout_p=(self.attn_drop.p if self.training else 0.0), scale=self.scale | |
| ) | |
| else: | |
| q = q * self.scale | |
| attn = q @ k.transpose(-2, -1) | |
| attn = attn.softmax(dim=-1) | |
| attn = self.attn_drop(attn) | |
| x = attn @ v | |
| x = x.transpose(1, 2).reshape(B, Nq, C) | |
| x = self.proj(x) | |
| x = self.proj_drop(x) | |
| return x | |
| class LayerScale(nn.Module): | |
| "Layer Scale Layer" | |
| def __init__( | |
| self, | |
| dim: int, | |
| init_values: float = 1e-5, | |
| inplace: bool = False, | |
| ): | |
| """ | |
| Initialize the Layer Scale layer | |
| Args: | |
| dim (int): Dimension of input features | |
| init_values (float): Initial value for LayerScale gamma (default: 1e-5) | |
| inplace (bool): Whether to perform inplace operations (default: False) | |
| """ | |
| super().__init__() | |
| self.inplace = inplace | |
| self.gamma = nn.Parameter(init_values * torch.ones(dim)) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| "Forward pass of the Layer Scale layer" | |
| return x.mul_(self.gamma) if self.inplace else x * self.gamma | |
| class SelfAttentionBlock(nn.Module): | |
| "Self-Attention Block" | |
| def __init__( | |
| self, | |
| dim: int, | |
| num_heads: int, | |
| latent_attn_dim: Optional[int] = None, | |
| mlp_ratio: float = 4.0, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| proj_drop: float = 0.0, | |
| attn_drop: float = 0.0, | |
| init_values: Optional[float] = None, | |
| drop_path: float = 0.0, | |
| act_layer: nn.Module = nn.GELU, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| mlp_layer: nn.Module = Mlp, | |
| custom_positional_encoding: Callable = None, | |
| ): | |
| """ | |
| Initialize the Self-Attention Block. | |
| Args: | |
| dim (int): Dimension of input features | |
| num_heads (int): Number of attention heads | |
| mlp_ratio (float): Ratio of hidden to input dimension in MLP (default: 4.) | |
| qkv_bias (bool): Whether to include bias in qkv projection (default: False) | |
| qk_norm (bool): Whether to normalize q and k (default: False) | |
| proj_drop (float): Dropout rate for output (default: 0.) | |
| attn_drop (float): Dropout rate for attention weights (default: 0.) | |
| init_values (float): Initial value for LayerScale gamma (default: None) | |
| drop_path (float): Dropout rate for stochastic depth (default: 0.) | |
| act_layer (nn.Module): Activation layer (default: nn.GELU) | |
| norm_layer (nn.Module): Normalization layer (default: nn.LayerNorm) | |
| mlp_layer (nn.Module): MLP layer (default: Mlp) | |
| custom_positional_encoding (Callable): Custom positional encoding function (default: None) | |
| """ | |
| super().__init__() | |
| self.norm1 = norm_layer(dim) | |
| self.attn = Attention( | |
| dim, | |
| latent_attn_dim=latent_attn_dim, | |
| num_heads=num_heads, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| attn_drop=attn_drop, | |
| proj_drop=proj_drop, | |
| norm_layer=norm_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| ) | |
| self.ls1 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity() | |
| self.drop_path1 = DropPath(drop_path) if drop_path > 0.0 else nn.Identity() | |
| self.norm2 = norm_layer(dim) | |
| self.mlp = mlp_layer( | |
| in_features=dim, | |
| hidden_features=int(dim * mlp_ratio), | |
| act_layer=act_layer, | |
| drop=proj_drop, | |
| ) | |
| self.ls2 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity() | |
| self.drop_path2 = DropPath(drop_path) if drop_path > 0.0 else nn.Identity() | |
| self.custom_positional_encoding = custom_positional_encoding | |
| def forward(self, x: torch.Tensor, xpos: torch.Tensor = None) -> torch.Tensor: | |
| """ | |
| Forward pass of the Self-Attention Block. | |
| Args: | |
| x (torch.Tensor): Input features | |
| xpos (torch.Tensor): Positions of tokens (required when using custom positional encoding) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| if self.custom_positional_encoding is not None: | |
| assert ( | |
| xpos is not None | |
| ), "Positions of tokens (xpos) are a required input when using custom positional encoding" | |
| x = x + self.drop_path1(self.ls1(self.attn(self.norm1(x), xpos))) | |
| x = x + self.drop_path2(self.ls2(self.mlp(self.norm2(x)))) | |
| return x | |
| class CrossAttentionBlock(nn.Module): | |
| "Cross-Attention Block" | |
| def __init__( | |
| self, | |
| dim: int, | |
| num_heads: int, | |
| mlp_ratio: float = 4.0, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| proj_drop: float = 0.0, | |
| attn_drop: float = 0.0, | |
| init_values: Optional[float] = None, | |
| drop_path: float = 0.0, | |
| act_layer: nn.Module = nn.GELU, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| mlp_layer: nn.Module = Mlp, | |
| custom_positional_encoding: Callable = None, | |
| norm_cross_tokens: bool = True, | |
| ): | |
| """ | |
| Initialize the Cross-Attention Block. | |
| Args: | |
| dim (int): Dimension of input features | |
| num_heads (int): Number of attention heads | |
| mlp_ratio (float): Ratio of hidden to input dimension in MLP (default: 4.) | |
| qkv_bias (bool): Whether to include bias in qkv projection (default: False) | |
| qk_norm (bool): Whether to normalize q and k (default: False) | |
| proj_drop (float): Dropout rate for output (default: 0.) | |
| attn_drop (float): Dropout rate for attention weights (default: 0.) | |
| init_values (float): Initial value for LayerScale gamma (default: None) | |
| drop_path (float): Dropout rate for stochastic depth (default: 0.) | |
| act_layer (nn.Module): Activation layer (default: nn.GELU) | |
| norm_layer (nn.Module): Normalization layer (default: nn.LayerNorm) | |
| mlp_layer (nn.Module): MLP layer (default: Mlp) | |
| custom_positional_encoding (Callable): Custom positional encoding function (default: None) | |
| norm_cross_tokens (bool): Whether to normalize cross tokens (default: True) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| super().__init__() | |
| self.norm1 = norm_layer(dim) | |
| self.attn = Attention( | |
| dim, | |
| num_heads=num_heads, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| attn_drop=attn_drop, | |
| proj_drop=proj_drop, | |
| norm_layer=norm_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| ) | |
| self.ls1 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity() | |
| self.drop_path1 = DropPath(drop_path) if drop_path > 0.0 else nn.Identity() | |
| self.norm_y = norm_layer(dim) if norm_cross_tokens else nn.Identity() | |
| self.custom_positional_encoding = custom_positional_encoding | |
| self.norm2 = norm_layer(dim) | |
| self.cross_attn = CrossAttention( | |
| dim, | |
| num_heads=num_heads, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| attn_drop=attn_drop, | |
| proj_drop=proj_drop, | |
| norm_layer=norm_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| ) | |
| self.ls2 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity() | |
| self.drop_path2 = DropPath(drop_path) if drop_path > 0.0 else nn.Identity() | |
| self.norm3 = norm_layer(dim) | |
| self.mlp = mlp_layer( | |
| in_features=dim, | |
| hidden_features=int(dim * mlp_ratio), | |
| act_layer=act_layer, | |
| drop=proj_drop, | |
| ) | |
| self.ls3 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity() | |
| self.drop_path3 = DropPath(drop_path) if drop_path > 0.0 else nn.Identity() | |
| def forward( | |
| self, | |
| x: torch.Tensor, | |
| y: torch.Tensor, | |
| xpos: torch.Tensor = None, | |
| ypos: torch.Tensor = None, | |
| ) -> torch.Tensor: | |
| """ | |
| Forward pass of the Cross-Attention Block. | |
| Args: | |
| x (torch.Tensor): Input features | |
| y (torch.Tensor): Cross features | |
| xpos (torch.Tensor): Positions of tokens (required when using custom positional encoding) | |
| ypos (torch.Tensor): Positions of cross tokens (required when using custom positional encoding) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| if self.custom_positional_encoding is not None: | |
| assert ( | |
| xpos is not None | |
| ), "Positions of tokens (xpos) are a required input when using custom positional encoding" | |
| assert ( | |
| ypos is not None | |
| ), "Positions of cross tokens (ypos) are a required input when using custom positional encoding" | |
| x = x + self.drop_path1(self.ls1(self.attn(self.norm1(x), xpos))) | |
| y_ = self.norm_y(y) | |
| x = x + self.drop_path2(self.ls2(self.cross_attn(self.norm2(x), y_, y_, xpos, ypos))) | |
| x = x + self.drop_path3(self.ls3(self.mlp(self.norm3(x)))) | |
| return x | |
| def dummy_positional_encoding(x, xpos): | |
| "Dummy function for positional encoding of tokens" | |
| x = x | |
| xpos = xpos | |
| return x | |
| # copied from DiffTrsformer | |
| class RMSNorm(nn.Module): | |
| def __init__(self, dim: int, eps: float = 1e-6, elementwise_affine=True, memory_efficient=False): | |
| super().__init__() | |
| self.dim = dim | |
| self.eps = eps | |
| self.elementwise_affine = elementwise_affine | |
| if self.elementwise_affine: | |
| self.weight = nn.Parameter(torch.ones(dim)) | |
| else: | |
| self.register_parameter("weight", None) | |
| def _norm(self, x): | |
| return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) | |
| def forward(self, x): | |
| output = self._norm(x.float()).type_as(x) | |
| if self.weight is not None: | |
| output = output * self.weight | |
| return output | |
| def extra_repr(self) -> str: | |
| return f"dim={self.dim}, eps={self.eps}, elementwise_affine={self.elementwise_affine}" | |
| def lambda_init_fn(depth): | |
| return 0.8 - 0.6 * math.exp(-0.3 * depth) # copied from DiffTrsformer | |
| class DiffAttention(nn.Module): | |
| "Differential Self-Attention Layer" | |
| fused_attn: Final[bool] | |
| def __init__( | |
| self, | |
| dim: int, | |
| depth: int, | |
| num_heads: int = 8, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| attn_drop: float = 0.0, | |
| proj_drop: float = 0.0, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| custom_positional_encoding: Callable = None, | |
| ): | |
| """ | |
| Initialize the DiffAttention layer. | |
| Args: | |
| dim (int): Dimension of input features | |
| depth (int): Depth of the current layer, used in lambda initialization (default: 0) | |
| num_heads (int): Number of attention heads (default: 8) | |
| qkv_bias (bool): Whether to include bias in qkv projection (default: False) | |
| qk_norm (bool): Whether to normalize q and k (default: False) | |
| attn_drop (float): Dropout rate for attention weights (default: 0.) | |
| proj_drop (float): Dropout rate for output (default: 0.) | |
| norm_layer (nn.Module): Normalization layer (default: nn.LayerNorm) | |
| custom_positional_encoding (Callable): Custom positional encoding function (default: None) | |
| """ | |
| super().__init__() | |
| assert dim % num_heads == 0, "dim should be divisible by num_heads" | |
| self.num_heads = num_heads | |
| self.head_dim = dim // num_heads // 2 | |
| self.scale = self.head_dim**-0.5 | |
| self.fused_attn = use_fused_attn() | |
| self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) | |
| self.q_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.k_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.attn_drop = nn.Dropout(attn_drop) | |
| self.proj = nn.Linear(dim, dim) | |
| self.proj_drop = nn.Dropout(proj_drop) | |
| self.custom_positional_encoding = custom_positional_encoding | |
| # DiffTransformer specific | |
| self.lambda_init = lambda_init_fn(depth) | |
| self.lambda_q1 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.lambda_k1 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.lambda_q2 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.lambda_k2 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.subln = RMSNorm(2 * self.head_dim, eps=1e-5, elementwise_affine=True) | |
| def forward(self, x: torch.Tensor, xpos: torch.Tensor = None) -> torch.Tensor: | |
| """ | |
| Forward pass of the Attention layer. | |
| Args: | |
| x (torch.Tensor): Input features | |
| xpos (torch.Tensor): Positions of tokens (required when using custom positional encoding) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| B, N, C = x.shape | |
| qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim * 2) | |
| q, k, v = torch.chunk(qkv, 3, dim=2) # B, N, Nh, Dh | |
| q = q.view(B, N, 2 * self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| k = k.view(B, N, 2 * self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| v = v.view(B, N, self.num_heads, 2 * self.head_dim).permute(0, 2, 1, 3) | |
| q, k = self.q_norm(q), self.k_norm(k) | |
| if self.custom_positional_encoding is not None: | |
| assert ( | |
| xpos is not None | |
| ), "Positions of tokens (xpos) are a required input when using custom positional encoding" | |
| q = self.custom_positional_encoding(q, xpos) | |
| k = self.custom_positional_encoding(k, xpos) | |
| q1, q2 = q.chunk(2, dim=1) # split heads dimension into two | |
| k1, k2 = k.chunk(2, dim=1) # split heads dimension into two | |
| if self.fused_attn: | |
| attn1 = F.scaled_dot_product_attention( | |
| q1, k1, v, dropout_p=(self.attn_drop.p if self.training else 0.0), scale=self.scale | |
| ) | |
| attn2 = F.scaled_dot_product_attention( | |
| q2, k2, v, dropout_p=(self.attn_drop.p if self.training else 0.0), scale=self.scale | |
| ) | |
| else: | |
| q1 = q1 * self.scale | |
| attn = q1 @ k1.transpose(-2, -1) | |
| attn = attn.softmax(dim=-1) | |
| attn = self.attn_drop(attn) | |
| attn1 = attn @ v | |
| q2 = q2 * self.scale | |
| attn = q2 @ k2.transpose(-2, -1) | |
| attn = attn.softmax(dim=-1) | |
| attn = self.attn_drop(attn) | |
| attn2 = attn @ v | |
| lambda_1 = torch.exp(torch.sum(self.lambda_q1 * self.lambda_k1, dim=-1).float()).type_as(q) | |
| lambda_2 = torch.exp(torch.sum(self.lambda_q2 * self.lambda_k2, dim=-1).float()).type_as(q) | |
| lambda_full = lambda_1 - lambda_2 + self.lambda_init | |
| attn = attn1 - lambda_full * attn2 | |
| attn = self.subln(attn) | |
| attn = attn * (1 - self.lambda_init) | |
| attn = attn.reshape(B, N, self.num_heads * 2 * self.head_dim) | |
| x = self.proj(attn) | |
| x = self.proj_drop(x) | |
| return x | |
| class DiffCrossAttention(nn.Module): | |
| "Differential Cross-Attention Layer, following https://arxiv.org/abs/2410.05258" | |
| fused_attn: Final[bool] | |
| def __init__( | |
| self, | |
| dim: int, | |
| depth: int, | |
| num_heads: int = 8, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| attn_drop: float = 0.0, | |
| proj_drop: float = 0.0, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| custom_positional_encoding: Callable = None, | |
| ): | |
| """ | |
| Initialize the Cross-Attention layer. | |
| Args: | |
| dim (int): Dimension of input features | |
| depth (int): Depth of the current layer, used in lambda initialization (default: 0) | |
| num_heads (int): Number of attention heads (default: 8) | |
| qkv_bias (bool): Whether to include bias in qkv projection (default: False) | |
| qk_norm (bool): Whether to normalize q and k (default: False) | |
| attn_drop (float): Dropout rate for attention weights (default: 0.) | |
| proj_drop (float): Dropout rate for output (default: 0.) | |
| norm_layer (nn.Module): Normalization layer (default: nn.LayerNorm) | |
| custom_positional_encoding (Callable): Custom positional encoding function (default: None) | |
| """ | |
| super().__init__() | |
| assert dim % num_heads == 0, "dim should be divisible by num_heads" | |
| self.num_heads = num_heads | |
| self.head_dim = dim // num_heads // 2 | |
| self.scale = self.head_dim**-0.5 | |
| self.fused_attn = use_fused_attn() | |
| self.projq = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.projk = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.projv = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.q_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.k_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() | |
| self.attn_drop = nn.Dropout(attn_drop) | |
| self.proj = nn.Linear(dim, dim) | |
| self.proj_drop = nn.Dropout(proj_drop) | |
| # DiffTransformer specific | |
| self.lambda_init = lambda_init_fn(depth) | |
| self.lambda_q1 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.lambda_k1 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.lambda_q2 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.lambda_k2 = nn.Parameter(torch.zeros(self.head_dim, dtype=torch.float32).normal_(mean=0, std=0.1)) | |
| self.subln = RMSNorm(2 * self.head_dim, eps=1e-5, elementwise_affine=True) | |
| self.custom_positional_encoding = custom_positional_encoding | |
| def lambda_init_fn(self, depth): | |
| return 0.8 - 0.6 * math.exp(-0.3 * depth) # copied from DiffTrsformer | |
| def forward( | |
| self, | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| qpos: torch.Tensor = None, | |
| kpos: torch.Tensor = None, | |
| ) -> torch.Tensor: | |
| """ | |
| Forward pass of the Cross-Attention layer. | |
| Args: | |
| query (torch.Tensor): Query features | |
| key (torch.Tensor): Key features | |
| value (torch.Tensor): Value features | |
| qpos (torch.Tensor): Positions of queries (required when using custom positional encoding) | |
| kpos (torch.Tensor): Positions of keys (required when using custom positional encoding) | |
| Returns: | |
| torch.Tensor: Output features of same shape as input | |
| """ | |
| B, Nq, C = query.shape | |
| Nk = key.shape[1] | |
| Nv = value.shape[1] | |
| q = self.projq(query).reshape(B, Nq, 2 * self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| k = self.projk(key).reshape(B, Nk, 2 * self.num_heads, self.head_dim).permute(0, 2, 1, 3) | |
| v = self.projv(value).reshape(B, Nv, self.num_heads, 2 * self.head_dim).permute(0, 2, 1, 3) | |
| q, k = self.q_norm(q), self.k_norm(k) | |
| if self.custom_positional_encoding is not None: | |
| assert ( | |
| qpos is not None | |
| ), "Positions of queries (qpos) are a required input when using custom positional encoding" | |
| assert ( | |
| kpos is not None | |
| ), "Positions of keys (kpos) are a required input when using custom positional encoding" | |
| q = self.custom_positional_encoding(q, qpos) | |
| k = self.custom_positional_encoding(k, kpos) | |
| q1, q2 = q.chunk(2, dim=1) # split heads dimension into two | |
| k1, k2 = k.chunk(2, dim=1) # split heads dimension into two | |
| if self.fused_attn: | |
| attn1 = F.scaled_dot_product_attention( | |
| q1, k1, v, dropout_p=(self.attn_drop.p if self.training else 0.0), scale=self.scale | |
| ) | |
| attn2 = F.scaled_dot_product_attention( | |
| q2, k2, v, dropout_p=(self.attn_drop.p if self.training else 0.0), scale=self.scale | |
| ) | |
| else: | |
| q1 = q1 * self.scale | |
| attn = q1 @ k1.transpose(-2, -1) | |
| attn = attn.softmax(dim=-1) | |
| attn = self.attn_drop(attn) | |
| attn1 = attn @ v | |
| q2 = q2 * self.scale | |
| attn = q2 @ k2.transpose(-2, -1) | |
| attn = attn.softmax(dim=-1) | |
| attn = self.attn_drop(attn) | |
| attn2 = attn @ v | |
| attn1 = attn1.transpose(1, 2) # B, Nq, Nh, Dh | |
| attn2 = attn2.transpose(1, 2) | |
| lambda_1 = torch.exp(torch.sum(self.lambda_q1 * self.lambda_k1, dim=-1).float()).type_as(q) | |
| lambda_2 = torch.exp(torch.sum(self.lambda_q2 * self.lambda_k2, dim=-1).float()).type_as(q) | |
| lambda_full = lambda_1 - lambda_2 + self.lambda_init | |
| attn = attn1 - lambda_full * attn2 | |
| attn = self.subln(attn) | |
| attn = attn * (1 - self.lambda_init) | |
| attn = attn.reshape(B, Nq, self.num_heads * 2 * self.head_dim) | |
| x = self.proj(attn) | |
| x = self.proj_drop(x) | |
| return x | |
| class DiffSelfAttentionBlock(SelfAttentionBlock): | |
| "Differential Self-Attention Block" | |
| def __init__( | |
| self, | |
| dim: int, | |
| depth: int, | |
| num_heads: int, | |
| mlp_ratio: float = 4.0, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| proj_drop: float = 0.0, | |
| attn_drop: float = 0.0, | |
| init_values: Optional[float] = None, | |
| drop_path: float = 0.0, | |
| act_layer: nn.Module = nn.GELU, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| mlp_layer: nn.Module = Mlp, | |
| custom_positional_encoding: Callable = None, | |
| ): | |
| super().__init__( | |
| dim=dim, | |
| num_heads=num_heads, | |
| mlp_ratio=mlp_ratio, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| proj_drop=proj_drop, | |
| attn_drop=attn_drop, | |
| init_values=init_values, | |
| drop_path=drop_path, | |
| act_layer=act_layer, | |
| norm_layer=norm_layer, | |
| mlp_layer=mlp_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| ) | |
| self.attn = DiffAttention( | |
| dim, | |
| depth, | |
| num_heads=num_heads, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| attn_drop=attn_drop, | |
| proj_drop=proj_drop, | |
| norm_layer=norm_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| ) | |
| class DiffCrossAttentionBlock(CrossAttentionBlock): | |
| "Differential Cross-Attention Block" | |
| def __init__( | |
| self, | |
| dim: int, | |
| depth: int, | |
| num_heads: int, | |
| mlp_ratio: float = 4.0, | |
| qkv_bias: bool = False, | |
| qk_norm: bool = False, | |
| proj_drop: float = 0.0, | |
| attn_drop: float = 0.0, | |
| init_values: Optional[float] = None, | |
| drop_path: float = 0.0, | |
| act_layer: nn.Module = nn.GELU, | |
| norm_layer: nn.Module = nn.LayerNorm, | |
| mlp_layer: nn.Module = Mlp, | |
| custom_positional_encoding: Callable = None, | |
| norm_cross_tokens: bool = True, | |
| ): | |
| super().__init__( | |
| dim=dim, | |
| num_heads=num_heads, | |
| mlp_ratio=mlp_ratio, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| proj_drop=proj_drop, | |
| attn_drop=attn_drop, | |
| init_values=init_values, | |
| drop_path=drop_path, | |
| act_layer=act_layer, | |
| norm_layer=norm_layer, | |
| mlp_layer=mlp_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| norm_cross_tokens=norm_cross_tokens, | |
| ) | |
| self.cross_attn = DiffCrossAttention( | |
| dim, | |
| depth, | |
| num_heads=num_heads, | |
| qkv_bias=qkv_bias, | |
| qk_norm=qk_norm, | |
| attn_drop=attn_drop, | |
| proj_drop=proj_drop, | |
| norm_layer=norm_layer, | |
| custom_positional_encoding=custom_positional_encoding, | |
| ) | |
| if __name__ == "__main__": | |
| # Init Attention & CrossAttention classes | |
| self_attn = Attention(dim=768, custom_positional_encoding=dummy_positional_encoding) | |
| cross_attn = CrossAttention(dim=768, custom_positional_encoding=dummy_positional_encoding) | |
| # Perform dummy inference with the Attention classes | |
| dummy_input = torch.randn((1, 256, 768)) | |
| dummy_x = torch.arange(16) | |
| dummy_y = torch.arange(16) | |
| dummy_xpos = torch.cartesian_prod(dummy_y, dummy_x).view(1, 256, 2).expand(1, -1, 2).clone() | |
| self_attn_output = self_attn(dummy_input, dummy_xpos) | |
| cross_attn_output = cross_attn(dummy_input, dummy_input, dummy_input, dummy_xpos, dummy_xpos) | |
| print("Init of Attention & CrossAttention classes is successful!") | |
| # Init SelfAttentionBlock & CrossAttentionBlock | |
| self_attn_block = SelfAttentionBlock(dim=768, num_heads=16, custom_positional_encoding=dummy_positional_encoding) | |
| cross_attn_block = CrossAttentionBlock(dim=768, num_heads=16, custom_positional_encoding=dummy_positional_encoding) | |
| # Perform dummy inference with the Attention blocks | |
| self_attn_block_output = self_attn_block(dummy_input, dummy_xpos) | |
| cross_attn_block_output = cross_attn_block(dummy_input, dummy_input, dummy_xpos, dummy_xpos) | |
| print("Init of SelfAttentionBlock & CrossAttentionBlock is successful!") | |
| # Init DiffAttention & DiffCrossAttention classes | |
| diff_self_attn = DiffAttention(dim=768, depth=0, custom_positional_encoding=dummy_positional_encoding) | |
| diff_cross_attn = DiffCrossAttention(dim=768, depth=0, custom_positional_encoding=dummy_positional_encoding) | |
| # Perform dummy inference with the DiffAttention classes | |
| diff_self_attn_output = diff_self_attn(dummy_input, dummy_xpos) | |
| diff_cross_attn_output = diff_cross_attn(dummy_input, dummy_input, dummy_input, dummy_xpos, dummy_xpos) | |
| print("Init of DiffAttention & DiffCrossAttention classes is successful!") | |
| # Init DiffSelfAttentionBlock & DiffCrossAttentionBlock | |
| diff_self_attn_block = DiffSelfAttentionBlock( | |
| dim=768, depth=0, num_heads=8, custom_positional_encoding=dummy_positional_encoding | |
| ) | |
| diff_cross_attn_block = DiffCrossAttentionBlock( | |
| dim=768, depth=0, num_heads=8, custom_positional_encoding=dummy_positional_encoding | |
| ) | |
| # Perform dummy inference with the DiffAttention blocks | |
| diff_self_attn_block_output = diff_self_attn_block(dummy_input, dummy_xpos) | |
| diff_cross_attn_block_output = diff_cross_attn_block(dummy_input, dummy_input, dummy_xpos, dummy_xpos) | |
| print("Init of DiffSelfAttentionBlock & DiffCrossAttentionBlock is successful!") | |