sohv's picture
Upload src/moe.py
f52cfc5 verified
"""
Mixture of Experts (MoE) Implementation for nanoKimi
This module implements the MoE layer used in Kimi-K2, which allows
for efficient scaling by routing tokens to different expert networks.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
class MoELayer(nn.Module):
"""
Mixture of Experts Layer
Routes input tokens to different expert networks based on a learned gating function.
Only the top-k experts are activated for each token, making the computation sparse.
Args:
n_embd: embedding dimension
num_experts: number of expert networks
expert_capacity: capacity of each expert (max tokens per expert)
top_k: number of experts to route each token to
dropout: dropout probability
bias: whether to use bias in linear layers
"""
def __init__(self, n_embd, num_experts=8, expert_capacity=32, top_k=2, dropout=0.0, bias=True):
super().__init__()
self.n_embd = n_embd
self.num_experts = num_experts
self.expert_capacity = expert_capacity
self.top_k = top_k
# Gating network - decides which experts to use
self.gate = nn.Linear(n_embd, num_experts, bias=bias)
# Expert networks - simple FFN for each expert
self.experts = nn.ModuleList([
ExpertFFN(n_embd, dropout=dropout, bias=bias)
for _ in range(num_experts)
])
# Load balancing loss coefficient
self.load_balance_loss_coef = 0.01
def forward(self, x):
B, T, C = x.shape
# Flatten to (B*T, C) for easier processing
x_flat = x.view(-1, C)
# Compute gating scores
gate_logits = self.gate(x_flat) # (B*T, num_experts)
gate_scores = F.softmax(gate_logits, dim=-1)
# Select top-k experts for each token
top_k_scores, top_k_indices = torch.topk(gate_scores, self.top_k, dim=-1)
# Normalize top-k scores
top_k_scores = top_k_scores / top_k_scores.sum(dim=-1, keepdim=True)
# Initialize output
output = torch.zeros_like(x_flat)
# Process each expert
for expert_idx in range(self.num_experts):
# Find tokens assigned to this expert
expert_mask = (top_k_indices == expert_idx).any(dim=-1)
if expert_mask.sum() == 0:
continue
# Get tokens for this expert
expert_tokens = x_flat[expert_mask]
# Apply capacity constraint
if expert_tokens.size(0) > self.expert_capacity:
# Random sampling if too many tokens
perm = torch.randperm(expert_tokens.size(0))[:self.expert_capacity]
expert_tokens = expert_tokens[perm]
expert_mask_indices = torch.where(expert_mask)[0][perm]
else:
expert_mask_indices = torch.where(expert_mask)[0]
# Process through expert
expert_output = self.experts[expert_idx](expert_tokens)
# Weight by gating scores and add to output
for i, token_idx in enumerate(expert_mask_indices):
# Find which position in top_k this expert is for this token
expert_positions = (top_k_indices[token_idx] == expert_idx).nonzero(as_tuple=True)[0]
if len(expert_positions) > 0:
weight = top_k_scores[token_idx, expert_positions[0]]
output[token_idx] += weight * expert_output[i]
# Reshape back to original shape
output = output.view(B, T, C)
# Compute load balancing loss
load_balance_loss = self._compute_load_balance_loss(gate_scores)
return output, load_balance_loss
def _compute_load_balance_loss(self, gate_scores):
"""
Compute load balancing loss to encourage equal usage of experts
"""
# Compute the fraction of tokens routed to each expert
expert_usage = gate_scores.mean(dim=0) # (num_experts,)
# Target is uniform distribution
target_usage = torch.ones_like(expert_usage) / self.num_experts
# L2 loss between actual and target usage
load_balance_loss = F.mse_loss(expert_usage, target_usage)
return self.load_balance_loss_coef * load_balance_loss
class ExpertFFN(nn.Module):
"""
Expert Feed-Forward Network
A simple two-layer MLP that serves as an expert in the MoE layer.
"""
def __init__(self, n_embd, dropout=0.0, bias=True):
super().__init__()
# Typical GPT-style FFN with 4x expansion
self.fc1 = nn.Linear(n_embd, 4 * n_embd, bias=bias)
self.fc2 = nn.Linear(4 * n_embd, n_embd, bias=bias)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.fc1(x)
x = F.gelu(x)
x = self.fc2(x)
x = self.dropout(x)
return x
class StandardFFN(nn.Module):
"""
Standard Feed-Forward Network for comparison with MoE
"""
def __init__(self, n_embd, dropout=0.0, bias=True):
super().__init__()
self.fc1 = nn.Linear(n_embd, 4 * n_embd, bias=bias)
self.fc2 = nn.Linear(4 * n_embd, n_embd, bias=bias)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.fc1(x)
x = F.gelu(x)
x = self.fc2(x)
x = self.dropout(x)
return x, 0.0 # Return 0 load balance loss for consistency