import math from typing import Optional, Tuple, Union import torch import torch.nn as nn import torch.nn.functional as F from transformers import PreTrainedModel, PretrainedConfig from transformers.modeling_outputs import BaseModelOutputWithPooling, MaskedLMOutput from .configuration_bert_updated import BertUpdatedConfig class BertSelfAttention(nn.Module): def __init__(self, config): super().__init__() self.num_attention_heads = config.num_attention_heads self.attention_head_size = config.hidden_size // config.num_attention_heads self.all_head_size = self.num_attention_heads * self.attention_head_size self.query = nn.Linear(config.hidden_size, self.all_head_size) self.key = nn.Linear(config.hidden_size, self.all_head_size) self.value = nn.Linear(config.hidden_size, self.all_head_size) self.dropout = nn.Dropout(config.attention_probs_dropout_prob) def _split_heads(self, x: torch.Tensor) -> torch.Tensor: B, T, _ = x.shape return x.view(B, T, self.num_attention_heads, self.attention_head_size).permute(0, 2, 1, 3) def forward( self, hidden_states: torch.Tensor, key_padding_mask: Optional[torch.Tensor] = None, output_attentions: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: q = self._split_heads(self.query(hidden_states)) k = self._split_heads(self.key(hidden_states)) v = self._split_heads(self.value(hidden_states)) scale = math.sqrt(self.attention_head_size) scores = torch.matmul(q, k.transpose(-1, -2)) / scale if key_padding_mask is not None: scores = scores.masked_fill(key_padding_mask[:, None, None, :], float("-inf")) probs = F.softmax(scores, dim=-1) probs = self.dropout(probs) context = torch.matmul(probs, v) B, _, T, _ = context.shape context = context.permute(0, 2, 1, 3).contiguous().view(B, T, self.all_head_size) if output_attentions: return context, probs return context, None class BertSdpaSelfAttention(BertSelfAttention): def forward( self, hidden_states: torch.Tensor, key_padding_mask: Optional[torch.Tensor] = None, output_attentions: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: if output_attentions: return super().forward(hidden_states, key_padding_mask, output_attentions=True) B, T, _ = hidden_states.shape q = self._split_heads(self.query(hidden_states)) k = self._split_heads(self.key(hidden_states)) v = self._split_heads(self.value(hidden_states)) attn_mask = None if key_padding_mask is not None: attn_mask = torch.zeros(B, 1, 1, T, dtype=q.dtype, device=q.device) attn_mask = attn_mask.masked_fill(key_padding_mask[:, None, None, :], float("-inf")) context = F.scaled_dot_product_attention(q, k, v, attn_mask=attn_mask) context = context.permute(0, 2, 1, 3).contiguous().view(B, T, self.all_head_size) return context, None class BertFlashSelfAttention(BertSelfAttention): def forward( self, hidden_states: torch.Tensor, key_padding_mask: Optional[torch.Tensor] = None, output_attentions: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: if output_attentions: return super().forward(hidden_states, key_padding_mask, output_attentions=True) try: from flash_attn import flash_attn_func, flash_attn_varlen_func from flash_attn.bert_padding import pad_input, unpad_input except ImportError as e: raise ImportError( "flash_attn is required for attn_implementation='flash_attention_2'. " "Install with: pip install flash-attn --no-build-isolation" ) from e B, T, _ = hidden_states.shape q = self._split_heads(self.query(hidden_states)).permute(0, 2, 1, 3) k = self._split_heads(self.key(hidden_states)).permute(0, 2, 1, 3) v = self._split_heads(self.value(hidden_states)).permute(0, 2, 1, 3) orig_dtype = q.dtype if orig_dtype not in (torch.float16, torch.bfloat16): q, k, v = q.to(torch.bfloat16), k.to(torch.bfloat16), v.to(torch.bfloat16) if key_padding_mask is not None and key_padding_mask.any(): attend = ~key_padding_mask q_u, indices, cu_seqlens, max_seqlen, _ = unpad_input(q, attend) k_u, _, _, _, _ = unpad_input(k, attend) v_u, _, _, _, _ = unpad_input(v, attend) out_u = flash_attn_varlen_func( q_u, k_u, v_u, cu_seqlens_q=cu_seqlens, cu_seqlens_k=cu_seqlens, max_seqlen_q=max_seqlen, max_seqlen_k=max_seqlen, causal=False, ) out = pad_input(out_u, indices, B, T) else: out = flash_attn_func(q, k, v, causal=False) out = out.to(orig_dtype).reshape(B, T, self.all_head_size) return out, None BERT_SELF_ATTENTION_CLASSES = { "eager": BertSelfAttention, "sdpa": BertSdpaSelfAttention, "flash_attention_2": BertFlashSelfAttention, } class BertSelfOutput(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor: hidden_states = self.dropout(self.dense(hidden_states)) return self.LayerNorm(hidden_states + input_tensor) class BertAttention(nn.Module): def __init__(self, config): super().__init__() attn_cls = BERT_SELF_ATTENTION_CLASSES[getattr(config, "_attn_implementation", "eager")] self.self = attn_cls(config) self.output = BertSelfOutput(config) def forward( self, hidden_states: torch.Tensor, key_padding_mask: Optional[torch.Tensor], output_attentions: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: self_out, attn_weights = self.self(hidden_states, key_padding_mask, output_attentions) return self.output(self_out, hidden_states), attn_weights class BertIntermediate(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.intermediate_size) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: return F.gelu(self.dense(hidden_states)) class BertOutput(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.intermediate_size, config.hidden_size) self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor: hidden_states = self.dropout(self.dense(hidden_states)) return self.LayerNorm(hidden_states + input_tensor) class BertLayer(nn.Module): def __init__(self, config): super().__init__() self.attention = BertAttention(config) self.intermediate = BertIntermediate(config) self.output = BertOutput(config) def forward( self, hidden_states: torch.Tensor, key_padding_mask: Optional[torch.Tensor], output_attentions: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: attn_out, attn_weights = self.attention(hidden_states, key_padding_mask, output_attentions) return self.output(self.intermediate(attn_out), attn_out), attn_weights class BertEncoder(nn.Module): def __init__(self, config): super().__init__() self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)]) def forward( self, hidden_states: torch.Tensor, key_padding_mask: Optional[torch.Tensor], output_hidden_states: bool = False, output_attentions: bool = False, ) -> Tuple: all_hidden_states = (hidden_states,) if output_hidden_states else None all_attentions = () if output_attentions else None for layer in self.layer: hidden_states, attn_weights = layer(hidden_states, key_padding_mask, output_attentions) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if output_attentions: all_attentions = all_attentions + (attn_weights,) return hidden_states, all_hidden_states, all_attentions class BertEmbeddings(nn.Module): def __init__(self, config): super().__init__() self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id) self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size) self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.register_buffer("position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)), persistent=False) def forward(self, input_ids: torch.LongTensor, token_type_ids: Optional[torch.LongTensor] = None) -> torch.Tensor: B, T = input_ids.shape if token_type_ids is None: token_type_ids = torch.zeros_like(input_ids) x = self.word_embeddings(input_ids) x = x + self.position_embeddings(self.position_ids[:, :T]) x = x + self.token_type_embeddings(token_type_ids) return self.dropout(self.LayerNorm(x)) class BertPooler(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.activation = nn.Tanh() def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: return self.activation(self.dense(hidden_states[:, 0])) class BertPredictionHeadTransform(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: return self.LayerNorm(F.gelu(self.dense(hidden_states))) class BertModel(PreTrainedModel): config_class = BertUpdatedConfig base_model_prefix = "bert" _supports_sdpa = True _supports_flash_attn_2 = True def __init__(self, config): super().__init__(config) self.embeddings = BertEmbeddings(config) self.encoder = BertEncoder(config) self.pooler = BertPooler(config) self.post_init() def get_input_embeddings(self): return self.embeddings.word_embeddings def set_input_embeddings(self, value): self.embeddings.word_embeddings = value def forward( self, input_ids: torch.LongTensor, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.LongTensor] = None, output_hidden_states: Optional[bool] = None, output_attentions: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, BaseModelOutputWithPooling]: output_hidden_states = output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions return_dict = return_dict if return_dict is not None else self.config.use_return_dict if attention_mask is None: attention_mask = torch.ones_like(input_ids) key_padding_mask = attention_mask.eq(0) if not key_padding_mask.any(): key_padding_mask = None x = self.embeddings(input_ids, token_type_ids) last_hidden_state, all_hidden_states, all_attentions = self.encoder( x, key_padding_mask, output_hidden_states=output_hidden_states, output_attentions=output_attentions, ) pooled = self.pooler(last_hidden_state) if not return_dict: return tuple(v for v in [last_hidden_state, pooled, all_hidden_states, all_attentions] if v is not None) return BaseModelOutputWithPooling( last_hidden_state=last_hidden_state, pooler_output=pooled, hidden_states=all_hidden_states, attentions=all_attentions, ) class BertForMaskedLM(PreTrainedModel): config_class = BertUpdatedConfig base_model_prefix = "bert" _supports_sdpa = True _supports_flash_attn_2 = True def __init__(self, config): super().__init__(config) self.bert = BertModel(config) self.transform = BertPredictionHeadTransform(config) self.cls = nn.Linear(config.hidden_size, config.vocab_size) self.post_init() def get_input_embeddings(self): return self.bert.embeddings.word_embeddings def forward( self, input_ids: torch.LongTensor, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.LongTensor] = None, labels: Optional[torch.LongTensor] = None, output_hidden_states: Optional[bool] = None, output_attentions: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, MaskedLMOutput]: return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.bert( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, output_hidden_states=output_hidden_states, output_attentions=output_attentions, return_dict=True, ) logits = self.cls(self.transform(outputs.last_hidden_state)) loss = None if labels is not None: loss = F.cross_entropy(logits.view(-1, self.config.vocab_size), labels.view(-1), ignore_index=-100) if not return_dict: output = (logits,) + outputs[2:] return (loss,) + output if loss is not None else output return MaskedLMOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, )