resmlp_comparison / experiment_v2.py
AmberLJC's picture
Upload experiment_v2.py with huggingface_hub
7dbd1cf verified
"""
PlainMLP vs ResMLP Comparison on Distant Identity Task (V2)
This experiment demonstrates the vanishing gradient problem in deep networks
and how residual connections solve it.
Key insight: The identity task Y=X is trivially solvable by a residual network
if it can learn to zero out the residual branch, but a plain network must
learn a complex composition of transformations.
V2 Changes:
- Use proper residual scaling (1/sqrt(num_layers)) to prevent explosion
- Better initialization for residual blocks
"""
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple
import json
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
# Configuration
NUM_LAYERS = 20
HIDDEN_DIM = 64
NUM_SAMPLES = 1024
TRAINING_STEPS = 500
LEARNING_RATE = 1e-3
BATCH_SIZE = 64
print(f"[Config] Layers: {NUM_LAYERS}, Hidden Dim: {HIDDEN_DIM}")
print(f"[Config] Samples: {NUM_SAMPLES}, Steps: {TRAINING_STEPS}, LR: {LEARNING_RATE}")
class PlainMLP(nn.Module):
"""Plain MLP: x = ReLU(Linear(x)) for each layer
This architecture suffers from vanishing gradients in deep networks because:
1. Each ReLU zeros out negative values, losing information
2. Gradients must flow through all layers multiplicatively
3. The network must learn a complex function composition to approximate identity
"""
def __init__(self, dim: int, num_layers: int):
super().__init__()
self.layers = nn.ModuleList()
for _ in range(num_layers):
layer = nn.Linear(dim, dim)
# Kaiming He initialization
nn.init.kaiming_normal_(layer.weight, mode='fan_in', nonlinearity='relu')
nn.init.zeros_(layer.bias)
self.layers.append(layer)
self.activation = nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
for layer in self.layers:
x = self.activation(layer(x))
return x
class ResMLP(nn.Module):
"""Residual MLP: x = x + scale * ReLU(Linear(x)) for each layer
Key advantages for identity learning:
1. Identity shortcut allows gradients to flow directly to early layers
2. Network only needs to learn the residual (deviation from identity)
3. For identity task, optimal solution is to zero the residual branch
Uses scaling factor 1/sqrt(num_layers) to prevent activation explosion.
"""
def __init__(self, dim: int, num_layers: int):
super().__init__()
self.layers = nn.ModuleList()
self.scale = 1.0 / np.sqrt(num_layers) # Scaling to prevent explosion
for _ in range(num_layers):
layer = nn.Linear(dim, dim)
# Kaiming He initialization
nn.init.kaiming_normal_(layer.weight, mode='fan_in', nonlinearity='relu')
nn.init.zeros_(layer.bias)
self.layers.append(layer)
self.activation = nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
for layer in self.layers:
x = x + self.scale * self.activation(layer(x)) # Scaled residual
return x
def generate_identity_data(num_samples: int, dim: int) -> Tuple[torch.Tensor, torch.Tensor]:
"""Generate synthetic data where Y = X, with X ~ U(-1, 1)
This is the "Distant Identity" task - the network must learn to output
exactly what it received as input, which is trivial for a single layer
but challenging for deep networks without skip connections.
"""
X = torch.empty(num_samples, dim).uniform_(-1, 1)
Y = X.clone() # Identity task: target equals input
return X, Y
def train_model(model: nn.Module, X: torch.Tensor, Y: torch.Tensor,
steps: int, lr: float, batch_size: int) -> List[float]:
"""Train model and record loss at each step"""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
losses = []
num_samples = X.shape[0]
for step in range(steps):
# Random batch sampling
indices = torch.randint(0, num_samples, (batch_size,))
batch_x = X[indices]
batch_y = Y[indices]
# Forward pass
optimizer.zero_grad()
output = model(batch_x)
loss = criterion(output, batch_y)
# Backward pass
loss.backward()
optimizer.step()
losses.append(loss.item())
if step % 100 == 0:
print(f" Step {step}/{steps}, Loss: {loss.item():.6f}")
return losses
class ActivationGradientHook:
"""Hook to capture activations and gradients at each layer"""
def __init__(self):
self.activations: List[torch.Tensor] = []
self.gradients: List[torch.Tensor] = []
self.handles = []
def register_hooks(self, model: nn.Module):
"""Register forward and backward hooks on each layer"""
for layer in model.layers:
# Forward hook to capture activations (output of linear layer)
handle_fwd = layer.register_forward_hook(self._forward_hook)
# Backward hook to capture gradients
handle_bwd = layer.register_full_backward_hook(self._backward_hook)
self.handles.extend([handle_fwd, handle_bwd])
def _forward_hook(self, module, input, output):
self.activations.append(output.detach().clone())
def _backward_hook(self, module, grad_input, grad_output):
# grad_output[0] is the gradient w.r.t. the layer's output
self.gradients.append(grad_output[0].detach().clone())
def clear(self):
self.activations = []
self.gradients = []
def remove_hooks(self):
for handle in self.handles:
handle.remove()
self.handles = []
def get_activation_stats(self) -> Tuple[List[float], List[float]]:
"""Get mean and std of activations for each layer"""
means = [act.mean().item() for act in self.activations]
stds = [act.std().item() for act in self.activations]
return means, stds
def get_gradient_norms(self) -> List[float]:
"""Get L2 norm of gradients for each layer"""
# Gradients are captured in reverse order (from output to input)
norms = [grad.norm(2).item() for grad in reversed(self.gradients)]
return norms
def analyze_final_state(model: nn.Module, dim: int, batch_size: int = 64) -> Dict:
"""Perform forward/backward pass and capture activation/gradient stats"""
hook = ActivationGradientHook()
hook.register_hooks(model)
# Generate new random batch
X_test = torch.empty(batch_size, dim).uniform_(-1, 1)
Y_test = X_test.clone()
# Forward pass
model.zero_grad()
output = model(X_test)
loss = nn.MSELoss()(output, Y_test)
# Backward pass
loss.backward()
# Get statistics
act_means, act_stds = hook.get_activation_stats()
grad_norms = hook.get_gradient_norms()
hook.remove_hooks()
return {
'activation_means': act_means,
'activation_stds': act_stds,
'gradient_norms': grad_norms,
'final_loss': loss.item()
}
def plot_training_loss(plain_losses: List[float], res_losses: List[float], save_path: str):
"""Plot training loss curves for both models"""
plt.figure(figsize=(10, 6))
steps = range(len(plain_losses))
plt.plot(steps, plain_losses, label='PlainMLP', color='#e74c3c', alpha=0.8, linewidth=2)
plt.plot(steps, res_losses, label='ResMLP', color='#3498db', alpha=0.8, linewidth=2)
plt.xlabel('Training Steps', fontsize=12)
plt.ylabel('MSE Loss', fontsize=12)
plt.title('Training Loss: PlainMLP vs ResMLP on Identity Task', fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.yscale('log') # Log scale to see differences better
# Add annotation about final losses
final_plain = plain_losses[-1]
final_res = res_losses[-1]
plt.annotate(f'PlainMLP final: {final_plain:.4f}',
xy=(len(plain_losses)-1, final_plain),
xytext=(len(plain_losses)*0.7, final_plain*2),
fontsize=10, color='#e74c3c',
arrowprops=dict(arrowstyle='->', color='#e74c3c', alpha=0.7))
plt.annotate(f'ResMLP final: {final_res:.6f}',
xy=(len(res_losses)-1, final_res),
xytext=(len(res_losses)*0.7, final_res*0.1),
fontsize=10, color='#3498db',
arrowprops=dict(arrowstyle='->', color='#3498db', alpha=0.7))
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[Plot] Saved training loss plot to {save_path}")
def plot_gradient_magnitudes(plain_grads: List[float], res_grads: List[float], save_path: str):
"""Plot gradient magnitude vs layer depth"""
plt.figure(figsize=(10, 6))
layers = range(1, len(plain_grads) + 1)
plt.plot(layers, plain_grads, 'o-', label='PlainMLP', color='#e74c3c',
markersize=8, linewidth=2, markeredgecolor='white', markeredgewidth=1)
plt.plot(layers, res_grads, 's-', label='ResMLP', color='#3498db',
markersize=8, linewidth=2, markeredgecolor='white', markeredgewidth=1)
plt.xlabel('Layer Depth', fontsize=12)
plt.ylabel('Gradient L2 Norm', fontsize=12)
plt.title('Gradient Magnitude vs Layer Depth (After Training)', fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.yscale('log')
# Add shaded region to highlight gradient difference
plt.fill_between(layers, plain_grads, res_grads, alpha=0.2, color='gray')
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[Plot] Saved gradient magnitude plot to {save_path}")
def plot_activation_means(plain_means: List[float], res_means: List[float], save_path: str):
"""Plot activation mean vs layer depth"""
plt.figure(figsize=(10, 6))
layers = range(1, len(plain_means) + 1)
plt.plot(layers, plain_means, 'o-', label='PlainMLP', color='#e74c3c',
markersize=8, linewidth=2, markeredgecolor='white', markeredgewidth=1)
plt.plot(layers, res_means, 's-', label='ResMLP', color='#3498db',
markersize=8, linewidth=2, markeredgecolor='white', markeredgewidth=1)
plt.axhline(y=0, color='gray', linestyle='--', alpha=0.5, label='Zero baseline')
plt.xlabel('Layer Depth', fontsize=12)
plt.ylabel('Activation Mean', fontsize=12)
plt.title('Activation Mean vs Layer Depth (After Training)', fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[Plot] Saved activation mean plot to {save_path}")
def plot_activation_stds(plain_stds: List[float], res_stds: List[float], save_path: str):
"""Plot activation std vs layer depth"""
plt.figure(figsize=(10, 6))
layers = range(1, len(plain_stds) + 1)
plt.plot(layers, plain_stds, 'o-', label='PlainMLP', color='#e74c3c',
markersize=8, linewidth=2, markeredgecolor='white', markeredgewidth=1)
plt.plot(layers, res_stds, 's-', label='ResMLP', color='#3498db',
markersize=8, linewidth=2, markeredgecolor='white', markeredgewidth=1)
plt.xlabel('Layer Depth', fontsize=12)
plt.ylabel('Activation Std', fontsize=12)
plt.title('Activation Standard Deviation vs Layer Depth (After Training)', fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"[Plot] Saved activation std plot to {save_path}")
def main():
print("=" * 60)
print("PlainMLP vs ResMLP: Distant Identity Task Experiment (V2)")
print("=" * 60)
# Generate synthetic data
print("\n[1] Generating synthetic identity data...")
X, Y = generate_identity_data(NUM_SAMPLES, HIDDEN_DIM)
print(f" Data shape: X={X.shape}, Y={Y.shape}")
print(f" X range: [{X.min():.3f}, {X.max():.3f}]")
# Initialize models
print("\n[2] Initializing models...")
plain_mlp = PlainMLP(HIDDEN_DIM, NUM_LAYERS)
res_mlp = ResMLP(HIDDEN_DIM, NUM_LAYERS)
plain_params = sum(p.numel() for p in plain_mlp.parameters())
res_params = sum(p.numel() for p in res_mlp.parameters())
print(f" PlainMLP parameters: {plain_params:,}")
print(f" ResMLP parameters: {res_params:,}")
print(f" ResMLP residual scale: {res_mlp.scale:.4f}")
# Train PlainMLP
print("\n[3] Training PlainMLP...")
plain_losses = train_model(plain_mlp, X, Y, TRAINING_STEPS, LEARNING_RATE, BATCH_SIZE)
print(f" Final loss: {plain_losses[-1]:.6f}")
# Train ResMLP
print("\n[4] Training ResMLP...")
res_losses = train_model(res_mlp, X, Y, TRAINING_STEPS, LEARNING_RATE, BATCH_SIZE)
print(f" Final loss: {res_losses[-1]:.6f}")
# Final state analysis
print("\n[5] Analyzing final state of trained models...")
print(" Analyzing PlainMLP...")
plain_stats = analyze_final_state(plain_mlp, HIDDEN_DIM)
print(" Analyzing ResMLP...")
res_stats = analyze_final_state(res_mlp, HIDDEN_DIM)
# Print analysis summary
print("\n[6] Analysis Summary:")
print(f" PlainMLP - Final Loss: {plain_stats['final_loss']:.6f}")
print(f" ResMLP - Final Loss: {res_stats['final_loss']:.6f}")
print(f" Loss Improvement: {plain_stats['final_loss'] / res_stats['final_loss']:.1f}x")
print(f"\n PlainMLP - Gradient norm range: [{min(plain_stats['gradient_norms']):.2e}, {max(plain_stats['gradient_norms']):.2e}]")
print(f" ResMLP - Gradient norm range: [{min(res_stats['gradient_norms']):.2e}, {max(res_stats['gradient_norms']):.2e}]")
print(f"\n PlainMLP - Activation std range: [{min(plain_stats['activation_stds']):.4f}, {max(plain_stats['activation_stds']):.4f}]")
print(f" ResMLP - Activation std range: [{min(res_stats['activation_stds']):.4f}, {max(res_stats['activation_stds']):.4f}]")
# Generate plots
print("\n[7] Generating plots...")
plot_training_loss(plain_losses, res_losses, 'plots/training_loss.png')
plot_gradient_magnitudes(plain_stats['gradient_norms'], res_stats['gradient_norms'],
'plots/gradient_magnitude.png')
plot_activation_means(plain_stats['activation_means'], res_stats['activation_means'],
'plots/activation_mean.png')
plot_activation_stds(plain_stats['activation_stds'], res_stats['activation_stds'],
'plots/activation_std.png')
# Save results to JSON for report
results = {
'config': {
'num_layers': NUM_LAYERS,
'hidden_dim': HIDDEN_DIM,
'num_samples': NUM_SAMPLES,
'training_steps': TRAINING_STEPS,
'learning_rate': LEARNING_RATE,
'batch_size': BATCH_SIZE,
'residual_scale': float(res_mlp.scale)
},
'plain_mlp': {
'final_loss': plain_losses[-1],
'initial_loss': plain_losses[0],
'loss_history': plain_losses,
'gradient_norms': plain_stats['gradient_norms'],
'activation_means': plain_stats['activation_means'],
'activation_stds': plain_stats['activation_stds']
},
'res_mlp': {
'final_loss': res_losses[-1],
'initial_loss': res_losses[0],
'loss_history': res_losses,
'gradient_norms': res_stats['gradient_norms'],
'activation_means': res_stats['activation_means'],
'activation_stds': res_stats['activation_stds']
}
}
with open('results.json', 'w') as f:
json.dump(results, f, indent=2)
print("\n[8] Results saved to results.json")
print("\n" + "=" * 60)
print("Experiment completed successfully!")
print("=" * 60)
return results
if __name__ == "__main__":
results = main()