| import sys |
| from pathlib import Path |
|
|
| sys.path.insert(0, str(Path(__file__).parent.parent.parent)) |
|
|
| import logging |
| from typing import Optional, Callable |
|
|
| import torch |
| import torch.nn as nn |
| from torch.utils.data import DataLoader |
|
|
| from codsworth.config import CodsworthConfig |
| from codsworth.model import CodsworthTransformer |
| from codsworth.utils import get_device, AverageMeter |
| from codsworth.eval.perplexity import Perplexity |
|
|
|
|
| logger = logging.getLogger("codsworth") |
|
|
|
|
| class Evaluator: |
| """Evaluator for Codsworth model.""" |
| |
| def __init__( |
| self, |
| model: CodsworthTransformer, |
| tokenizer, |
| config: Optional[CodsworthConfig] = None, |
| device: Optional[torch.device] = None, |
| ): |
| self.model = model |
| self.tokenizer = tokenizer |
| self.config = config |
| self.device = device or get_device() |
| |
| self.model = self.model.to(self.device) |
| self.model.eval() |
| |
| self.perplexity_metric = Perplexity() |
| |
| def evaluate( |
| self, |
| dataloader: DataLoader, |
| max_batches: Optional[int] = None, |
| ) -> dict: |
| self.model.eval() |
| |
| loss_meter = AverageMeter("loss") |
| perplexity_meter = AverageMeter("perplexity") |
| |
| total_tokens = 0 |
| total_loss = 0.0 |
| |
| with torch.no_grad(): |
| for batch_idx, batch in enumerate(dataloader): |
| if max_batches is not None and batch_idx >= max_batches: |
| break |
| |
| input_ids = batch["input_ids"].to(self.device) |
| labels = batch["labels"].to(self.device) |
| |
| outputs = self.model(input_ids=input_ids, labels=labels) |
| |
| loss = outputs["loss"] |
| loss_meter.update(loss.item()) |
| |
| perplexity = torch.exp(loss) |
| perplexity_meter.update(perplexity.item()) |
| |
| batch_tokens = (labels != self.tokenizer.pad_id).sum().item() |
| total_tokens += batch_tokens |
| total_loss += loss.item() * batch_tokens |
| |
| avg_loss = total_loss / max(1, total_tokens) |
| avg_perplexity = torch.exp(torch.tensor(avg_loss)).item() |
| |
| results = { |
| "loss": loss_meter.avg, |
| "perplexity": perplexity_meter.avg, |
| "avg_perplexity": avg_perplexity, |
| "total_tokens": total_tokens, |
| } |
| |
| logger.info(f"Evaluation Results:") |
| logger.info(f" Loss: {results['loss']:.4f}") |
| logger.info(f" Perplexity: {results['perplexity']:.4f}") |
| |
| return results |
| |
| def evaluate_generation( |
| self, |
| prompt: str, |
| max_new_tokens: int = 100, |
| temperature: float = 1.0, |
| top_k: Optional[int] = None, |
| top_p: Optional[float] = None, |
| ) -> str: |
| self.model.eval() |
| |
| input_ids = self.tokenizer.encode( |
| prompt, |
| add_bos=True, |
| add_eos=False, |
| ) |
| input_ids = torch.tensor([input_ids], dtype=torch.long).to(self.device) |
| |
| with torch.no_grad(): |
| generated = self.model.generate( |
| input_ids=input_ids, |
| max_new_tokens=max_new_tokens, |
| temperature=temperature, |
| top_k=top_k, |
| top_p=top_p, |
| eos_token_id=self.tokenizer.eos_id, |
| ) |
| |
| generated_text = self.tokenizer.decode( |
| generated[0], |
| remove_special_tokens=True, |
| ) |
| |
| return generated_text |
| |
| def evaluate_perplexity( |
| self, |
| texts: list[str], |
| context_length: int = 2048, |
| ) -> float: |
| self.model.eval() |
| |
| total_loss = 0.0 |
| total_tokens = 0 |
| |
| with torch.no_grad(): |
| for text in texts: |
| token_ids = self.tokenizer.encode( |
| text, |
| add_bos=False, |
| add_eos=False, |
| ) |
| |
| num_chunks = (len(token_ids) + context_length - 1) // context_length |
| |
| for chunk_idx in range(num_chunks): |
| start = chunk_idx * context_length |
| end = min(start + context_length, len(token_ids)) |
| |
| chunk_ids = token_ids[start:end] |
| labels = token_ids[start + 1 : end + 1] |
| |
| if len(chunk_ids) < 2: |
| continue |
| |
| input_tensor = torch.tensor([chunk_ids], dtype=torch.long).to(self.device) |
| labels_tensor = torch.tensor([labels], dtype=torch.long).to(self.device) |
| |
| outputs = self.model(input_ids=input_tensor, labels=labels_tensor) |
| |
| token_loss = outputs["loss"].item() |
| num_valid_tokens = len([l for l in labels if l != self.tokenizer.pad_id]) |
| |
| total_loss += token_loss * num_valid_tokens |
| total_tokens += num_valid_tokens |
| |
| if total_tokens == 0: |
| return float("inf") |
| |
| avg_loss = total_loss / total_tokens |
| perplexity = torch.exp(torch.tensor(avg_loss)).item() |
| |
| return perplexity |
| |
| def compare_models( |
| self, |
| other_model: CodsworthTransformer, |
| dataloader: DataLoader, |
| max_batches: Optional[int] = None, |
| ) -> dict: |
| self_results = self.evaluate(dataloader, max_batches) |
| |
| other_evaluator = Evaluator( |
| other_model, |
| self.tokenizer, |
| self.config, |
| self.device, |
| ) |
| other_results = other_evaluator.evaluate(dataloader, max_batches) |
| |
| return { |
| "model_1": self_results, |
| "model_2": other_results, |
| "difference": { |
| "loss": self_results["loss"] - other_results["loss"], |
| "perplexity": self_results["perplexity"] - other_results["perplexity"], |
| }, |
| } |
|
|
|
|
| def evaluate( |
| model: CodsworthTransformer, |
| tokenizer, |
| dataloader: DataLoader, |
| device: Optional[torch.device] = None, |
| max_batches: Optional[int] = None, |
| ) -> dict: |
| evaluator = Evaluator(model, tokenizer, device=device) |
| return evaluator.evaluate(dataloader, max_batches) |