| """Additional Utility Metrics"""
|
|
|
| import logging
|
| from typing import Any, Dict, List, Optional
|
|
|
| import numpy as np
|
| import torch
|
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| def compute_all_metrics(
|
| predictions: np.ndarray,
|
| references: np.ndarray,
|
| task: str = "classification",
|
| ) -> Dict[str, float]:
|
| """Compute comprehensive metrics based on task type."""
|
| if task == "classification":
|
| return compute_classification_metrics(predictions, references)
|
| elif task == "regression":
|
| return compute_regression_metrics(predictions, references)
|
| elif task == "code_generation":
|
| return compute_code_metrics(predictions, references)
|
| elif task == "reasoning":
|
| return compute_reasoning_metrics(predictions, references)
|
| else:
|
| raise ValueError(f"Unknown task: {task}")
|
|
|
|
|
| def compute_classification_metrics(
|
| predictions: np.ndarray,
|
| references: np.ndarray,
|
| average: str = "macro",
|
| ) -> Dict[str, float]:
|
| """Compute classification metrics."""
|
| from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
|
|
|
| accuracy = accuracy_score(references, predictions)
|
| f1 = f1_score(references, predictions, average=average, zero_division=0)
|
| precision = precision_score(references, predictions, average=average, zero_division=0)
|
| recall = recall_score(references, predictions, average=average, zero_division=0)
|
|
|
| return {
|
| "accuracy": accuracy,
|
| "f1": f1,
|
| "precision": precision,
|
| "recall": recall,
|
| }
|
|
|
|
|
| def compute_regression_metrics(
|
| predictions: np.ndarray,
|
| references: np.ndarray,
|
| ) -> Dict[str, float]:
|
| """Compute regression metrics."""
|
| mae = mean_absolute_error(references, predictions)
|
| mse = mean_squared_error(references, predictions)
|
| rmse = np.sqrt(mse)
|
| r2 = r2_score(references, predictions)
|
|
|
| return {
|
| "mae": mae,
|
| "mse": mse,
|
| "rmse": rmse,
|
| "r2": r2,
|
| }
|
|
|
|
|
| def compute_code_metrics(
|
| predictions: List[str],
|
| references: List[str],
|
| ) -> Dict[str, float]:
|
| """Compute code generation metrics."""
|
|
|
| exact_matches = sum(p.strip() == r.strip() for p, r in zip(predictions, references))
|
| exact_match_rate = exact_matches / len(predictions) if predictions else 0.0
|
|
|
|
|
| try:
|
| from nltk.translate.bleu_score import corpus_bleu
|
|
|
|
|
| pred_tokens = [p.split() for p in predictions]
|
| ref_tokens = [[r.split()] for r in references]
|
|
|
| bleu = corpus_bleu(ref_tokens, pred_tokens)
|
| except ImportError:
|
| bleu = 0.0
|
|
|
| return {
|
| "exact_match": exact_match_rate,
|
| "bleu": bleu,
|
| }
|
|
|
|
|
| def compute_reasoning_metrics(
|
| predictions: List[str],
|
| references: List[str],
|
| steps_predictions: Optional[List[List[str]]] = None,
|
| steps_references: Optional[List[List[str]]] = None,
|
| ) -> Dict[str, float]:
|
| """Compute reasoning-specific metrics."""
|
|
|
| exact_matches = sum(p.strip() == r.strip() for p, r in zip(predictions, references))
|
| exact_match_rate = exact_matches / len(predictions) if predictions else 0.0
|
|
|
|
|
| step_accuracy = 0.0
|
| if steps_predictions and steps_references:
|
| step_scores = []
|
| for pred_steps, ref_steps in zip(steps_predictions, steps_references):
|
|
|
| pred_set = set(pred_steps)
|
| ref_set = set(ref_steps)
|
| if ref_set:
|
| intersection = pred_set & ref_set
|
| union = pred_set | ref_set
|
| step_scores.append(len(intersection) / len(union))
|
| if step_scores:
|
| step_accuracy = np.mean(step_scores)
|
|
|
| return {
|
| "exact_match": exact_match_rate,
|
| "step_accuracy": step_accuracy,
|
| }
|
|
|
|
|
| def compute_perplexity_from_loss(loss: float) -> float:
|
| """Convert loss to perplexity."""
|
| return float(torch.exp(torch.tensor(loss)).item())
|
|
|
|
|
| def compute_parameter_count(model: torch.nn.Module) -> Dict[str, int]:
|
| """Count parameters by type."""
|
| total_params = sum(p.numel() for p in model.parameters())
|
| trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
|
|
|
|
|
| module_counts = {}
|
| for name, module in model.named_modules():
|
| module_type = type(module).__name__
|
| if module_type not in module_counts:
|
| module_counts[module_type] = 0
|
| module_counts[module_type] += sum(p.numel() for p in module.parameters())
|
|
|
| return {
|
| "total": total_params,
|
| "trainable": trainable_params,
|
| "frozen": total_params - trainable_params,
|
| "by_module": module_counts,
|
| }
|
|
|
|
|
| def compute_flops(
|
| model: torch.nn.Module,
|
| input_shape: tuple,
|
| forward_pass: bool = True,
|
| ) -> Dict[str, float]:
|
| """Estimate FLOPs for a forward pass."""
|
|
|
|
|
| total_params = sum(p.numel() for p in model.parameters())
|
|
|
|
|
| batch_size, seq_len = input_shape[0], input_shape[1]
|
| flops_per_token = 2 * total_params
|
| total_flops = flops_per_token * seq_len * batch_size
|
|
|
| return {
|
| "total_flops": total_flops,
|
| "flops_per_token": flops_per_token,
|
| "gflops": total_flops / 1e9,
|
| }
|
|
|
|
|
| def compute_memory_usage(
|
| model: torch.nn.Module,
|
| batch_size: int,
|
| seq_len: int,
|
| dtype: str = "bfloat16",
|
| ) -> Dict[str, float]:
|
| """Estimate memory usage."""
|
|
|
| param_bytes = {
|
| "float32": 4,
|
| "float16": 2,
|
| "bfloat16": 2,
|
| "int8": 1,
|
| "int4": 0.5,
|
| }[dtype]
|
|
|
| param_memory = sum(p.numel() for p in model.parameters()) * param_bytes / 1e9
|
|
|
|
|
|
|
| d_model = getattr(model.config, "d_model", 2048)
|
| num_layers = getattr(model.config, "num_hidden_layers", 24)
|
| activation_memory = batch_size * seq_len * d_model * 2 * num_layers * param_bytes / 1e9
|
|
|
|
|
| gradient_memory = param_memory
|
|
|
| total_memory = param_memory + activation_memory + gradient_memory
|
|
|
| return {
|
| "parameters_gb": param_memory,
|
| "activations_gb": activation_memory,
|
| "gradients_gb": gradient_memory,
|
| "total_gb": total_memory,
|
| }
|
|
|
|
|
| def track_gradient_norms(
|
| model: torch.nn.Module,
|
| norm_type: float = 2.0,
|
| ) -> Dict[str, float]:
|
| """Compute gradient norms for debugging."""
|
| total_norm = 0.0
|
| param_norms = {}
|
|
|
| for name, param in model.named_parameters():
|
| if param.grad is not None:
|
| param_norm = param.grad.data.norm(norm_type).item()
|
| param_norms[name] = param_norm
|
| total_norm += param_norm ** norm_type
|
|
|
| total_norm = total_norm ** (1.0 / norm_type)
|
|
|
| return {
|
| "total_grad_norm": total_norm,
|
| "param_grad_norms": param_norms,
|
| }
|
|
|
|
|
| def compute_parameter_distribution(model: torch.nn.Module) -> Dict[str, Any]:
|
| """Analyze parameter distribution (mean, std, min, max)."""
|
| stats = {
|
| "mean": [],
|
| "std": [],
|
| "min": [],
|
| "max": [],
|
| "num_zeros": [],
|
| }
|
|
|
| for name, param in model.named_parameters():
|
| if param.requires_grad:
|
| data = param.data.cpu().numpy().flatten()
|
| stats["mean"].append(float(np.mean(data)))
|
| stats["std"].append(float(np.std(data)))
|
| stats["min"].append(float(np.min(data)))
|
| stats["max"].append(float(np.max(data)))
|
| stats["num_zeros"].append(int(np.sum(data == 0)))
|
|
|
|
|
| return {
|
| "overall_mean": float(np.mean(stats["mean"])),
|
| "overall_std": float(np.mean(stats["std"])),
|
| "overall_min": float(np.min(stats["min"])),
|
| "overall_max": float(np.max(stats["max"])),
|
| "total_zeros": sum(stats["num_zeros"]),
|
| "zero_percentage": sum(stats["num_zeros"]) / sum(p.numel() for p in model.parameters() if p.requires_grad),
|
| }
|
|
|