|
|
from .configuration_transformer import TransformerConfig |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import init |
|
|
import torch.nn.functional as F |
|
|
from torch.nn.parameter import Parameter |
|
|
from torch import nn |
|
|
from typing import Tuple, List |
|
|
from itertools import chain |
|
|
from transformers.modeling_outputs import ( |
|
|
BaseModelOutput, |
|
|
MaskedLMOutput, |
|
|
CausalLMOutput, |
|
|
) |
|
|
from torch.utils.checkpoint import checkpoint |
|
|
from transformers.modeling_utils import PreTrainedModel |
|
|
import math |
|
|
try: |
|
|
from flash_attn import flash_attn_varlen_func, flash_attn_func |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
|
|
|
A_LARGE_NEGATIVE_NUMER = -1e10 |
|
|
|
|
|
|
|
|
def create_4d_mask(attn_mask, return_type="bool", x=None, causal=False): |
|
|
B, L = attn_mask.shape |
|
|
device = attn_mask.device |
|
|
mask_4d = torch.eq(attn_mask[:, None, :, None], attn_mask[:, None, None, :]) |
|
|
if causal: |
|
|
causal_mask = torch.tril(torch.ones(L, L, device=device)).unsqueeze(0).unsqueeze(0) |
|
|
mask_4d = mask_4d & causal_mask |
|
|
if return_type == "bool": |
|
|
return mask_4d.to(torch.bool) |
|
|
elif return_type == "float": |
|
|
mask_4d = mask_4d.to(x.dtype) |
|
|
return mask_4d * 0 + (1 - mask_4d) * A_LARGE_NEGATIVE_NUMER |
|
|
|
|
|
|
|
|
def rotate_half(x): |
|
|
return torch.cat((-x[..., x.shape[-1] // 2 :], x[..., : x.shape[-1] // 2]), dim=-1) |
|
|
|
|
|
|
|
|
def apply_rotary_pos_emb(q, k, cos, sin, unsqueeze_dim=1): |
|
|
cos = cos.unsqueeze(unsqueeze_dim) |
|
|
sin = sin.unsqueeze(unsqueeze_dim) |
|
|
q_embed = (q * cos) + (rotate_half(q) * sin) |
|
|
k_embed = (k * cos) + (rotate_half(k) * sin) |
|
|
return q_embed, k_embed |
|
|
|
|
|
|
|
|
def apply_rotary_pos_emb_1(x, cos, sin, unsqueeze_dim=1): |
|
|
cos = cos.unsqueeze(unsqueeze_dim) |
|
|
sin = sin.unsqueeze(unsqueeze_dim) |
|
|
return (x * cos) + (rotate_half(x) * sin) |
|
|
|
|
|
|
|
|
class RMSNorm(nn.Module): |
|
|
|
|
|
def __init__(self, dim: int, eps: float = 1e-5): |
|
|
super().__init__() |
|
|
self.eps = eps |
|
|
init_device = None |
|
|
self.weight = Parameter( |
|
|
torch.empty(dim, device=init_device, dtype=torch.float32) |
|
|
) |
|
|
init.ones_(self.weight) |
|
|
|
|
|
def _norm(self, x): |
|
|
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
|
|
|
def forward(self, x : torch.Tensor) -> torch.Tensor: |
|
|
output = self._norm(x.float()).type_as(x) |
|
|
return output * self.weight |
|
|
|
|
|
|
|
|
class TokenEmbedding(nn.Module): |
|
|
|
|
|
def __init__(self, config: TransformerConfig): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size) |
|
|
if config.embedding_layer_norm: |
|
|
if config.layernorm_type == "layernorm": |
|
|
self.rms_norm = nn.LayerNorm( |
|
|
config.hidden_size, eps=config.layer_norm_eps, bias=False |
|
|
) |
|
|
elif config.layernorm_type == "rmsnorm": |
|
|
self.rms_norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
self.position_embedding_type = config.position_embedding_type |
|
|
self.padding_idx = config.pad_token_id |
|
|
self.token_dropout = config.token_dropout |
|
|
self.mask_token_id = config.mask_token_id |
|
|
|
|
|
def forward(self, input_ids: torch.Tensor) -> torch.Tensor: |
|
|
embeddings = self.word_embeddings(input_ids) |
|
|
if self.config.embedding_shrinking and self.training: |
|
|
|
|
|
embeddings = embeddings * 0.1 + embeddings.detach() * 0.9 |
|
|
if self.config.embedding_layer_norm: |
|
|
embeddings = self.rms_norm(embeddings) |
|
|
return embeddings |
|
|
|
|
|
|
|
|
class RotaryEmbedding(nn.Module): |
|
|
|
|
|
def __init__(self, dim: int, b: int = 10000): |
|
|
super().__init__() |
|
|
inv_freq = 1.0 / ( |
|
|
b ** (torch.arange(0, dim, 2, dtype=torch.int64).float() / dim) |
|
|
) |
|
|
self.register_buffer("inv_freq", inv_freq, persistent=False) |
|
|
|
|
|
@torch.no_grad() |
|
|
def forward(self, x: torch.Tensor, position_ids: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1) |
|
|
position_ids_expanded = position_ids[:, None, :].float() |
|
|
with torch.autocast(device_type=x.device.type, enabled=False): |
|
|
freqs = inv_freq_expanded.float() @ position_ids_expanded.float() |
|
|
freqs = freqs.transpose(1, 2) |
|
|
emb = torch.cat((freqs, freqs), dim=-1) |
|
|
cos = emb.cos() |
|
|
sin = emb.sin() |
|
|
return cos.to(x.dtype), sin.to(x.dtype) |
|
|
|
|
|
|
|
|
class SelfAttention(nn.Module): |
|
|
|
|
|
def __init__(self, config: TransformerConfig, causal: bool=False): |
|
|
super().__init__() |
|
|
if config.hidden_size % config.num_attention_heads != 0: |
|
|
raise ValueError( |
|
|
f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention " |
|
|
f"heads ({config.num_attention_heads})" |
|
|
) |
|
|
self.config = config |
|
|
self.num_attention_heads = config.num_attention_heads |
|
|
self.attention_head_size = config.hidden_size // config.num_attention_heads |
|
|
self.all_head_size = self.num_attention_heads * self.attention_head_size |
|
|
self.query = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
|
|
self.key = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
|
|
self.value = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
|
|
self.dropout = nn.Dropout(config.attention_probs_dropout_prob) |
|
|
self.output = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
|
|
self.output_dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
self.config = config |
|
|
self.causal = causal |
|
|
|
|
|
def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor: |
|
|
new_x_shape = x.size()[:-1] + ( |
|
|
self.num_attention_heads, |
|
|
self.attention_head_size, |
|
|
) |
|
|
x = x.view(new_x_shape) |
|
|
return x.permute( |
|
|
0, 2, 1, 3 |
|
|
) |
|
|
|
|
|
def naive_forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: torch.Tensor = None, |
|
|
rotary_embeddings: torch.Tensor = None, |
|
|
output_attentions: bool = False, |
|
|
) -> Tuple[torch.Tensor]: |
|
|
B, L, D = hidden_states.size() |
|
|
query_states = self.query(hidden_states) |
|
|
key_states = self.key(hidden_states) |
|
|
value_states = self.value(hidden_states) |
|
|
key_states = self.transpose_for_scores(key_states).contiguous() |
|
|
query_states = self.transpose_for_scores(query_states).contiguous() |
|
|
value_states = self.transpose_for_scores(value_states).contiguous() |
|
|
if rotary_embeddings is not None: |
|
|
cos, sin = rotary_embeddings |
|
|
query_states, key_states = apply_rotary_pos_emb( |
|
|
query_states, key_states, cos, sin |
|
|
) |
|
|
scale_factor = self.attention_head_size**-0.5 |
|
|
attention_scores = torch.matmul( |
|
|
query_states, key_states.transpose(-1, -2) |
|
|
) |
|
|
attention_scores = attention_scores * scale_factor |
|
|
if attention_mask is not None: |
|
|
attention_scores = attention_scores + attention_mask |
|
|
attention_probs = nn.functional.softmax( |
|
|
attention_scores, dim=-1, dtype=torch.float32 |
|
|
).to(query_states.dtype) |
|
|
attention_probs = self.dropout(attention_probs) |
|
|
context = torch.matmul( |
|
|
attention_probs, value_states |
|
|
) |
|
|
context = context.permute( |
|
|
0, 2, 1, 3 |
|
|
).contiguous() |
|
|
context = context.view(B, L, D) |
|
|
context = self.output(context) |
|
|
context = self.output_dropout(context) |
|
|
return_attention_probs = attention_probs.detach() if output_attentions else None |
|
|
return context, return_attention_probs |
|
|
|
|
|
def sdpa_forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: torch.Tensor = None, |
|
|
rotary_embeddings: torch.Tensor = None, |
|
|
) -> Tuple[torch.Tensor]: |
|
|
B, L, D = hidden_states.size() |
|
|
query_states = self.query(hidden_states) |
|
|
key_states = self.key(hidden_states) |
|
|
value_states = self.value(hidden_states) |
|
|
key_states = self.transpose_for_scores(key_states).contiguous() |
|
|
query_states = self.transpose_for_scores(query_states).contiguous() |
|
|
value_states = self.transpose_for_scores(value_states).contiguous() |
|
|
if rotary_embeddings is not None: |
|
|
cos, sin = rotary_embeddings |
|
|
query_states, key_states = apply_rotary_pos_emb( |
|
|
query_states, key_states, cos, sin |
|
|
) |
|
|
scale_factor = self.attention_head_size**-0.5 |
|
|
dropout_p = self.config.attention_probs_dropout_prob if self.training else 0 |
|
|
context = F.scaled_dot_product_attention( |
|
|
query=query_states, |
|
|
key=key_states, |
|
|
value=value_states, |
|
|
attn_mask=attention_mask, |
|
|
dropout_p=dropout_p, |
|
|
scale=scale_factor, |
|
|
) |
|
|
context = context.permute( |
|
|
0, 2, 1, 3 |
|
|
).contiguous() |
|
|
context = context.view(B, L, D) |
|
|
context = self.output(context) |
|
|
context = self.output_dropout(context) |
|
|
return_attention_probs = None |
|
|
return context, return_attention_probs |
|
|
|
|
|
def flash_attn_forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
rotary_embeddings: torch.Tensor = None, |
|
|
lengths: List[List[int]] = None, |
|
|
) -> Tuple[torch.Tensor]: |
|
|
B, L, D = hidden_states.size() |
|
|
NH = self.num_attention_heads |
|
|
H = self.attention_head_size |
|
|
|
|
|
scale_factor = self.attention_head_size**-0.5 |
|
|
query_states = self.query(hidden_states) |
|
|
key_states = self.key(hidden_states) |
|
|
value_states = self.value(hidden_states) |
|
|
|
|
|
if lengths is not None: |
|
|
|
|
|
query_states = query_states.view(B * L, NH, H).contiguous() |
|
|
key_states = key_states.view(B * L, NH, H).contiguous() |
|
|
value_states = value_states.view(B * L, NH, H).contiguous() |
|
|
if rotary_embeddings is not None: |
|
|
cos, sin = rotary_embeddings |
|
|
cos = cos.view(B * L, 1, H) |
|
|
sin = sin.view(B * L, 1, H) |
|
|
query_states = (query_states * cos) + (rotate_half(query_states) * sin) |
|
|
key_states = (key_states * cos) + (rotate_half(key_states) * sin) |
|
|
lengths = [0, ] + list(chain(*lengths)) |
|
|
lengths = torch.tensor(lengths, dtype=torch.int, device=query_states.device) |
|
|
max_seqlen = torch.max(lengths) |
|
|
cum_seqlen = torch.cumsum(lengths, dim=0, dtype=torch.int) |
|
|
context = flash_attn_varlen_func( |
|
|
q=query_states, |
|
|
k=key_states, |
|
|
v=value_states, |
|
|
cu_seqlens_q=cum_seqlen, |
|
|
cu_seqlens_k=cum_seqlen, |
|
|
max_seqlen_q=max_seqlen, |
|
|
max_seqlen_k=max_seqlen, |
|
|
causal=self.causal, |
|
|
return_attn_probs=False, |
|
|
softmax_scale=scale_factor, |
|
|
) |
|
|
else: |
|
|
query_states = query_states.view(B, L, NH, H).contiguous() |
|
|
key_states = key_states.view(B, L, NH, H).contiguous() |
|
|
value_states = value_states.view(B, L, NH, H).contiguous() |
|
|
if rotary_embeddings is not None: |
|
|
cos, sin = rotary_embeddings |
|
|
query_states, key_states = apply_rotary_pos_emb( |
|
|
query_states, key_states, cos, sin, unsqueeze_dim=2 |
|
|
) |
|
|
context = flash_attn_func( |
|
|
q=query_states, |
|
|
k=key_states, |
|
|
v=value_states, |
|
|
softmax_scale=scale_factor, |
|
|
causal=self.causal, |
|
|
) |
|
|
context = context.view(B, L, D).contiguous() |
|
|
context = self.output(context) |
|
|
context = self.output_dropout(context) |
|
|
return_attention_probs = None |
|
|
return context, return_attention_probs |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: torch.Tensor = None, |
|
|
lengths: List[torch.Tensor] = None, |
|
|
rotary_embeddings: torch.Tensor = None, |
|
|
output_attentions: bool = False, |
|
|
): |
|
|
if self.config.attn_impl == "naive": |
|
|
return self.naive_forward( |
|
|
hidden_states=hidden_states, |
|
|
attention_mask=attention_mask, |
|
|
rotary_embeddings=rotary_embeddings, |
|
|
output_attentions=output_attentions, |
|
|
) |
|
|
elif self.config.attn_impl == "sdpa": |
|
|
return self.sdpa_forward( |
|
|
hidden_states=hidden_states, |
|
|
attention_mask=attention_mask, |
|
|
rotary_embeddings=rotary_embeddings, |
|
|
) |
|
|
elif self.config.attn_impl == "flash_attn": |
|
|
return self.flash_attn_forward( |
|
|
hidden_states=hidden_states, |
|
|
rotary_embeddings=rotary_embeddings, |
|
|
lengths=lengths, |
|
|
) |
|
|
|
|
|
|
|
|
class FeedForwardNetwork(nn.Module): |
|
|
|
|
|
def __init__(self, config: TransformerConfig): |
|
|
super().__init__() |
|
|
self.w1 = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) |
|
|
self.w2 = nn.Linear(config.intermediate_size, config.hidden_size, bias=False) |
|
|
if config.act_fn == "gelu": |
|
|
self.act_fn = nn.GELU() |
|
|
elif config.act_fn == "silu": |
|
|
self.act_fn = nn.SiLU() |
|
|
|
|
|
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: |
|
|
return self.w2(self.act_fn(self.w1(hidden_states))) |
|
|
|
|
|
|
|
|
class TransFormerLayer(nn.Module): |
|
|
|
|
|
def __init__(self, config: TransformerConfig, causal=False): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
self.causal = causal |
|
|
self.attention = SelfAttention(config, causal=causal) |
|
|
self.ffn = FeedForwardNetwork(config) |
|
|
if config.layernorm_type == "layernorm": |
|
|
self.pre_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, bias=False) |
|
|
self.post_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, bias=False) |
|
|
else: |
|
|
self.pre_norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.post_norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: torch.Tensor = None, |
|
|
lengths: List[torch.Tensor] = None, |
|
|
rotary_embeddings: torch.Tensor = None, |
|
|
output_attentions: bool = False, |
|
|
): |
|
|
residual = hidden_states |
|
|
hidden_states = self.pre_norm(hidden_states) |
|
|
hidden_states, attn_probs = self.attention( |
|
|
hidden_states=hidden_states, |
|
|
attention_mask=attention_mask, |
|
|
lengths=lengths, |
|
|
rotary_embeddings=rotary_embeddings, |
|
|
output_attentions=output_attentions |
|
|
) |
|
|
hidden_states = residual + hidden_states |
|
|
residual = hidden_states |
|
|
hidden_states = self.post_norm(hidden_states) |
|
|
hidden_states = self.ffn(hidden_states) |
|
|
hidden_states = residual + hidden_states |
|
|
return (hidden_states, attn_probs) |
|
|
|
|
|
|
|
|
class TransformerCore(nn.Module): |
|
|
|
|
|
def __init__(self, config: TransformerConfig, causal=False): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
self.layer = [] |
|
|
for _ in range(config.num_hidden_layers): |
|
|
sub_layer = TransFormerLayer(config, causal=causal) |
|
|
self.layer.append(sub_layer) |
|
|
self.layer = nn.ModuleList(self.layer) |
|
|
if self.config.layernorm_type == "layernorm": |
|
|
self.norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, bias=False) |
|
|
else: |
|
|
self.norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.gradient_checkpointing = False |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: torch.Tensor = None, |
|
|
lengths: List[torch.Tensor] = None, |
|
|
rotary_embeddings: torch.Tensor = None, |
|
|
output_attentions: bool = False, |
|
|
output_hidden_states=False, |
|
|
): |
|
|
all_hidden_states = [] |
|
|
all_self_attentions = [] |
|
|
for i, layer_module in enumerate(self.layer): |
|
|
if output_hidden_states: |
|
|
all_hidden_states.append(hidden_states.detach().cpu()) |
|
|
if torch.is_grad_enabled() and hidden_states.requires_grad and self.gradient_checkpointing: |
|
|
hidden_states, attn_probs = checkpoint( |
|
|
layer_module, |
|
|
hidden_states, |
|
|
attention_mask, |
|
|
rotary_embeddings, |
|
|
lengths, |
|
|
output_attentions, |
|
|
use_reentrant=False, |
|
|
) |
|
|
else: |
|
|
hidden_states, attn_probs = layer_module( |
|
|
hidden_states=hidden_states, |
|
|
attention_mask=attention_mask, |
|
|
lengths=lengths, |
|
|
rotary_embeddings=rotary_embeddings, |
|
|
output_attentions=output_attentions |
|
|
) |
|
|
if output_attentions: |
|
|
all_self_attentions.append(attn_probs.detach().cpu() if attn_probs is not None else None) |
|
|
hidden_states = self.norm(hidden_states) |
|
|
if output_hidden_states: |
|
|
all_hidden_states.append(hidden_states.detach().cpu(), ) |
|
|
return BaseModelOutput( |
|
|
last_hidden_state=hidden_states, |
|
|
hidden_states=all_hidden_states, |
|
|
attentions=all_self_attentions, |
|
|
) |
|
|
|
|
|
|
|
|
class BaseTransformerModel(PreTrainedModel): |
|
|
|
|
|
config_class = TransformerConfig |
|
|
base_model_prefix = "transformer" |
|
|
|
|
|
|
|
|
class TransformerModel(BaseTransformerModel): |
|
|
|
|
|
def __init__(self, config: TransformerConfig, causal=False): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
self.rotary_embedding = RotaryEmbedding(dim=config.hidden_size // config.num_attention_heads) |
|
|
self.token_embedding = TokenEmbedding(config) |
|
|
self.transformer = TransformerCore(config, causal=causal) |
|
|
self.causal = causal |
|
|
|
|
|
def enable_gradient_checkpointing(self): |
|
|
self.transformer.gradient_checkpointing = True |
|
|
|
|
|
def disable_gradient_checkpointing(self): |
|
|
self.transformer.gradient_checkpointing = False |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: torch.Tensor=None, |
|
|
lengths: torch.Tensor=None, |
|
|
position_ids: torch.Tensor=None, |
|
|
output_attentions=False, |
|
|
output_hidden_states=False, |
|
|
) -> BaseModelOutput: |
|
|
embeddings = self.token_embedding(input_ids) |
|
|
if position_ids is None: |
|
|
position_ids = torch.arange(input_ids.size(1)).to(input_ids.device) |
|
|
position_ids = position_ids.unsqueeze(0).expand_as(input_ids) |
|
|
if position_ids.shape != input_ids.shape: |
|
|
raise ValueError("Position IDs must have the same shape as input_ids") |
|
|
rotary_embeddings = self.rotary_embedding(embeddings, position_ids) |
|
|
|
|
|
if attention_mask is not None: |
|
|
if self.config.attn_impl == "flash_attn": |
|
|
raise ValueError("Flash attention does not support specifying attention mask") |
|
|
attention_mask = create_4d_mask( |
|
|
attention_mask, |
|
|
return_type="float", |
|
|
x=embeddings, |
|
|
causal=self.causal, |
|
|
) |
|
|
|
|
|
outputs = self.transformer( |
|
|
hidden_states=embeddings, |
|
|
attention_mask=attention_mask, |
|
|
lengths=lengths, |
|
|
rotary_embeddings=rotary_embeddings, |
|
|
output_attentions=output_attentions, |
|
|
output_hidden_states=output_hidden_states |
|
|
) |
|
|
|
|
|
return BaseModelOutput( |
|
|
last_hidden_state=outputs.last_hidden_state, |
|
|
hidden_states=outputs.hidden_states, |
|
|
attentions=outputs.attentions, |
|
|
) |
|
|
|
|
|
|
|
|
class TransformerForMaskedLM(BaseTransformerModel): |
|
|
|
|
|
def __init__(self, config: TransformerConfig): |
|
|
super().__init__(config) |
|
|
self.model = TransformerModel(config, causal=False) |
|
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
self.post_init() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: torch.Tensor=None, |
|
|
lengths: torch.Tensor=None, |
|
|
position_ids: torch.Tensor=None, |
|
|
labels: torch.Tensor=None, |
|
|
output_attentions=False, |
|
|
output_hidden_states=False, |
|
|
) -> MaskedLMOutput: |
|
|
outputs = self.model( |
|
|
input_ids=input_ids, |
|
|
attention_mask=attention_mask, |
|
|
lengths=lengths, |
|
|
position_ids=position_ids, |
|
|
output_attentions=output_attentions, |
|
|
output_hidden_states=output_hidden_states |
|
|
) |
|
|
sequence_output = outputs.last_hidden_state |
|
|
prediction_scores = self.lm_head(sequence_output) |
|
|
loss = None |
|
|
if labels is not None: |
|
|
loss_fct = nn.CrossEntropyLoss() |
|
|
labels = labels.to(prediction_scores.device) |
|
|
loss = loss_fct( |
|
|
prediction_scores.view(-1, self.config.vocab_size).float(), labels.view(-1) |
|
|
) |
|
|
return MaskedLMOutput( |
|
|
loss=loss, |
|
|
logits=prediction_scores, |
|
|
hidden_states=sequence_output, |
|
|
attentions=outputs.attentions, |
|
|
) |
|
|
|
|
|
|
|
|
class TransformerForCausalLM(BaseTransformerModel): |
|
|
|
|
|
def __init__(self, config): |
|
|
super().__init__(config) |
|
|
self.model = TransformerModel(config, causal=True) |
|
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
self.init_weights() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: torch.Tensor=None, |
|
|
lengths: torch.Tensor=None, |
|
|
position_ids: torch.Tensor=None, |
|
|
labels: torch.Tensor=None, |
|
|
output_attentions=False, |
|
|
output_hidden_states=False, |
|
|
reduction="mean", |
|
|
) -> CausalLMOutput: |
|
|
outputs = self.model( |
|
|
input_ids=input_ids, |
|
|
attention_mask=attention_mask, |
|
|
lengths=lengths, |
|
|
position_ids=position_ids, |
|
|
output_attentions=output_attentions, |
|
|
output_hidden_states=output_hidden_states |
|
|
) |
|
|
sequence_output = outputs.last_hidden_state |
|
|
prediction_scores = self.lm_head(sequence_output) |
|
|
loss = None |
|
|
if labels is not None: |
|
|
loss_fct = nn.CrossEntropyLoss(reduction=reduction) |
|
|
labels = labels.to(prediction_scores.device) |
|
|
loss = loss_fct( |
|
|
prediction_scores.view(-1, self.config.vocab_size).to(torch.float32), |
|
|
labels.view(-1), |
|
|
) |
|
|
return CausalLMOutput( |
|
|
loss=loss, |
|
|
logits=prediction_scores, |
|
|
hidden_states=outputs.hidden_states, |
|
|
attentions=outputs.attentions, |
|
|
) |
|
|
|
|
|
TransformerForMaskedLM.register_for_auto_class("AutoModelForMaskedLM") |