Spaces:
Sleeping
Sleeping
| # Copyright (c) 2022, Tri Dao. | |
| import torch | |
| import torch.nn as nn | |
| from einops import rearrange | |
| from torch import Tensor | |
| from flash_attn.utils.distributed import all_reduce, reduce_scatter | |
| class GPT2Embeddings(nn.Module): | |
| def __init__( | |
| self, | |
| embed_dim, | |
| vocab_size, | |
| max_position_embeddings, | |
| padding_idx=None, | |
| word_embed_proj_dim=None, | |
| device=None, | |
| dtype=None, | |
| ): | |
| """ | |
| If max_position_embeddings <= 0, there's no position embeddings | |
| If word_embe_proj_dim is not None (e.g., OPT-350m), we embed to that dimension | |
| the project up to embed_dim | |
| """ | |
| factory_kwargs = {"device": device, "dtype": dtype} | |
| super().__init__() | |
| if word_embed_proj_dim is None: | |
| self.word_embeddings = nn.Embedding( | |
| vocab_size, embed_dim, padding_idx=padding_idx, **factory_kwargs | |
| ) | |
| self.project_in = None | |
| else: | |
| self.word_embeddings = nn.Embedding( | |
| vocab_size, word_embed_proj_dim, padding_idx=padding_idx, **factory_kwargs | |
| ) | |
| self.project_in = nn.Linear( | |
| word_embed_proj_dim, embed_dim, bias=False, **factory_kwargs | |
| ) | |
| self.max_position_embeddings = max_position_embeddings | |
| if self.max_position_embeddings > 0: | |
| self.position_embeddings = nn.Embedding( | |
| max_position_embeddings, embed_dim, **factory_kwargs | |
| ) | |
| def forward(self, input_ids, position_ids=None): | |
| """ | |
| input_ids: (batch, seqlen) | |
| position_ids: (batch, seqlen) | |
| """ | |
| batch_size, seqlen = input_ids.shape | |
| embeddings = self.word_embeddings(input_ids) | |
| if self.project_in is not None: | |
| embeddings = self.project_in(embeddings) | |
| if self.max_position_embeddings > 0: | |
| if position_ids is None: | |
| position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device) | |
| position_embeddings = self.position_embeddings(position_ids) | |
| embeddings = embeddings + position_embeddings | |
| return embeddings | |
| class BertEmbeddings(nn.Module): | |
| def __init__( | |
| self, | |
| embed_dim, | |
| vocab_size, | |
| max_position_embeddings, | |
| type_vocab_size, | |
| padding_idx=None, | |
| device=None, | |
| dtype=None, | |
| ): | |
| """ | |
| If max_position_embeddings <= 0, there's no position embeddings | |
| If type_vocab_size <= 0, there's no token type embeddings | |
| """ | |
| factory_kwargs = {"device": device, "dtype": dtype} | |
| super().__init__() | |
| self.word_embeddings = nn.Embedding( | |
| vocab_size, embed_dim, padding_idx=padding_idx, **factory_kwargs | |
| ) | |
| self.max_position_embeddings = max_position_embeddings | |
| self.type_vocab_size = type_vocab_size | |
| if self.max_position_embeddings > 0: | |
| self.position_embeddings = nn.Embedding( | |
| max_position_embeddings, embed_dim, **factory_kwargs | |
| ) | |
| if self.type_vocab_size > 0: | |
| self.token_type_embeddings = nn.Embedding(type_vocab_size, embed_dim, **factory_kwargs) | |
| def forward(self, input_ids, position_ids=None, token_type_ids=None): | |
| """ | |
| input_ids: (batch, seqlen) | |
| position_ids: (batch, seqlen) | |
| token_type_ids: (batch, seqlen) | |
| """ | |
| batch_size, seqlen = input_ids.shape | |
| embeddings = self.word_embeddings(input_ids) | |
| if self.max_position_embeddings > 0: | |
| if position_ids is None: | |
| position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device) | |
| position_embeddings = self.position_embeddings(position_ids) | |
| embeddings = embeddings + position_embeddings | |
| if self.type_vocab_size > 0: | |
| if token_type_ids is None: | |
| token_type_ids = torch.zeros(seqlen, dtype=torch.long, device=input_ids.device) | |
| token_type_embeddings = self.token_type_embeddings(token_type_ids) | |
| embeddings = embeddings + token_type_embeddings | |
| return embeddings | |
| class VocabParallelEmbedding(nn.Embedding): | |
| def __init__(self, num_embeddings, *args, process_group=None, padding_idx=None, **kwargs): | |
| self.process_group = process_group | |
| if process_group is not None: | |
| world_size = torch.distributed.get_world_size(process_group) | |
| if num_embeddings % world_size != 0: | |
| raise ValueError( | |
| f"num_embeddings ({num_embeddings}) must be divisible by " | |
| f"world_size ({world_size})" | |
| ) | |
| if world_size > 1 and padding_idx is not None: | |
| raise RuntimeError("ParallelEmbedding does not support padding_idx") | |
| else: | |
| world_size = 1 | |
| super().__init__(num_embeddings // world_size, *args, padding_idx=padding_idx, **kwargs) | |
| def forward(self, input: Tensor) -> Tensor: | |
| if self.process_group is None: | |
| return super().forward(input) | |
| else: | |
| rank = torch.distributed.get_rank(self.process_group) | |
| vocab_size = self.num_embeddings | |
| vocab_start_index, vocab_end_index = rank * vocab_size, (rank + 1) * vocab_size | |
| # Create a mask of valid vocab ids (1 means it needs to be masked). | |
| input_ids_mask = (input < vocab_start_index) | (input >= vocab_end_index) | |
| input = input - vocab_start_index | |
| input[input_ids_mask] = 0 | |
| embeddings = super().forward(input) | |
| embeddings[input_ids_mask] = 0.0 | |
| return embeddings | |
| class ColumnParallelEmbedding(nn.Embedding): | |
| def __init__(self, num_embeddings, embedding_dim, *args, process_group=None, **kwargs): | |
| self.process_group = process_group | |
| if process_group is not None: | |
| world_size = torch.distributed.get_world_size(process_group) | |
| if embedding_dim % world_size != 0: | |
| raise ValueError( | |
| f"embedding_dim ({embedding_dim}) must be divisible by " | |
| f"world_size ({world_size})" | |
| ) | |
| else: | |
| world_size = 1 | |
| super().__init__(num_embeddings, embedding_dim // world_size, *args, **kwargs) | |
| class ParallelGPT2Embeddings(nn.Module): | |
| def __init__( | |
| self, | |
| embed_dim, | |
| vocab_size, | |
| max_position_embeddings, | |
| process_group, | |
| padding_idx=None, | |
| sequence_parallel=True, | |
| device=None, | |
| dtype=None, | |
| ): | |
| """ | |
| If max_position_embeddings <= 0, there's no position embeddings | |
| """ | |
| factory_kwargs = {"device": device, "dtype": dtype} | |
| super().__init__() | |
| self.process_group = process_group | |
| self.sequence_parallel = sequence_parallel | |
| self.word_embeddings = VocabParallelEmbedding( | |
| vocab_size, | |
| embed_dim, | |
| padding_idx=padding_idx, | |
| process_group=process_group, | |
| **factory_kwargs, | |
| ) | |
| self.max_position_embeddings = max_position_embeddings | |
| if self.max_position_embeddings > 0: | |
| self.position_embeddings = ColumnParallelEmbedding( | |
| max_position_embeddings, embed_dim, process_group=process_group, **factory_kwargs | |
| ) | |
| def forward(self, input_ids, position_ids=None, combine_batch_seqlen_dim=False): | |
| """ | |
| input_ids: (batch, seqlen) | |
| position_ids: (batch, seqlen) | |
| """ | |
| batch_size, seqlen = input_ids.shape | |
| world_size = torch.distributed.get_world_size(self.process_group) | |
| embeddings = self.word_embeddings(input_ids) | |
| if self.max_position_embeddings > 0: | |
| if position_ids is None: | |
| position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device) | |
| position_embeddings = self.position_embeddings(position_ids) | |
| if world_size <= 1: | |
| embeddings = embeddings + position_embeddings | |
| else: | |
| partition_dim = self.position_embeddings.embedding_dim | |
| rank = torch.distributed.get_rank(self.process_group) | |
| embeddings[ | |
| ..., rank * partition_dim : (rank + 1) * partition_dim | |
| ] += position_embeddings | |
| if combine_batch_seqlen_dim: | |
| embeddings = rearrange(embeddings, "b s d -> (b s) d") | |
| reduce_fn = reduce_scatter if self.sequence_parallel else all_reduce | |
| return embeddings if world_size <= 1 else reduce_fn(embeddings, self.process_group) | |