recomendation / utils /advanced_metrics.py
Ali Mohsin
fixes that fix
78db21d
"""
Advanced metrics calculation for outfit recommendation system.
Includes accuracy, precision, recall, F1 score, and other research-grade metrics.
"""
import numpy as np
import torch
import torch.nn.functional as F
from typing import Dict, List, Any, Tuple
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score
import json
from pathlib import Path
class AdvancedMetrics:
"""Calculate comprehensive metrics for outfit recommendation models."""
def __init__(self):
self.reset()
def reset(self):
"""Reset all metrics."""
self.predictions = []
self.targets = []
self.scores = []
self.embeddings = []
self.outfit_scores = []
def add_batch(self, predictions: torch.Tensor, targets: torch.Tensor,
scores: torch.Tensor = None, embeddings: torch.Tensor = None):
"""Add a batch of predictions and targets."""
self.predictions.extend(predictions.detach().cpu().numpy())
self.targets.extend(targets.detach().cpu().numpy())
if scores is not None:
self.scores.extend(scores.detach().cpu().numpy())
if embeddings is not None:
self.embeddings.extend(embeddings.detach().cpu().numpy())
def add_outfit_scores(self, outfit_scores: List[float]):
"""Add outfit compatibility scores."""
self.outfit_scores.extend(outfit_scores)
def calculate_classification_metrics(self) -> Dict[str, float]:
"""Calculate classification metrics."""
if not self.predictions or not self.targets:
return {}
preds = np.array(self.predictions)
targets = np.array(self.targets)
# Convert to binary if needed
if preds.max() > 1:
preds = (preds > 0.5).astype(int)
if targets.max() > 1:
targets = (targets > 0.5).astype(int)
accuracy = accuracy_score(targets, preds)
precision, recall, f1, _ = precision_recall_fscore_support(
targets, preds, average='weighted', zero_division=0
)
# Calculate per-class metrics
precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(
targets, preds, average='macro', zero_division=0
)
# Calculate AUC if we have scores
auc = None
if self.scores:
try:
scores_array = np.array(self.scores)
if len(np.unique(targets)) > 1: # Need both classes for AUC
auc = roc_auc_score(targets, scores_array)
except ValueError:
auc = None
return {
"accuracy": float(accuracy),
"precision_weighted": float(precision),
"recall_weighted": float(recall),
"f1_weighted": float(f1),
"precision_macro": float(precision_macro),
"recall_macro": float(recall_macro),
"f1_macro": float(f1_macro),
"auc": float(auc) if auc is not None else None
}
def calculate_embedding_metrics(self) -> Dict[str, float]:
"""Calculate embedding quality metrics."""
if not self.embeddings:
return {}
embeddings = np.array(self.embeddings)
# Calculate embedding statistics
mean_norm = np.mean(np.linalg.norm(embeddings, axis=1))
std_norm = np.std(np.linalg.norm(embeddings, axis=1))
# Calculate intra-class and inter-class distances
if len(self.targets) > 1:
targets = np.array(self.targets)
unique_classes = np.unique(targets)
intra_class_distances = []
inter_class_distances = []
for class_label in unique_classes:
class_embeddings = embeddings[targets == class_label]
if len(class_embeddings) > 1:
# Intra-class distances
for i in range(len(class_embeddings)):
for j in range(i + 1, len(class_embeddings)):
dist = np.linalg.norm(class_embeddings[i] - class_embeddings[j])
intra_class_distances.append(dist)
# Inter-class distances
other_embeddings = embeddings[targets != class_label]
if len(other_embeddings) > 0:
for class_emb in class_embeddings:
for other_emb in other_embeddings:
dist = np.linalg.norm(class_emb - other_emb)
inter_class_distances.append(dist)
avg_intra_class = np.mean(intra_class_distances) if intra_class_distances else 0
avg_inter_class = np.mean(inter_class_distances) if inter_class_distances else 0
# Separation ratio (higher is better)
separation_ratio = avg_inter_class / (avg_intra_class + 1e-8)
else:
avg_intra_class = 0
avg_inter_class = 0
separation_ratio = 0
return {
"embedding_mean_norm": float(mean_norm),
"embedding_std_norm": float(std_norm),
"avg_intra_class_distance": float(avg_intra_class),
"avg_inter_class_distance": float(avg_inter_class),
"separation_ratio": float(separation_ratio)
}
def calculate_outfit_metrics(self) -> Dict[str, float]:
"""Calculate outfit-specific metrics."""
if not self.outfit_scores:
return {}
scores = np.array(self.outfit_scores)
return {
"outfit_score_mean": float(np.mean(scores)),
"outfit_score_std": float(np.std(scores)),
"outfit_score_min": float(np.min(scores)),
"outfit_score_max": float(np.max(scores)),
"outfit_score_median": float(np.median(scores))
}
def calculate_all_metrics(self) -> Dict[str, Any]:
"""Calculate all available metrics."""
metrics = {
"classification": self.calculate_classification_metrics(),
"embeddings": self.calculate_embedding_metrics(),
"outfits": self.calculate_outfit_metrics()
}
# Add summary statistics
metrics["summary"] = {
"total_predictions": len(self.predictions),
"total_targets": len(self.targets),
"total_scores": len(self.scores),
"total_embeddings": len(self.embeddings),
"total_outfit_scores": len(self.outfit_scores)
}
return metrics
def save_metrics(self, filepath: str, additional_info: Dict[str, Any] = None):
"""Save metrics to JSON file."""
metrics = self.calculate_all_metrics()
if additional_info:
metrics["additional_info"] = additional_info
# Ensure directory exists
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w') as f:
json.dump(metrics, f, indent=2)
return metrics
def calculate_triplet_metrics(anchor_emb: torch.Tensor, positive_emb: torch.Tensor,
negative_emb: torch.Tensor, margin: float = 0.2) -> Dict[str, float]:
"""Calculate triplet-specific metrics."""
# Calculate distances
pos_dist = F.pairwise_distance(anchor_emb, positive_emb, p=2)
neg_dist = F.pairwise_distance(anchor_emb, negative_emb, p=2)
# Triplet loss
triplet_loss = F.relu(pos_dist - neg_dist + margin).mean()
# Accuracy: positive distance < negative distance
correct = (pos_dist < neg_dist).float().mean()
# Margin violations
margin_violations = (pos_dist - neg_dist + margin > 0).float().mean()
# Distance statistics
pos_dist_mean = pos_dist.mean()
neg_dist_mean = neg_dist.mean()
distance_ratio = neg_dist_mean / (pos_dist_mean + 1e-8)
return {
"triplet_loss": float(triplet_loss),
"triplet_accuracy": float(correct),
"margin_violations": float(margin_violations),
"positive_distance_mean": float(pos_dist_mean),
"negative_distance_mean": float(neg_dist_mean),
"distance_ratio": float(distance_ratio)
}
def calculate_outfit_compatibility_metrics(outfit_scores: torch.Tensor,
labels: torch.Tensor) -> Dict[str, float]:
"""Calculate outfit compatibility specific metrics."""
# Convert to numpy for sklearn compatibility
scores_np = outfit_scores.detach().cpu().numpy()
labels_np = labels.detach().cpu().numpy()
# Binary classification metrics
pred_binary = (scores_np > 0.5).astype(int)
accuracy = accuracy_score(labels_np, pred_binary)
precision, recall, f1, _ = precision_recall_fscore_support(
labels_np, pred_binary, average='weighted', zero_division=0
)
# AUC if we have both classes
auc = None
if len(np.unique(labels_np)) > 1:
try:
auc = roc_auc_score(labels_np, scores_np)
except ValueError:
auc = None
# Score distribution metrics
compatible_scores = scores_np[labels_np == 1]
incompatible_scores = scores_np[labels_np == 0]
return {
"compatibility_accuracy": float(accuracy),
"compatibility_precision": float(precision),
"compatibility_recall": float(recall),
"compatibility_f1": float(f1),
"compatibility_auc": float(auc) if auc is not None else None,
"compatible_score_mean": float(np.mean(compatible_scores)) if len(compatible_scores) > 0 else 0,
"incompatible_score_mean": float(np.mean(incompatible_scores)) if len(incompatible_scores) > 0 else 0,
"score_separation": float(np.mean(compatible_scores) - np.mean(incompatible_scores)) if len(compatible_scores) > 0 and len(incompatible_scores) > 0 else 0
}
if __name__ == "__main__":
# Example usage
metrics = AdvancedMetrics()
# Simulate some data
predictions = torch.randn(100, 1)
targets = torch.randint(0, 2, (100, 1)).float()
scores = torch.sigmoid(predictions)
embeddings = torch.randn(100, 512)
metrics.add_batch(predictions, targets, scores, embeddings)
metrics.add_outfit_scores(scores.flatten().tolist())
# Calculate and save metrics
all_metrics = metrics.calculate_all_metrics()
print("Calculated metrics:")
print(json.dumps(all_metrics, indent=2))
# Save to file
metrics.save_metrics("test_metrics.json", {"model": "test", "epoch": 1})