PII-Detection / transformer.py
AmitHirpara's picture
add comments
f53fac9
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
def scaled_dot_product_attention(q, k, v, mask=None, dropout=None):
"""Compute scaled dot-product attention."""
# Get dimension of keys for scaling
d_k = q.size(-1)
# Compute attention scores using dot product
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
# Mask out padding positions if mask provided
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
# Convert scores to probabilities
attention_weights = F.softmax(scores, dim=-1)
# Apply dropout to attention weights if specified
if dropout is not None:
attention_weights = dropout(attention_weights)
# Apply attention weights to values
output = torch.matmul(attention_weights, v)
return output, attention_weights
class MultiHeadAttention(nn.Module):
"""Multi-Head Attention mechanism"""
def __init__(self, d_model, num_heads, dropout=0.1):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads # Dimension per head
# Linear layers for projecting Q, K, V
self.w_q = nn.Linear(d_model, d_model)
self.w_k = nn.Linear(d_model, d_model)
self.w_v = nn.Linear(d_model, d_model)
# Final output projection
self.w_o = nn.Linear(d_model, d_model)
# Dropout layer
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
"""
query: (batch_size, seq_len_q, d_model)
key: (batch_size, seq_len_k, d_model)
value: (batch_size, seq_len_v, d_model)
mask: (batch_size, 1, 1, seq_len_k) or None
"""
batch_size = query.size(0)
seq_len_q = query.size(1)
seq_len_k = key.size(1)
seq_len_v = value.size(1)
# Project and reshape for multiple heads
Q = self.w_q(query).view(batch_size, seq_len_q, self.num_heads, self.d_k).transpose(1, 2)
K = self.w_k(key).view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2)
V = self.w_v(value).view(batch_size, seq_len_v, self.num_heads, self.d_k).transpose(1, 2)
# Apply scaled dot-product attention
attention_output, attention_weights = scaled_dot_product_attention(
Q, K, V, mask=mask, dropout=self.dropout
)
# Concatenate heads and apply output projection
attention_output = attention_output.transpose(1, 2).contiguous().view(
batch_size, seq_len_q, self.d_model
)
output = self.w_o(attention_output)
return output, attention_weights
class PositionwiseFeedForward(nn.Module):
"""Position-wise Feed Forward Network"""
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
# Two linear layers with ReLU activation
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
self.activation = nn.ReLU()
def forward(self, x):
# Apply first linear layer, activation, dropout, then second linear layer
return self.w_2(self.dropout(self.activation(self.w_1(x))))
class EncoderLayer(nn.Module):
"""Single Encoder Layer"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super(EncoderLayer, self).__init__()
# Multi-head self-attention sublayer
self.self_attention = MultiHeadAttention(d_model, num_heads, dropout)
# Position-wise feed forward sublayer
self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
# Layer normalization for each sublayer
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
# Dropout for residual connections
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
# Self-attention sublayer with residual connection and layer norm
attn_output, _ = self.self_attention(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
# Feed forward sublayer with residual connection and layer norm
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
class TransformerEncoder(nn.Module):
"""Stack of Encoder Layers"""
def __init__(self, num_layers, d_model, num_heads, d_ff, dropout=0.1):
super(TransformerEncoder, self).__init__()
# Create stack of encoder layers
self.layers = nn.ModuleList([
EncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
# Final layer normalization
self.norm = nn.LayerNorm(d_model)
def forward(self, x, mask=None):
# Pass through each encoder layer sequentially
for layer in self.layers:
x = layer(x, mask)
# Apply final normalization
return self.norm(x)
class PositionalEncoding(nn.Module):
"""Positional Encoding for Transformer"""
def __init__(self, d_model, max_len=5000, dropout=0.1):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
# Create matrix to hold positional encodings
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# Create frequency terms for sin/cos functions
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
# Apply sine to even indices
pe[:, 0::2] = torch.sin(position * div_term)
# Apply cosine to odd indices
if d_model % 2 == 1:
pe[:, 1::2] = torch.cos(position * div_term[:-1])
else:
pe[:, 1::2] = torch.cos(position * div_term)
# Add batch dimension and save as buffer
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
# Add positional encoding to input embeddings
x = x + self.pe[:, :x.size(1), :]
return self.dropout(x)
class TransformerPII(nn.Module):
"""
Transformer model for PII detection (token classification)
Built from scratch with custom implementation
"""
def __init__(self, vocab_size, num_classes, d_model=256, num_heads=8,
d_ff=512, num_layers=4, dropout=0.1, max_len=512, pad_idx=0):
super(TransformerPII, self).__init__()
self.d_model = d_model
self.pad_idx = pad_idx
# Embedding layer for input tokens
self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=pad_idx)
# Add positional information to embeddings
self.positional_encoding = PositionalEncoding(d_model, max_len, dropout)
# Stack of transformer encoder layers
self.encoder = TransformerEncoder(num_layers, d_model, num_heads, d_ff, dropout)
# Classification head for token-level predictions
self.classifier = nn.Linear(d_model, num_classes)
# Dropout layer
self.dropout = nn.Dropout(dropout)
# Initialize model weights
self._init_weights()
def _init_weights(self):
"""Initialize model weights"""
# Initialize embeddings with normal distribution
nn.init.normal_(self.embedding.weight, mean=0, std=self.d_model**-0.5)
# Set padding token embedding to zero
if self.pad_idx is not None:
nn.init.constant_(self.embedding.weight[self.pad_idx], 0)
# Initialize classifier with Xavier uniform
nn.init.xavier_uniform_(self.classifier.weight)
if self.classifier.bias is not None:
nn.init.constant_(self.classifier.bias, 0)
def create_padding_mask(self, x):
"""Create padding mask for attention"""
# Create mask where non-padding tokens are marked as 1
mask = (x != self.pad_idx).unsqueeze(1).unsqueeze(2)
return mask.float()
def forward(self, x, mask=None):
"""Forward pass for token classification"""
# Validate input dimensions
if x.dim() != 2:
raise ValueError(f"Expected input to have 2 dimensions [batch_size, seq_len], got {x.dim()}")
batch_size, seq_len = x.shape
# Create padding mask if not provided
if mask is None:
mask = self.create_padding_mask(x)
# Embed and scale by sqrt(d_model)
x = self.embedding(x) * math.sqrt(self.d_model)
# Add positional encoding
x = self.positional_encoding(x)
# Pass through transformer encoder stack
encoder_output = self.encoder(x, mask)
# Apply dropout before classification
encoder_output = self.dropout(encoder_output)
# Get class predictions for each token
logits = self.classifier(encoder_output)
return logits
def predict(self, x):
"""Get predictions for inference"""
# Switch to evaluation mode
self.eval()
with torch.no_grad():
logits = self.forward(x)
predictions = torch.argmax(logits, dim=-1)
return predictions
def create_transformer_pii_model(vocab_size, num_classes, d_model=256, num_heads=8,
d_ff=512, num_layers=4, dropout=0.1, max_len=512):
"""Factory function to create transformer model for PII detection"""
model = TransformerPII(
vocab_size=vocab_size,
num_classes=num_classes,
d_model=d_model,
num_heads=num_heads,
d_ff=d_ff,
num_layers=num_layers,
dropout=dropout,
max_len=max_len,
pad_idx=0
)
return model