gradient_clipping_experiment / extended_experiment_v2.py
AmberLJC's picture
Upload extended_experiment_v2.py with huggingface_hub
ff872cb verified
"""
Extended Gradient Clipping Experiment V2: Testing Physics-of-AI Predictions
Key changes from V1:
1. More epochs (10 instead of 3) to allow rare class learning
2. Smaller learning rate (0.01) for more stable training
3. More frequent tracking to catch dynamics
4. Added loss tracking per class to understand learning dynamics
Predictions being tested:
- Prediction 2: Representation Collapse (effective dimensionality drops without clipping)
- Prediction 4: Rare Sample Learning (clipping improves rare class accuracy)
"""
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import random
from typing import Dict, List, Tuple
SEED = 42
def set_seeds(seed=SEED):
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
class SimpleNextTokenModel(nn.Module):
def __init__(self, vocab_size=4, embedding_dim=16):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.linear = nn.Linear(embedding_dim, vocab_size)
def forward(self, x):
embedded = self.embedding(x)
logits = self.linear(embedded)
return logits
def get_embeddings(self):
return self.embedding.weight.data.clone()
def compute_effective_dimension(embedding_matrix: torch.Tensor) -> float:
"""PCA-based effective dimensionality using entropy."""
centered = embedding_matrix - embedding_matrix.mean(dim=0, keepdim=True)
cov = torch.mm(centered.T, centered) / (embedding_matrix.shape[0] - 1)
eigenvalues = torch.linalg.eigvalsh(cov)
eigenvalues = torch.clamp(eigenvalues, min=1e-10)
eigenvalues = eigenvalues / eigenvalues.sum()
entropy = -torch.sum(eigenvalues * torch.log(eigenvalues))
return torch.exp(entropy).item()
def compute_per_class_accuracy(model: nn.Module, inputs: torch.Tensor,
targets: torch.Tensor) -> Dict[int, float]:
"""Compute accuracy for each target class."""
model.eval()
with torch.no_grad():
logits = model(inputs)
predictions = logits.argmax(dim=1)
accuracies = {}
for class_idx in range(4):
mask = targets == class_idx
if mask.sum() > 0:
correct = (predictions[mask] == targets[mask]).float().mean().item()
accuracies[class_idx] = correct
else:
accuracies[class_idx] = None
return accuracies
def compute_per_class_loss(model: nn.Module, inputs: torch.Tensor,
targets: torch.Tensor, criterion: nn.Module) -> Dict[int, float]:
"""Compute average loss for each target class."""
model.eval()
losses = {}
with torch.no_grad():
logits = model(inputs)
for class_idx in range(4):
mask = targets == class_idx
if mask.sum() > 0:
class_loss = criterion(logits[mask], targets[mask]).item()
losses[class_idx] = class_loss
else:
losses[class_idx] = None
return losses
def create_imbalanced_dataset(n_samples=1000, n_rare=10, seed=SEED):
set_seeds(seed)
inputs = torch.randint(0, 4, (n_samples,))
targets = torch.zeros(n_samples, dtype=torch.long)
rare_indices = random.sample(range(n_samples), n_rare)
targets[rare_indices] = 1
return inputs, targets, sorted(rare_indices)
def train_with_tracking(inputs: torch.Tensor, targets: torch.Tensor,
rare_indices: List[int], clip_grad: bool = False,
max_norm: float = 1.0, n_epochs: int = 10,
lr: float = 0.01, init_weights=None,
track_every: int = 50) -> Dict:
"""
Extended training with comprehensive tracking.
"""
set_seeds(SEED)
model = SimpleNextTokenModel(vocab_size=4, embedding_dim=16)
if init_weights:
model.load_state_dict({k: v.clone() for k, v in init_weights.items()})
optimizer = optim.SGD(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
metrics = {
'losses': [],
'grad_norms': [],
'weight_norms': [],
'effective_dims': [],
'effective_dim_steps': [],
'class_accuracies': {0: [], 1: [], 2: [], 3: []},
'class_losses': {0: [], 1: [], 2: [], 3: []},
'accuracy_steps': [],
'rare_sample_losses': [], # Track loss specifically at rare samples
'rare_sample_steps': [],
}
mode = "WITH" if clip_grad else "WITHOUT"
print(f"\n{'='*60}")
print(f"Training {mode} gradient clipping (max_norm={max_norm})")
print(f"Learning rate: {lr}, Epochs: {n_epochs}")
print(f"{'='*60}")
step = 0
n_samples = len(inputs)
for epoch in range(n_epochs):
model.train()
epoch_losses = []
for i in range(n_samples):
x = inputs[i:i+1]
y = targets[i:i+1]
optimizer.zero_grad()
logits = model(x)
loss = criterion(logits, y)
loss.backward()
grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), float('inf'))
if clip_grad:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
optimizer.step()
metrics['losses'].append(loss.item())
metrics['grad_norms'].append(grad_norm.item())
total_norm = sum(p.data.norm(2).item() ** 2 for p in model.parameters()) ** 0.5
metrics['weight_norms'].append(total_norm)
epoch_losses.append(loss.item())
# Track at rare sample positions
if i in rare_indices:
metrics['rare_sample_losses'].append(loss.item())
metrics['rare_sample_steps'].append(step)
# Track embedding stats and accuracy periodically
if step % track_every == 0:
emb_matrix = model.get_embeddings()
eff_dim = compute_effective_dimension(emb_matrix)
metrics['effective_dims'].append(eff_dim)
metrics['effective_dim_steps'].append(step)
class_acc = compute_per_class_accuracy(model, inputs, targets)
class_loss = compute_per_class_loss(model, inputs, targets, criterion)
for cls_idx in range(4):
if class_acc[cls_idx] is not None:
metrics['class_accuracies'][cls_idx].append(class_acc[cls_idx])
else:
metrics['class_accuracies'][cls_idx].append(0.0)
if class_loss[cls_idx] is not None:
metrics['class_losses'][cls_idx].append(class_loss[cls_idx])
else:
metrics['class_losses'][cls_idx].append(0.0)
metrics['accuracy_steps'].append(step)
step += 1
avg_loss = np.mean(epoch_losses)
class_acc = compute_per_class_accuracy(model, inputs, targets)
class_loss = compute_per_class_loss(model, inputs, targets, criterion)
eff_dim = compute_effective_dimension(model.get_embeddings())
b_acc = f"{class_acc[1]:.3f}" if class_acc[1] is not None else "N/A"
b_loss = f"{class_loss[1]:.3f}" if class_loss[1] is not None else "N/A"
print(f"Epoch {epoch+1:2d}/{n_epochs}: Loss={avg_loss:.4f} | "
f"Acc A={class_acc[0]:.3f} B={b_acc} | "
f"Loss A={class_loss[0]:.3f} B={b_loss} | "
f"EffDim={eff_dim:.3f}")
return metrics
def plot_comprehensive_analysis(metrics_no_clip: Dict, metrics_with_clip: Dict,
rare_indices: List[int], filename: str,
n_samples: int = 1000):
"""Create comprehensive 8-panel analysis."""
fig = plt.figure(figsize=(20, 16))
gs = fig.add_gridspec(4, 2, hspace=0.35, wspace=0.25)
n_epochs = len(metrics_no_clip['losses']) // n_samples
# Row 1: Effective Dimension
ax1 = fig.add_subplot(gs[0, 0])
ax2 = fig.add_subplot(gs[0, 1])
ax1.plot(metrics_no_clip['effective_dim_steps'], metrics_no_clip['effective_dims'],
'b-', linewidth=2, marker='o', markersize=3)
ax1.set_ylabel('Effective Dimension', fontsize=11)
ax1.set_title('Effective Dim - WITHOUT Clipping', fontsize=12, fontweight='bold', color='red')
ax1.grid(True, alpha=0.3)
ax1.set_ylim([2.0, 3.5])
ax2.plot(metrics_with_clip['effective_dim_steps'], metrics_with_clip['effective_dims'],
'g-', linewidth=2, marker='o', markersize=3)
ax2.set_title('Effective Dim - WITH Clipping', fontsize=12, fontweight='bold', color='green')
ax2.grid(True, alpha=0.3)
ax2.set_ylim([2.0, 3.5])
# Row 2: Class Accuracies
ax3 = fig.add_subplot(gs[1, 0])
ax4 = fig.add_subplot(gs[1, 1])
ax3.plot(metrics_no_clip['accuracy_steps'], metrics_no_clip['class_accuracies'][0],
'r-', linewidth=2, alpha=0.7, label='Without Clip')
ax3.plot(metrics_with_clip['accuracy_steps'], metrics_with_clip['class_accuracies'][0],
'g-', linewidth=2, alpha=0.7, label='With Clip')
ax3.set_ylabel('Accuracy', fontsize=11)
ax3.set_title("Common Class 'A' Accuracy", fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
ax3.set_ylim([0, 1.05])
ax4.plot(metrics_no_clip['accuracy_steps'], metrics_no_clip['class_accuracies'][1],
'r-', linewidth=2, alpha=0.7, label='Without Clip')
ax4.plot(metrics_with_clip['accuracy_steps'], metrics_with_clip['class_accuracies'][1],
'g-', linewidth=2, alpha=0.7, label='With Clip')
ax4.set_title("Rare Class 'B' Accuracy [KEY PREDICTION]", fontsize=12, fontweight='bold', color='purple')
ax4.legend()
ax4.grid(True, alpha=0.3)
ax4.set_ylim([0, 1.05])
# Row 3: Class Losses
ax5 = fig.add_subplot(gs[2, 0])
ax6 = fig.add_subplot(gs[2, 1])
ax5.plot(metrics_no_clip['accuracy_steps'], metrics_no_clip['class_losses'][0],
'r-', linewidth=2, alpha=0.7, label='Without Clip')
ax5.plot(metrics_with_clip['accuracy_steps'], metrics_with_clip['class_losses'][0],
'g-', linewidth=2, alpha=0.7, label='With Clip')
ax5.set_ylabel('Loss', fontsize=11)
ax5.set_title("Common Class 'A' Loss", fontsize=12, fontweight='bold')
ax5.legend()
ax5.grid(True, alpha=0.3)
ax6.plot(metrics_no_clip['accuracy_steps'], metrics_no_clip['class_losses'][1],
'r-', linewidth=2, alpha=0.7, label='Without Clip')
ax6.plot(metrics_with_clip['accuracy_steps'], metrics_with_clip['class_losses'][1],
'g-', linewidth=2, alpha=0.7, label='With Clip')
ax6.set_title("Rare Class 'B' Loss", fontsize=12, fontweight='bold')
ax6.legend()
ax6.grid(True, alpha=0.3)
# Row 4: Gradient Norms and Weight Norms
ax7 = fig.add_subplot(gs[3, 0])
ax8 = fig.add_subplot(gs[3, 1])
steps = range(len(metrics_no_clip['grad_norms']))
ax7.plot(steps, metrics_no_clip['grad_norms'], 'r-', alpha=0.3, linewidth=0.5, label='Without Clip')
ax7.plot(steps, metrics_with_clip['grad_norms'], 'g-', alpha=0.3, linewidth=0.5, label='With Clip')
ax7.axhline(y=1.0, color='black', linestyle='--', linewidth=2, label='Clip threshold')
ax7.set_ylabel('Gradient Norm', fontsize=11)
ax7.set_xlabel('Training Step', fontsize=11)
ax7.set_title('Gradient Norms', fontsize=12, fontweight='bold')
ax7.legend()
ax7.grid(True, alpha=0.3)
ax8.plot(steps, metrics_no_clip['weight_norms'], 'r-', alpha=0.7, linewidth=1, label='Without Clip')
ax8.plot(steps, metrics_with_clip['weight_norms'], 'g-', alpha=0.7, linewidth=1, label='With Clip')
ax8.set_xlabel('Training Step', fontsize=11)
ax8.set_title('Weight Norms', fontsize=12, fontweight='bold')
ax8.legend()
ax8.grid(True, alpha=0.3)
fig.suptitle('Extended Gradient Clipping Analysis: Testing Physics-of-AI Predictions\n'
f'(10 epochs, lr=0.01, 990 common / 10 rare samples)',
fontsize=14, fontweight='bold', y=1.01)
plt.savefig(filename, dpi=150, bbox_inches='tight')
plt.close()
print(f"Comprehensive analysis saved to: {filename}")
def plot_rare_sample_dynamics(metrics_no_clip: Dict, metrics_with_clip: Dict,
filename: str):
"""Plot dynamics specifically at rare sample positions."""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Rare sample losses over time
ax1 = axes[0, 0]
ax1.plot(metrics_no_clip['rare_sample_steps'], metrics_no_clip['rare_sample_losses'],
'ro-', alpha=0.7, markersize=3, linewidth=0.5, label='Without Clip')
ax1.plot(metrics_with_clip['rare_sample_steps'], metrics_with_clip['rare_sample_losses'],
'go-', alpha=0.7, markersize=3, linewidth=0.5, label='With Clip')
ax1.set_ylabel('Loss at Rare Sample', fontsize=11)
ax1.set_title('Loss When Encountering Rare Samples', fontsize=12, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Histogram of rare sample losses
ax2 = axes[0, 1]
ax2.hist(metrics_no_clip['rare_sample_losses'], bins=30, alpha=0.5, color='red',
label=f"Without Clip (mean={np.mean(metrics_no_clip['rare_sample_losses']):.3f})")
ax2.hist(metrics_with_clip['rare_sample_losses'], bins=30, alpha=0.5, color='green',
label=f"With Clip (mean={np.mean(metrics_with_clip['rare_sample_losses']):.3f})")
ax2.set_xlabel('Loss', fontsize=11)
ax2.set_ylabel('Count', fontsize=11)
ax2.set_title('Distribution of Rare Sample Losses', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Gradient norms at rare positions
ax3 = axes[1, 0]
# Extract gradient norms at rare sample positions
n_samples = 1000
n_epochs = len(metrics_no_clip['losses']) // n_samples
rare_indices = [25, 104, 114, 142, 228, 250, 281, 654, 754, 759] # From our dataset
rare_grad_norms_no = []
rare_grad_norms_with = []
rare_steps = []
for epoch in range(n_epochs):
for idx in rare_indices:
step = epoch * n_samples + idx
if step < len(metrics_no_clip['grad_norms']):
rare_grad_norms_no.append(metrics_no_clip['grad_norms'][step])
rare_grad_norms_with.append(metrics_with_clip['grad_norms'][step])
rare_steps.append(step)
ax3.scatter(rare_steps, rare_grad_norms_no, c='red', alpha=0.6, s=20, label='Without Clip')
ax3.scatter(rare_steps, rare_grad_norms_with, c='green', alpha=0.6, s=20, label='With Clip')
ax3.axhline(y=1.0, color='black', linestyle='--', linewidth=2, label='Clip threshold')
ax3.set_xlabel('Training Step', fontsize=11)
ax3.set_ylabel('Gradient Norm', fontsize=11)
ax3.set_title('Gradient Norms at Rare Sample Positions', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Summary statistics
ax4 = axes[1, 1]
ax4.axis('off')
mean_rare_loss_no = np.mean(metrics_no_clip['rare_sample_losses'])
mean_rare_loss_with = np.mean(metrics_with_clip['rare_sample_losses'])
mean_rare_grad_no = np.mean(rare_grad_norms_no)
mean_rare_grad_with = np.mean(rare_grad_norms_with)
# Final class B accuracy
final_acc_b_no = metrics_no_clip['class_accuracies'][1][-1] if metrics_no_clip['class_accuracies'][1] else 0
final_acc_b_with = metrics_with_clip['class_accuracies'][1][-1] if metrics_with_clip['class_accuracies'][1] else 0
summary_text = f"""
RARE SAMPLE DYNAMICS SUMMARY
════════════════════════════════════════════════════
At Rare Sample Positions:
─────────────────────────────────────────────────────
Mean Loss (WITHOUT Clipping): {mean_rare_loss_no:.4f}
Mean Loss (WITH Clipping): {mean_rare_loss_with:.4f}
Loss Reduction: {(mean_rare_loss_no - mean_rare_loss_with) / mean_rare_loss_no * 100:+.1f}%
Mean Gradient Norm (WITHOUT): {mean_rare_grad_no:.4f}
Mean Gradient Norm (WITH): {mean_rare_grad_with:.4f}
Gradient Reduction: {(mean_rare_grad_no - mean_rare_grad_with) / mean_rare_grad_no * 100:+.1f}%
Final Rare Class Accuracy:
─────────────────────────────────────────────────────
WITHOUT Clipping: {final_acc_b_no:.1%}
WITH Clipping: {final_acc_b_with:.1%}
════════════════════════════════════════════════════
PHYSICS-OF-AI INTERPRETATION:
Gradient clipping acts as a "velocity limiter" in
weight space, preventing the model from making
sudden large updates when encountering rare samples.
This allows the model to gradually learn the rare
class pattern rather than overshooting and forgetting.
"""
ax4.text(0.05, 0.5, summary_text, transform=ax4.transAxes,
fontsize=10, verticalalignment='center', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.9))
fig.suptitle('Rare Sample Dynamics Analysis\n'
'(How the model behaves when encountering rare class B samples)',
fontsize=14, fontweight='bold', y=1.01)
plt.tight_layout()
plt.savefig(filename, dpi=150, bbox_inches='tight')
plt.close()
print(f"Rare sample dynamics plot saved to: {filename}")
def main():
print("="*70)
print("EXTENDED GRADIENT CLIPPING EXPERIMENT V2")
print("Testing Physics-of-AI Predictions with Extended Training")
print("="*70)
# Create dataset
inputs, targets, rare_indices = create_imbalanced_dataset(n_samples=1000, n_rare=10, seed=SEED)
print(f"\nDataset: {len(inputs)} samples ({(targets == 0).sum().item()} common, {(targets == 1).sum().item()} rare)")
print(f"Rare indices: {rare_indices}")
# Get initial weights
set_seeds(SEED)
init_model = SimpleNextTokenModel(vocab_size=4, embedding_dim=16)
init_weights = {name: param.clone() for name, param in init_model.state_dict().items()}
init_eff_dim = compute_effective_dimension(init_model.get_embeddings())
print(f"Initial effective dimension: {init_eff_dim:.3f}")
# Training parameters
n_epochs = 10
lr = 0.01
# Run training WITHOUT gradient clipping
metrics_no_clip = train_with_tracking(
inputs, targets, rare_indices,
clip_grad=False, n_epochs=n_epochs, lr=lr,
init_weights=init_weights, track_every=100
)
# Run training WITH gradient clipping
metrics_with_clip = train_with_tracking(
inputs, targets, rare_indices,
clip_grad=True, max_norm=1.0, n_epochs=n_epochs, lr=lr,
init_weights=init_weights, track_every=100
)
# Generate plots
print("\n" + "="*70)
print("GENERATING ANALYSIS PLOTS")
print("="*70)
plot_comprehensive_analysis(
metrics_no_clip, metrics_with_clip, rare_indices,
"extended_analysis_v2.png"
)
plot_rare_sample_dynamics(
metrics_no_clip, metrics_with_clip,
"rare_sample_dynamics.png"
)
# Final summary
print("\n" + "="*70)
print("FINAL PREDICTION TEST RESULTS")
print("="*70)
# Prediction 2
dims_no = metrics_no_clip['effective_dims']
dims_with = metrics_with_clip['effective_dims']
print("\n[PREDICTION 2] Representation Collapse:")
print(f" Effective Dim Variance (WITHOUT): {np.std(dims_no):.6f}")
print(f" Effective Dim Variance (WITH): {np.std(dims_with):.6f}")
print(f" Verdict: {'SUPPORTED' if np.std(dims_no) > np.std(dims_with) else 'NOT SUPPORTED'}")
# Prediction 4
final_acc_b_no = metrics_no_clip['class_accuracies'][1][-1]
final_acc_b_with = metrics_with_clip['class_accuracies'][1][-1]
print("\n[PREDICTION 4] Rare Sample Learning:")
print(f" Final Rare Class Accuracy (WITHOUT): {final_acc_b_no:.1%}")
print(f" Final Rare Class Accuracy (WITH): {final_acc_b_with:.1%}")
print(f" Verdict: {'SUPPORTED' if final_acc_b_with >= final_acc_b_no else 'NOT SUPPORTED'}")
return {
'metrics_no_clip': metrics_no_clip,
'metrics_with_clip': metrics_with_clip,
'rare_indices': rare_indices,
}
if __name__ == "__main__":
results = main()
print("\n" + "="*70)
print("EXPERIMENT COMPLETE!")
print("="*70)