RippleGPT-Nano / validation /benchmarks /comparative_benchmark.py
Tavernari's picture
Upload folder using huggingface_hub
148b631 verified
"""
comparative_benchmark.py - Main benchmark script for RippleGPT vs Baseline comparison.
This script runs standardized benchmarks comparing:
1. RippleGPT (ALiBi + SwiGLU)
2. VanillaGPT2 (Absolute Pos Emb + GELU MLP)
Metrics collected:
- Parameter count (iso-parameter verification)
- Training loss convergence
- Validation perplexity
- Training speed (samples/sec)
- Memory usage (peak)
- Extrapolation capability (RippleGPT only)
Usage:
python comparative_benchmark.py --dataset tinystories --size small
python comparative_benchmark.py --dataset python --size medium
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import gc
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
# Add parent paths
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.config import RippleConfig
from src.model import RippleGPT
from validation.benchmarks.baseline_gpt2 import VanillaGPT2, GPT2Config
from validation.benchmarks.data_loaders import (
TinyStoriesDataset,
PythonCodeDataset,
BenchmarkDataConfig,
create_dataloader
)
# ============================================================================
# BENCHMARK CONFIGURATIONS
# ============================================================================
MODEL_SIZES = {
"small": {
"n_layer": 6,
"n_head": 6,
"n_embd": 384,
"block_size": 256,
"dropout": 0.1
},
"medium": {
"n_layer": 8,
"n_head": 8,
"n_embd": 512,
"block_size": 512,
"dropout": 0.1
},
"large": {
"n_layer": 12,
"n_head": 12,
"n_embd": 768,
"block_size": 1024,
"dropout": 0.1
}
}
DATASET_CONFIGS = {
"tinystories": {
"small": {"split": "train", "max_samples": 2000},
"medium": {"split": "train", "max_samples": 10000},
"large": {"split": "train", "max_samples": 50000}
},
"python": {
"small": {"split": "train", "max_samples": 1000},
"medium": {"split": "train", "max_samples": 5000},
"large": {"split": "train", "max_samples": 25000}
}
}
# Training hyperparameters (same for both models for fair comparison)
TRAINING_CONFIG = {
"small": {
"batch_size": 32,
"learning_rate": 1e-3,
"max_iters": 500,
"eval_interval": 50,
"eval_samples": 100
},
"medium": {
"batch_size": 16,
"learning_rate": 6e-4,
"max_iters": 1000,
"eval_interval": 100,
"eval_samples": 200
},
"large": {
"batch_size": 8,
"learning_rate": 3e-4,
"max_iters": 2000,
"eval_interval": 200,
"eval_samples": 300
}
}
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
def get_device() -> torch.device:
"""Get the best available device."""
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return torch.device("mps")
return torch.device("cpu")
def get_memory_usage() -> float:
"""Get current memory usage in MB."""
device = get_device()
if device.type == "cuda":
return torch.cuda.max_memory_allocated() / 1024 / 1024
elif device.type == "mps":
# MPS doesn't have direct memory tracking, estimate from system
import psutil
return psutil.Process().memory_info().rss / 1024 / 1024
return 0.0
def reset_memory():
"""Reset memory counters."""
gc.collect()
device = get_device()
if device.type == "cuda":
torch.cuda.reset_peak_memory_stats()
torch.cuda.empty_cache()
elif device.type == "mps":
torch.mps.empty_cache()
# ============================================================================
# MODEL CREATION
# ============================================================================
def create_ripple_model(size: str, vocab_size: int = 50257) -> RippleGPT:
"""Create a RippleGPT model for the given size."""
cfg = MODEL_SIZES[size]
config = RippleConfig(
vocab_size=vocab_size,
n_layer=cfg["n_layer"],
n_head=cfg["n_head"],
n_embd=cfg["n_embd"],
block_size=cfg["block_size"],
dropout=cfg["dropout"],
use_absolute_pos_emb=False # KEY: No absolute pos embeddings!
)
return RippleGPT(config)
def create_baseline_model(size: str, vocab_size: int = 50257) -> VanillaGPT2:
"""Create a VanillaGPT2 baseline for the given size."""
cfg = MODEL_SIZES[size]
config = GPT2Config(
vocab_size=vocab_size,
n_layer=cfg["n_layer"],
n_head=cfg["n_head"],
n_embd=cfg["n_embd"],
block_size=cfg["block_size"],
dropout=cfg["dropout"]
)
return VanillaGPT2(config)
# ============================================================================
# TRAINING LOOP
# ============================================================================
def train_model(
model: nn.Module,
dataloader,
config: dict,
model_name: str,
device: torch.device
) -> Dict:
"""
Train a model and collect metrics.
Returns dict with:
- train_losses: List of (iter, loss) tuples
- final_loss: Last training loss
- samples_per_sec: Training throughput
- peak_memory_mb: Peak memory usage
- total_time_sec: Total training time
"""
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=config["learning_rate"])
# Cosine annealing scheduler
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=config["max_iters"]
)
train_losses = []
total_samples = 0
start_time = time.time()
reset_memory()
print(f"\n🏋️ Training {model_name}...")
print(f" Max iterations: {config['max_iters']}")
print(f" Batch size: {config['batch_size']}")
print(f" Learning rate: {config['learning_rate']}")
model.train()
data_iter = iter(dataloader)
for iteration in range(config["max_iters"]):
# Get batch
try:
x, y = next(data_iter)
except StopIteration:
data_iter = iter(dataloader)
x, y = next(data_iter)
x, y = x.to(device), y.to(device)
# Forward + backward
optimizer.zero_grad()
_, loss = model(x, y)
loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
total_samples += x.size(0)
# Log progress
if iteration % config["eval_interval"] == 0 or iteration == config["max_iters"] - 1:
train_losses.append((iteration, loss.item()))
elapsed = time.time() - start_time
samples_sec = total_samples / elapsed if elapsed > 0 else 0
print(f" [{iteration:5d}/{config['max_iters']}] "
f"loss: {loss.item():.4f} | "
f"lr: {scheduler.get_last_lr()[0]:.2e} | "
f"{samples_sec:.1f} samples/sec")
elapsed_time = time.time() - start_time
peak_memory = get_memory_usage()
return {
"train_losses": train_losses,
"final_loss": train_losses[-1][1] if train_losses else float('inf'),
"samples_per_sec": total_samples / elapsed_time,
"peak_memory_mb": peak_memory,
"total_time_sec": elapsed_time
}
# ============================================================================
# EVALUATION
# ============================================================================
@torch.no_grad()
def evaluate_perplexity(
model: nn.Module,
dataloader,
num_samples: int,
device: torch.device
) -> float:
"""Compute perplexity on validation data."""
model.eval()
total_loss = 0.0
count = 0
data_iter = iter(dataloader)
for _ in range(num_samples):
try:
x, y = next(data_iter)
except StopIteration:
break
x, y = x.to(device), y.to(device)
_, loss = model(x, y)
total_loss += loss.item()
count += 1
avg_loss = total_loss / count if count > 0 else float('inf')
return torch.exp(torch.tensor(avg_loss)).item()
@torch.no_grad()
def test_extrapolation(
model: nn.Module,
base_data,
train_block_size: int,
test_sizes: List[int],
device: torch.device,
model_name: str
) -> Dict[int, float]:
"""
Test model on sequences longer than training length.
Only meaningful for RippleGPT (VanillaGPT2 will fail/clip).
Returns dict mapping context_size -> perplexity.
"""
results = {}
model.eval()
print(f"\n📏 Testing extrapolation for {model_name}...")
for test_size in test_sizes:
if test_size <= train_block_size:
continue
# For RippleGPT, we can test longer sequences
# For VanillaGPT2, this will be clipped to block_size
try:
# Create a dataset with the larger block size
if isinstance(model, RippleGPT):
# RippleGPT can handle longer sequences
test_ds = TinyStoriesDataset(
split="validation",
block_size=test_size,
max_samples=50
)
test_dl = create_dataloader(test_ds, batch_size=4)
total_loss = 0.0
count = 0
for x, y in test_dl:
if count >= 20:
break
x, y = x.to(device), y.to(device)
_, loss = model(x, y)
total_loss += loss.item()
count += 1
if count > 0:
ppl = torch.exp(torch.tensor(total_loss / count)).item()
results[test_size] = ppl
ratio = test_size / train_block_size
print(f" {test_size} tokens ({ratio:.1f}x train): PPL = {ppl:.2f}")
else:
# VanillaGPT2 cannot extrapolate
results[test_size] = float('inf')
print(f" {test_size} tokens: ❌ Cannot extrapolate (VanillaGPT2)")
except Exception as e:
print(f" {test_size} tokens: ❌ Error: {e}")
results[test_size] = float('inf')
return results
# ============================================================================
# MAIN BENCHMARK
# ============================================================================
def run_benchmark(
dataset_name: str,
size: str,
output_dir: Optional[str] = None
) -> Dict:
"""
Run complete benchmark comparing RippleGPT vs VanillaGPT2.
Returns comprehensive results dict.
"""
device = get_device()
print(f"\n{'='*70}")
print(f"🚀 RippleGPT COMPARATIVE BENCHMARK")
print(f"{'='*70}")
print(f"Dataset: {dataset_name}")
print(f"Size: {size}")
print(f"Device: {device}")
print(f"{'='*70}")
# Load dataset configuration
model_cfg = MODEL_SIZES[size]
data_cfg = DATASET_CONFIGS[dataset_name][size]
train_cfg = TRAINING_CONFIG[size]
# Create dataset
print("\n📚 Loading dataset...")
if dataset_name == "tinystories":
train_ds = TinyStoriesDataset(
split=data_cfg["split"],
block_size=model_cfg["block_size"],
max_samples=data_cfg["max_samples"]
)
else: # python
train_ds = PythonCodeDataset(
split=data_cfg["split"],
block_size=model_cfg["block_size"],
max_samples=data_cfg["max_samples"]
)
vocab_size = train_ds.vocab_size
train_dl = create_dataloader(train_ds, batch_size=train_cfg["batch_size"])
print(f" Vocab size: {vocab_size}")
print(f" Block size: {model_cfg['block_size']}")
print(f" Max samples: {data_cfg['max_samples']}")
# Create models
print("\n🔧 Creating models...")
ripple_model = create_ripple_model(size, vocab_size)
baseline_model = create_baseline_model(size, vocab_size)
ripple_params = ripple_model.get_num_params()
baseline_params = baseline_model.get_num_params()
print(f" RippleGPT: {ripple_params:,} parameters")
print(f" VanillaGPT2: {baseline_params:,} parameters")
print(f" Difference: {baseline_params - ripple_params:+,} ({(baseline_params/ripple_params - 1)*100:+.1f}%)")
# Collect results
results = {
"metadata": {
"dataset": dataset_name,
"size": size,
"device": str(device),
"timestamp": datetime.now().isoformat(),
"model_config": model_cfg,
"train_config": train_cfg
},
"parameters": {
"ripple": ripple_params,
"baseline": baseline_params,
"difference_pct": (baseline_params / ripple_params - 1) * 100
},
"ripple": {},
"baseline": {}
}
# Train RippleGPT
print("\n" + "="*50)
ripple_results = train_model(
ripple_model, train_dl, train_cfg, "RippleGPT", device
)
results["ripple"]["training"] = {
"final_loss": ripple_results["final_loss"],
"samples_per_sec": ripple_results["samples_per_sec"],
"peak_memory_mb": ripple_results["peak_memory_mb"],
"total_time_sec": ripple_results["total_time_sec"],
"loss_curve": ripple_results["train_losses"]
}
# Preloaded datasets can be reused - just create new DataLoaders
train_dl = create_dataloader(train_ds, batch_size=train_cfg["batch_size"])
# Train VanillaGPT2
print("\n" + "="*50)
baseline_results = train_model(
baseline_model, train_dl, train_cfg, "VanillaGPT2", device
)
results["baseline"]["training"] = {
"final_loss": baseline_results["final_loss"],
"samples_per_sec": baseline_results["samples_per_sec"],
"peak_memory_mb": baseline_results["peak_memory_mb"],
"total_time_sec": baseline_results["total_time_sec"],
"loss_curve": baseline_results["train_losses"]
}
# Extrapolation test (RippleGPT only)
train_block = model_cfg["block_size"]
extrap_sizes = [train_block * 2, train_block * 4]
ripple_extrap = test_extrapolation(
ripple_model, train_ds, train_block, extrap_sizes, device, "RippleGPT"
)
results["ripple"]["extrapolation"] = ripple_extrap
baseline_extrap = test_extrapolation(
baseline_model, train_ds, train_block, extrap_sizes, device, "VanillaGPT2"
)
results["baseline"]["extrapolation"] = baseline_extrap
# Summary
print("\n" + "="*70)
print("📊 BENCHMARK RESULTS SUMMARY")
print("="*70)
print(f"\n{'Metric':<25} {'RippleGPT':<20} {'VanillaGPT2':<20} {'Winner':<10}")
print("-"*70)
# Parameters (lower is better)
param_winner = "RippleGPT" if ripple_params < baseline_params else "VanillaGPT2"
print(f"{'Parameters':<25} {ripple_params:,<20} {baseline_params:,<20} {param_winner:<10}")
# Final loss (lower is better)
r_loss = results["ripple"]["training"]["final_loss"]
b_loss = results["baseline"]["training"]["final_loss"]
loss_winner = "RippleGPT" if r_loss < b_loss else "VanillaGPT2"
print(f"{'Final Loss':<25} {r_loss:<20.4f} {b_loss:<20.4f} {loss_winner:<10}")
# Speed (higher is better)
r_speed = results["ripple"]["training"]["samples_per_sec"]
b_speed = results["baseline"]["training"]["samples_per_sec"]
speed_winner = "RippleGPT" if r_speed > b_speed else "VanillaGPT2"
print(f"{'Speed (samples/sec)':<25} {r_speed:<20.1f} {b_speed:<20.1f} {speed_winner:<10}")
# Memory (lower is better)
r_mem = results["ripple"]["training"]["peak_memory_mb"]
b_mem = results["baseline"]["training"]["peak_memory_mb"]
mem_winner = "RippleGPT" if r_mem < b_mem else "VanillaGPT2"
print(f"{'Memory (MB)':<25} {r_mem:<20.1f} {b_mem:<20.1f} {mem_winner:<10}")
# Extrapolation
print(f"\n{'Extrapolation (2x):':<25} ", end="")
r_ext = results["ripple"]["extrapolation"].get(train_block * 2, float('inf'))
b_ext = results["baseline"]["extrapolation"].get(train_block * 2, float('inf'))
if r_ext < float('inf'):
print(f"{'✅ PPL=' + f'{r_ext:.2f}':<20}", end="")
else:
print(f"{'❌':<20}", end="")
print(f"{'❌ Cannot':<20} {'RippleGPT':<10}")
print("="*70)
# Save results
if output_dir:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
result_file = output_path / f"benchmark_{dataset_name}_{size}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(result_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\n💾 Results saved to: {result_file}")
return results
# ============================================================================
# ENTRY POINT
# ============================================================================
def parse_args():
parser = argparse.ArgumentParser(
description="RippleGPT Comparative Benchmark",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Quick test with TinyStories
python comparative_benchmark.py --dataset tinystories --size small
# Full benchmark with Python code
python comparative_benchmark.py --dataset python --size medium
# Save results
python comparative_benchmark.py --dataset tinystories --size small --output results/
"""
)
parser.add_argument(
"--dataset",
type=str,
choices=["tinystories", "python"],
default="tinystories",
help="Dataset to use for benchmark"
)
parser.add_argument(
"--size",
type=str,
choices=["small", "medium", "large"],
default="small",
help="Model size configuration"
)
parser.add_argument(
"--output",
type=str,
default="validation/benchmarks/results",
help="Output directory for results"
)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
run_benchmark(
dataset_name=args.dataset,
size=args.size,
output_dir=args.output
)