import os
from datetime import datetime
# Configure CUDA visibility (set this as appropriate for your environment).
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
import math
import random
import re
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import Counter
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
from datasets import load_dataset
"""
Homework 1 (Part I) – Transformer-based sentiment analysis on the IMDB dataset.
This script implements:
- Data loading and preprocessing for the IMDB movie review dataset
- A Transformer-based text classification model
- Training and evaluation loops for binary sentiment analysis
- Saving of the trained model together with vocabulary and configuration
The code is organized into clearly separated sections:
1) Data preparation and tokenization
2) Transformer components (building blocks)
3) Full Transformer classifier
4) Training and evaluation logic
5) Execution example using a train/validation split of IMDB
Model Analysis and Improvement:
1. After evaluation, delve into analyzing your model's behavior to identify
areas for improvement and fine-tuning.
2. Analyze translation errors (if applicable): Examine specific translation
examples where the model performs poorly and try to understand the reasons
behind these errors. Are there issues with handling rare words or
idiomatic expressions?
3. Explore the impact of model size: Experiment with different Transformer
model sizes (e.g., small, medium, large) to understand how model
complexity affects performance.
"""
# ==========================================
# 1. Data Preparation & Tokenization
# ==========================================
def tokenize(text):
"""
Tokenize a raw review string into a list of normalized word tokens.
Steps:
- Convert to lowercase
- Remove HTML line breaks
- Remove non-alphanumeric characters (except whitespace)
- Split on whitespace
Args:
text (str): Raw review text.
Returns:
List[str]: List of token strings.
"""
text = text.lower()
text = re.sub(r"
", " ", text) # Remove HTML line breaks
text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
return text.split()
class IMDBDataset(Dataset):
"""
Torch Dataset wrapper for IMDB sequences and labels.
Each item corresponds to:
- a fixed-length sequence of token IDs
- a sentiment label (0 = negative, 1 = positive)
"""
def __init__(self, sequences, labels):
self.sequences = torch.tensor(sequences, dtype=torch.long)
self.labels = torch.tensor(labels, dtype=torch.long)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.sequences[idx], self.labels[idx]
def build_vocab(texts, max_vocab_size=10000):
"""
Build a word-to-index vocabulary from a collection of texts.
The vocabulary is constructed using token frequency counts from the
training set only to avoid information leakage. Two special tokens
are always included:
- "" mapped to index 0
- "" mapped to index 1
The remaining (max_vocab_size - 2) most frequent tokens are added.
Args:
texts (Iterable[str]): Training texts.
max_vocab_size (int): Maximum size of the vocabulary.
Returns:
Dict[str, int]: Mapping from token string to integer index.
"""
counter = Counter()
for text in texts:
counter.update(tokenize(text))
# Reserve 0 for padding and 1 for unknown tokens
vocab = {"": 0, "": 1}
common_words = counter.most_common(max_vocab_size - 2)
for word, _ in common_words:
vocab[word] = len(vocab)
return vocab
def preprocess_data(texts, vocab, max_len=128):
"""
Convert raw texts into padded/truncated sequences of token IDs.
Steps:
- Tokenize each text
- Map tokens to vocabulary indices (using for OOV tokens)
- Truncate to max_len or pad with to reach max_len
Args:
texts (Iterable[str]): Input texts (reviews).
vocab (Dict[str, int]): Token-to-index mapping.
max_len (int): Maximum sequence length in tokens.
Returns:
np.ndarray: Array of shape (num_examples, max_len) with dtype int.
"""
sequences = []
for text in texts:
tokens = tokenize(text)
token_ids = [vocab.get(token, vocab[""]) for token in tokens]
# Pad or Truncate
if len(token_ids) < max_len:
token_ids += [vocab[""]] * (max_len - len(token_ids))
else:
token_ids = token_ids[:max_len]
sequences.append(token_ids)
return np.array(sequences)
# ==========================================
# 2. Transformer Components
# ==========================================
class PositionalEncoding(nn.Module):
"""
Sinusoidal positional encoding module.
Implements the deterministic positional encoding from the original
Transformer paper ("Attention is All You Need"), which is added to
token embeddings to inject information about token positions.
"""
def __init__(self, d_model, max_len=5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
"""
Add positional encodings to input embeddings.
Args:
x (Tensor): Input tensor of shape [batch_size, seq_len, d_model].
Returns:
Tensor: Positionally encoded representations with same shape as x.
"""
return x + self.pe[:, :x.size(1)]
class MultiHeadAttention(nn.Module):
"""
Multi-head self-attention mechanism.
For each token, attention is computed over all tokens in the sequence
(including itself) using multiple attention heads. Each head operates
in its own subspace and the outputs are concatenated.
"""
def __init__(self, d_model, num_heads):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def forward(self, x, mask=None):
"""
Apply multi-head self-attention to the input sequence.
Args:
x (Tensor): Input tensor of shape [batch_size, seq_len, d_model].
mask (Tensor, optional): Attention mask of shape
[batch_size, 1, 1, seq_len] or broadcastable equivalent,
where positions with 0 are masked out.
Returns:
Tensor: Output tensor of shape [batch_size, seq_len, d_model].
"""
batch_size, seq_len, _ = x.shape
# Linear projections
Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
# Scaled Dot-Product Attention
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attn = torch.softmax(scores, dim=-1)
context = torch.matmul(attn, V)
# Concatenate heads
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
return self.W_o(context)
class TransformerEncoderBlock(nn.Module):
"""
Single Transformer encoder block consisting of:
- multi-head self-attention sublayer (with residual + layer norm)
- position-wise feed-forward sublayer (with residual + layer norm)
"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Linear(d_ff, d_model)
)
self.layernorm1 = nn.LayerNorm(d_model)
self.layernorm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
"""
Forward pass through one encoder block.
Args:
x (Tensor): Input tensor of shape [batch_size, seq_len, d_model].
mask (Tensor, optional): Attention mask (see MultiHeadAttention).
Returns:
Tensor: Output tensor of shape [batch_size, seq_len, d_model].
"""
# Sublayer 1: self-attention with residual connection
attn_out = self.mha(x, mask)
x = self.layernorm1(x + self.dropout(attn_out))
# Sublayer 2: position-wise feed-forward network with residual
ffn_out = self.ffn(x)
x = self.layernorm2(x + self.dropout(ffn_out))
return x
# ==========================================
# 3. Full Transformer Classifier
# ==========================================
class TransformerClassifier(nn.Module):
"""
Transformer-based text classifier for IMDB sentiment analysis.
Architecture:
- Token embedding layer
- Sinusoidal positional encoding
- Stack of Transformer encoder blocks
- Global average pooling over sequence dimension
- Linear classification head to predict sentiment label
"""
def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_len, num_classes=2, dropout=0.1):
super().__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model, max_len)
self.encoder_layers = nn.ModuleList([
TransformerEncoderBlock(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
self.dropout = nn.Dropout(dropout)
# Classification Head: Flatten or Global Pool
self.classifier = nn.Linear(d_model, num_classes)
def forward(self, x, mask=None):
"""
Forward pass for the classifier.
Args:
x (Tensor): Input tensor of token IDs
with shape [batch_size, seq_len].
mask (Tensor, optional): Attention mask (not used in this script).
Returns:
Tensor: Logits of shape [batch_size, num_classes].
"""
x = self.dropout(self.pos_encoding(self.embedding(x)))
for layer in self.encoder_layers:
x = layer(x, mask)
# Global Average Pooling across the sequence dimension
x = x.mean(dim=1)
return self.classifier(x)
# ==========================================
# 4. Training and Evaluation Logic
# ==========================================
def train_model(model, train_loader, val_loader, epochs, lr, device):
"""
Train the Transformer classifier on the IMDB training split.
Args:
model (nn.Module): TransformerClassifier instance.
train_loader (DataLoader): Batches of (sequence, label) for training.
val_loader (DataLoader): Batches for validation.
epochs (int): Number of full passes through the training set.
lr (float): Initial learning rate for Adam optimizer.
device (torch.device): Device on which to run training.
Uses:
- CrossEntropyLoss for binary sentiment classification.
- Adam optimizer with StepLR scheduler (gamma=0.5 every 2 epochs).
"""
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5)
model.to(device)
for epoch in range(epochs):
model.train()
total_loss = 0
for batch_seq, batch_lab in train_loader:
batch_seq, batch_lab = batch_seq.to(device), batch_lab.to(device)
optimizer.zero_grad()
outputs = model(batch_seq)
loss = criterion(outputs, batch_lab)
loss.backward()
optimizer.step()
total_loss += loss.item()
scheduler.step()
val_metrics = evaluate_model(model, val_loader, device)
val_acc = val_metrics["accuracy"]
val_p = val_metrics["precision"]
val_r = val_metrics["recall"]
val_f1 = val_metrics["f1"]
print(
f"Epoch {epoch+1}/{epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Val Acc: {val_acc:.4f} | "
f"Val P: {val_p:.4f} | Val R: {val_r:.4f} | Val F1: {val_f1:.4f}"
)
def evaluate_model(model, loader, device):
"""
Evaluate the model on a dataset.
Args:
model (nn.Module): Trained (or partially trained) classifier.
loader (DataLoader): DataLoader for validation or test data.
device (torch.device): Device on which to perform evaluation.
Returns:
Dict[str, float]: Dictionary with accuracy, precision, recall, and F1.
"""
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for batch_seq, batch_lab in loader:
batch_seq, batch_lab = batch_seq.to(device), batch_lab.to(device)
outputs = model(batch_seq)
preds = torch.argmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch_lab.cpu().numpy())
acc = accuracy_score(all_labels, all_preds)
p, r, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='binary')
return {"accuracy": acc, "precision": p, "recall": r, "f1": f1}
def count_trainable_parameters(model):
"""
Count the number of trainable parameters in a model.
Args:
model (nn.Module): Model whose parameters should be counted.
Returns:
int: Number of trainable parameters.
"""
return sum(p.numel() for p in model.parameters() if p.requires_grad)
def write_experiment_report_md(
report_path,
results,
best_result,
device,
train_size,
val_size,
):
"""
Write a Markdown report summarizing model-size experiment results.
Args:
report_path (str): Output Markdown file path.
results (List[Dict]): Per-model experiment outputs.
best_result (Dict): Best-performing entry from `results`.
device (torch.device): Device used during training.
train_size (int): Number of training samples.
val_size (int): Number of validation samples.
"""
lines = []
lines.append("# IMDB Transformer Model-Size Experiment Report")
lines.append("")
lines.append(f"- Generated at: `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`")
lines.append(f"- Device: `{device}`")
lines.append(f"- Training samples: `{train_size}`")
lines.append(f"- Validation samples: `{val_size}`")
lines.append(f"- Max vocab size: `{MAX_VOCAB}`")
lines.append(f"- Max sequence length: `{MAX_LEN}`")
lines.append(f"- Batch size: `{BATCH_SIZE}`")
lines.append(f"- Epochs: `{EPOCHS}`")
lines.append(f"- Learning rate: `{LR}`")
lines.append("")
lines.append("## Overall Comparison")
lines.append("")
lines.append("| Model Size | Trainable Params | Accuracy | Precision | Recall | F1 | Checkpoint |")
lines.append("|---|---:|---:|---:|---:|---:|---|")
for item in results:
metrics = item["metrics"]
lines.append(
f"| {item['size']} | {item['params']:,} | "
f"{metrics['accuracy']:.4f} | {metrics['precision']:.4f} | "
f"{metrics['recall']:.4f} | {metrics['f1']:.4f} | "
f"`{item['checkpoint_path']}` |"
)
lines.append("")
lines.append("## Best Model")
lines.append("")
lines.append(f"- Best size by validation F1: `{best_result['size']}`")
lines.append(f"- Checkpoint: `{best_result['checkpoint_path']}`")
lines.append(f"- Trainable parameters: `{best_result['params']:,}`")
lines.append("- Metrics:")
lines.append(f" - Accuracy: `{best_result['metrics']['accuracy']:.4f}`")
lines.append(f" - Precision: `{best_result['metrics']['precision']:.4f}`")
lines.append(f" - Recall: `{best_result['metrics']['recall']:.4f}`")
lines.append(f" - F1: `{best_result['metrics']['f1']:.4f}`")
lines.append("")
lines.append("## Per-Model Details")
lines.append("")
for item in results:
cfg = item["config"]
metrics = item["metrics"]
lines.append(f"### {item['size'].capitalize()} model")
lines.append("")
lines.append("- Architecture:")
lines.append(f" - `d_model`: `{cfg['d_model']}`")
lines.append(f" - `num_heads`: `{cfg['num_heads']}`")
lines.append(f" - `num_layers`: `{cfg['num_layers']}`")
lines.append(f" - `d_ff`: `{cfg['d_ff']}`")
lines.append(f"- Trainable params: `{item['params']:,}`")
lines.append(f"- Checkpoint: `{item['checkpoint_path']}`")
lines.append("- Validation metrics:")
lines.append(f" - Accuracy: `{metrics['accuracy']:.4f}`")
lines.append(f" - Precision: `{metrics['precision']:.4f}`")
lines.append(f" - Recall: `{metrics['recall']:.4f}`")
lines.append(f" - F1: `{metrics['f1']:.4f}`")
lines.append("")
with open(report_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
# ==========================================
# 5. Execution Example (Subset of IMDB)
# ==========================================
# Dataset loading using the real IMDB dataset via HuggingFace datasets.
# Data source:
# HuggingFace Datasets – "imdb" configuration, which originates from the
# Large Movie Review Dataset (Maas et al., 2011).
def load_imdb_texts(split: str = "train"):
"""
Load IMDB dataset texts and labels using `datasets.load_dataset`.
Args:
split (str): Dataset split, e.g. "train" or "test".
Returns:
Tuple[List[str], List[int]]: List of review texts and sentiment labels,
where labels are integers 0 (negative) and 1 (positive).
"""
ds = load_dataset("imdb", split=split)
texts = ds["text"]
labels = ds["label"]
return texts, labels
# ===========================
# Hyperparameters
# ===========================
# MAX_VOCAB: upper bound on vocabulary size. Larger values can capture more
# rare words but increase model size and memory usage.
MAX_VOCAB = 5000
# MAX_LEN: maximum number of tokens per review. Longer sequences capture
# more context but are more expensive to process; here we use 64 for speed.
MAX_LEN = 64
# BATCH_SIZE: number of examples per optimization step. Larger batches yield
# smoother gradients but require more memory.
BATCH_SIZE = 32
# EPOCHS: number of full passes through the training dataset.
EPOCHS = 5
# LR: initial learning rate for the Adam optimizer.
LR = 0.001
# Transformer size presets for model-complexity experiments.
# Each preset controls hidden size, attention heads, number of layers,
# and feed-forward dimension.
MODEL_SIZES = {
"small": {"d_model": 64, "num_heads": 4, "num_layers": 1, "d_ff": 128},
"medium": {"d_model": 128, "num_heads": 8, "num_layers": 2, "d_ff": 256},
"large": {"d_model": 256, "num_heads": 8, "num_layers": 4, "d_ff": 512},
}
# Directory to save trained model and related artifacts (checkpoint, vocab,
# and configuration dictionary for reproducibility).
# Keep output paths relative to the current working directory.
SAVE_DIR = os.path.join(".", "saved_model")
os.makedirs(SAVE_DIR, exist_ok=True)
MODEL_PATH = os.path.join(SAVE_DIR, "transformer_imdb.pt")
REPORT_PATH = os.path.join(SAVE_DIR, "transformer_imdb_experiment_report.md")
def main():
"""
Train a Transformer-based sentiment classifier on IMDB and save the model,
vocabulary, and configuration to disk.
"""
# 1) Load IMDB training split and then create train/validation split.
all_train_texts, all_train_labels = load_imdb_texts(split="train")
train_texts, val_texts, train_labels, val_labels = train_test_split(
all_train_texts,
all_train_labels,
test_size=0.2,
random_state=42,
stratify=all_train_labels,
)
# 2) Build vocabulary using training texts only (avoid validation leakage).
vocab = build_vocab(train_texts, MAX_VOCAB)
# 3) Preprocess train and validation data into fixed-length ID sequences.
train_sequences = preprocess_data(train_texts, vocab, MAX_LEN)
val_sequences = preprocess_data(val_texts, vocab, MAX_LEN)
train_dataset = IMDBDataset(train_sequences, train_labels)
val_dataset = IMDBDataset(val_sequences, val_labels)
# DataLoaders for mini-batch training and validation.
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results = []
# Train and evaluate multiple model sizes to analyze how complexity
# changes sentiment-classification performance.
for size_name, size_cfg in MODEL_SIZES.items():
print("\n" + "=" * 72)
print(f"Training {size_name.upper()} model with config: {size_cfg}")
print("=" * 72)
model = TransformerClassifier(
len(vocab),
size_cfg["d_model"],
size_cfg["num_heads"],
size_cfg["num_layers"],
size_cfg["d_ff"],
MAX_LEN,
)
param_count = count_trainable_parameters(model)
print(f"Trainable parameters ({size_name}): {param_count:,}")
train_model(model, train_loader, val_loader, EPOCHS, LR, device)
val_metrics = evaluate_model(model, val_loader, device)
size_model_path = os.path.join(SAVE_DIR, f"transformer_imdb_{size_name}.pt")
results.append(
{
"size": size_name,
"params": param_count,
"config": size_cfg,
"metrics": val_metrics,
"checkpoint_path": size_model_path,
}
)
# Save each trained size-specific model.
torch.save(
{
"model_state_dict": model.state_dict(),
"vocab": vocab,
"config": {
"max_vocab": MAX_VOCAB,
"max_len": MAX_LEN,
"batch_size": BATCH_SIZE,
"epochs": EPOCHS,
"lr": LR,
"size_name": size_name,
**size_cfg,
},
"val_metrics": val_metrics,
},
size_model_path,
)
print(f"Saved {size_name} model to {size_model_path}")
# Print a concise comparison table at the end.
print("\n" + "#" * 72)
print("Model Size Impact Summary (Validation Set)")
print("#" * 72)
print(f"{'Size':<10} {'Params':>12} {'Acc':>8} {'Precision':>10} {'Recall':>8} {'F1':>8}")
for item in results:
m = item["metrics"]
print(
f"{item['size']:<10} "
f"{item['params']:>12,} "
f"{m['accuracy']:>8.4f} "
f"{m['precision']:>10.4f} "
f"{m['recall']:>8.4f} "
f"{m['f1']:>8.4f}"
)
# Keep a compatibility checkpoint name for the best model by validation F1.
best_result = max(results, key=lambda x: x["metrics"]["f1"])
best_model_path = os.path.join(SAVE_DIR, f"transformer_imdb_{best_result['size']}.pt")
torch.save(
{
"best_size": best_result["size"],
"best_model_path": best_model_path,
"all_results": results,
},
MODEL_PATH,
)
print(f"\nBest model by Val F1: {best_result['size']} -> {best_model_path}")
print(f"Experiment summary saved to {MODEL_PATH}")
write_experiment_report_md(
REPORT_PATH,
results,
best_result,
device,
train_size=len(train_texts),
val_size=len(val_texts),
)
print(f"Markdown report saved to {REPORT_PATH}")
if __name__ == "__main__":
main()