| """ |
| Based on: https://github.com/lucidrains/flamingo-pytorch |
| """ |
|
|
| import torch |
| from einops import rearrange, repeat |
| from einops_exts import rearrange_many |
| from torch import einsum, nn |
|
|
|
|
| def exists(val): |
| return val is not None |
|
|
|
|
| def FeedForward(dim, mult=4): |
| inner_dim = int(dim * mult) |
| return nn.Sequential( |
| nn.LayerNorm(dim), |
| nn.Linear(dim, inner_dim, bias=False), |
| nn.GELU(), |
| nn.Linear(inner_dim, dim, bias=False), |
| ) |
|
|
|
|
| class PerceiverAttention(nn.Module): |
| def __init__(self, *, dim, dim_head=64, heads=8): |
| super().__init__() |
| self.scale = dim_head**-0.5 |
| self.heads = heads |
| inner_dim = dim_head * heads |
|
|
| self.norm_media = nn.LayerNorm(dim) |
| self.norm_latents = nn.LayerNorm(dim) |
|
|
| self.to_q = nn.Linear(dim, inner_dim, bias=False) |
| self.to_kv = nn.Linear(dim, inner_dim * 2, bias=False) |
| self.to_out = nn.Linear(inner_dim, dim, bias=False) |
|
|
| def forward(self, x, latents): |
| """ |
| Args: |
| x (torch.Tensor): image features |
| shape (b, T, n1, D) |
| latent (torch.Tensor): latent features |
| shape (b, T, n2, D) |
| """ |
| x = self.norm_media(x) |
| latents = self.norm_latents(latents) |
|
|
| h = self.heads |
|
|
| q = self.to_q(latents) |
| kv_input = torch.cat((x, latents), dim=-2) |
| k, v = self.to_kv(kv_input).chunk(2, dim=-1) |
| q, k, v = rearrange_many((q, k, v), "b t n (h d) -> b h t n d", h=h) |
| q = q * self.scale |
|
|
| |
| sim = einsum("... i d, ... j d -> ... i j", q, k) |
| sim = sim - sim.amax(dim=-1, keepdim=True).detach() |
| attn = sim.softmax(dim=-1) |
|
|
| out = einsum("... i j, ... j d -> ... i d", attn, v) |
| out = rearrange(out, "b h t n d -> b t n (h d)", h=h) |
| return self.to_out(out) |
|
|
|
|
| class PerceiverResampler(nn.Module): |
| def __init__( |
| self, |
| *, |
| dim, |
| depth=6, |
| dim_head=64, |
| heads=8, |
| num_latents=64, |
| max_num_media=None, |
| max_num_frames=None, |
| ff_mult=4, |
| ): |
| super().__init__() |
| self.latents = nn.Parameter(torch.randn(num_latents, dim)) |
| self.frame_embs = ( |
| nn.Parameter(torch.randn(max_num_frames, dim)) |
| if exists(max_num_frames) |
| else None |
| ) |
| self.media_time_embs = ( |
| nn.Parameter(torch.randn(max_num_media, 1, dim)) |
| if exists(max_num_media) |
| else None |
| ) |
|
|
| self.layers = nn.ModuleList([]) |
| for _ in range(depth): |
| self.layers.append( |
| nn.ModuleList( |
| [ |
| PerceiverAttention(dim=dim, dim_head=dim_head, heads=heads), |
| FeedForward(dim=dim, mult=ff_mult), |
| ] |
| ) |
| ) |
|
|
| self.norm = nn.LayerNorm(dim) |
|
|
| def forward(self, x): |
| """ |
| Args: |
| x (torch.Tensor): image features |
| shape (b, T, F, v, D) |
| Returns: |
| shape (b, T, n, D) where n is self.num_latents |
| """ |
| b, T, F, v = x.shape[:4] |
|
|
| |
| if exists(self.frame_embs): |
| frame_embs = repeat(self.frame_embs[:F], "F d -> b T F v d", b=b, T=T, v=v) |
| x = x + frame_embs |
| x = rearrange( |
| x, "b T F v d -> b T (F v) d" |
| ) |
| if exists(self.media_time_embs): |
| x = x + self.media_time_embs[:T] |
|
|
| |
| latents = repeat(self.latents, "n d -> b T n d", b=b, T=T) |
| for attn, ff in self.layers: |
| latents = attn(x, latents) + latents |
| latents = ff(latents) + latents |
| return self.norm(latents) |
|
|
|
|
| |
| class MaskedCrossAttention(nn.Module): |
| def __init__( |
| self, |
| *, |
| dim, |
| dim_visual, |
| dim_head=64, |
| heads=8, |
| only_attend_immediate_media=True, |
| ): |
| super().__init__() |
| self.scale = dim_head**-0.5 |
| self.heads = heads |
| inner_dim = dim_head * heads |
|
|
| self.norm = nn.LayerNorm(dim) |
|
|
| self.to_q = nn.Linear(dim, inner_dim, bias=False) |
| self.to_kv = nn.Linear(dim_visual, inner_dim * 2, bias=False) |
| self.to_out = nn.Linear(inner_dim, dim, bias=False) |
|
|
| |
| self.only_attend_immediate_media = only_attend_immediate_media |
|
|
| def forward(self, x, media, media_locations=None, use_cached_media=False): |
| """ |
| Args: |
| x (torch.Tensor): text features |
| shape (B, T_txt, D_txt) |
| media (torch.Tensor): image features |
| shape (B, T_img, n, D_img) where n is the dim of the latents |
| media_locations: boolean mask identifying the media tokens in x |
| shape (B, T_txt) |
| use_cached_media: bool |
| If true, treat all of x as if they occur after the last media |
| registered in media_locations. T_txt does not need to exactly |
| equal media_locations.shape[1] in this case |
| """ |
|
|
| if not use_cached_media: |
| assert ( |
| media_locations.shape[1] == x.shape[1] |
| ), f"media_location.shape is {media_locations.shape} but x.shape is {x.shape}" |
|
|
| T_txt = x.shape[1] |
| _, T_img, n = media.shape[:3] |
| h = self.heads |
|
|
| x = self.norm(x) |
|
|
| q = self.to_q(x) |
| media = rearrange(media, "b t n d -> b (t n) d") |
|
|
| k, v = self.to_kv(media).chunk(2, dim=-1) |
| q, k, v = rearrange_many((q, k, v), "b n (h d) -> b h n d", h=h) |
|
|
| q = q * self.scale |
|
|
| sim = einsum("... i d, ... j d -> ... i j", q, k) |
|
|
| if exists(media_locations): |
| media_time = torch.arange(T_img, device=x.device) + 1 |
|
|
| if use_cached_media: |
| |
| text_time = repeat( |
| torch.count_nonzero(media_locations, dim=1), |
| "b -> b i", |
| i=T_txt, |
| ) |
| else: |
| |
| text_time = media_locations.cumsum(dim=-1) |
|
|
| |
| |
| mask_op = torch.eq if self.only_attend_immediate_media else torch.ge |
|
|
| text_to_media_mask = mask_op( |
| rearrange(text_time, "b i -> b 1 i 1"), |
| repeat(media_time, "j -> 1 1 1 (j n)", n=n), |
| ) |
| sim = sim.masked_fill(~text_to_media_mask, -torch.finfo(sim.dtype).max) |
|
|
| sim = sim - sim.amax(dim=-1, keepdim=True).detach() |
| attn = sim.softmax(dim=-1) |
|
|
| if exists(media_locations) and self.only_attend_immediate_media: |
| |
| text_without_media_mask = text_time == 0 |
| text_without_media_mask = rearrange( |
| text_without_media_mask, "b i -> b 1 i 1" |
| ) |
| attn = attn.masked_fill(text_without_media_mask, 0.0) |
|
|
| out = einsum("... i j, ... j d -> ... i d", attn, v) |
| out = rearrange(out, "b h n d -> b n (h d)") |
| return self.to_out(out) |
|
|
|
|
| class GatedCrossAttentionBlock(nn.Module): |
| def __init__( |
| self, |
| *, |
| dim, |
| dim_visual, |
| dim_head=64, |
| heads=8, |
| ff_mult=4, |
| only_attend_immediate_media=True, |
| ): |
| super().__init__() |
| self.attn = MaskedCrossAttention( |
| dim=dim, |
| dim_visual=dim_visual, |
| dim_head=dim_head, |
| heads=heads, |
| only_attend_immediate_media=only_attend_immediate_media, |
| ) |
| self.attn_gate = nn.Parameter(torch.tensor([0.0])) |
|
|
| self.ff = FeedForward(dim, mult=ff_mult) |
| self.ff_gate = nn.Parameter(torch.tensor([0.0])) |
|
|
| def forward( |
| self, |
| x, |
| media, |
| media_locations=None, |
| use_cached_media=False, |
| ): |
| x = ( |
| self.attn( |
| x, |
| media, |
| media_locations=media_locations, |
| use_cached_media=use_cached_media, |
| ) |
| * self.attn_gate.tanh() |
| + x |
| ) |
| x = self.ff(x) * self.ff_gate.tanh() + x |
|
|
| return x |
|
|