""" comparative_benchmark.py - Main benchmark script for RippleGPT vs Baseline comparison. This script runs standardized benchmarks comparing: 1. RippleGPT (ALiBi + SwiGLU) 2. VanillaGPT2 (Absolute Pos Emb + GELU MLP) Metrics collected: - Parameter count (iso-parameter verification) - Training loss convergence - Validation perplexity - Training speed (samples/sec) - Memory usage (peak) - Extrapolation capability (RippleGPT only) Usage: python comparative_benchmark.py --dataset tinystories --size small python comparative_benchmark.py --dataset python --size medium """ import argparse import json import os import sys import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple import gc import torch import torch.nn as nn from torch.utils.data import DataLoader # Add parent paths sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from src.config import RippleConfig from src.model import RippleGPT from validation.benchmarks.baseline_gpt2 import VanillaGPT2, GPT2Config from validation.benchmarks.data_loaders import ( TinyStoriesDataset, PythonCodeDataset, BenchmarkDataConfig, create_dataloader ) # ============================================================================ # BENCHMARK CONFIGURATIONS # ============================================================================ MODEL_SIZES = { "small": { "n_layer": 6, "n_head": 6, "n_embd": 384, "block_size": 256, "dropout": 0.1 }, "medium": { "n_layer": 8, "n_head": 8, "n_embd": 512, "block_size": 512, "dropout": 0.1 }, "large": { "n_layer": 12, "n_head": 12, "n_embd": 768, "block_size": 1024, "dropout": 0.1 } } DATASET_CONFIGS = { "tinystories": { "small": {"split": "train", "max_samples": 2000}, "medium": {"split": "train", "max_samples": 10000}, "large": {"split": "train", "max_samples": 50000} }, "python": { "small": {"split": "train", "max_samples": 1000}, "medium": {"split": "train", "max_samples": 5000}, "large": {"split": "train", "max_samples": 25000} } } # Training hyperparameters (same for both models for fair comparison) TRAINING_CONFIG = { "small": { "batch_size": 32, "learning_rate": 1e-3, "max_iters": 500, "eval_interval": 50, "eval_samples": 100 }, "medium": { "batch_size": 16, "learning_rate": 6e-4, "max_iters": 1000, "eval_interval": 100, "eval_samples": 200 }, "large": { "batch_size": 8, "learning_rate": 3e-4, "max_iters": 2000, "eval_interval": 200, "eval_samples": 300 } } # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def get_device() -> torch.device: """Get the best available device.""" if torch.cuda.is_available(): return torch.device("cuda") elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): return torch.device("mps") return torch.device("cpu") def get_memory_usage() -> float: """Get current memory usage in MB.""" device = get_device() if device.type == "cuda": return torch.cuda.max_memory_allocated() / 1024 / 1024 elif device.type == "mps": # MPS doesn't have direct memory tracking, estimate from system import psutil return psutil.Process().memory_info().rss / 1024 / 1024 return 0.0 def reset_memory(): """Reset memory counters.""" gc.collect() device = get_device() if device.type == "cuda": torch.cuda.reset_peak_memory_stats() torch.cuda.empty_cache() elif device.type == "mps": torch.mps.empty_cache() # ============================================================================ # MODEL CREATION # ============================================================================ def create_ripple_model(size: str, vocab_size: int = 50257) -> RippleGPT: """Create a RippleGPT model for the given size.""" cfg = MODEL_SIZES[size] config = RippleConfig( vocab_size=vocab_size, n_layer=cfg["n_layer"], n_head=cfg["n_head"], n_embd=cfg["n_embd"], block_size=cfg["block_size"], dropout=cfg["dropout"], use_absolute_pos_emb=False # KEY: No absolute pos embeddings! ) return RippleGPT(config) def create_baseline_model(size: str, vocab_size: int = 50257) -> VanillaGPT2: """Create a VanillaGPT2 baseline for the given size.""" cfg = MODEL_SIZES[size] config = GPT2Config( vocab_size=vocab_size, n_layer=cfg["n_layer"], n_head=cfg["n_head"], n_embd=cfg["n_embd"], block_size=cfg["block_size"], dropout=cfg["dropout"] ) return VanillaGPT2(config) # ============================================================================ # TRAINING LOOP # ============================================================================ def train_model( model: nn.Module, dataloader, config: dict, model_name: str, device: torch.device ) -> Dict: """ Train a model and collect metrics. Returns dict with: - train_losses: List of (iter, loss) tuples - final_loss: Last training loss - samples_per_sec: Training throughput - peak_memory_mb: Peak memory usage - total_time_sec: Total training time """ model = model.to(device) optimizer = torch.optim.AdamW(model.parameters(), lr=config["learning_rate"]) # Cosine annealing scheduler scheduler = torch.optim.lr_scheduler.CosineAnnealingLR( optimizer, T_max=config["max_iters"] ) train_losses = [] total_samples = 0 start_time = time.time() reset_memory() print(f"\nšŸ‹ļø Training {model_name}...") print(f" Max iterations: {config['max_iters']}") print(f" Batch size: {config['batch_size']}") print(f" Learning rate: {config['learning_rate']}") model.train() data_iter = iter(dataloader) for iteration in range(config["max_iters"]): # Get batch try: x, y = next(data_iter) except StopIteration: data_iter = iter(dataloader) x, y = next(data_iter) x, y = x.to(device), y.to(device) # Forward + backward optimizer.zero_grad() _, loss = model(x, y) loss.backward() # Gradient clipping torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() scheduler.step() total_samples += x.size(0) # Log progress if iteration % config["eval_interval"] == 0 or iteration == config["max_iters"] - 1: train_losses.append((iteration, loss.item())) elapsed = time.time() - start_time samples_sec = total_samples / elapsed if elapsed > 0 else 0 print(f" [{iteration:5d}/{config['max_iters']}] " f"loss: {loss.item():.4f} | " f"lr: {scheduler.get_last_lr()[0]:.2e} | " f"{samples_sec:.1f} samples/sec") elapsed_time = time.time() - start_time peak_memory = get_memory_usage() return { "train_losses": train_losses, "final_loss": train_losses[-1][1] if train_losses else float('inf'), "samples_per_sec": total_samples / elapsed_time, "peak_memory_mb": peak_memory, "total_time_sec": elapsed_time } # ============================================================================ # EVALUATION # ============================================================================ @torch.no_grad() def evaluate_perplexity( model: nn.Module, dataloader, num_samples: int, device: torch.device ) -> float: """Compute perplexity on validation data.""" model.eval() total_loss = 0.0 count = 0 data_iter = iter(dataloader) for _ in range(num_samples): try: x, y = next(data_iter) except StopIteration: break x, y = x.to(device), y.to(device) _, loss = model(x, y) total_loss += loss.item() count += 1 avg_loss = total_loss / count if count > 0 else float('inf') return torch.exp(torch.tensor(avg_loss)).item() @torch.no_grad() def test_extrapolation( model: nn.Module, base_data, train_block_size: int, test_sizes: List[int], device: torch.device, model_name: str ) -> Dict[int, float]: """ Test model on sequences longer than training length. Only meaningful for RippleGPT (VanillaGPT2 will fail/clip). Returns dict mapping context_size -> perplexity. """ results = {} model.eval() print(f"\nšŸ“ Testing extrapolation for {model_name}...") for test_size in test_sizes: if test_size <= train_block_size: continue # For RippleGPT, we can test longer sequences # For VanillaGPT2, this will be clipped to block_size try: # Create a dataset with the larger block size if isinstance(model, RippleGPT): # RippleGPT can handle longer sequences test_ds = TinyStoriesDataset( split="validation", block_size=test_size, max_samples=50 ) test_dl = create_dataloader(test_ds, batch_size=4) total_loss = 0.0 count = 0 for x, y in test_dl: if count >= 20: break x, y = x.to(device), y.to(device) _, loss = model(x, y) total_loss += loss.item() count += 1 if count > 0: ppl = torch.exp(torch.tensor(total_loss / count)).item() results[test_size] = ppl ratio = test_size / train_block_size print(f" {test_size} tokens ({ratio:.1f}x train): PPL = {ppl:.2f}") else: # VanillaGPT2 cannot extrapolate results[test_size] = float('inf') print(f" {test_size} tokens: āŒ Cannot extrapolate (VanillaGPT2)") except Exception as e: print(f" {test_size} tokens: āŒ Error: {e}") results[test_size] = float('inf') return results # ============================================================================ # MAIN BENCHMARK # ============================================================================ def run_benchmark( dataset_name: str, size: str, output_dir: Optional[str] = None ) -> Dict: """ Run complete benchmark comparing RippleGPT vs VanillaGPT2. Returns comprehensive results dict. """ device = get_device() print(f"\n{'='*70}") print(f"šŸš€ RippleGPT COMPARATIVE BENCHMARK") print(f"{'='*70}") print(f"Dataset: {dataset_name}") print(f"Size: {size}") print(f"Device: {device}") print(f"{'='*70}") # Load dataset configuration model_cfg = MODEL_SIZES[size] data_cfg = DATASET_CONFIGS[dataset_name][size] train_cfg = TRAINING_CONFIG[size] # Create dataset print("\nšŸ“š Loading dataset...") if dataset_name == "tinystories": train_ds = TinyStoriesDataset( split=data_cfg["split"], block_size=model_cfg["block_size"], max_samples=data_cfg["max_samples"] ) else: # python train_ds = PythonCodeDataset( split=data_cfg["split"], block_size=model_cfg["block_size"], max_samples=data_cfg["max_samples"] ) vocab_size = train_ds.vocab_size train_dl = create_dataloader(train_ds, batch_size=train_cfg["batch_size"]) print(f" Vocab size: {vocab_size}") print(f" Block size: {model_cfg['block_size']}") print(f" Max samples: {data_cfg['max_samples']}") # Create models print("\nšŸ”§ Creating models...") ripple_model = create_ripple_model(size, vocab_size) baseline_model = create_baseline_model(size, vocab_size) ripple_params = ripple_model.get_num_params() baseline_params = baseline_model.get_num_params() print(f" RippleGPT: {ripple_params:,} parameters") print(f" VanillaGPT2: {baseline_params:,} parameters") print(f" Difference: {baseline_params - ripple_params:+,} ({(baseline_params/ripple_params - 1)*100:+.1f}%)") # Collect results results = { "metadata": { "dataset": dataset_name, "size": size, "device": str(device), "timestamp": datetime.now().isoformat(), "model_config": model_cfg, "train_config": train_cfg }, "parameters": { "ripple": ripple_params, "baseline": baseline_params, "difference_pct": (baseline_params / ripple_params - 1) * 100 }, "ripple": {}, "baseline": {} } # Train RippleGPT print("\n" + "="*50) ripple_results = train_model( ripple_model, train_dl, train_cfg, "RippleGPT", device ) results["ripple"]["training"] = { "final_loss": ripple_results["final_loss"], "samples_per_sec": ripple_results["samples_per_sec"], "peak_memory_mb": ripple_results["peak_memory_mb"], "total_time_sec": ripple_results["total_time_sec"], "loss_curve": ripple_results["train_losses"] } # Preloaded datasets can be reused - just create new DataLoaders train_dl = create_dataloader(train_ds, batch_size=train_cfg["batch_size"]) # Train VanillaGPT2 print("\n" + "="*50) baseline_results = train_model( baseline_model, train_dl, train_cfg, "VanillaGPT2", device ) results["baseline"]["training"] = { "final_loss": baseline_results["final_loss"], "samples_per_sec": baseline_results["samples_per_sec"], "peak_memory_mb": baseline_results["peak_memory_mb"], "total_time_sec": baseline_results["total_time_sec"], "loss_curve": baseline_results["train_losses"] } # Extrapolation test (RippleGPT only) train_block = model_cfg["block_size"] extrap_sizes = [train_block * 2, train_block * 4] ripple_extrap = test_extrapolation( ripple_model, train_ds, train_block, extrap_sizes, device, "RippleGPT" ) results["ripple"]["extrapolation"] = ripple_extrap baseline_extrap = test_extrapolation( baseline_model, train_ds, train_block, extrap_sizes, device, "VanillaGPT2" ) results["baseline"]["extrapolation"] = baseline_extrap # Summary print("\n" + "="*70) print("šŸ“Š BENCHMARK RESULTS SUMMARY") print("="*70) print(f"\n{'Metric':<25} {'RippleGPT':<20} {'VanillaGPT2':<20} {'Winner':<10}") print("-"*70) # Parameters (lower is better) param_winner = "RippleGPT" if ripple_params < baseline_params else "VanillaGPT2" print(f"{'Parameters':<25} {ripple_params:,<20} {baseline_params:,<20} {param_winner:<10}") # Final loss (lower is better) r_loss = results["ripple"]["training"]["final_loss"] b_loss = results["baseline"]["training"]["final_loss"] loss_winner = "RippleGPT" if r_loss < b_loss else "VanillaGPT2" print(f"{'Final Loss':<25} {r_loss:<20.4f} {b_loss:<20.4f} {loss_winner:<10}") # Speed (higher is better) r_speed = results["ripple"]["training"]["samples_per_sec"] b_speed = results["baseline"]["training"]["samples_per_sec"] speed_winner = "RippleGPT" if r_speed > b_speed else "VanillaGPT2" print(f"{'Speed (samples/sec)':<25} {r_speed:<20.1f} {b_speed:<20.1f} {speed_winner:<10}") # Memory (lower is better) r_mem = results["ripple"]["training"]["peak_memory_mb"] b_mem = results["baseline"]["training"]["peak_memory_mb"] mem_winner = "RippleGPT" if r_mem < b_mem else "VanillaGPT2" print(f"{'Memory (MB)':<25} {r_mem:<20.1f} {b_mem:<20.1f} {mem_winner:<10}") # Extrapolation print(f"\n{'Extrapolation (2x):':<25} ", end="") r_ext = results["ripple"]["extrapolation"].get(train_block * 2, float('inf')) b_ext = results["baseline"]["extrapolation"].get(train_block * 2, float('inf')) if r_ext < float('inf'): print(f"{'āœ… PPL=' + f'{r_ext:.2f}':<20}", end="") else: print(f"{'āŒ':<20}", end="") print(f"{'āŒ Cannot':<20} {'RippleGPT':<10}") print("="*70) # Save results if output_dir: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) result_file = output_path / f"benchmark_{dataset_name}_{size}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(result_file, "w") as f: json.dump(results, f, indent=2, default=str) print(f"\nšŸ’¾ Results saved to: {result_file}") return results # ============================================================================ # ENTRY POINT # ============================================================================ def parse_args(): parser = argparse.ArgumentParser( description="RippleGPT Comparative Benchmark", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Quick test with TinyStories python comparative_benchmark.py --dataset tinystories --size small # Full benchmark with Python code python comparative_benchmark.py --dataset python --size medium # Save results python comparative_benchmark.py --dataset tinystories --size small --output results/ """ ) parser.add_argument( "--dataset", type=str, choices=["tinystories", "python"], default="tinystories", help="Dataset to use for benchmark" ) parser.add_argument( "--size", type=str, choices=["small", "medium", "large"], default="small", help="Model size configuration" ) parser.add_argument( "--output", type=str, default="validation/benchmarks/results", help="Output directory for results" ) return parser.parse_args() if __name__ == '__main__': args = parse_args() run_benchmark( dataset_name=args.dataset, size=args.size, output_dir=args.output )