| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """ |
| OpenLLM Model Evaluation Script |
| |
| This script implements comprehensive evaluation for trained OpenLLM models, |
| including intrinsic evaluation (perplexity, loss) and text generation quality |
| assessment as specified in Step 5 of the training pipeline. |
| |
| Usage: |
| python core/src/evaluate_model.py \ |
| --model_dir models/openllm-medium \ |
| --eval_data data/clean/validation_data.txt \ |
| --metrics perplexity,loss |
| |
| Features: |
| - Perplexity calculation on held-out data |
| - Text generation quality assessment |
| - Multiple evaluation metrics |
| - Comprehensive quality benchmarks |
| - JSON output for downstream analysis |
| |
| Author: Louis Chua Bean Chong |
| License: GPLv3 |
| """ |
|
|
| import argparse |
| import json |
| import math |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import sentencepiece as smp |
| import torch |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
| from model import GPTModel, create_model |
|
|
|
|
| class ModelEvaluator: |
| """ |
| Comprehensive evaluator for OpenLLM models. |
| |
| Implements intrinsic evaluation metrics and text generation quality |
| assessment following the training pipeline specifications. |
| """ |
|
|
| def __init__(self, model: GPTModel, tokenizer_path: str, device: str = "cpu"): |
| """ |
| Initialize the model evaluator. |
| |
| Args: |
| model: Trained GPT model |
| tokenizer_path: Path to tokenizer model file |
| device: Device to run evaluation on |
| """ |
| self.model = model.to(device) |
| self.device = device |
|
|
| |
| self.tokenizer = smp.SentencePieceProcessor() |
| self.tokenizer.load(tokenizer_path) |
|
|
| print("π§ ModelEvaluator initialized") |
| print(f" Device: {device}") |
| print(f" Model parameters: {model.get_num_params():,}") |
| print(f" Vocabulary size: {self.tokenizer.vocab_size():,}") |
|
|
| def evaluate_perplexity( |
| self, eval_data: List[str], max_seq_len: int = 512, batch_size: int = 1 |
| ) -> Dict[str, float]: |
| """ |
| Calculate perplexity on evaluation data. |
| |
| Args: |
| eval_data: List of text passages for evaluation |
| max_seq_len: Maximum sequence length for evaluation |
| batch_size: Batch size for evaluation |
| |
| Returns: |
| Dictionary with loss and perplexity metrics |
| """ |
| self.model.eval() |
| total_loss = 0.0 |
| total_tokens = 0 |
| num_sequences = 0 |
|
|
| print(f"π Calculating perplexity on {len(eval_data)} passages...") |
|
|
| with torch.no_grad(): |
| for i, text in enumerate(eval_data): |
| if i % 100 == 0: |
| print(f" Progress: {i}/{len(eval_data)} passages") |
|
|
| |
| tokens = self.tokenizer.encode(text) |
| if len(tokens) < 2: |
| continue |
|
|
| |
| if len(tokens) > max_seq_len: |
| tokens = tokens[:max_seq_len] |
|
|
| |
| input_ids = torch.tensor([tokens[:-1]], dtype=torch.long, device=self.device) |
| target_ids = torch.tensor([tokens[1:]], dtype=torch.long, device=self.device) |
|
|
| |
| logits, loss = self.model(input_ids, target_ids) |
|
|
| |
| seq_length = len(tokens) - 1 |
| total_loss += loss.item() * seq_length |
| total_tokens += seq_length |
| num_sequences += 1 |
|
|
| |
| avg_loss = total_loss / total_tokens if total_tokens > 0 else float("inf") |
| perplexity = math.exp(min(avg_loss, 10)) |
|
|
| return { |
| "loss": avg_loss, |
| "perplexity": perplexity, |
| "total_tokens": total_tokens, |
| "num_sequences": num_sequences, |
| } |
|
|
| def evaluate_text_generation( |
| self, |
| prompts: List[str], |
| max_length: int = 256, |
| temperature: float = 0.7, |
| top_k: Optional[int] = 40, |
| num_samples: int = 1, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Evaluate text generation quality. |
| |
| Args: |
| prompts: List of input prompts |
| max_length: Maximum generation length |
| temperature: Sampling temperature |
| top_k: Top-k sampling parameter |
| num_samples: Number of samples per prompt |
| |
| Returns: |
| List of generation results with quality metrics |
| """ |
| self.model.eval() |
| results = [] |
|
|
| print(f"βοΈ Evaluating text generation on {len(prompts)} prompts...") |
|
|
| with torch.no_grad(): |
| for prompt in prompts: |
| prompt_results = [] |
|
|
| for sample_idx in range(num_samples): |
| |
| input_ids = self.tokenizer.encode(prompt) |
| input_tensor = torch.tensor([input_ids], dtype=torch.long, device=self.device) |
|
|
| start_time = time.time() |
|
|
| |
| output = self.model.generate( |
| input_tensor, |
| max_new_tokens=max_length, |
| temperature=temperature, |
| top_k=top_k, |
| ) |
|
|
| generation_time = time.time() - start_time |
|
|
| |
| generated_ids = output[0].tolist() |
| full_text = self.tokenizer.decode(generated_ids) |
| generated_text = self.tokenizer.decode(generated_ids[len(input_ids) :]) |
|
|
| |
| quality_metrics = self._assess_generation_quality(generated_text) |
|
|
| prompt_results.append( |
| { |
| "prompt": prompt, |
| "generated_text": generated_text, |
| "full_text": full_text, |
| "generation_time": generation_time, |
| "tokens_generated": len(generated_ids) - len(input_ids), |
| "tokens_per_second": (len(generated_ids) - len(input_ids)) |
| / generation_time, |
| "quality_metrics": quality_metrics, |
| } |
| ) |
|
|
| results.extend(prompt_results) |
|
|
| return results |
|
|
| def _assess_generation_quality(self, text: str) -> Dict[str, float]: |
| """ |
| Assess basic quality metrics for generated text. |
| |
| Args: |
| text: Generated text to assess |
| |
| Returns: |
| Dictionary of quality metrics |
| """ |
| if not text.strip(): |
| return { |
| "length": 0, |
| "avg_word_length": 0, |
| "repetition_rate": 1.0, |
| "coherence_score": 0.0, |
| } |
|
|
| words = text.split() |
|
|
| |
| length = len(words) |
| avg_word_length = sum(len(word) for word in words) / len(words) if words else 0 |
|
|
| |
| bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)] |
| unique_bigrams = len(set(bigrams)) |
| repetition_rate = 1 - (unique_bigrams / len(bigrams) if bigrams else 0) |
|
|
| |
| sentences = text.split(".") |
| valid_sentences = [s for s in sentences if len(s.strip().split()) > 3] |
| coherence_score = len(valid_sentences) / len(sentences) if sentences else 0 |
|
|
| return { |
| "length": length, |
| "avg_word_length": avg_word_length, |
| "repetition_rate": repetition_rate, |
| "coherence_score": coherence_score, |
| } |
|
|
| def evaluate_downstream_tasks(self) -> Dict[str, Any]: |
| """ |
| Evaluate model performance on downstream tasks. |
| |
| This function implements basic downstream task evaluation including: |
| - Reading comprehension (simplified SQUAD-style) |
| - Sentiment analysis (few-shot) |
| - Common sense reasoning |
| |
| Returns: |
| Dictionary of downstream task results |
| """ |
| results = {} |
|
|
| |
| results["reading_comprehension"] = self._evaluate_reading_comprehension() |
|
|
| |
| results["sentiment_analysis"] = self._evaluate_sentiment_analysis() |
|
|
| |
| results["reasoning"] = self._evaluate_reasoning() |
|
|
| |
| results["text_completion"] = self._evaluate_text_completion() |
|
|
| return results |
|
|
| def _evaluate_reading_comprehension(self) -> Dict[str, Any]: |
| """Simplified reading comprehension evaluation.""" |
| |
| tasks = [ |
| { |
| "context": "The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France. It is named after the engineer Gustave Eiffel, whose company designed and built the tower.", |
| "question": "Who is the Eiffel Tower named after?", |
| "expected": "Gustave Eiffel", |
| }, |
| { |
| "context": "Python is a high-level programming language. It was created by Guido van Rossum and first released in 1991.", |
| "question": "When was Python first released?", |
| "expected": "1991", |
| }, |
| { |
| "context": "Machine learning is a subset of artificial intelligence that enables computers to learn without being explicitly programmed.", |
| "question": "What is machine learning a subset of?", |
| "expected": "artificial intelligence", |
| }, |
| ] |
|
|
| correct = 0 |
| total = len(tasks) |
|
|
| for task in tasks: |
| prompt = f"Context: {task['context']}\nQuestion: {task['question']}\nAnswer:" |
|
|
| |
| input_ids = self.tokenizer.encode(prompt) |
| input_tensor = torch.tensor([input_ids], dtype=torch.long, device=self.device) |
|
|
| with torch.no_grad(): |
| output = self.model.generate(input_tensor, max_new_tokens=20, temperature=0.1) |
|
|
| generated_ids = output[0].tolist() |
| answer = self.tokenizer.decode(generated_ids[len(input_ids) :]).strip().lower() |
|
|
| |
| if task["expected"].lower() in answer: |
| correct += 1 |
|
|
| return { |
| "accuracy": correct / total, |
| "correct": correct, |
| "total": total, |
| "score": correct / total, |
| } |
|
|
| def _evaluate_sentiment_analysis(self) -> Dict[str, Any]: |
| """Few-shot sentiment analysis evaluation.""" |
| |
| examples = "Examples:\nText: 'I love this movie!' Sentiment: Positive\nText: 'This is terrible.' Sentiment: Negative\nText: 'It was okay.' Sentiment: Neutral\n\n" |
|
|
| |
| test_cases = [ |
| {"text": "This is amazing!", "expected": "positive"}, |
| {"text": "I hate this.", "expected": "negative"}, |
| {"text": "This is wonderful.", "expected": "positive"}, |
| {"text": "This is awful.", "expected": "negative"}, |
| {"text": "It was fine.", "expected": "neutral"}, |
| ] |
|
|
| correct = 0 |
| total = len(test_cases) |
|
|
| for case in test_cases: |
| prompt = f"{examples}Text: '{case['text']}' Sentiment:" |
|
|
| |
| input_ids = self.tokenizer.encode(prompt) |
| input_tensor = torch.tensor([input_ids], dtype=torch.long, device=self.device) |
|
|
| with torch.no_grad(): |
| output = self.model.generate(input_tensor, max_new_tokens=5, temperature=0.1) |
|
|
| generated_ids = output[0].tolist() |
| sentiment = self.tokenizer.decode(generated_ids[len(input_ids) :]).strip().lower() |
|
|
| |
| if case["expected"] in sentiment: |
| correct += 1 |
|
|
| return { |
| "accuracy": correct / total, |
| "correct": correct, |
| "total": total, |
| "score": correct / total, |
| } |
|
|
| def _evaluate_reasoning(self) -> Dict[str, Any]: |
| """Simple reasoning evaluation.""" |
| |
| tasks = [ |
| { |
| "question": "If all birds can fly and a penguin is a bird, can a penguin fly?", |
| "expected": "no", |
| }, |
| { |
| "question": "If it is raining outside, should you take an umbrella?", |
| "expected": "yes", |
| }, |
| {"question": "What comes after Monday?", "expected": "tuesday"}, |
| {"question": "Is the sun larger than the earth?", "expected": "yes"}, |
| ] |
|
|
| correct = 0 |
| total = len(tasks) |
|
|
| for task in tasks: |
| prompt = f"Question: {task['question']}\nAnswer:" |
|
|
| |
| input_ids = self.tokenizer.encode(prompt) |
| input_tensor = torch.tensor([input_ids], dtype=torch.long, device=self.device) |
|
|
| with torch.no_grad(): |
| output = self.model.generate(input_tensor, max_new_tokens=10, temperature=0.1) |
|
|
| generated_ids = output[0].tolist() |
| answer = self.tokenizer.decode(generated_ids[len(input_ids) :]).strip().lower() |
|
|
| |
| if task["expected"] in answer: |
| correct += 1 |
|
|
| return { |
| "accuracy": correct / total, |
| "correct": correct, |
| "total": total, |
| "score": correct / total, |
| } |
|
|
| def _evaluate_text_completion(self) -> Dict[str, Any]: |
| """Evaluate text completion quality.""" |
| |
| completions = [ |
| {"prompt": "The capital of France is", "expected_word": "paris"}, |
| {"prompt": "Two plus two equals", "expected_word": "four"}, |
| {"prompt": "The largest planet in our solar system is", "expected_word": "jupiter"}, |
| {"prompt": "Water boils at", "expected_word": "100"}, |
| ] |
|
|
| correct = 0 |
| total = len(completions) |
|
|
| for completion in completions: |
| |
| input_ids = self.tokenizer.encode(completion["prompt"]) |
| input_tensor = torch.tensor([input_ids], dtype=torch.long, device=self.device) |
|
|
| with torch.no_grad(): |
| output = self.model.generate(input_tensor, max_new_tokens=5, temperature=0.1) |
|
|
| generated_ids = output[0].tolist() |
| generated_text = self.tokenizer.decode(generated_ids[len(input_ids) :]).strip().lower() |
|
|
| |
| if completion["expected_word"] in generated_text: |
| correct += 1 |
|
|
| return { |
| "accuracy": correct / total, |
| "correct": correct, |
| "total": total, |
| "score": correct / total, |
| } |
|
|
| def run_comprehensive_evaluation( |
| self, eval_data_path: str, metrics: List[str] = None, generation_prompts: List[str] = None |
| ) -> Dict[str, Any]: |
| """ |
| Run comprehensive model evaluation. |
| |
| Args: |
| eval_data_path: Path to evaluation text file |
| metrics: List of metrics to compute |
| generation_prompts: Prompts for text generation evaluation |
| |
| Returns: |
| Complete evaluation results |
| """ |
| if metrics is None: |
| metrics = ["perplexity", "loss", "generation"] |
|
|
| if generation_prompts is None: |
| generation_prompts = [ |
| "The history of artificial intelligence", |
| "Machine learning algorithms", |
| "The future of technology", |
| "In a world where", |
| "Scientists have discovered", |
| ] |
|
|
| results = { |
| "model_info": { |
| "parameters": self.model.get_num_params(), |
| "device": self.device, |
| "vocab_size": self.tokenizer.vocab_size(), |
| }, |
| "evaluation_timestamp": time.time(), |
| } |
|
|
| |
| print(f"π Loading evaluation data from {eval_data_path}") |
| if os.path.exists(eval_data_path): |
| with open(eval_data_path, "r", encoding="utf-8") as f: |
| eval_texts = [line.strip() for line in f if line.strip()] |
| else: |
| print("β οΈ Evaluation file not found, using sample texts") |
| eval_texts = [ |
| "Artificial intelligence is a rapidly growing field of computer science.", |
| "Machine learning algorithms can learn patterns from data automatically.", |
| "Natural language processing helps computers understand human language.", |
| "Deep learning uses neural networks with multiple layers for complex tasks.", |
| "The development of large language models has transformed AI applications.", |
| ] |
|
|
| |
| if "perplexity" in metrics or "loss" in metrics: |
| perplexity_results = self.evaluate_perplexity(eval_texts) |
| results["intrinsic_evaluation"] = perplexity_results |
|
|
| |
| if "generation" in metrics: |
| generation_results = self.evaluate_text_generation(generation_prompts) |
| results["generation_evaluation"] = { |
| "results": generation_results, |
| "summary": self._summarize_generation_results(generation_results), |
| } |
|
|
| |
| results["downstream_evaluation"] = self.evaluate_downstream_tasks() |
|
|
| |
| results["quality_assessment"] = self._assess_overall_quality(results) |
|
|
| return results |
|
|
| def _summarize_generation_results(self, results: List[Dict[str, Any]]) -> Dict[str, float]: |
| """Summarize text generation results.""" |
| if not results: |
| return {} |
|
|
| total_time = sum(r["generation_time"] for r in results) |
| total_tokens = sum(r["tokens_generated"] for r in results) |
|
|
| quality_metrics = [r["quality_metrics"] for r in results] |
|
|
| return { |
| "avg_generation_time": total_time / len(results), |
| "avg_tokens_per_second": total_tokens / total_time if total_time > 0 else 0, |
| "avg_length": sum(q["length"] for q in quality_metrics) / len(quality_metrics), |
| "avg_repetition_rate": sum(q["repetition_rate"] for q in quality_metrics) |
| / len(quality_metrics), |
| "avg_coherence_score": sum(q["coherence_score"] for q in quality_metrics) |
| / len(quality_metrics), |
| } |
|
|
| def _assess_overall_quality(self, results: Dict[str, Any]) -> Dict[str, Any]: |
| """Assess overall model quality based on evaluation results.""" |
| assessment = {"quality_level": "unknown", "recommendations": []} |
|
|
| |
| if "intrinsic_evaluation" in results: |
| perplexity = results["intrinsic_evaluation"].get("perplexity", float("inf")) |
|
|
| if perplexity < 12: |
| assessment["quality_level"] = "good" |
| assessment["recommendations"].append("Model shows good perplexity scores") |
| elif perplexity < 50: |
| assessment["quality_level"] = "fair" |
| assessment["recommendations"].append( |
| "Model shows fair performance, could benefit from more training" |
| ) |
| else: |
| assessment["quality_level"] = "poor" |
| assessment["recommendations"].append( |
| "Model needs significant more training or data improvements" |
| ) |
|
|
| |
| if "generation_evaluation" in results: |
| summary = results["generation_evaluation"].get("summary", {}) |
| repetition_rate = summary.get("avg_repetition_rate", 1.0) |
| coherence_score = summary.get("avg_coherence_score", 0.0) |
|
|
| if repetition_rate > 0.7: |
| assessment["recommendations"].append( |
| "High repetition rate - consider training longer or adjusting data" |
| ) |
| if coherence_score < 0.3: |
| assessment["recommendations"].append( |
| "Low coherence - model may need more training steps" |
| ) |
|
|
| return assessment |
|
|
|
|
| def load_model_from_directory(model_dir: str, device: str = "cpu") -> Tuple[GPTModel, str]: |
| """ |
| Load model from directory containing checkpoints. |
| |
| Args: |
| model_dir: Directory containing model files |
| device: Device to load model on |
| |
| Returns: |
| Tuple of (model, tokenizer_path) |
| """ |
| model_dir = Path(model_dir) |
|
|
| |
| best_model_path = model_dir / "best_model.pt" |
| if not best_model_path.exists(): |
| |
| checkpoints = list(model_dir.glob("checkpoint_step_*.pt")) |
| if not checkpoints: |
| raise FileNotFoundError(f"No model checkpoints found in {model_dir}") |
|
|
| |
| latest_checkpoint = max(checkpoints, key=lambda p: int(p.stem.split("_")[-1])) |
| best_model_path = latest_checkpoint |
|
|
| print(f"π Loading model from {best_model_path}") |
|
|
| |
| checkpoint = torch.load(best_model_path, map_location=device) |
|
|
| |
| config = checkpoint.get("config", {}) |
| n_layer = config.get("n_layer", 12) |
|
|
| if n_layer <= 6: |
| model_size = "small" |
| elif n_layer <= 12: |
| model_size = "medium" |
| else: |
| model_size = "large" |
|
|
| |
| model = create_model(model_size) |
| model.load_state_dict(checkpoint["model_state_dict"]) |
|
|
| print(f"β
Model loaded successfully ({model_size}, {model.get_num_params():,} parameters)") |
|
|
| |
| tokenizer_path = model_dir.parent / "tokenizer" / "tokenizer.model" |
| if not tokenizer_path.exists(): |
| tokenizer_path = Path("data/tokenizer/tokenizer.model") |
|
|
| if not tokenizer_path.exists(): |
| raise FileNotFoundError(f"Tokenizer not found at {tokenizer_path}") |
|
|
| return model, str(tokenizer_path) |
|
|
|
|
| def main(): |
| """Main evaluation function.""" |
| parser = argparse.ArgumentParser( |
| description="Evaluate OpenLLM model performance", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # Basic evaluation |
| python core/src/evaluate_model.py \\ |
| --model_dir models/small-extended-4k \\ |
| --eval_data data/clean/training_data.txt |
| |
| # Specific metrics |
| python core/src/evaluate_model.py \\ |
| --model_dir models/small-extended-4k \\ |
| --metrics perplexity,generation \\ |
| --output results.json |
| """, |
| ) |
|
|
| parser.add_argument("--model_dir", required=True, help="Directory containing trained model") |
|
|
| parser.add_argument( |
| "--eval_data", help="Path to evaluation text file (default: use sample texts)" |
| ) |
|
|
| parser.add_argument( |
| "--metrics", |
| default="perplexity,loss,generation", |
| help="Comma-separated list of metrics to evaluate (default: perplexity,loss,generation)", |
| ) |
|
|
| parser.add_argument("--output", help="Output JSON file for results (default: print to console)") |
|
|
| parser.add_argument( |
| "--device", |
| choices=["cpu", "cuda", "auto"], |
| default="auto", |
| help="Device for evaluation (default: auto)", |
| ) |
|
|
| parser.add_argument( |
| "--generation_prompts", help="File containing prompts for text generation evaluation" |
| ) |
|
|
| args = parser.parse_args() |
|
|
| print("π OpenLLM Model Evaluation") |
| print("=" * 50) |
|
|
| |
| if args.device == "auto": |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| else: |
| device = args.device |
|
|
| print(f"Using device: {device}") |
|
|
| try: |
| |
| model, tokenizer_path = load_model_from_directory(args.model_dir, device) |
|
|
| |
| evaluator = ModelEvaluator(model, tokenizer_path, device) |
|
|
| |
| metrics = [m.strip() for m in args.metrics.split(",")] |
|
|
| |
| generation_prompts = None |
| if args.generation_prompts and os.path.exists(args.generation_prompts): |
| with open(args.generation_prompts, "r", encoding="utf-8") as f: |
| generation_prompts = [line.strip() for line in f if line.strip()] |
|
|
| |
| eval_data_path = args.eval_data or "data/clean/training_data.txt" |
| results = evaluator.run_comprehensive_evaluation( |
| eval_data_path, metrics, generation_prompts |
| ) |
|
|
| |
| if args.output: |
| with open(args.output, "w", encoding="utf-8") as f: |
| json.dump(results, f, indent=2) |
| print(f"\nπΎ Results saved to {args.output}") |
| else: |
| print("\nπ Evaluation Results:") |
| print("=" * 50) |
|
|
| |
| if "intrinsic_evaluation" in results: |
| intrinsic = results["intrinsic_evaluation"] |
| print("π Intrinsic Metrics:") |
| print(f" Loss: {intrinsic['loss']:.4f}") |
| print(f" Perplexity: {intrinsic['perplexity']:.2f}") |
| print(f" Sequences evaluated: {intrinsic['num_sequences']:,}") |
|
|
| if "generation_evaluation" in results: |
| gen_summary = results["generation_evaluation"]["summary"] |
| print("\nβοΈ Generation Quality:") |
| print( |
| f" Avg generation speed: {gen_summary['avg_tokens_per_second']:.1f} tokens/sec" |
| ) |
| print(f" Avg text length: {gen_summary['avg_length']:.1f} words") |
| print(f" Repetition rate: {gen_summary['avg_repetition_rate']:.3f}") |
| print(f" Coherence score: {gen_summary['avg_coherence_score']:.3f}") |
|
|
| |
| if "quality_assessment" in results: |
| assessment = results["quality_assessment"] |
| print("\nπ― Overall Assessment:") |
| print(f" Quality Level: {assessment['quality_level'].upper()}") |
| for rec in assessment["recommendations"]: |
| print(f" β’ {rec}") |
|
|
| print("\nπ Evaluation completed successfully!") |
|
|
| except Exception as e: |
| print(f"\nβ Evaluation failed: {e}") |
| import traceback |
|
|
| traceback.print_exc() |
| return False |
|
|
| return True |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|