| """Comparative Evaluation Between Model Sizes and Architectures"""
|
|
|
| import json
|
| import logging
|
| from dataclasses import dataclass, field
|
| from typing import Any, Dict, List, Optional, Tuple
|
|
|
| import numpy as np
|
| import torch
|
| from scipy import stats
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class ModelComparison:
|
| """Results of comparing two models."""
|
| model_a_name: str
|
| model_b_name: str
|
| metrics_a: Dict[str, float]
|
| metrics_b: Dict[str, float]
|
| differences: Dict[str, float] = field(default_factory=dict)
|
| statistical_tests: Dict[str, Dict[str, float]] = field(default_factory=dict)
|
| summary: str = ""
|
|
|
| def compute_differences(self):
|
| """Compute absolute and relative differences."""
|
| self.differences = {}
|
| for key in self.metrics_a:
|
| if key in self.metrics_b:
|
| diff = self.metrics_a[key] - self.metrics_b[key]
|
| rel_diff = diff / (self.metrics_b[key] if self.metrics_b[key] != 0 else 1e-8)
|
| self.differences[key] = {
|
| "absolute": diff,
|
| "relative": rel_diff,
|
| }
|
|
|
| def compute_statistical_tests(self, samples_a: List[float], samples_b: List[float]):
|
| """Run statistical significance tests."""
|
| if len(samples_a) < 2 or len(samples_b) < 2:
|
| return
|
|
|
|
|
| t_stat, t_p = stats.ttest_ind(samples_a, samples_b, equal_var=False)
|
|
|
|
|
| u_stat, u_p = stats.mannwhitneyu(samples_a, samples_b, alternative="two-sided")
|
|
|
| self.statistical_tests = {
|
| "t_test": {"statistic": float(t_stat), "p_value": float(t_p)},
|
| "mann_whitney_u": {"statistic": float(u_stat), "p_value": float(u_p)},
|
| }
|
|
|
| def generate_summary(self) -> str:
|
| """Generate human-readable summary."""
|
| lines = [
|
| f"Model Comparison: {self.model_a_name} vs {self.model_b_name}",
|
| "=" * 60,
|
| ]
|
|
|
| for metric, diffs in self.differences.items():
|
| a_val = self.metrics_a[metric]
|
| b_val = self.metrics_b[metric]
|
| abs_diff = diffs["absolute"]
|
| rel_diff = diffs["relative"] * 100
|
|
|
| if abs_diff > 0:
|
| better = self.model_a_name if a_val > b_val else self.model_b_name
|
| else:
|
| better = "tie"
|
|
|
| lines.append(f"{metric:30s}: {a_val:.4f} vs {b_val:.4f} "
|
| f"(diff: {abs_diff:+.4f}, {rel_diff:+.1f}%) -> {better}")
|
|
|
| lines.append("\nStatistical Significance:")
|
| for test_name, results in self.statistical_tests.items():
|
| p_val = results["p_value"]
|
| sig = "significant" if p_val < 0.05 else "not significant"
|
| lines.append(f" {test_name}: p={p_val:.4f} ({sig})")
|
|
|
| self.summary = "\n".join(lines)
|
| return self.summary
|
|
|
|
|
| class ComparativeEvaluator:
|
| """Evaluate and compare multiple models."""
|
|
|
| def __init__(
|
| self,
|
| models: Dict[str, torch.nn.Module],
|
| tokenizers: Dict[str, Any],
|
| benchmark_config: Any,
|
| ):
|
| self.models = models
|
| self.tokenizers = tokenizers
|
| self.config = benchmark_config
|
|
|
| def compare_models(
|
| self,
|
| model_names: List[str],
|
| benchmark_datasets: Dict[str, Any],
|
| ) -> ModelComparison:
|
| """Compare two models on multiple benchmarks."""
|
| if len(model_names) != 2:
|
| raise ValueError("Can only compare exactly 2 models")
|
|
|
| name_a, name_b = model_names
|
| model_a = self.models[name_a]
|
| model_b = self.models[name_b]
|
| tokenizer_a = self.tokenizers[name_a]
|
| tokenizer_b = self.tokenizers[name_b]
|
|
|
|
|
| metrics_a = self._run_benchmarks(model_a, tokenizer_a, benchmark_datasets)
|
| metrics_b = self._run_benchmarks(model_b, tokenizer_b, benchmark_datasets)
|
|
|
| comparison = ModelComparison(
|
| model_a_name=name_a,
|
| model_b_name=name_b,
|
| metrics_a=metrics_a,
|
| metrics_b=metrics_b,
|
| )
|
|
|
| comparison.compute_differences()
|
|
|
|
|
| comparison.generate_summary()
|
| return comparison
|
|
|
| def _run_benchmarks(
|
| self,
|
| model: torch.nn.Module,
|
| tokenizer: Any,
|
| datasets: Dict[str, Any],
|
| ) -> Dict[str, float]:
|
| """Run all benchmarks on a model."""
|
| from .benchmark import BenchmarkSuite, BenchmarkConfig
|
|
|
| config = BenchmarkConfig(
|
| batch_size=self.config.batch_size,
|
| max_seq_length=self.config.max_seq_length,
|
| datasets=list(datasets.keys()),
|
| )
|
|
|
| suite = BenchmarkSuite(model, tokenizer, config)
|
| results = suite.run_all_benchmarks()
|
|
|
|
|
| flat_metrics = {}
|
| for category, metrics in results["benchmarks"].items():
|
| if isinstance(metrics, dict):
|
| for key, value in metrics.items():
|
| if isinstance(value, (int, float)):
|
| flat_metrics[f"{category}_{key}"] = value
|
| elif isinstance(metrics, (int, float)):
|
| flat_metrics[category] = metrics
|
|
|
| return flat_metrics
|
|
|
| def generate_comparison_report(
|
| self,
|
| comparisons: List[ModelComparison],
|
| output_path: str,
|
| ):
|
| """Generate comprehensive comparison report."""
|
| report = {
|
| "timestamp": torch.datetime.now().isoformat(),
|
| "comparisons": [],
|
| }
|
|
|
| for comp in comparisons:
|
| report["comparisons"].append({
|
| "models": [comp.model_a_name, comp.model_b_name],
|
| "metrics_a": comp.metrics_a,
|
| "metrics_b": comp.metrics_b,
|
| "differences": comp.differences,
|
| "statistical_tests": comp.statistical_tests,
|
| "summary": comp.summary,
|
| })
|
|
|
| with open(output_path, 'w') as f:
|
| json.dump(report, f, indent=2)
|
|
|
| logger.info(f"Comparison report saved to {output_path}")
|
|
|
|
|
| def compare_model_sizes(
|
| models: Dict[str, torch.nn.Module],
|
| tokenizers: Dict[str, Any],
|
| config: Any,
|
| output_dir: str,
|
| ) -> Dict[str, ModelComparison]:
|
| """Compare 7B vs 32B vs 70B models."""
|
| comparisons = {}
|
| evaluator = ComparativeEvaluator(models, tokenizers, config)
|
|
|
|
|
| from .eval_datasets import load_human_eval, load_gsm8k, load_truthfulqa
|
| datasets = {
|
| "human_eval": load_human_eval()[:100],
|
| "gsm8k": load_gsm8k()[:100],
|
| "truthfulqa": load_truthfulqa()[:100],
|
| }
|
|
|
|
|
| model_names = list(models.keys())
|
| for i in range(len(model_names)):
|
| for j in range(i + 1, len(model_names)):
|
| pair = (model_names[i], model_names[j])
|
| logger.info(f"Comparing {pair[0]} vs {pair[1]}")
|
|
|
| comparison = evaluator.compare_models(list(pair), datasets)
|
| comparisons[f"{pair[0]}_vs_{pair[1]}"] = comparison
|
|
|
|
|
| output_path = f"{output_dir}/comparison_{pair[0]}_vs_{pair[1]}.json"
|
| evaluator.generate_comparison_report([comparison], output_path)
|
|
|
| return comparisons
|
|
|
|
|
| def analyze_scaling_laws(comparisons: Dict[str, ModelComparison]) -> Dict[str, Any]:
|
| """Analyze scaling laws from model comparisons."""
|
|
|
| sizes = []
|
| perplexities = []
|
| accuracies = []
|
| code_scores = []
|
|
|
|
|
| size_map = {"zenith-7b": 7, "zenith-32b": 32, "zenith-70b": 70}
|
|
|
| for comp_key, comp in comparisons.items():
|
|
|
| for metric, value in comp.metrics_a.items():
|
| if "perplexity" in metric:
|
| model_name = comp.model_a_name
|
| if model_name in size_map:
|
| sizes.append(size_map[model_name])
|
| perplexities.append(value)
|
| elif "accuracy" in metric or "pass@1" in metric:
|
| model_name = comp.model_a_name
|
| if model_name in size_map:
|
| accuracies.append((size_map[model_name], value))
|
|
|
|
|
| if len(sizes) >= 2 and len(perplexities) >= 2:
|
| log_sizes = np.log(sizes)
|
| log_ppl = np.log(perplexities)
|
| slope, intercept, r_value, p_value, std_err = stats.linregress(log_sizes, log_ppl)
|
| scaling_exponent = -slope
|
| else:
|
| scaling_exponent = None
|
|
|
| return {
|
| "sizes": sizes,
|
| "perplexities": perplexities,
|
| "accuracies": accuracies,
|
| "scaling_exponent": scaling_exponent,
|
| "r_squared": r_value**2 if scaling_exponent is not None else None,
|
| }
|
|
|