EmCoder / modeling_emcoder.py
yezdata's picture
ADD EmCoder V1.5
326e148 verified
Raw
History Blame
4.68 kB
import torch
import torch.nn as nn
from transformers import PreTrainedModel
from .configuration_emcoder import EmCoderConfig
class EmCoderCore(nn.Module):
"""The core encoder architecture of EmCoder, without the classification head."""
def __init__(self, config: EmCoderConfig):
super().__init__()
self.token_embedding = nn.Embedding(config.vocab_size, config.d_model)
self.pos_embedding = nn.Embedding(config.max_seq_len, config.d_model)
self.embed_norm = nn.LayerNorm(config.d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=config.d_model,
nhead=config.n_head,
dim_feedforward=config.d_ffn,
dropout=config.dropout,
activation="gelu",
norm_first=True,
batch_first=True,
)
self.encoder = nn.TransformerEncoder(
encoder_layer=encoder_layer, num_layers=config.n_layers
)
self.final_norm = nn.LayerNorm(config.d_model)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
"""Standard forward pass through the encoder."""
seq_len = x.size(1)
pos_ids = torch.arange(seq_len, device=x.device).unsqueeze(0)
x = self.token_embedding(x) + self.pos_embedding(pos_ids)
x = self.embed_norm(x)
x = self.dropout(x)
padding_mask = mask == 0
encoded = self.encoder(x, src_key_padding_mask=padding_mask)
return self.final_norm(encoded)
class EmCoder(PreTrainedModel):
"""The full EmCoder model, including the classification head."""
config_class = EmCoderConfig
def __init__(self, config: EmCoderConfig):
super().__init__(config)
self.encoder = EmCoderCore(config)
self.classifier = nn.Sequential(
nn.Linear(config.d_model, config.d_model),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_model, config.num_labels),
)
self.post_init()
def _set_mc_dropout(self, active: bool = True):
for m in self.modules():
if isinstance(m, nn.Dropout) or isinstance(m, nn.MultiheadAttention):
m.train(active)
@staticmethod
def _masked_mean_pooling(
features: torch.Tensor, mask: torch.Tensor
) -> torch.Tensor:
mask = mask.unsqueeze(-1) # (B, S, 1)
masked_features = features * mask # (B, S, D)
sum_masked_features = masked_features.sum(dim=1) # (B, D)
count_tokens = torch.clamp(mask.sum(dim=1), min=1e-9) # (B, 1)
return sum_masked_features / count_tokens # (B, D)
def mc_forward(
self,
x: torch.Tensor,
mask: torch.Tensor,
n_samples: int,
max_batch_size: int | None = None,
) -> torch.Tensor:
"""
Performs Monte Carlo Dropout inference to quantify epistemic uncertainty.
Args:
x: Input token IDs of shape (B, S).
mask: Attention mask of shape (B, S).
n_samples: Total number of Monte Carlo samples.
max_batch_size: Maximum number of samples in one forward pass.
Returns:
Logits of shape (n_samples, B, num_labels).
"""
if max_batch_size is None:
max_batch_size = n_samples
B, S = x.shape
num_labels = self.classifier[-1].out_features
all_logits = torch.empty((n_samples, B, num_labels), device=x.device)
is_training = self.training
self._set_mc_dropout(active=True)
try:
for i in range(0, n_samples, max_batch_size):
batch_samples = min(max_batch_size, n_samples - i)
x_stacked = x.repeat(batch_samples, 1) # (batch_samples * B, S)
mask_stacked = mask.repeat(batch_samples, 1) # (batch_samples * B, S)
features = self.encoder(
x_stacked, mask_stacked
) # (batch_samples * B, S, D)
pooled = self._masked_mean_pooling(features, mask_stacked)
logits = self.classifier(pooled) # (n_samples * B, num_labels)
all_logits[i : i + batch_samples] = logits.view(batch_samples, B, -1)
finally:
self._set_mc_dropout(active=is_training)
return all_logits
def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
"""Standard forward pass without MC Dropout."""
features = self.encoder(x, mask)
pooled = self._masked_mean_pooling(features, mask)
return self.classifier(pooled)