| from typing import Optional |
|
|
| import torch |
|
|
|
|
| def _make_causal_mask( |
| attention_mask: torch.Tensor, dtype: torch.dtype, device: torch.device |
| ): |
| """ |
| Make causal mask used for bi-directional self-attention. |
| """ |
| bsz, tgt_len = attention_mask.shape |
| mask = torch.full((tgt_len, tgt_len), torch.tensor(torch.finfo(dtype).min, device=device), device=device) |
| mask_cond = torch.arange(mask.size(-1), device=device) |
| mask.masked_fill_(mask_cond < (mask_cond + 1).view(mask.size(-1), 1), 0) |
| mask = mask.to(dtype) |
|
|
| return mask[None, None, :, :].expand(bsz, 1, tgt_len, tgt_len) |
|
|
|
|
| def _make_2dvison_mask(column_mask, dtype: torch.dtype, device: torch.device): |
| """ |
| """ |
| bsz, seq_length = column_mask.shape |
| cross_mask = torch.zeros((bsz, 1, seq_length, seq_length), dtype=dtype, device=device) |
|
|
| |
| start = None |
| for bsz_idx in range(bsz): |
| for i in range(seq_length): |
| if column_mask[bsz_idx, i] == 1: |
| if start is None: |
| start = i |
| else: |
| if start is not None: |
| |
| cross_mask[bsz_idx, 0, start:i, start:i] = 1 |
| start = None |
|
|
| |
| if start is not None: |
| cross_mask[bsz_idx, 0, start:seq_length, start:seq_length] = 1 |
|
|
| return cross_mask |
|
|
|
|
| def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None): |
| """ |
| Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`. |
| """ |
| bsz, src_len = mask.size() |
| tgt_len = tgt_len if tgt_len is not None else src_len |
|
|
| expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype) |
|
|
| inverted_mask = 1.0 - expanded_mask |
|
|
| return inverted_mask.masked_fill_(inverted_mask.to(torch.bool), torch.finfo(dtype).min) |
|
|
|
|
| def make_mask(attention_mask: torch.Tensor, dtype: torch.dtype=None, device: torch.device=None, mode: str="default", vision_mask: torch.Tensor=None, ): |
| if dtype is None: |
| dtype = attention_mask.dtype |
| if device is None: |
| device = attention_mask.device |
| expanded_attn_mask = _expand_mask(attention_mask, dtype).to(device) |
| causal_mask = _make_causal_mask(attention_mask, dtype, device).to(device) |
| if mode == "default": |
| return attention_mask |
| else: |
| assert vision_mask is not None, "vision_mask is None" |
| vision_mask = vision_mask.to(device) |
| bsz, seq_length = attention_mask.shape |
| vision_mask_bg = vision_mask[:, None, :, None] |
| vision_mask_2d = _make_2dvison_mask(vision_mask, dtype, device) |
| if mode == "bidirectional": |
| mask = expanded_attn_mask + causal_mask |
| mask = mask.clone().masked_fill_(vision_mask_2d.to(torch.bool), 0) |
| return mask |
| else: |
| raise NotImplementedError(f"mode {mode} is not implemented") |
|
|