PhishSentinel / src /models /evaluator.py
github-actions[bot]
Deploy to HF Spaces (ci)
0fd143d
"""
PhishLens Model Evaluator.
Produces comprehensive evaluation metrics, visualisations, and stress tests
for trained PhishLens classifiers. All outputs are saved to
`reports/figures/` and logged to MLflow.
Key security metrics:
- False Negative Rate (FNR): Fraction of phishing emails classified as
legitimate — the most critical security failure mode. Target FNR < 5%.
- False Positive Rate (FPR): Fraction of legitimate emails flagged as phishing.
High FPR causes alert fatigue and user trust erosion.
- Matthews Correlation Coefficient (MCC): Balanced metric robust to class imbalance.
- AUC-ROC: Discrimination ability across all confidence thresholds.
- Confusion matrix: Visualised and saved as PNG.
Security rationale: A phishing detector with 99% accuracy but 20% FNR is
dangerous — it misses 1 in 5 phishing emails. Evaluator explicitly surfaces FNR
and FPR as primary dashboard metrics, not just accuracy.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import joblib
import matplotlib
matplotlib.use("Agg") # Non-interactive backend for server/CI environments
import matplotlib.pyplot as plt
try:
import mlflow
_MLFLOW_AVAILABLE = True
except ImportError:
mlflow = None # type: ignore[assignment]
_MLFLOW_AVAILABLE = False
import numpy as np
import pandas as pd
from sklearn.metrics import (
ConfusionMatrixDisplay,
classification_report,
confusion_matrix,
f1_score,
matthews_corrcoef,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import StratifiedKFold
from src.utils.config import DEFAULT_CONFIG
from src.utils.logger import get_logger
log = get_logger(__name__)
FIGURES_DIR = Path("reports/figures")
class PhishLensEvaluator:
"""Evaluation engine for PhishLens classifiers.
Args:
threshold: Classification threshold (default 0.5).
Raise to reduce FPR (at cost of higher FNR).
"""
def __init__(self, threshold: float = 0.5) -> None:
self.threshold = threshold
self.results: Dict[str, Dict] = {}
def evaluate(
self,
model: Any,
X_test: np.ndarray,
y_test: np.ndarray,
model_name: str = "model",
scaler: Optional[Any] = None,
log_to_mlflow: bool = True,
) -> Dict:
"""Evaluate a classifier and compute all security-relevant metrics.
Args:
model: Fitted classifier with predict_proba() method.
X_test: Test feature matrix.
y_test: True labels.
model_name: Name for logging and file naming.
scaler: Optional StandardScaler (for LR models).
log_to_mlflow: Whether to log metrics to MLflow.
Returns:
Dict of evaluation metrics.
"""
X_eval = scaler.transform(X_test) if scaler else X_test
X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0)
proba = model.predict_proba(X_eval)[:, 1] # P(phishing)
y_pred = (proba >= self.threshold).astype(int)
cm = confusion_matrix(y_test, y_pred)
tn, fp, fn, tp = cm.ravel()
metrics = {
"model": model_name,
"threshold": self.threshold,
"precision": float(precision_score(y_test, y_pred, zero_division=0)),
"recall": float(recall_score(y_test, y_pred, zero_division=0)),
"f1": float(f1_score(y_test, y_pred, zero_division=0)),
"auc_roc": float(roc_auc_score(y_test, proba)),
"mcc": float(matthews_corrcoef(y_test, y_pred)),
"fnr": float(fn / (fn + tp)) if (fn + tp) > 0 else 0.0,
"fpr": float(fp / (fp + tn)) if (fp + tn) > 0 else 0.0,
"tp": int(tp), "tn": int(tn), "fp": int(fp), "fn": int(fn),
"n_test": len(y_test),
}
log.info(
f"\n[{model_name.upper()}] "
f"F1={metrics['f1']:.4f} | "
f"AUC={metrics['auc_roc']:.4f} | "
f"FNR={metrics['fnr']:.4f} | "
f"FPR={metrics['fpr']:.4f} | "
f"MCC={metrics['mcc']:.4f}"
)
if log_to_mlflow and _MLFLOW_AVAILABLE:
for k, v in metrics.items():
if isinstance(v, (int, float)):
mlflow.log_metric(f"test_{k}", v)
self.results[model_name] = metrics
return metrics
def plot_confusion_matrix(
self,
model: Any,
X_test: np.ndarray,
y_test: np.ndarray,
model_name: str = "model",
scaler: Optional[Any] = None,
) -> str:
"""Generate and save a confusion matrix PNG.
Returns:
File path to the saved PNG.
"""
X_eval = scaler.transform(X_test) if scaler else X_test
X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0)
y_pred = (model.predict_proba(X_eval)[:, 1] >= self.threshold).astype(int)
FIGURES_DIR.mkdir(parents=True, exist_ok=True)
fig, ax = plt.subplots(figsize=(6, 5))
disp = ConfusionMatrixDisplay.from_predictions(
y_test, y_pred,
display_labels=["Legitimate", "Phishing"],
cmap="Blues",
ax=ax,
)
ax.set_title(f"PhishLens — {model_name.upper()} Confusion Matrix")
plt.tight_layout()
out_path = str(FIGURES_DIR / f"cm_{model_name}.png")
fig.savefig(out_path, dpi=150)
plt.close(fig)
log.info(f"Confusion matrix saved to '{out_path}'")
return out_path
def compare_models(self) -> pd.DataFrame:
"""Produce a comparison DataFrame of all evaluated models.
Returns:
DataFrame sorted by F1 (descending).
"""
if not self.results:
return pd.DataFrame()
df = pd.DataFrame(self.results.values())
df = df.sort_values("f1", ascending=False).reset_index(drop=True)
return df
def stress_test(
self,
model: Any,
X_test: np.ndarray,
y_test: np.ndarray,
model_name: str = "model",
noise_levels: Tuple[float, ...] = (0.0, 0.05, 0.1, 0.2),
scaler: Optional[Any] = None,
) -> pd.DataFrame:
"""Stress test robustness against Gaussian feature noise.
Security rationale: Real phishing emails contain natural variation.
We simulate this by adding Gaussian noise to feature vectors and
measuring F1 degradation. A robust model should degrade gracefully.
Args:
model: Fitted classifier.
X_test: Test feature matrix.
y_test: True labels.
model_name: For logging.
noise_levels: Sigma values for Gaussian noise.
scaler: Optional StandardScaler.
Returns:
DataFrame with noise_level and corresponding F1 score.
"""
records: List[Dict] = []
rng = np.random.default_rng(seed=42)
for sigma in noise_levels:
if sigma == 0.0:
X_noisy = X_test.copy()
else:
noise = rng.normal(0, sigma, size=X_test.shape).astype(np.float32)
X_noisy = X_test + noise
X_eval = scaler.transform(X_noisy) if scaler else X_noisy
X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0)
proba = model.predict_proba(X_eval)[:, 1]
y_pred = (proba >= self.threshold).astype(int)
f1 = float(f1_score(y_test, y_pred, zero_division=0))
records.append({"model": model_name, "noise_sigma": sigma, "f1": f1})
log.info(f"Stress test [{model_name}] noise={sigma:.2f}: F1={f1:.4f}")
return pd.DataFrame(records)
def find_failure_modes(
self,
model: Any,
X_test: np.ndarray,
y_test: np.ndarray,
feature_names: List[str],
top_n: int = 20,
scaler: Optional[Any] = None,
) -> Dict:
"""Analyse false negatives (missed phishing) and false positives.
Returns the most common feature patterns in misclassified emails
to help identify weaknesses and adversarial attack surfaces.
Args:
model: Fitted classifier.
X_test: Test feature matrix.
y_test: True labels.
feature_names: List of feature names.
top_n: Number of top features to show per failure mode.
scaler: Optional StandardScaler.
Returns:
Dict with 'false_negatives' and 'false_positives' feature summaries.
"""
X_eval = scaler.transform(X_test) if scaler else X_test
X_eval = np.nan_to_num(X_eval, nan=0.0, posinf=0.0, neginf=0.0)
proba = model.predict_proba(X_eval)[:, 1]
y_pred = (proba >= self.threshold).astype(int)
fn_mask = (y_test == 1) & (y_pred == 0) # Phishing missed
fp_mask = (y_test == 0) & (y_pred == 1) # Legitimate flagged
def top_features(X_subset: np.ndarray) -> List[Dict]:
if len(X_subset) == 0:
return []
means = X_subset.mean(axis=0)
top_idx = np.argsort(means)[::-1][:top_n]
return [
{"feature": feature_names[i] if i < len(feature_names) else f"feat_{i}",
"mean_value": float(means[i])}
for i in top_idx
]
return {
"false_negative_count": int(fn_mask.sum()),
"false_positive_count": int(fp_mask.sum()),
"false_negatives_top_features": top_features(X_test[fn_mask]),
"false_positives_top_features": top_features(X_test[fp_mask]),
}