| """Comprehensive Benchmark Suite for Zenith Models"""
|
|
|
| import json
|
| import logging
|
| import time
|
| from dataclasses import dataclass, field
|
| from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
| import numpy as np
|
| import torch
|
| from torch.utils.data import DataLoader
|
| from tqdm import tqdm
|
|
|
| from .metrics import (
|
| compute_perplexity,
|
| compute_accuracy,
|
| compute_em_score,
|
| compute_f1_score,
|
| compute_eq_metrics,
|
| compute_code_metrics,
|
| compute_reasoning_metrics,
|
| )
|
| from .eval_datasets import EvaluationDataset
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class BenchmarkConfig:
|
| """Configuration for benchmarking."""
|
| batch_size: int = 16
|
| max_seq_length: int = 8192
|
| num_samples: Optional[int] = None
|
| device: str = "cuda"
|
| dtype: str = "bfloat16"
|
| use_flash_attention: bool = True
|
|
|
|
|
| run_perplexity: bool = True
|
| run_accuracy: bool = True
|
| run_code_metrics: bool = True
|
| run_reasoning: bool = True
|
| run_eq_metrics: bool = True
|
|
|
|
|
| datasets: List[str] = field(default_factory=lambda: [
|
| "human_eval",
|
| "mbpp",
|
| "gsm8k",
|
| "math",
|
| "truthfulqa",
|
| "emotional_bench",
|
| ])
|
|
|
|
|
| save_results: bool = True
|
| output_dir: str = "./benchmark_results"
|
|
|
|
|
| class BenchmarkSuite:
|
| """Comprehensive benchmarking suite for LLMs."""
|
|
|
| def __init__(
|
| self,
|
| model: torch.nn.Module,
|
| tokenizer: Any,
|
| config: BenchmarkConfig,
|
| ):
|
| self.model = model
|
| self.tokenizer = tokenizer
|
| self.config = config
|
| self.device = torch.device(config.device if torch.cuda.is_available() else "cpu")
|
|
|
|
|
| self.model.to(self.device)
|
| self.model.eval()
|
|
|
|
|
| self.autocast_ctx = torch.cuda.amp.autocast(enabled=config.dtype in ["fp16", "bf16"], dtype=getattr(torch, config.dtype))
|
|
|
| logger.info(f"BenchmarkSuite initialized on {self.device}")
|
|
|
| def run_all_benchmarks(self) -> Dict[str, Any]:
|
| """Run all configured benchmarks."""
|
| results = {
|
| "model_name": getattr(self.model, "name", "unknown"),
|
| "config": self.config.__dict__,
|
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
| "benchmarks": {},
|
| }
|
|
|
|
|
| if self.config.run_perplexity:
|
| logger.info("Running perplexity benchmark...")
|
| ppl = self._run_perplexity_benchmark()
|
| results["benchmarks"]["perplexity"] = ppl
|
|
|
|
|
| if self.config.run_accuracy:
|
| logger.info("Running accuracy benchmark...")
|
| acc = self._run_accuracy_benchmark()
|
| results["benchmarks"]["accuracy"] = acc
|
|
|
|
|
| if self.config.run_code_metrics:
|
| logger.info("Running code generation benchmarks...")
|
| code_metrics = self._run_code_benchmarks()
|
| results["benchmarks"]["code"] = code_metrics
|
|
|
|
|
| if self.config.run_reasoning:
|
| logger.info("Running reasoning benchmarks...")
|
| reasoning_metrics = self._run_reasoning_benchmarks()
|
| results["benchmarks"]["reasoning"] = reasoning_metrics
|
|
|
|
|
| if self.config.run_eq_metrics:
|
| logger.info("Running EQ benchmarks...")
|
| eq_metrics = self._run_eq_benchmarks()
|
| results["benchmarks"]["emotional_intelligence"] = eq_metrics
|
|
|
|
|
| if self.config.save_results:
|
| self._save_results(results)
|
|
|
| return results
|
|
|
| def _run_perplexity_benchmark(self) -> Dict[str, float]:
|
| """Compute perplexity on validation data."""
|
|
|
| from datasets import load_dataset
|
|
|
| try:
|
| ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="validation")
|
| texts = [ex["text"] for ex in ds if ex["text"].strip() and len(ex["text"].split()) > 10]
|
|
|
| if self.config.num_samples:
|
| texts = texts[:self.config.num_samples]
|
| except Exception as e:
|
| logger.warning(f"Failed to load WikiText: {e}. Using dummy data.")
|
| texts = ["This is a sample text for perplexity evaluation."] * 100
|
|
|
| total_loss = 0.0
|
| total_tokens = 0
|
|
|
| with torch.no_grad():
|
| for batch in tqdm(self._create_batches(texts, self.config.batch_size), desc="Perplexity"):
|
| input_ids = batch["input_ids"].to(self.device)
|
| attention_mask = batch["attention_mask"].to(self.device)
|
|
|
| with self.autocast_ctx:
|
| outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
|
| logits = outputs.logits if hasattr(outputs, "logits") else outputs
|
|
|
|
|
| shift_logits = logits[..., :-1, :].contiguous()
|
| shift_labels = input_ids[..., 1:].contiguous()
|
|
|
| loss_fct = torch.nn.CrossEntropyLoss(reduction="sum")
|
| loss = loss_fct(
|
| shift_logits.view(-1, shift_logits.size(-1)),
|
| shift_labels.view(-1),
|
| )
|
|
|
| total_loss += loss.item()
|
| total_tokens += (shift_labels != self.tokenizer.pad_token_id).sum().item()
|
|
|
| perplexity = torch.exp(torch.tensor(total_loss / total_tokens)).item()
|
| return {"perplexity": perplexity}
|
|
|
| def _run_accuracy_benchmark(self) -> Dict[str, float]:
|
| """Run multiple-choice accuracy benchmark."""
|
|
|
| try:
|
| ds = load_dataset("truthful_qa", "multiple_choice", split="validation")
|
| except:
|
| logger.warning("TruthfulQA not available, using dummy data")
|
| return {"accuracy": 0.0, "num_samples": 0}
|
|
|
| correct = 0
|
| total = 0
|
|
|
| with torch.no_grad():
|
| for ex in tqdm(ds, desc="Accuracy"):
|
| question = ex["question"]
|
| choices = ex["mc1_choices"]["choices"]
|
| correct_idx = ex["mc1_idx"]
|
|
|
|
|
| scores = []
|
| for choice in choices:
|
| text = f"Q: {question}\nA: {choice}"
|
| inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=self.config.max_seq_length).to(self.device)
|
| with self.autocast_ctx:
|
| outputs = self.model(**inputs)
|
| logits = outputs.logits if hasattr(outputs, "logits") else outputs
|
|
|
| loss = torch.nn.functional.cross_entropy(
|
| logits[0, :-1],
|
| inputs["input_ids"][0, 1:],
|
| reduction="mean",
|
| )
|
| scores.append(-loss.item())
|
|
|
| pred_idx = np.argmax(scores)
|
| if pred_idx == correct_idx:
|
| correct += 1
|
| total += 1
|
|
|
| accuracy = correct / total if total > 0 else 0.0
|
| return {"accuracy": accuracy, "num_samples": total}
|
|
|
| def _run_code_benchmarks(self) -> Dict[str, Any]:
|
| """Run code generation benchmarks (HumanEval, MBPP)."""
|
| results = {}
|
|
|
| for dataset_name in ["human_eval", "mbpp"]:
|
| try:
|
| ds = load_dataset("openai_humaneval" if dataset_name == "human_eval" else "mbpp", split="test")
|
| except:
|
| logger.warning(f"{dataset_name} not available")
|
| continue
|
|
|
| pass_at_k = self._evaluate_code_completion(ds, dataset_name)
|
| results[dataset_name] = pass_at_k
|
|
|
| return results
|
|
|
| def _evaluate_code_completion(self, dataset: Any, dataset_name: str, k: int = 1) -> Dict[str, float]:
|
| """Evaluate code completion using pass@k metric."""
|
| correct = 0
|
| total = 0
|
|
|
| for ex in tqdm(dataset, desc=f"Code eval ({dataset_name})"):
|
| prompt = ex["prompt"] if "prompt" in ex else ex["text"]
|
| reference = ex["canonical_solution"] if "canonical_solution" in ex else ex["solution"]
|
|
|
|
|
| with torch.no_grad():
|
| inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=self.config.max_seq_length).to(self.device)
|
| with self.autocast_ctx:
|
| generated = self.model.generate(
|
| **inputs,
|
| max_new_tokens=256,
|
| temperature=0.2,
|
| do_sample=True,
|
| num_return_sequences=k,
|
| pad_token_id=self.tokenizer.pad_token_id,
|
| )
|
|
|
|
|
| for gen in generated:
|
| completion = self.tokenizer.decode(gen[inputs.input_ids.shape[1]:], skip_special_tokens=True)
|
| code = prompt + completion
|
|
|
|
|
|
|
| try:
|
| compile(code, "<string>", "exec")
|
| correct += 1
|
| break
|
| except SyntaxError:
|
| pass
|
|
|
| total += 1
|
|
|
| pass_at_k = correct / total if total > 0 else 0.0
|
| return {f"pass@{k}": pass_at_k, "num_samples": total}
|
|
|
| def _run_reasoning_benchmarks(self) -> Dict[str, Any]:
|
| """Run reasoning benchmarks (GSM8K, MATH)."""
|
| results = {}
|
|
|
| for dataset_name in ["gsm8k", "math"]:
|
| try:
|
| if dataset_name == "gsm8k":
|
| ds = load_dataset("gsm8k", "main", split="test")
|
| else:
|
| ds = load_dataset("hendrycks_math", split="test")
|
| except:
|
| logger.warning(f"{dataset_name} not available")
|
| continue
|
|
|
| accuracy = self._evaluate_reasoning(ds, dataset_name)
|
| results[dataset_name] = accuracy
|
|
|
| return results
|
|
|
| def _evaluate_reasoning(self, dataset: Any, dataset_name: str) -> Dict[str, float]:
|
| """Evaluate reasoning problems."""
|
| correct = 0
|
| total = 0
|
|
|
| for ex in tqdm(dataset, desc=f"Reasoning ({dataset_name})"):
|
| question = ex["question"]
|
| answer = ex["answer"]
|
|
|
|
|
| with torch.no_grad():
|
| inputs = self.tokenizer(question, return_tensors="pt", truncation=True, max_length=self.config.max_seq_length).to(self.device)
|
| with self.autocast_ctx:
|
| generated = self.model.generate(
|
| **inputs,
|
| max_new_tokens=512,
|
| temperature=0.1,
|
| do_sample=False,
|
| pad_token_id=self.tokenizer.pad_token_id,
|
| )
|
|
|
| prediction = self.tokenizer.decode(generated[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
|
|
|
|
|
|
|
| pred_answer = self._extract_answer(prediction)
|
| true_answer = self._extract_answer(answer)
|
|
|
| if pred_answer == true_answer:
|
| correct += 1
|
| total += 1
|
|
|
| accuracy = correct / total if total > 0 else 0.0
|
| return {"accuracy": accuracy, "num_samples": total}
|
|
|
| def _extract_answer(self, text: str) -> str:
|
| """Extract final answer from text."""
|
|
|
| import re
|
| match = re.search(r'\\boxed{(.+?)}', text)
|
| if match:
|
| return match.group(1).strip()
|
|
|
| lines = text.strip().split('\n')
|
| return lines[-1].strip() if lines else ""
|
|
|
| def _run_eq_benchmarks(self) -> Dict[str, Any]:
|
| """Run emotional intelligence benchmarks."""
|
|
|
| try:
|
| ds = load_dataset("emotion", split="test")
|
| except:
|
| logger.warning("Emotion dataset not available")
|
| return {"accuracy": 0.0}
|
|
|
|
|
| metrics = compute_eq_metrics(self.model, ds, self.tokenizer, self.device)
|
| return metrics
|
|
|
| def _create_batches(self, texts: List[str], batch_size: int) -> DataLoader:
|
| """Create batches from texts."""
|
| from torch.utils.data import Dataset
|
|
|
| class TextDataset(Dataset):
|
| def __init__(self, texts, tokenizer, max_length):
|
| self.texts = texts
|
| self.tokenizer = tokenizer
|
| self.max_length = max_length
|
|
|
| def __len__(self):
|
| return len(self.texts)
|
|
|
| def __getitem__(self, idx):
|
| encoded = self.tokenizer(
|
| self.texts[idx],
|
| truncation=True,
|
| max_length=self.max_length,
|
| padding="max_length",
|
| return_tensors="pt",
|
| )
|
| return {k: v.squeeze(0) for k, v in encoded.items()}
|
|
|
| dataset = TextDataset(texts, self.tokenizer, self.config.max_seq_length)
|
| return DataLoader(dataset, batch_size=batch_size, shuffle=False)
|
|
|
| def _save_results(self, results: Dict[str, Any]):
|
| """Save benchmark results to file."""
|
| import os
|
| os.makedirs(self.config.output_dir, exist_ok=True)
|
|
|
| timestamp = time.strftime("%Y%m%d_%H%M%S")
|
| filename = f"{self.config.output_dir}/benchmark_{timestamp}.json"
|
|
|
| with open(filename, 'w') as f:
|
| json.dump(results, f, indent=2)
|
|
|
| logger.info(f"Benchmark results saved to {filename}")
|
|
|
|
|
| def run_benchmark(
|
| model: torch.nn.Module,
|
| tokenizer: Any,
|
| config: Optional[BenchmarkConfig] = None,
|
| ) -> Dict[str, Any]:
|
| """Convenience function to run benchmarks."""
|
| if config is None:
|
| config = BenchmarkConfig()
|
|
|
| suite = BenchmarkSuite(model, tokenizer, config)
|
| return suite.run_all_benchmarks()
|
|
|