|
|
""" |
|
|
models/anomaly-detection/src/utils/metrics.py |
|
|
Clustering and anomaly detection metrics for model evaluation |
|
|
""" |
|
|
import numpy as np |
|
|
from typing import Dict, Any, Optional, List |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger("metrics") |
|
|
|
|
|
|
|
|
try: |
|
|
from sklearn.metrics import ( |
|
|
silhouette_score, |
|
|
calinski_harabasz_score, |
|
|
davies_bouldin_score, |
|
|
adjusted_rand_score, |
|
|
normalized_mutual_info_score |
|
|
) |
|
|
SKLEARN_AVAILABLE = True |
|
|
except ImportError: |
|
|
SKLEARN_AVAILABLE = False |
|
|
logger.warning("scikit-learn not available for metrics") |
|
|
|
|
|
|
|
|
def calculate_clustering_metrics( |
|
|
X: np.ndarray, |
|
|
labels: np.ndarray, |
|
|
true_labels: Optional[np.ndarray] = None |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate comprehensive clustering quality metrics. |
|
|
|
|
|
Args: |
|
|
X: Feature matrix (n_samples, n_features) |
|
|
labels: Predicted cluster labels |
|
|
true_labels: Optional ground truth labels for supervised metrics |
|
|
|
|
|
Returns: |
|
|
Dict of metric_name -> metric_value |
|
|
""" |
|
|
if not SKLEARN_AVAILABLE: |
|
|
logger.warning("sklearn not available, returning empty metrics") |
|
|
return {} |
|
|
|
|
|
metrics = {} |
|
|
|
|
|
|
|
|
valid_mask = labels >= 0 |
|
|
n_clusters = len(set(labels[valid_mask])) |
|
|
|
|
|
|
|
|
if n_clusters < 2 or np.sum(valid_mask) < 2: |
|
|
metrics["n_clusters"] = n_clusters |
|
|
metrics["n_noise_points"] = np.sum(labels == -1) |
|
|
metrics["error"] = "insufficient_clusters" |
|
|
return metrics |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
metrics["silhouette_score"] = float(silhouette_score( |
|
|
X[valid_mask], labels[valid_mask] |
|
|
)) |
|
|
except Exception as e: |
|
|
logger.debug(f"Silhouette score failed: {e}") |
|
|
metrics["silhouette_score"] = None |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
metrics["calinski_harabasz_score"] = float(calinski_harabasz_score( |
|
|
X[valid_mask], labels[valid_mask] |
|
|
)) |
|
|
except Exception as e: |
|
|
logger.debug(f"Calinski-Harabasz failed: {e}") |
|
|
metrics["calinski_harabasz_score"] = None |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
metrics["davies_bouldin_score"] = float(davies_bouldin_score( |
|
|
X[valid_mask], labels[valid_mask] |
|
|
)) |
|
|
except Exception as e: |
|
|
logger.debug(f"Davies-Bouldin failed: {e}") |
|
|
metrics["davies_bouldin_score"] = None |
|
|
|
|
|
|
|
|
metrics["n_clusters"] = n_clusters |
|
|
metrics["n_samples"] = len(labels) |
|
|
metrics["n_noise_points"] = int(np.sum(labels == -1)) |
|
|
metrics["noise_ratio"] = float(np.sum(labels == -1) / len(labels)) |
|
|
|
|
|
|
|
|
cluster_sizes = [np.sum(labels == i) for i in range(n_clusters)] |
|
|
metrics["min_cluster_size"] = int(min(cluster_sizes)) if cluster_sizes else 0 |
|
|
metrics["max_cluster_size"] = int(max(cluster_sizes)) if cluster_sizes else 0 |
|
|
metrics["mean_cluster_size"] = float(np.mean(cluster_sizes)) if cluster_sizes else 0 |
|
|
|
|
|
|
|
|
if true_labels is not None: |
|
|
try: |
|
|
|
|
|
metrics["adjusted_rand_score"] = float(adjusted_rand_score( |
|
|
true_labels, labels |
|
|
)) |
|
|
except Exception as e: |
|
|
logger.debug(f"ARI failed: {e}") |
|
|
|
|
|
try: |
|
|
|
|
|
metrics["normalized_mutual_info"] = float(normalized_mutual_info_score( |
|
|
true_labels, labels |
|
|
)) |
|
|
except Exception as e: |
|
|
logger.debug(f"NMI failed: {e}") |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
def calculate_anomaly_metrics( |
|
|
labels: np.ndarray, |
|
|
predicted_anomalies: np.ndarray, |
|
|
true_anomalies: Optional[np.ndarray] = None |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate anomaly detection metrics. |
|
|
|
|
|
Args: |
|
|
labels: Cluster labels or -1 for anomalies |
|
|
predicted_anomalies: Boolean array of predicted anomaly flags |
|
|
true_anomalies: Optional ground truth anomaly flags |
|
|
|
|
|
Returns: |
|
|
Dict of metric_name -> metric_value |
|
|
""" |
|
|
metrics = {} |
|
|
|
|
|
n_samples = len(labels) |
|
|
n_predicted_anomalies = int(np.sum(predicted_anomalies)) |
|
|
|
|
|
metrics["n_samples"] = n_samples |
|
|
metrics["n_predicted_anomalies"] = n_predicted_anomalies |
|
|
metrics["anomaly_rate"] = float(n_predicted_anomalies / n_samples) if n_samples > 0 else 0 |
|
|
|
|
|
|
|
|
if true_anomalies is not None: |
|
|
n_true_anomalies = int(np.sum(true_anomalies)) |
|
|
|
|
|
|
|
|
tp = int(np.sum(predicted_anomalies & true_anomalies)) |
|
|
|
|
|
fp = int(np.sum(predicted_anomalies & ~true_anomalies)) |
|
|
|
|
|
fn = int(np.sum(~predicted_anomalies & true_anomalies)) |
|
|
|
|
|
tn = int(np.sum(~predicted_anomalies & ~true_anomalies)) |
|
|
|
|
|
metrics["true_positives"] = tp |
|
|
metrics["false_positives"] = fp |
|
|
metrics["false_negatives"] = fn |
|
|
metrics["true_negatives"] = tn |
|
|
|
|
|
|
|
|
metrics["precision"] = float(tp / (tp + fp)) if (tp + fp) > 0 else 0 |
|
|
|
|
|
|
|
|
metrics["recall"] = float(tp / (tp + fn)) if (tp + fn) > 0 else 0 |
|
|
|
|
|
|
|
|
if metrics["precision"] + metrics["recall"] > 0: |
|
|
metrics["f1_score"] = float( |
|
|
2 * metrics["precision"] * metrics["recall"] / |
|
|
(metrics["precision"] + metrics["recall"]) |
|
|
) |
|
|
else: |
|
|
metrics["f1_score"] = 0 |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
def calculate_optuna_objective( |
|
|
X: np.ndarray, |
|
|
labels: np.ndarray, |
|
|
objective_type: str = "silhouette" |
|
|
) -> float: |
|
|
""" |
|
|
Calculate objective value for Optuna optimization. |
|
|
|
|
|
Args: |
|
|
X: Feature matrix |
|
|
labels: Predicted labels |
|
|
objective_type: 'silhouette', 'calinski', or 'combined' |
|
|
|
|
|
Returns: |
|
|
Objective value (higher is better) |
|
|
""" |
|
|
metrics = calculate_clustering_metrics(X, labels) |
|
|
|
|
|
|
|
|
if "error" in metrics: |
|
|
return -1.0 |
|
|
|
|
|
if objective_type == "silhouette": |
|
|
score = metrics.get("silhouette_score") |
|
|
return score if score is not None else -1.0 |
|
|
|
|
|
elif objective_type == "calinski": |
|
|
score = metrics.get("calinski_harabasz_score") |
|
|
|
|
|
return min(score / 1000, 1.0) if score is not None else -1.0 |
|
|
|
|
|
elif objective_type == "combined": |
|
|
|
|
|
silhouette = metrics.get("silhouette_score", -1) |
|
|
calinski = min(metrics.get("calinski_harabasz_score", 0) / 1000, 1) |
|
|
davies = metrics.get("davies_bouldin_score", 10) |
|
|
|
|
|
|
|
|
davies_inv = 1 / (1 + davies) if davies is not None else 0 |
|
|
|
|
|
|
|
|
combined = (0.4 * silhouette + 0.3 * calinski + 0.3 * davies_inv) |
|
|
return float(combined) |
|
|
|
|
|
return -1.0 |
|
|
|
|
|
|
|
|
def format_metrics_report(metrics: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Format metrics dictionary as a readable report. |
|
|
|
|
|
Args: |
|
|
metrics: Dictionary of metric values |
|
|
|
|
|
Returns: |
|
|
Formatted string report |
|
|
""" |
|
|
lines = ["=" * 50] |
|
|
lines.append("CLUSTERING METRICS REPORT") |
|
|
lines.append("=" * 50) |
|
|
|
|
|
for key, value in metrics.items(): |
|
|
if value is None: |
|
|
value_str = "N/A" |
|
|
elif isinstance(value, float): |
|
|
value_str = f"{value:.4f}" |
|
|
else: |
|
|
value_str = str(value) |
|
|
|
|
|
lines.append(f"{key:30s}: {value_str}") |
|
|
|
|
|
lines.append("=" * 50) |
|
|
return "\n".join(lines) |
|
|
|