SpliceBERT-510nt / modeling_bert.py
Taykhoom's picture
Upload folder using huggingface_hub
37b9eb5 verified
import math
from typing import Optional, Tuple, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import PreTrainedModel, PretrainedConfig
from transformers.modeling_outputs import BaseModelOutputWithPooling, MaskedLMOutput
from .configuration_bert_updated import BertUpdatedConfig
class BertSelfAttention(nn.Module):
def __init__(self, config):
super().__init__()
self.num_attention_heads = config.num_attention_heads
self.attention_head_size = config.hidden_size // config.num_attention_heads
self.all_head_size = self.num_attention_heads * self.attention_head_size
self.query = nn.Linear(config.hidden_size, self.all_head_size)
self.key = nn.Linear(config.hidden_size, self.all_head_size)
self.value = nn.Linear(config.hidden_size, self.all_head_size)
self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
def _split_heads(self, x: torch.Tensor) -> torch.Tensor:
B, T, _ = x.shape
return x.view(B, T, self.num_attention_heads, self.attention_head_size).permute(0, 2, 1, 3)
def forward(
self,
hidden_states: torch.Tensor,
key_padding_mask: Optional[torch.Tensor] = None,
output_attentions: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
q = self._split_heads(self.query(hidden_states))
k = self._split_heads(self.key(hidden_states))
v = self._split_heads(self.value(hidden_states))
scale = math.sqrt(self.attention_head_size)
scores = torch.matmul(q, k.transpose(-1, -2)) / scale
if key_padding_mask is not None:
scores = scores.masked_fill(key_padding_mask[:, None, None, :], float("-inf"))
probs = F.softmax(scores, dim=-1)
probs = self.dropout(probs)
context = torch.matmul(probs, v)
B, _, T, _ = context.shape
context = context.permute(0, 2, 1, 3).contiguous().view(B, T, self.all_head_size)
if output_attentions:
return context, probs
return context, None
class BertSdpaSelfAttention(BertSelfAttention):
def forward(
self,
hidden_states: torch.Tensor,
key_padding_mask: Optional[torch.Tensor] = None,
output_attentions: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
if output_attentions:
return super().forward(hidden_states, key_padding_mask, output_attentions=True)
B, T, _ = hidden_states.shape
q = self._split_heads(self.query(hidden_states))
k = self._split_heads(self.key(hidden_states))
v = self._split_heads(self.value(hidden_states))
attn_mask = None
if key_padding_mask is not None:
attn_mask = torch.zeros(B, 1, 1, T, dtype=q.dtype, device=q.device)
attn_mask = attn_mask.masked_fill(key_padding_mask[:, None, None, :], float("-inf"))
context = F.scaled_dot_product_attention(q, k, v, attn_mask=attn_mask)
context = context.permute(0, 2, 1, 3).contiguous().view(B, T, self.all_head_size)
return context, None
class BertFlashSelfAttention(BertSelfAttention):
def forward(
self,
hidden_states: torch.Tensor,
key_padding_mask: Optional[torch.Tensor] = None,
output_attentions: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
if output_attentions:
return super().forward(hidden_states, key_padding_mask, output_attentions=True)
try:
from flash_attn import flash_attn_func, flash_attn_varlen_func
from flash_attn.bert_padding import pad_input, unpad_input
except ImportError as e:
raise ImportError(
"flash_attn is required for attn_implementation='flash_attention_2'. "
"Install with: pip install flash-attn --no-build-isolation"
) from e
B, T, _ = hidden_states.shape
q = self._split_heads(self.query(hidden_states)).permute(0, 2, 1, 3)
k = self._split_heads(self.key(hidden_states)).permute(0, 2, 1, 3)
v = self._split_heads(self.value(hidden_states)).permute(0, 2, 1, 3)
orig_dtype = q.dtype
if orig_dtype not in (torch.float16, torch.bfloat16):
q, k, v = q.to(torch.bfloat16), k.to(torch.bfloat16), v.to(torch.bfloat16)
if key_padding_mask is not None and key_padding_mask.any():
attend = ~key_padding_mask
q_u, indices, cu_seqlens, max_seqlen, _ = unpad_input(q, attend)
k_u, _, _, _, _ = unpad_input(k, attend)
v_u, _, _, _, _ = unpad_input(v, attend)
out_u = flash_attn_varlen_func(
q_u, k_u, v_u,
cu_seqlens_q=cu_seqlens, cu_seqlens_k=cu_seqlens,
max_seqlen_q=max_seqlen, max_seqlen_k=max_seqlen,
causal=False,
)
out = pad_input(out_u, indices, B, T)
else:
out = flash_attn_func(q, k, v, causal=False)
out = out.to(orig_dtype).reshape(B, T, self.all_head_size)
return out, None
BERT_SELF_ATTENTION_CLASSES = {
"eager": BertSelfAttention,
"sdpa": BertSdpaSelfAttention,
"flash_attention_2": BertFlashSelfAttention,
}
class BertSelfOutput(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
hidden_states = self.dropout(self.dense(hidden_states))
return self.LayerNorm(hidden_states + input_tensor)
class BertAttention(nn.Module):
def __init__(self, config):
super().__init__()
attn_cls = BERT_SELF_ATTENTION_CLASSES[getattr(config, "_attn_implementation", "eager")]
self.self = attn_cls(config)
self.output = BertSelfOutput(config)
def forward(
self,
hidden_states: torch.Tensor,
key_padding_mask: Optional[torch.Tensor],
output_attentions: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
self_out, attn_weights = self.self(hidden_states, key_padding_mask, output_attentions)
return self.output(self_out, hidden_states), attn_weights
class BertIntermediate(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
return F.gelu(self.dense(hidden_states))
class BertOutput(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
hidden_states = self.dropout(self.dense(hidden_states))
return self.LayerNorm(hidden_states + input_tensor)
class BertLayer(nn.Module):
def __init__(self, config):
super().__init__()
self.attention = BertAttention(config)
self.intermediate = BertIntermediate(config)
self.output = BertOutput(config)
def forward(
self,
hidden_states: torch.Tensor,
key_padding_mask: Optional[torch.Tensor],
output_attentions: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
attn_out, attn_weights = self.attention(hidden_states, key_padding_mask, output_attentions)
return self.output(self.intermediate(attn_out), attn_out), attn_weights
class BertEncoder(nn.Module):
def __init__(self, config):
super().__init__()
self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])
def forward(
self,
hidden_states: torch.Tensor,
key_padding_mask: Optional[torch.Tensor],
output_hidden_states: bool = False,
output_attentions: bool = False,
) -> Tuple:
all_hidden_states = (hidden_states,) if output_hidden_states else None
all_attentions = () if output_attentions else None
for layer in self.layer:
hidden_states, attn_weights = layer(hidden_states, key_padding_mask, output_attentions)
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
if output_attentions:
all_attentions = all_attentions + (attn_weights,)
return hidden_states, all_hidden_states, all_attentions
class BertEmbeddings(nn.Module):
def __init__(self, config):
super().__init__()
self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size)
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.register_buffer("position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)), persistent=False)
def forward(self, input_ids: torch.LongTensor, token_type_ids: Optional[torch.LongTensor] = None) -> torch.Tensor:
B, T = input_ids.shape
if token_type_ids is None:
token_type_ids = torch.zeros_like(input_ids)
x = self.word_embeddings(input_ids)
x = x + self.position_embeddings(self.position_ids[:, :T])
x = x + self.token_type_embeddings(token_type_ids)
return self.dropout(self.LayerNorm(x))
class BertPooler(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.activation = nn.Tanh()
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
return self.activation(self.dense(hidden_states[:, 0]))
class BertPredictionHeadTransform(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
return self.LayerNorm(F.gelu(self.dense(hidden_states)))
class BertModel(PreTrainedModel):
config_class = BertUpdatedConfig
base_model_prefix = "bert"
_supports_sdpa = True
_supports_flash_attn_2 = True
def __init__(self, config):
super().__init__(config)
self.embeddings = BertEmbeddings(config)
self.encoder = BertEncoder(config)
self.pooler = BertPooler(config)
self.post_init()
def get_input_embeddings(self):
return self.embeddings.word_embeddings
def set_input_embeddings(self, value):
self.embeddings.word_embeddings = value
def forward(
self,
input_ids: torch.LongTensor,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
output_hidden_states: Optional[bool] = None,
output_attentions: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, BaseModelOutputWithPooling]:
output_hidden_states = output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if attention_mask is None:
attention_mask = torch.ones_like(input_ids)
key_padding_mask = attention_mask.eq(0)
if not key_padding_mask.any():
key_padding_mask = None
x = self.embeddings(input_ids, token_type_ids)
last_hidden_state, all_hidden_states, all_attentions = self.encoder(
x, key_padding_mask,
output_hidden_states=output_hidden_states,
output_attentions=output_attentions,
)
pooled = self.pooler(last_hidden_state)
if not return_dict:
return tuple(v for v in [last_hidden_state, pooled, all_hidden_states, all_attentions] if v is not None)
return BaseModelOutputWithPooling(
last_hidden_state=last_hidden_state,
pooler_output=pooled,
hidden_states=all_hidden_states,
attentions=all_attentions,
)
class BertForMaskedLM(PreTrainedModel):
config_class = BertUpdatedConfig
base_model_prefix = "bert"
_supports_sdpa = True
_supports_flash_attn_2 = True
def __init__(self, config):
super().__init__(config)
self.bert = BertModel(config)
self.transform = BertPredictionHeadTransform(config)
self.cls = nn.Linear(config.hidden_size, config.vocab_size)
self.post_init()
def get_input_embeddings(self):
return self.bert.embeddings.word_embeddings
def forward(
self,
input_ids: torch.LongTensor,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
labels: Optional[torch.LongTensor] = None,
output_hidden_states: Optional[bool] = None,
output_attentions: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, MaskedLMOutput]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.bert(
input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids,
output_hidden_states=output_hidden_states, output_attentions=output_attentions,
return_dict=True,
)
logits = self.cls(self.transform(outputs.last_hidden_state))
loss = None
if labels is not None:
loss = F.cross_entropy(logits.view(-1, self.config.vocab_size), labels.view(-1), ignore_index=-100)
if not return_dict:
output = (logits,) + outputs[2:]
return (loss,) + output if loss is not None else output
return MaskedLMOutput(
loss=loss, logits=logits,
hidden_states=outputs.hidden_states, attentions=outputs.attentions,
)