Zenith-7b-V1 / utils /metrics.py
Zandy-Wandy's picture
Upload Zenith-7B model
8d18b7c verified
"""Additional Utility Metrics"""
import logging
from typing import Any, Dict, List, Optional
import numpy as np
import torch
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
logger = logging.getLogger(__name__)
def compute_all_metrics(
predictions: np.ndarray,
references: np.ndarray,
task: str = "classification",
) -> Dict[str, float]:
"""Compute comprehensive metrics based on task type."""
if task == "classification":
return compute_classification_metrics(predictions, references)
elif task == "regression":
return compute_regression_metrics(predictions, references)
elif task == "code_generation":
return compute_code_metrics(predictions, references)
elif task == "reasoning":
return compute_reasoning_metrics(predictions, references)
else:
raise ValueError(f"Unknown task: {task}")
def compute_classification_metrics(
predictions: np.ndarray,
references: np.ndarray,
average: str = "macro",
) -> Dict[str, float]:
"""Compute classification metrics."""
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
accuracy = accuracy_score(references, predictions)
f1 = f1_score(references, predictions, average=average, zero_division=0)
precision = precision_score(references, predictions, average=average, zero_division=0)
recall = recall_score(references, predictions, average=average, zero_division=0)
return {
"accuracy": accuracy,
"f1": f1,
"precision": precision,
"recall": recall,
}
def compute_regression_metrics(
predictions: np.ndarray,
references: np.ndarray,
) -> Dict[str, float]:
"""Compute regression metrics."""
mae = mean_absolute_error(references, predictions)
mse = mean_squared_error(references, predictions)
rmse = np.sqrt(mse)
r2 = r2_score(references, predictions)
return {
"mae": mae,
"mse": mse,
"rmse": rmse,
"r2": r2,
}
def compute_code_metrics(
predictions: List[str],
references: List[str],
) -> Dict[str, float]:
"""Compute code generation metrics."""
# Exact match
exact_matches = sum(p.strip() == r.strip() for p, r in zip(predictions, references))
exact_match_rate = exact_matches / len(predictions) if predictions else 0.0
# BLEU score (simplified)
try:
from nltk.translate.bleu_score import corpus_bleu
# Tokenize
pred_tokens = [p.split() for p in predictions]
ref_tokens = [[r.split()] for r in references]
bleu = corpus_bleu(ref_tokens, pred_tokens)
except ImportError:
bleu = 0.0
return {
"exact_match": exact_match_rate,
"bleu": bleu,
}
def compute_reasoning_metrics(
predictions: List[str],
references: List[str],
steps_predictions: Optional[List[List[str]]] = None,
steps_references: Optional[List[List[str]]] = None,
) -> Dict[str, float]:
"""Compute reasoning-specific metrics."""
# Exact match
exact_matches = sum(p.strip() == r.strip() for p, r in zip(predictions, references))
exact_match_rate = exact_matches / len(predictions) if predictions else 0.0
# Step-level accuracy if available
step_accuracy = 0.0
if steps_predictions and steps_references:
step_scores = []
for pred_steps, ref_steps in zip(steps_predictions, steps_references):
# Jaccard similarity
pred_set = set(pred_steps)
ref_set = set(ref_steps)
if ref_set:
intersection = pred_set & ref_set
union = pred_set | ref_set
step_scores.append(len(intersection) / len(union))
if step_scores:
step_accuracy = np.mean(step_scores)
return {
"exact_match": exact_match_rate,
"step_accuracy": step_accuracy,
}
def compute_perplexity_from_loss(loss: float) -> float:
"""Convert loss to perplexity."""
return float(torch.exp(torch.tensor(loss)).item())
def compute_parameter_count(model: torch.nn.Module) -> Dict[str, int]:
"""Count parameters by type."""
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
# Count by module type
module_counts = {}
for name, module in model.named_modules():
module_type = type(module).__name__
if module_type not in module_counts:
module_counts[module_type] = 0
module_counts[module_type] += sum(p.numel() for p in module.parameters())
return {
"total": total_params,
"trainable": trainable_params,
"frozen": total_params - trainable_params,
"by_module": module_counts,
}
def compute_flops(
model: torch.nn.Module,
input_shape: tuple,
forward_pass: bool = True,
) -> Dict[str, float]:
"""Estimate FLOPs for a forward pass."""
# Simplified FLOPs estimation
# For transformer: 6 * batch_size * seq_len * d_model^2 per layer (approx)
total_params = sum(p.numel() for p in model.parameters())
# Rough estimate: 2 * params per token
batch_size, seq_len = input_shape[0], input_shape[1]
flops_per_token = 2 * total_params
total_flops = flops_per_token * seq_len * batch_size
return {
"total_flops": total_flops,
"flops_per_token": flops_per_token,
"gflops": total_flops / 1e9,
}
def compute_memory_usage(
model: torch.nn.Module,
batch_size: int,
seq_len: int,
dtype: str = "bfloat16",
) -> Dict[str, float]:
"""Estimate memory usage."""
# Parameter memory
param_bytes = {
"float32": 4,
"float16": 2,
"bfloat16": 2,
"int8": 1,
"int4": 0.5,
}[dtype]
param_memory = sum(p.numel() for p in model.parameters()) * param_bytes / 1e9 # GB
# Activation memory (rough estimate: batch_size * seq_len * d_model * 2 * num_layers)
# Assuming 2x for activations
d_model = getattr(model.config, "d_model", 2048)
num_layers = getattr(model.config, "num_hidden_layers", 24)
activation_memory = batch_size * seq_len * d_model * 2 * num_layers * param_bytes / 1e9
# Gradient memory (same as parameters if not using gradient checkpointing)
gradient_memory = param_memory
total_memory = param_memory + activation_memory + gradient_memory
return {
"parameters_gb": param_memory,
"activations_gb": activation_memory,
"gradients_gb": gradient_memory,
"total_gb": total_memory,
}
def track_gradient_norms(
model: torch.nn.Module,
norm_type: float = 2.0,
) -> Dict[str, float]:
"""Compute gradient norms for debugging."""
total_norm = 0.0
param_norms = {}
for name, param in model.named_parameters():
if param.grad is not None:
param_norm = param.grad.data.norm(norm_type).item()
param_norms[name] = param_norm
total_norm += param_norm ** norm_type
total_norm = total_norm ** (1.0 / norm_type)
return {
"total_grad_norm": total_norm,
"param_grad_norms": param_norms,
}
def compute_parameter_distribution(model: torch.nn.Module) -> Dict[str, Any]:
"""Analyze parameter distribution (mean, std, min, max)."""
stats = {
"mean": [],
"std": [],
"min": [],
"max": [],
"num_zeros": [],
}
for name, param in model.named_parameters():
if param.requires_grad:
data = param.data.cpu().numpy().flatten()
stats["mean"].append(float(np.mean(data)))
stats["std"].append(float(np.std(data)))
stats["min"].append(float(np.min(data)))
stats["max"].append(float(np.max(data)))
stats["num_zeros"].append(int(np.sum(data == 0)))
# Aggregate
return {
"overall_mean": float(np.mean(stats["mean"])),
"overall_std": float(np.mean(stats["std"])),
"overall_min": float(np.min(stats["min"])),
"overall_max": float(np.max(stats["max"])),
"total_zeros": sum(stats["num_zeros"]),
"zero_percentage": sum(stats["num_zeros"]) / sum(p.numel() for p in model.parameters() if p.requires_grad),
}