"""Additional Utility Metrics""" import logging from typing import Any, Dict, List, Optional import numpy as np import torch from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score logger = logging.getLogger(__name__) def compute_all_metrics( predictions: np.ndarray, references: np.ndarray, task: str = "classification", ) -> Dict[str, float]: """Compute comprehensive metrics based on task type.""" if task == "classification": return compute_classification_metrics(predictions, references) elif task == "regression": return compute_regression_metrics(predictions, references) elif task == "code_generation": return compute_code_metrics(predictions, references) elif task == "reasoning": return compute_reasoning_metrics(predictions, references) else: raise ValueError(f"Unknown task: {task}") def compute_classification_metrics( predictions: np.ndarray, references: np.ndarray, average: str = "macro", ) -> Dict[str, float]: """Compute classification metrics.""" from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score accuracy = accuracy_score(references, predictions) f1 = f1_score(references, predictions, average=average, zero_division=0) precision = precision_score(references, predictions, average=average, zero_division=0) recall = recall_score(references, predictions, average=average, zero_division=0) return { "accuracy": accuracy, "f1": f1, "precision": precision, "recall": recall, } def compute_regression_metrics( predictions: np.ndarray, references: np.ndarray, ) -> Dict[str, float]: """Compute regression metrics.""" mae = mean_absolute_error(references, predictions) mse = mean_squared_error(references, predictions) rmse = np.sqrt(mse) r2 = r2_score(references, predictions) return { "mae": mae, "mse": mse, "rmse": rmse, "r2": r2, } def compute_code_metrics( predictions: List[str], references: List[str], ) -> Dict[str, float]: """Compute code generation metrics.""" # Exact match exact_matches = sum(p.strip() == r.strip() for p, r in zip(predictions, references)) exact_match_rate = exact_matches / len(predictions) if predictions else 0.0 # BLEU score (simplified) try: from nltk.translate.bleu_score import corpus_bleu # Tokenize pred_tokens = [p.split() for p in predictions] ref_tokens = [[r.split()] for r in references] bleu = corpus_bleu(ref_tokens, pred_tokens) except ImportError: bleu = 0.0 return { "exact_match": exact_match_rate, "bleu": bleu, } def compute_reasoning_metrics( predictions: List[str], references: List[str], steps_predictions: Optional[List[List[str]]] = None, steps_references: Optional[List[List[str]]] = None, ) -> Dict[str, float]: """Compute reasoning-specific metrics.""" # Exact match exact_matches = sum(p.strip() == r.strip() for p, r in zip(predictions, references)) exact_match_rate = exact_matches / len(predictions) if predictions else 0.0 # Step-level accuracy if available step_accuracy = 0.0 if steps_predictions and steps_references: step_scores = [] for pred_steps, ref_steps in zip(steps_predictions, steps_references): # Jaccard similarity pred_set = set(pred_steps) ref_set = set(ref_steps) if ref_set: intersection = pred_set & ref_set union = pred_set | ref_set step_scores.append(len(intersection) / len(union)) if step_scores: step_accuracy = np.mean(step_scores) return { "exact_match": exact_match_rate, "step_accuracy": step_accuracy, } def compute_perplexity_from_loss(loss: float) -> float: """Convert loss to perplexity.""" return float(torch.exp(torch.tensor(loss)).item()) def compute_parameter_count(model: torch.nn.Module) -> Dict[str, int]: """Count parameters by type.""" total_params = sum(p.numel() for p in model.parameters()) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) # Count by module type module_counts = {} for name, module in model.named_modules(): module_type = type(module).__name__ if module_type not in module_counts: module_counts[module_type] = 0 module_counts[module_type] += sum(p.numel() for p in module.parameters()) return { "total": total_params, "trainable": trainable_params, "frozen": total_params - trainable_params, "by_module": module_counts, } def compute_flops( model: torch.nn.Module, input_shape: tuple, forward_pass: bool = True, ) -> Dict[str, float]: """Estimate FLOPs for a forward pass.""" # Simplified FLOPs estimation # For transformer: 6 * batch_size * seq_len * d_model^2 per layer (approx) total_params = sum(p.numel() for p in model.parameters()) # Rough estimate: 2 * params per token batch_size, seq_len = input_shape[0], input_shape[1] flops_per_token = 2 * total_params total_flops = flops_per_token * seq_len * batch_size return { "total_flops": total_flops, "flops_per_token": flops_per_token, "gflops": total_flops / 1e9, } def compute_memory_usage( model: torch.nn.Module, batch_size: int, seq_len: int, dtype: str = "bfloat16", ) -> Dict[str, float]: """Estimate memory usage.""" # Parameter memory param_bytes = { "float32": 4, "float16": 2, "bfloat16": 2, "int8": 1, "int4": 0.5, }[dtype] param_memory = sum(p.numel() for p in model.parameters()) * param_bytes / 1e9 # GB # Activation memory (rough estimate: batch_size * seq_len * d_model * 2 * num_layers) # Assuming 2x for activations d_model = getattr(model.config, "d_model", 2048) num_layers = getattr(model.config, "num_hidden_layers", 24) activation_memory = batch_size * seq_len * d_model * 2 * num_layers * param_bytes / 1e9 # Gradient memory (same as parameters if not using gradient checkpointing) gradient_memory = param_memory total_memory = param_memory + activation_memory + gradient_memory return { "parameters_gb": param_memory, "activations_gb": activation_memory, "gradients_gb": gradient_memory, "total_gb": total_memory, } def track_gradient_norms( model: torch.nn.Module, norm_type: float = 2.0, ) -> Dict[str, float]: """Compute gradient norms for debugging.""" total_norm = 0.0 param_norms = {} for name, param in model.named_parameters(): if param.grad is not None: param_norm = param.grad.data.norm(norm_type).item() param_norms[name] = param_norm total_norm += param_norm ** norm_type total_norm = total_norm ** (1.0 / norm_type) return { "total_grad_norm": total_norm, "param_grad_norms": param_norms, } def compute_parameter_distribution(model: torch.nn.Module) -> Dict[str, Any]: """Analyze parameter distribution (mean, std, min, max).""" stats = { "mean": [], "std": [], "min": [], "max": [], "num_zeros": [], } for name, param in model.named_parameters(): if param.requires_grad: data = param.data.cpu().numpy().flatten() stats["mean"].append(float(np.mean(data))) stats["std"].append(float(np.std(data))) stats["min"].append(float(np.min(data))) stats["max"].append(float(np.max(data))) stats["num_zeros"].append(int(np.sum(data == 0))) # Aggregate return { "overall_mean": float(np.mean(stats["mean"])), "overall_std": float(np.mean(stats["std"])), "overall_min": float(np.min(stats["min"])), "overall_max": float(np.max(stats["max"])), "total_zeros": sum(stats["num_zeros"]), "zero_percentage": sum(stats["num_zeros"]) / sum(p.numel() for p in model.parameters() if p.requires_grad), }