|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| """
|
| GPT-style Language Model Architecture
|
|
|
| This module implements a standard GPT (Generative Pre-trained Transformer) architecture
|
| using pure PyTorch. The model is a decoder-only transformer designed for autoregressive
|
| language modeling (next-token prediction).
|
|
|
| ARCHITECTURE OVERVIEW:
|
| - Token Embedding: Maps token IDs to dense vectors
|
| - Positional Embedding: Adds position information to token embeddings
|
| - Transformer Blocks: Stack of multi-head attention + feed-forward layers
|
| - Layer Normalization: Pre-norm placement for training stability
|
| - Output Head: Linear projection to vocabulary for next-token prediction
|
|
|
| FEATURES:
|
| - Configurable model size (small/medium/large)
|
| - Dropout for regularization
|
| - Causal (autoregressive) attention masking
|
| - Compatible with our SentencePiece tokenizer
|
| - Memory-efficient implementation for training on limited hardware
|
|
|
| Usage:
|
| from model import GPTConfig, GPTModel
|
|
|
| config = GPTConfig(vocab_size=32000, n_layer=12, n_head=12, n_embd=768)
|
| model = GPTModel(config)
|
|
|
| # Forward pass
|
| logits = model(input_ids) # Shape: (batch_size, seq_len, vocab_size)
|
|
|
| Hardware Requirements:
|
| - Small Model (25M params): 4-8GB RAM, CPU/integrated GPU
|
| - Medium Model (117M params): 8-16GB RAM, dedicated GPU recommended
|
| - Large Model (350M params): 16GB+ RAM, high-end GPU required
|
|
|
| Author: Louis Chua Bean Chong
|
| License: GPLv3
|
| """
|
|
|
| import math
|
| import torch
|
| import torch.nn as nn
|
| import torch.nn.functional as F
|
| from dataclasses import dataclass
|
| from typing import Optional, Tuple
|
|
|
|
|
| @dataclass
|
| class GPTConfig:
|
| """
|
| Configuration class for GPT model hyperparameters.
|
|
|
| This class defines all the architectural parameters needed to instantiate
|
| a GPT model. Use the provided class methods to get pre-configured setups
|
| for different model sizes.
|
| """
|
|
|
|
|
| vocab_size: int = 32000
|
| n_layer: int = 12
|
| n_head: int = 12
|
| n_embd: int = 768
|
|
|
|
|
| block_size: int = 1024
|
|
|
|
|
| dropout: float = 0.1
|
| bias: bool = True
|
|
|
|
|
| model_name: str = "gpt-medium"
|
|
|
| @classmethod
|
| def small(cls) -> 'GPTConfig':
|
| """Small model configuration (~25M parameters) - Good for CPU training"""
|
| return cls(
|
| vocab_size=32000,
|
| n_layer=6,
|
| n_head=8,
|
| n_embd=512,
|
| block_size=1024,
|
| dropout=0.1,
|
| model_name="gpt-small"
|
| )
|
|
|
| @classmethod
|
| def medium(cls) -> 'GPTConfig':
|
| """Medium model configuration (~117M parameters) - Balanced performance"""
|
| return cls(
|
| vocab_size=32000,
|
| n_layer=12,
|
| n_head=12,
|
| n_embd=768,
|
| block_size=2048,
|
| dropout=0.1,
|
| model_name="gpt-medium"
|
| )
|
|
|
| @classmethod
|
| def large(cls) -> 'GPTConfig':
|
| """Large model configuration (~350M parameters) - High performance"""
|
| return cls(
|
| vocab_size=32000,
|
| n_layer=24,
|
| n_head=16,
|
| n_embd=1024,
|
| block_size=2048,
|
| dropout=0.1,
|
| model_name="gpt-large"
|
| )
|
|
|
| def estimate_parameters(self) -> int:
|
| """
|
| Estimate the total number of trainable parameters.
|
|
|
| Returns:
|
| int: Estimated parameter count
|
| """
|
|
|
| token_emb = self.vocab_size * self.n_embd
|
|
|
|
|
| pos_emb = self.block_size * self.n_embd
|
|
|
|
|
|
|
| layer_params = self.n_layer * (12 * self.n_embd**2 + 4 * self.n_embd)
|
|
|
|
|
| output_head = self.vocab_size * self.n_embd
|
|
|
| total = token_emb + pos_emb + layer_params + output_head
|
| return total
|
|
|
|
|
| class CausalSelfAttention(nn.Module):
|
| """
|
| Multi-head causal self-attention mechanism.
|
|
|
| This implements the core attention mechanism of the transformer, with causal
|
| masking to ensure autoregressive behavior (tokens can only attend to previous
|
| tokens, not future ones).
|
| """
|
|
|
| def __init__(self, config: GPTConfig):
|
| super().__init__()
|
| assert config.n_embd % config.n_head == 0, "Embedding dim must be divisible by number of heads"
|
|
|
| self.config = config
|
| self.n_head = config.n_head
|
| self.n_embd = config.n_embd
|
| self.head_dim = self.n_embd // self.n_head
|
|
|
|
|
| self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
|
|
|
|
|
| self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
|
|
|
|
|
| self.attn_dropout = nn.Dropout(config.dropout)
|
| self.resid_dropout = nn.Dropout(config.dropout)
|
|
|
|
|
| self.register_buffer(
|
| "bias",
|
| torch.tril(torch.ones(config.block_size, config.block_size))
|
| .view(1, 1, config.block_size, config.block_size)
|
| )
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| """
|
| Forward pass of causal self-attention.
|
|
|
| This method implements the scaled dot-product attention mechanism with causal masking.
|
| The attention mechanism allows each token to attend to all previous tokens in the sequence,
|
| but not to future tokens, maintaining the autoregressive property essential for language modeling.
|
|
|
| Mathematical formulation:
|
| Attention(Q, K, V) = softmax(QK^T / sqrt(d_k))V
|
| where Q, K, V are query, key, value matrices derived from input x
|
|
|
| Implementation details:
|
| - Uses batch matrix multiplication for efficiency
|
| - Applies causal mask to prevent future token attention
|
| - Implements multi-head attention by reshaping and parallel processing
|
| - Applies dropout for regularization during training
|
|
|
| Args:
|
| x: Input tensor of shape (batch_size, seq_len, n_embd)
|
| Contains embedded token representations from previous layer
|
|
|
| Returns:
|
| torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
|
| """
|
|
|
|
|
|
|
|
|
| B, T, C = x.size()
|
|
|
|
|
|
|
|
|
|
|
| q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
|
|
|
|
|
|
|
|
|
|
|
|
|
| q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
|
| k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
|
| v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
| att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
|
|
|
|
|
|
|
|
|
|
|
| att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
|
|
|
|
|
|
|
|
|
| att = F.softmax(att, dim=-1)
|
|
|
|
|
|
|
| att = self.attn_dropout(att)
|
|
|
|
|
|
|
|
|
|
|
| y = att @ v
|
|
|
|
|
|
|
|
|
|
|
| y = y.transpose(1, 2).contiguous().view(B, T, C)
|
|
|
|
|
|
|
|
|
| y = self.resid_dropout(self.c_proj(y))
|
| return y
|
|
|
|
|
| class MLP(nn.Module):
|
| """
|
| Multi-Layer Perceptron (Feed-Forward Network) for Transformer.
|
|
|
| This implements the position-wise feed-forward network that appears in each transformer layer.
|
| The MLP provides additional non-linear transformation capacity beyond what attention provides.
|
|
|
| Architecture:
|
| Input -> Linear(n_embd -> 4*n_embd) -> GELU -> Linear(4*n_embd -> n_embd) -> Dropout -> Output
|
|
|
| Design rationale:
|
| - 4x expansion is standard in transformers (from "Attention Is All You Need")
|
| - GELU activation provides smoother gradients than ReLU for language modeling
|
| - Dropout prevents overfitting in the feed-forward layers
|
| - Two linear layers allow complex non-linear transformations of attention outputs
|
|
|
| Parameters:
|
| - First linear layer: n_embd * 4*n_embd parameters (expansion)
|
| - Second linear layer: 4*n_embd * n_embd parameters (projection back)
|
| - Total: 8 * n_embd^2 parameters (significant portion of model size)
|
| """
|
|
|
| def __init__(self, config: GPTConfig):
|
| super().__init__()
|
|
|
|
|
|
|
|
|
| self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
|
|
|
|
|
|
|
|
|
| self.gelu = nn.GELU()
|
|
|
|
|
|
|
| self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
|
|
|
|
|
|
|
| self.dropout = nn.Dropout(config.dropout)
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| """
|
| Forward pass of the feed-forward network.
|
|
|
| This method applies a two-layer MLP with GELU activation to transform
|
| the attention outputs. The MLP operates independently on each position
|
| in the sequence, providing position-wise non-linear transformations.
|
|
|
| Mathematical operation:
|
| MLP(x) = Dropout(Linear₂(GELU(Linear₁(x))))
|
| where Linear₁: R^n_embd -> R^4*n_embd and Linear₂: R^4*n_embd -> R^n_embd
|
|
|
| Args:
|
| x: Input tensor of shape (batch_size, seq_len, n_embd)
|
| Contains attended representations from the attention layer
|
|
|
| Returns:
|
| torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
|
| Contains transformed representations ready for residual connection
|
| """
|
|
|
|
|
|
|
| x = self.c_fc(x)
|
|
|
|
|
|
|
|
|
| x = self.gelu(x)
|
|
|
|
|
|
|
|
|
| x = self.c_proj(x)
|
|
|
|
|
|
|
|
|
| x = self.dropout(x)
|
|
|
| return x
|
|
|
|
|
| class Block(nn.Module):
|
| """
|
| Single Transformer block.
|
|
|
| Consists of:
|
| 1. Layer normalization
|
| 2. Multi-head causal self-attention
|
| 3. Residual connection
|
| 4. Layer normalization
|
| 5. MLP (feed-forward network)
|
| 6. Residual connection
|
|
|
| Uses pre-norm architecture for better training stability.
|
| """
|
|
|
| def __init__(self, config: GPTConfig):
|
| super().__init__()
|
| self.ln_1 = nn.LayerNorm(config.n_embd)
|
| self.attn = CausalSelfAttention(config)
|
| self.ln_2 = nn.LayerNorm(config.n_embd)
|
| self.mlp = MLP(config)
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| """
|
| Forward pass of transformer block.
|
|
|
| Args:
|
| x: Input tensor of shape (batch_size, seq_len, n_embd)
|
|
|
| Returns:
|
| torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
|
| """
|
|
|
| x = x + self.attn(self.ln_1(x))
|
|
|
|
|
| x = x + self.mlp(self.ln_2(x))
|
|
|
| return x
|
|
|
|
|
| class GPTModel(nn.Module):
|
| """
|
| Complete GPT Language Model.
|
|
|
| This is the main model class that combines all components:
|
| - Token and positional embeddings
|
| - Stack of transformer blocks
|
| - Final layer normalization
|
| - Language modeling head
|
|
|
| The model can be used for:
|
| - Training from scratch on text data
|
| - Fine-tuning on downstream tasks
|
| - Text generation (inference)
|
| """
|
|
|
| def __init__(self, config: GPTConfig):
|
| super().__init__()
|
| assert config.vocab_size is not None, "vocab_size must be specified"
|
| assert config.block_size is not None, "block_size must be specified"
|
|
|
| self.config = config
|
|
|
|
|
| self.transformer = nn.ModuleDict(dict(
|
| wte = nn.Embedding(config.vocab_size, config.n_embd),
|
| wpe = nn.Embedding(config.block_size, config.n_embd),
|
| drop = nn.Dropout(config.dropout),
|
| h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
|
| ln_f = nn.LayerNorm(config.n_embd),
|
| ))
|
|
|
|
|
| self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
|
|
|
|
|
| self.transformer.wte.weight = self.lm_head.weight
|
|
|
|
|
| self.apply(self._init_weights)
|
|
|
|
|
| print(f"Model initialized: {self.config.model_name}")
|
| print(f"Parameters: {self.get_num_params():,}")
|
| print(f"Estimated: {self.config.estimate_parameters():,}")
|
|
|
| def _init_weights(self, module):
|
| """Initialize model weights using standard practices."""
|
| if isinstance(module, nn.Linear):
|
| torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
| if module.bias is not None:
|
| torch.nn.init.zeros_(module.bias)
|
| elif isinstance(module, nn.Embedding):
|
| torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
|
|
| def get_num_params(self, non_embedding: bool = False) -> int:
|
| """
|
| Count the number of parameters in the model.
|
|
|
| Args:
|
| non_embedding: If True, subtract embedding parameters
|
|
|
| Returns:
|
| int: Number of parameters
|
| """
|
| n_params = sum(p.numel() for p in self.parameters())
|
| if non_embedding:
|
| n_params -= self.transformer.wpe.weight.numel()
|
| n_params -= self.transformer.wte.weight.numel()
|
| return n_params
|
|
|
| def forward(
|
| self,
|
| idx: torch.Tensor,
|
| targets: Optional[torch.Tensor] = None
|
| ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
|
| """
|
| Forward pass of the GPT model.
|
|
|
| Args:
|
| idx: Input token indices of shape (batch_size, seq_len)
|
| targets: Optional target tokens for loss calculation (batch_size, seq_len)
|
|
|
| Returns:
|
| Tuple containing:
|
| - logits: Output logits of shape (batch_size, seq_len, vocab_size)
|
| - loss: Cross-entropy loss if targets provided, None otherwise
|
| """
|
| device = idx.device
|
| b, t = idx.size()
|
| assert t <= self.config.block_size, f"Sequence length {t} exceeds block size {self.config.block_size}"
|
|
|
|
|
| tok_emb = self.transformer.wte(idx)
|
|
|
|
|
| pos = torch.arange(0, t, dtype=torch.long, device=device)
|
| pos_emb = self.transformer.wpe(pos)
|
|
|
|
|
| x = self.transformer.drop(tok_emb + pos_emb)
|
|
|
|
|
| for block in self.transformer.h:
|
| x = block(x)
|
|
|
|
|
| x = self.transformer.ln_f(x)
|
|
|
|
|
| if targets is not None:
|
|
|
| logits = self.lm_head(x)
|
| loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
|
| else:
|
|
|
| logits = self.lm_head(x[:, [-1], :])
|
| loss = None
|
|
|
| return logits, loss
|
|
|
| def generate(
|
| self,
|
| idx: torch.Tensor,
|
| max_new_tokens: int = 100,
|
| temperature: float = 1.0,
|
| top_k: Optional[int] = None
|
| ) -> torch.Tensor:
|
| """
|
| Generate new tokens autoregressively.
|
|
|
| Args:
|
| idx: Starting token indices (batch_size, seq_len)
|
| max_new_tokens: Maximum number of new tokens to generate
|
| temperature: Sampling temperature (higher = more random)
|
| top_k: If set, only sample from top-k most likely tokens
|
|
|
| Returns:
|
| torch.Tensor: Generated sequence (batch_size, seq_len + max_new_tokens)
|
| """
|
| self.eval()
|
| with torch.no_grad():
|
| for _ in range(max_new_tokens):
|
|
|
| idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
|
|
|
|
|
| logits, _ = self(idx_cond)
|
|
|
|
|
| logits = logits[:, -1, :] / temperature
|
|
|
|
|
| if top_k is not None:
|
| v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
|
| logits[logits < v[:, [-1]]] = -float('Inf')
|
|
|
|
|
| probs = F.softmax(logits, dim=-1)
|
| idx_next = torch.multinomial(probs, num_samples=1)
|
|
|
|
|
| idx = torch.cat((idx, idx_next), dim=1)
|
|
|
| self.train()
|
| return idx
|
|
|
| def estimate_memory_usage(self, batch_size: int = 1, seq_len: int = None) -> dict:
|
| """
|
| Estimate memory usage for training and inference.
|
|
|
| Args:
|
| batch_size: Batch size for estimation
|
| seq_len: Sequence length (defaults to block_size)
|
|
|
| Returns:
|
| dict: Memory usage estimates in MB
|
| """
|
| if seq_len is None:
|
| seq_len = self.config.block_size
|
|
|
|
|
| param_memory = self.get_num_params() * 4 / (1024**2)
|
|
|
|
|
| activation_memory = (
|
| batch_size * seq_len * self.config.n_embd * self.config.n_layer * 8
|
| ) / (1024**2)
|
|
|
|
|
| gradient_memory = param_memory
|
|
|
| return {
|
| "parameters_mb": param_memory,
|
| "activations_mb": activation_memory,
|
| "gradients_mb": gradient_memory,
|
| "total_training_mb": param_memory + activation_memory + gradient_memory,
|
| "total_inference_mb": param_memory + activation_memory * 0.5,
|
| }
|
|
|
|
|
| def create_model(model_size: str = "medium") -> GPTModel:
|
| """
|
| Factory function to create a GPT model with predefined configurations.
|
|
|
| Args:
|
| model_size: Size of model to create ("small", "medium", "large")
|
|
|
| Returns:
|
| GPTModel: Initialized model
|
| """
|
| configs = {
|
| "small": GPTConfig.small(),
|
| "medium": GPTConfig.medium(),
|
| "large": GPTConfig.large(),
|
| }
|
|
|
| if model_size not in configs:
|
| raise ValueError(f"Unknown model size: {model_size}. Choose from {list(configs.keys())}")
|
|
|
| config = configs[model_size]
|
| model = GPTModel(config)
|
|
|
| return model
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| print("🧠 GPT Model Architecture")
|
| print("=" * 50)
|
|
|
|
|
| for size in ["small", "medium", "large"]:
|
| print(f"\n{size.upper()} MODEL:")
|
| model = create_model(size)
|
|
|
|
|
| memory = model.estimate_memory_usage(batch_size=4, seq_len=512)
|
| print(f"Memory (4 batch, 512 seq): {memory['total_training_mb']:.1f}MB training, {memory['total_inference_mb']:.1f}MB inference")
|
|
|
|
|
| x = torch.randint(0, 32000, (2, 64))
|
| with torch.no_grad():
|
| logits, _ = model(x)
|
| print(f"Test forward pass: {x.shape} -> {logits.shape} ✓") |