""" PhishLens Model Evaluator. Produces comprehensive evaluation metrics, visualisations, and stress tests for trained PhishLens classifiers. All outputs are saved to `reports/figures/` and logged to MLflow. Key security metrics: - False Negative Rate (FNR): Fraction of phishing emails classified as legitimate — the most critical security failure mode. Target FNR < 5%. - False Positive Rate (FPR): Fraction of legitimate emails flagged as phishing. High FPR causes alert fatigue and user trust erosion. - Matthews Correlation Coefficient (MCC): Balanced metric robust to class imbalance. - AUC-ROC: Discrimination ability across all confidence thresholds. - Confusion matrix: Visualised and saved as PNG. Security rationale: A phishing detector with 99% accuracy but 20% FNR is dangerous — it misses 1 in 5 phishing emails. Evaluator explicitly surfaces FNR and FPR as primary dashboard metrics, not just accuracy. """ from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import joblib import matplotlib matplotlib.use("Agg") # Non-interactive backend for server/CI environments import matplotlib.pyplot as plt try: import mlflow _MLFLOW_AVAILABLE = True except ImportError: mlflow = None # type: ignore[assignment] _MLFLOW_AVAILABLE = False import numpy as np import pandas as pd from sklearn.metrics import ( ConfusionMatrixDisplay, classification_report, confusion_matrix, f1_score, matthews_corrcoef, precision_score, recall_score, roc_auc_score, ) from sklearn.model_selection import StratifiedKFold from src.utils.config import DEFAULT_CONFIG from src.utils.logger import get_logger log = get_logger(__name__) FIGURES_DIR = Path("reports/figures") class PhishLensEvaluator: """Evaluation engine for PhishLens classifiers. Args: threshold: Classification threshold (default 0.5). Raise to reduce FPR (at cost of higher FNR). """ def __init__(self, threshold: float = 0.5) -> None: self.threshold = threshold self.results: Dict[str, Dict] = {} def evaluate( self, model: Any, X_test: np.ndarray, y_test: np.ndarray, model_name: str = "model", scaler: Optional[Any] = None, log_to_mlflow: bool = True, ) -> Dict: """Evaluate a classifier and compute all security-relevant metrics. Args: model: Fitted classifier with predict_proba() method. X_test: Test feature matrix. y_test: True labels. model_name: Name for logging and file naming. scaler: Optional StandardScaler (for LR models). log_to_mlflow: Whether to log metrics to MLflow. Returns: Dict of evaluation metrics. """ X_eval = scaler.transform(X_test) if scaler else X_test X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0) proba = model.predict_proba(X_eval)[:, 1] # P(phishing) y_pred = (proba >= self.threshold).astype(int) cm = confusion_matrix(y_test, y_pred) tn, fp, fn, tp = cm.ravel() metrics = { "model": model_name, "threshold": self.threshold, "precision": float(precision_score(y_test, y_pred, zero_division=0)), "recall": float(recall_score(y_test, y_pred, zero_division=0)), "f1": float(f1_score(y_test, y_pred, zero_division=0)), "auc_roc": float(roc_auc_score(y_test, proba)), "mcc": float(matthews_corrcoef(y_test, y_pred)), "fnr": float(fn / (fn + tp)) if (fn + tp) > 0 else 0.0, "fpr": float(fp / (fp + tn)) if (fp + tn) > 0 else 0.0, "tp": int(tp), "tn": int(tn), "fp": int(fp), "fn": int(fn), "n_test": len(y_test), } log.info( f"\n[{model_name.upper()}] " f"F1={metrics['f1']:.4f} | " f"AUC={metrics['auc_roc']:.4f} | " f"FNR={metrics['fnr']:.4f} | " f"FPR={metrics['fpr']:.4f} | " f"MCC={metrics['mcc']:.4f}" ) if log_to_mlflow and _MLFLOW_AVAILABLE: for k, v in metrics.items(): if isinstance(v, (int, float)): mlflow.log_metric(f"test_{k}", v) self.results[model_name] = metrics return metrics def plot_confusion_matrix( self, model: Any, X_test: np.ndarray, y_test: np.ndarray, model_name: str = "model", scaler: Optional[Any] = None, ) -> str: """Generate and save a confusion matrix PNG. Returns: File path to the saved PNG. """ X_eval = scaler.transform(X_test) if scaler else X_test X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0) y_pred = (model.predict_proba(X_eval)[:, 1] >= self.threshold).astype(int) FIGURES_DIR.mkdir(parents=True, exist_ok=True) fig, ax = plt.subplots(figsize=(6, 5)) disp = ConfusionMatrixDisplay.from_predictions( y_test, y_pred, display_labels=["Legitimate", "Phishing"], cmap="Blues", ax=ax, ) ax.set_title(f"PhishLens — {model_name.upper()} Confusion Matrix") plt.tight_layout() out_path = str(FIGURES_DIR / f"cm_{model_name}.png") fig.savefig(out_path, dpi=150) plt.close(fig) log.info(f"Confusion matrix saved to '{out_path}'") return out_path def compare_models(self) -> pd.DataFrame: """Produce a comparison DataFrame of all evaluated models. Returns: DataFrame sorted by F1 (descending). """ if not self.results: return pd.DataFrame() df = pd.DataFrame(self.results.values()) df = df.sort_values("f1", ascending=False).reset_index(drop=True) return df def stress_test( self, model: Any, X_test: np.ndarray, y_test: np.ndarray, model_name: str = "model", noise_levels: Tuple[float, ...] = (0.0, 0.05, 0.1, 0.2), scaler: Optional[Any] = None, ) -> pd.DataFrame: """Stress test robustness against Gaussian feature noise. Security rationale: Real phishing emails contain natural variation. We simulate this by adding Gaussian noise to feature vectors and measuring F1 degradation. A robust model should degrade gracefully. Args: model: Fitted classifier. X_test: Test feature matrix. y_test: True labels. model_name: For logging. noise_levels: Sigma values for Gaussian noise. scaler: Optional StandardScaler. Returns: DataFrame with noise_level and corresponding F1 score. """ records: List[Dict] = [] rng = np.random.default_rng(seed=42) for sigma in noise_levels: if sigma == 0.0: X_noisy = X_test.copy() else: noise = rng.normal(0, sigma, size=X_test.shape).astype(np.float32) X_noisy = X_test + noise X_eval = scaler.transform(X_noisy) if scaler else X_noisy X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0) proba = model.predict_proba(X_eval)[:, 1] y_pred = (proba >= self.threshold).astype(int) f1 = float(f1_score(y_test, y_pred, zero_division=0)) records.append({"model": model_name, "noise_sigma": sigma, "f1": f1}) log.info(f"Stress test [{model_name}] noise={sigma:.2f}: F1={f1:.4f}") return pd.DataFrame(records) def find_failure_modes( self, model: Any, X_test: np.ndarray, y_test: np.ndarray, feature_names: List[str], top_n: int = 20, scaler: Optional[Any] = None, ) -> Dict: """Analyse false negatives (missed phishing) and false positives. Returns the most common feature patterns in misclassified emails to help identify weaknesses and adversarial attack surfaces. Args: model: Fitted classifier. X_test: Test feature matrix. y_test: True labels. feature_names: List of feature names. top_n: Number of top features to show per failure mode. scaler: Optional StandardScaler. Returns: Dict with 'false_negatives' and 'false_positives' feature summaries. """ X_eval = scaler.transform(X_test) if scaler else X_test X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0) proba = model.predict_proba(X_eval)[:, 1] y_pred = (proba >= self.threshold).astype(int) fn_mask = (y_test == 1) & (y_pred == 0) # Phishing missed fp_mask = (y_test == 0) & (y_pred == 1) # Legitimate flagged def top_features(X_subset: np.ndarray) -> List[Dict]: if len(X_subset) == 0: return [] means = X_subset.mean(axis=0) top_idx = np.argsort(means)[::-1][:top_n] return [ {"feature": feature_names[i] if i < len(feature_names) else f"feat_{i}", "mean_value": float(means[i])} for i in top_idx ] return { "false_negative_count": int(fn_mask.sum()), "false_positive_count": int(fp_mask.sum()), "false_negatives_top_features": top_features(X_test[fn_mask]), "false_positives_top_features": top_features(X_test[fp_mask]), }