| |
|
|
| import math |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from typing import Optional |
|
|
| from michelangelo.models.modules.checkpoint import checkpoint |
|
|
|
|
| def init_linear(l, stddev): |
| nn.init.normal_(l.weight, std=stddev) |
| if l.bias is not None: |
| nn.init.constant_(l.bias, 0.0) |
|
|
|
|
| class MultiheadAttention(nn.Module): |
| def __init__( |
| self, |
| *, |
| device: torch.device, |
| dtype: torch.dtype, |
| n_ctx: int, |
| width: int, |
| heads: int, |
| init_scale: float, |
| qkv_bias: bool, |
| flash: bool = False |
| ): |
| super().__init__() |
| self.n_ctx = n_ctx |
| self.width = width |
| self.heads = heads |
| self.c_qkv = nn.Linear(width, width * 3, bias=qkv_bias, device=device, dtype=dtype) |
| self.c_proj = nn.Linear(width, width, device=device, dtype=dtype) |
| self.attention = QKVMultiheadAttention(device=device, dtype=dtype, heads=heads, n_ctx=n_ctx, flash=flash) |
| init_linear(self.c_qkv, init_scale) |
| init_linear(self.c_proj, init_scale) |
|
|
| def forward(self, x): |
| x = self.c_qkv(x) |
| x = checkpoint(self.attention, (x,), (), True) |
| x = self.c_proj(x) |
| return x |
|
|
|
|
| class QKVMultiheadAttention(nn.Module): |
| def __init__(self, *, device: torch.device, dtype: torch.dtype, heads: int, n_ctx: int, flash: bool = False): |
| super().__init__() |
| self.device = device |
| self.dtype = dtype |
| self.heads = heads |
| self.n_ctx = n_ctx |
| self.flash = flash |
|
|
| def forward(self, qkv): |
| bs, n_ctx, width = qkv.shape |
| attn_ch = width // self.heads // 3 |
| scale = 1 / math.sqrt(math.sqrt(attn_ch)) |
| qkv = qkv.view(bs, n_ctx, self.heads, -1) |
| q, k, v = torch.split(qkv, attn_ch, dim=-1) |
|
|
| if self.flash: |
| out = F.scaled_dot_product_attention(q, k, v) |
| else: |
| weight = torch.einsum( |
| "bthc,bshc->bhts", q * scale, k * scale |
| ) |
| wdtype = weight.dtype |
| weight = torch.softmax(weight.float(), dim=-1).type(wdtype) |
| out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) |
|
|
| return out |
|
|
|
|
| class ResidualAttentionBlock(nn.Module): |
| def __init__( |
| self, |
| *, |
| device: torch.device, |
| dtype: torch.dtype, |
| n_ctx: int, |
| width: int, |
| heads: int, |
| init_scale: float = 1.0, |
| qkv_bias: bool = True, |
| flash: bool = False, |
| use_checkpoint: bool = False |
| ): |
| super().__init__() |
|
|
| self.use_checkpoint = use_checkpoint |
|
|
| self.attn = MultiheadAttention( |
| device=device, |
| dtype=dtype, |
| n_ctx=n_ctx, |
| width=width, |
| heads=heads, |
| init_scale=init_scale, |
| qkv_bias=qkv_bias, |
| flash=flash |
| ) |
| self.ln_1 = nn.LayerNorm(width, device=device, dtype=dtype) |
| self.mlp = MLP(device=device, dtype=dtype, width=width, init_scale=init_scale) |
| self.ln_2 = nn.LayerNorm(width, device=device, dtype=dtype) |
|
|
| def _forward(self, x: torch.Tensor): |
| x = x + self.attn(self.ln_1(x)) |
| x = x + self.mlp(self.ln_2(x)) |
| return x |
|
|
| def forward(self, x: torch.Tensor): |
| return checkpoint(self._forward, (x,), self.parameters(), self.use_checkpoint) |
|
|
|
|
| class MultiheadCrossAttention(nn.Module): |
| def __init__( |
| self, |
| *, |
| device: torch.device, |
| dtype: torch.dtype, |
| width: int, |
| heads: int, |
| init_scale: float, |
| qkv_bias: bool = True, |
| flash: bool = False, |
| n_data: Optional[int] = None, |
| data_width: Optional[int] = None, |
| ): |
| super().__init__() |
| self.n_data = n_data |
| self.width = width |
| self.heads = heads |
| self.data_width = width if data_width is None else data_width |
| self.c_q = nn.Linear(width, width, bias=qkv_bias, device=device, dtype=dtype) |
| self.c_kv = nn.Linear(self.data_width, width * 2, bias=qkv_bias, device=device, dtype=dtype) |
| self.c_proj = nn.Linear(width, width, device=device, dtype=dtype) |
| self.attention = QKVMultiheadCrossAttention( |
| device=device, dtype=dtype, heads=heads, n_data=n_data, flash=flash |
| ) |
| init_linear(self.c_q, init_scale) |
| init_linear(self.c_kv, init_scale) |
| init_linear(self.c_proj, init_scale) |
|
|
| def forward(self, x, data): |
| x = self.c_q(x) |
| data = self.c_kv(data) |
| x = checkpoint(self.attention, (x, data), (), True) |
| x = self.c_proj(x) |
| return x |
|
|
|
|
| class QKVMultiheadCrossAttention(nn.Module): |
| def __init__(self, *, device: torch.device, dtype: torch.dtype, heads: int, |
| flash: bool = False, n_data: Optional[int] = None): |
|
|
| super().__init__() |
| self.device = device |
| self.dtype = dtype |
| self.heads = heads |
| self.n_data = n_data |
| self.flash = flash |
|
|
| def forward(self, q, kv): |
| _, n_ctx, _ = q.shape |
| bs, n_data, width = kv.shape |
| attn_ch = width // self.heads // 2 |
| scale = 1 / math.sqrt(math.sqrt(attn_ch)) |
| q = q.view(bs, n_ctx, self.heads, -1) |
| kv = kv.view(bs, n_data, self.heads, -1) |
| k, v = torch.split(kv, attn_ch, dim=-1) |
|
|
| if self.flash: |
| out = F.scaled_dot_product_attention(q, k, v) |
| else: |
| weight = torch.einsum( |
| "bthc,bshc->bhts", q * scale, k * scale |
| ) |
| wdtype = weight.dtype |
| weight = torch.softmax(weight.float(), dim=-1).type(wdtype) |
| out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) |
|
|
| return out |
|
|
|
|
| class ResidualCrossAttentionBlock(nn.Module): |
| def __init__( |
| self, |
| *, |
| device: Optional[torch.device], |
| dtype: Optional[torch.dtype], |
| n_data: Optional[int] = None, |
| width: int, |
| heads: int, |
| data_width: Optional[int] = None, |
| init_scale: float = 0.25, |
| qkv_bias: bool = True, |
| flash: bool = False |
| ): |
| super().__init__() |
|
|
| if data_width is None: |
| data_width = width |
|
|
| self.attn = MultiheadCrossAttention( |
| device=device, |
| dtype=dtype, |
| n_data=n_data, |
| width=width, |
| heads=heads, |
| data_width=data_width, |
| init_scale=init_scale, |
| qkv_bias=qkv_bias, |
| flash=flash, |
| ) |
| self.ln_1 = nn.LayerNorm(width, device=device, dtype=dtype) |
| self.ln_2 = nn.LayerNorm(data_width, device=device, dtype=dtype) |
| self.mlp = MLP(device=device, dtype=dtype, width=width, init_scale=init_scale) |
| self.ln_3 = nn.LayerNorm(width, device=device, dtype=dtype) |
|
|
| def forward(self, x: torch.Tensor, data: torch.Tensor): |
| x = x + self.attn(self.ln_1(x), self.ln_2(data)) |
| x = x + self.mlp(self.ln_3(x)) |
| return x |
|
|
|
|
| class MLP(nn.Module): |
| def __init__(self, *, |
| device: Optional[torch.device], |
| dtype: Optional[torch.dtype], |
| width: int, |
| init_scale: float): |
| super().__init__() |
| self.width = width |
| self.c_fc = nn.Linear(width, width * 4, device=device, dtype=dtype) |
| self.c_proj = nn.Linear(width * 4, width, device=device, dtype=dtype) |
| self.gelu = nn.GELU() |
| init_linear(self.c_fc, init_scale) |
| init_linear(self.c_proj, init_scale) |
|
|
| def forward(self, x): |
| return self.c_proj(self.gelu(self.c_fc(x))) |
|
|
|
|
| class Transformer(nn.Module): |
| def __init__( |
| self, |
| *, |
| device: Optional[torch.device], |
| dtype: Optional[torch.dtype], |
| n_ctx: int, |
| width: int, |
| layers: int, |
| heads: int, |
| init_scale: float = 0.25, |
| qkv_bias: bool = True, |
| flash: bool = False, |
| use_checkpoint: bool = False |
| ): |
| super().__init__() |
| self.n_ctx = n_ctx |
| self.width = width |
| self.layers = layers |
| self.resblocks = nn.ModuleList( |
| [ |
| ResidualAttentionBlock( |
| device=device, |
| dtype=dtype, |
| n_ctx=n_ctx, |
| width=width, |
| heads=heads, |
| init_scale=init_scale, |
| qkv_bias=qkv_bias, |
| flash=flash, |
| use_checkpoint=use_checkpoint |
| ) |
| for _ in range(layers) |
| ] |
| ) |
|
|
| def forward(self, x: torch.Tensor): |
| for block in self.resblocks: |
| x = block(x) |
| return x |
|
|