""" PhishLens Model Trainer. Trains an ensemble of classifiers: - Logistic Regression (strong interpretable baseline) - Random Forest (captures non-linear feature interactions) - XGBoost (gradient boosting, best single model for tabular phishing features) - LightGBM (faster alternative to XGBoost for large corpora) - CatBoost (handles categorical-like encoded features well) All models undergo Optuna hyperparameter optimisation (50 trials) with 5-fold stratified cross-validation and are logged to MLflow. Security rationale: An ensemble reduces the risk that a targeted adversarial attack against one model architecture defeats the overall system. Each model learns slightly different feature interactions, so an attacker must simultaneously craft an email that evades all five — significantly harder in practice. """ from __future__ import annotations import time from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import joblib import mlflow import numpy as np import optuna import pandas as pd from catboost import CatBoostClassifier from imblearn.over_sampling import SMOTE from lightgbm import LGBMClassifier from sklearn.base import clone from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import f1_score from sklearn.model_selection import StratifiedKFold from sklearn.preprocessing import StandardScaler from xgboost import XGBClassifier from src.utils.config import DEFAULT_CONFIG, PhishLensConfig from src.utils.logger import get_logger log = get_logger(__name__) optuna.logging.set_verbosity(optuna.logging.WARNING) # Suppress Optuna verbosity # Models available for training AVAILABLE_MODELS = ("lr", "rf", "xgboost", "lightgbm", "catboost") class PhishLensTrainer: """Trains and optimises PhishLens classifiers. Args: config: PhishLensConfig instance with hyperparameter search spaces. model_names: Which models to train (default: all). tune: If True, run Optuna hyperparameter search (50 trials). n_folds: Number of CV folds (default: 5). use_smote: If True, oversample minority class with SMOTE. """ def __init__( self, config: PhishLensConfig = DEFAULT_CONFIG, model_names: Tuple[str, ...] = AVAILABLE_MODELS, tune: bool = True, n_folds: int = 5, use_smote: bool = True, ) -> None: self.config = config self.model_names = model_names self.tune = tune self.n_folds = n_folds self.use_smote = use_smote self.trained_models: Dict[str, Any] = {} self.scalers: Dict[str, StandardScaler] = {} self.cv_scores: Dict[str, float] = {} def train( self, X: np.ndarray, y: np.ndarray, feature_names: Optional[List[str]] = None, experiment_name: str = "PhishLens", save_checkpoint_dir: Optional[str] = None, ) -> Dict[str, Any]: """Train all configured classifiers. Args: X: Feature matrix shape [n_samples, n_features]. y: Binary labels (0=legitimate, 1=phishing). feature_names: Feature names for MLflow logging. experiment_name: MLflow experiment name. Returns: Dict mapping model name to fitted classifier. """ import torch as _torch if _torch.cuda.is_available(): log.info(f"GPU: {_torch.cuda.get_device_name(0)} | VRAM: " f"{_torch.cuda.get_device_properties(0).total_memory // 1024**2:,} MB") log.info("GPU models: XGBoost(cuda:0) LightGBM(gpu) CatBoost(GPU)") else: log.info("CUDA not available — all models will use CPU") log.info(f"Training PhishLens models: {self.model_names}") log.info(f"Dataset: {X.shape[0]:,} samples, {X.shape[1]:,} features, " f"{y.sum():,} phishing, {(y==0).sum():,} legitimate") # Clean NaN/inf from feature matrix X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32) # Balance classes with SMOTE before training if self.use_smote: log.info("Applying SMOTE oversampling ...") smote = SMOTE(random_state=self.config.random_state, k_neighbors=5) X, y = smote.fit_resample(X, y) log.info(f"After SMOTE: {X.shape[0]:,} samples") mlflow.set_experiment(experiment_name) from tqdm.auto import tqdm as _tqdm n_models = len(self.model_names) pbar = _tqdm( enumerate(self.model_names, 1), total=n_models, desc=" Training models", unit="model", ncols=100, colour="green", ) for idx, model_name in pbar: pbar.set_description(f" [{idx}/{n_models}] {model_name.upper()}") log.info(f"\n{'='*60}") log.info(f"Training: {model_name.upper()}") self._train_one(model_name, X, y, feature_names, experiment_name, save_checkpoint_dir) pbar.set_postfix(cv_f1=f"{self.cv_scores.get(model_name, 0):.4f}") return self.trained_models def _train_one( self, name: str, X: np.ndarray, y: np.ndarray, feature_names: Optional[List[str]], experiment_name: str, save_checkpoint_dir: Optional[str] = None, ) -> None: """Train a single model with optional Optuna tuning.""" start = time.time() with mlflow.start_run(run_name=f"PhishLens_{name}"): mlflow.log_param("model", name) mlflow.log_param("n_samples", X.shape[0]) mlflow.log_param("n_features", X.shape[1]) mlflow.log_param("tune", self.tune) if self.tune: log.info(f"Running Optuna search ({self.config.optuna_trials} trials) for {name} ...") best_params = self._optuna_tune(name, X, y) mlflow.log_params(best_params) else: best_params = {} model = self._build_model(name, best_params) # Scale features for Logistic Regression if name == "lr": scaler = StandardScaler() X_fit = scaler.fit_transform(X) self.scalers[name] = scaler else: X_fit = X # Stratified K-Fold cross-validation — manual loop so tqdm can show # per-fold progress. 3 folds when not tuning (faster; tune already # did 3-fold CV per Optuna trial). GPU folds run sequentially (no # parallel CUDA context conflicts). n_folds_cv = 3 if not self.tune else self.n_folds cv = StratifiedKFold( n_splits=n_folds_cv, shuffle=True, random_state=self.config.random_state, ) log.info(f"Running {n_folds_cv}-fold CV for {name} ...") from tqdm.auto import tqdm as _tqdm _cv_scores: List[float] = [] _fold_bar = _tqdm( enumerate(cv.split(X_fit, y), 1), total=n_folds_cv, desc=f" {name} CV", unit="fold", ncols=100, leave=True, colour="yellow", ) for _fold_num, (_tr, _val) in _fold_bar: _fm = clone(model) _fm.fit(X_fit[_tr], y[_tr]) _fold_f1 = f1_score(y[_val], _fm.predict(X_fit[_val])) _cv_scores.append(_fold_f1) _fold_bar.set_postfix( fold=f"{_fold_num}/{n_folds_cv}", f1=f"{_fold_f1:.4f}", mean=f"{np.mean(_cv_scores):.4f}", ) cv_f1 = np.array(_cv_scores) self.cv_scores[name] = float(cv_f1.mean()) log.info(f"{name} CV F1: {cv_f1.mean():.4f} ± {cv_f1.std():.4f}") mlflow.log_metric("cv_f1_mean", cv_f1.mean()) mlflow.log_metric("cv_f1_std", cv_f1.std()) # Fit on full dataset — enable verbose progress for tree-based models log.info(f"Fitting {name} on full training set ({X_fit.shape[0]:,} samples) ...") if name == "lightgbm": import lightgbm as _lgb model.fit(X_fit, y, callbacks=[_lgb.log_evaluation(period=50)]) elif name == "xgboost": model.set_params(verbosity=1) model.fit(X_fit, y) model.set_params(verbosity=0) elif name == "catboost": # Flush any CUDA memory held by the embedding model or previous # GPU models before CatBoost allocates its own GPU context. try: import torch as _t if _t.cuda.is_available(): _t.cuda.empty_cache() _free = _t.cuda.get_device_properties(0).total_memory - _t.cuda.memory_reserved(0) log.info(f"CUDA cache cleared. Free VRAM: {_free // 1024**2} MB") except Exception: pass # Note: CatBoost forbids set_params() after fitting, so verbose # is set only before training via _build_model (verbose=0 default). model.set_params(verbose=100) model.fit(X_fit, y) # Do NOT call model.set_params(verbose=0) here — CatBoost raises # CatBoostError: You can't change params of fitted model. else: model.fit(X_fit, y) self.trained_models[name] = model # Checkpoint: save immediately so a crash later doesn't lose this model. if save_checkpoint_dir is not None: _ckpt = Path(save_checkpoint_dir) / f"{name}.pkl" Path(save_checkpoint_dir).mkdir(parents=True, exist_ok=True) joblib.dump(model, _ckpt) log.info(f" Checkpoint saved → {_ckpt.name}") elapsed = time.time() - start log.info(f"{name} trained in {elapsed:.1f}s") mlflow.log_metric("training_time_s", elapsed) def _optuna_tune(self, name: str, X: np.ndarray, y: np.ndarray) -> Dict: """Run Optuna hyperparameter search for the specified model.""" cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=self.config.random_state) # GPU models must run CV folds sequentially (n_jobs=1) — spawning multiple # loky worker processes each initialising a CUDA context on the same device # causes context conflicts and silent hangs. CPU models keep n_jobs=-1. _gpu_models = {"xgboost", "lightgbm", "catboost"} _tune_cv_jobs = 1 if name in _gpu_models else -1 def objective(trial: optuna.Trial) -> float: params = self._suggest_params(trial, name) model = self._build_model(name, params) if name == "lr": scaler = StandardScaler() X_t = scaler.fit_transform(X) else: X_t = X scores = cross_val_score(model, X_t, y, cv=cv, scoring="f1", n_jobs=_tune_cv_jobs) return float(scores.mean()) study = optuna.create_study(direction="maximize") study.optimize( objective, n_trials=self.config.optuna_trials, show_progress_bar=False, ) log.info(f"Optuna best F1 for {name}: {study.best_value:.4f}") return study.best_params def _suggest_params(self, trial: optuna.Trial, name: str) -> Dict: """Define Optuna hyperparameter search space per model.""" spaces = self.config.optuna_search_spaces if name == "lr": return { "C": trial.suggest_float("C", *spaces["lr"]["C"], log=True), "max_iter": trial.suggest_int("max_iter", *spaces["lr"]["max_iter"]), } elif name == "rf": return { "n_estimators": trial.suggest_int("n_estimators", *spaces["rf"]["n_estimators"]), "max_depth": trial.suggest_int("max_depth", *spaces["rf"]["max_depth"]), "min_samples_leaf": trial.suggest_int("min_samples_leaf", *spaces["rf"]["min_samples_leaf"]), } elif name == "xgboost": sp = spaces["xgboost"] return { "n_estimators": trial.suggest_int("n_estimators", *sp["n_estimators"]), "max_depth": trial.suggest_int("max_depth", *sp["max_depth"]), "learning_rate": trial.suggest_float("learning_rate", *sp["learning_rate"], log=True), "subsample": trial.suggest_float("subsample", *sp["subsample"]), "colsample_bytree": trial.suggest_float("colsample_bytree", *sp["colsample_bytree"]), } elif name == "lightgbm": sp = spaces["lightgbm"] return { "n_estimators": trial.suggest_int("n_estimators", *sp["n_estimators"]), "max_depth": trial.suggest_int("max_depth", *sp["max_depth"]), "learning_rate": trial.suggest_float("learning_rate", *sp["learning_rate"], log=True), "num_leaves": trial.suggest_int("num_leaves", *sp["num_leaves"]), } elif name == "catboost": sp = spaces["catboost"] return { "iterations": trial.suggest_int("iterations", *sp["iterations"]), "depth": trial.suggest_int("depth", *sp["depth"]), "learning_rate": trial.suggest_float("learning_rate", *sp["learning_rate"], log=True), } return {} def _build_model(self, name: str, params: Dict) -> Any: """Instantiate a classifier with the given hyperparameters.""" import torch _cuda = torch.cuda.is_available() rs = self.config.random_state if name == "lr": return LogisticRegression( C=params.get("C", 1.0), max_iter=params.get("max_iter", 1000), solver="lbfgs", class_weight="balanced", random_state=rs, ) elif name == "rf": return RandomForestClassifier( n_estimators=params.get("n_estimators", 300), max_depth=params.get("max_depth", 20), min_samples_leaf=params.get("min_samples_leaf", 2), class_weight="balanced", random_state=rs, n_jobs=-1, ) elif name == "xgboost": # tree_method="hist" + device="cuda:0": GPU-accelerated histogram algorithm. # n_jobs=1 when on GPU — XGBoost GPU handles all parallelism via CUDA; # setting n_jobs>1 would launch CPU threads competing with the GPU kernel. xgb_gpu = {"tree_method": "hist", "device": "cuda:0"} if _cuda else {} return XGBClassifier( n_estimators=params.get("n_estimators", 300), max_depth=params.get("max_depth", 6), learning_rate=params.get("learning_rate", 0.05), subsample=params.get("subsample", 0.8), colsample_bytree=params.get("colsample_bytree", 0.8), eval_metric="logloss", scale_pos_weight=1, random_state=rs, n_jobs=1 if _cuda else -1, **xgb_gpu, ) elif name == "lightgbm": # device="gpu": uses OpenCL GPU acceleration. # n_jobs=1 when on GPU — same reason as XGBoost: GPU handles # parallelism internally; CPU thread pool would conflict with it. return LGBMClassifier( n_estimators=params.get("n_estimators", 300), max_depth=params.get("max_depth", -1), learning_rate=params.get("learning_rate", 0.05), num_leaves=params.get("num_leaves", 31), class_weight="balanced", random_state=rs, n_jobs=1 if _cuda else -1, verbose=-1, device="gpu" if _cuda else "cpu", ) elif name == "catboost": import tempfile, os _cb_train_dir = os.path.join(tempfile.gettempdir(), "catboost_info") return CatBoostClassifier( iterations=params.get("iterations", 300), depth=params.get("depth", 6), learning_rate=params.get("learning_rate", 0.05), auto_class_weights="Balanced", random_seed=rs, verbose=0, task_type="GPU" if _cuda else "CPU", devices="0" if _cuda else None, train_dir=_cb_train_dir, gpu_ram_part=0.7 if _cuda else None, ) raise ValueError(f"Unknown model name: {name}") def save_all(self, output_dir: str) -> None: """Save all trained models to disk.""" out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) for name, model in self.trained_models.items(): joblib.dump(model, out / f"{name}.pkl") log.info(f"Saved {name} to {out / f'{name}.pkl'}") for name, scaler in self.scalers.items(): joblib.dump(scaler, out / f"{name}_scaler.pkl") log.info(f"All models saved to '{output_dir}'")