# model_trainer.py import os import sys import json import numpy as np from dataclasses import dataclass import optuna from optuna.samplers import TPESampler optuna.logging.set_verbosity(optuna.logging.WARNING) import mlflow import mlflow.sklearn import matplotlib matplotlib.use('Agg') from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, BaggingClassifier from sklearn.tree import DecisionTreeClassifier from sklearn.feature_selection import RFECV from sklearn.metrics import ( recall_score, roc_auc_score, classification_report ) from lightgbm import LGBMClassifier from xgboost import XGBClassifier from src.exception import CustomException from src.logger import logging from src.utils import save_object, evaluate_models, evaluate_final_model, save_best_params, load_best_params @dataclass class ModelTrainerConfig: trained_model_file_path: str = os.path.join("artifacts", "model.pkl") eval_report_path: str = os.path.join("artifacts", "eval_report.png") def tune_lightgbm(X_train, y_train, X_val, y_val, n_trials=20, params_path="artifacts/params/lightgbm_best_params.json"): if os.path.exists(params_path): logging.info("LightGBM: found saved params, skipping Tuning.") return LGBMClassifier(**load_best_params(params_path), random_state=42, verbosity=-1) def objective(trial): params = { 'n_estimators': trial.suggest_int('n_estimators', 200, 1000), 'num_leaves': trial.suggest_int('num_leaves', 20, 80), 'max_depth': trial.suggest_int('max_depth', 5, 9), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.1, log=True), 'min_child_samples': trial.suggest_int('min_child_samples', 20, 100), 'reg_lambda': trial.suggest_float('reg_lambda', 0.0, 10.0), 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), } model = LGBMClassifier(**params, random_state=42, verbosity=-1) model.fit(X_train, y_train) return roc_auc_score(y_val, model.predict_proba(X_val)[:, 1]) study = optuna.create_study(direction="maximize", sampler=TPESampler(seed=42)) study.optimize(objective, n_trials=n_trials, show_progress_bar=True) logging.info(f"LightGBM best params: {study.best_params} | AUC: {study.best_value: .4f}") save_best_params(study.best_params, params_path) return LGBMClassifier(**study.best_params, random_state=42, verbosity=-1) def tune_xgboost(X_train, y_train, X_val, y_val, n_trials=20, params_path="artifacts/params/xgboost_best_params.json"): if os.path.exists(params_path): logging.info("XGBoost: found saved params, skipping Optuna.") return XGBClassifier(**load_best_params(params_path), random_state=42, eval_metric='logloss', verbosity=0) def objective(trial): params = { 'n_estimators': trial.suggest_int('n_estimators', 200, 1000), 'max_depth': trial.suggest_int('max_depth', 3, 8), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.1, log=True), 'subsample': trial.suggest_float('subsample', 0.6, 1.0), 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), 'reg_lambda': trial.suggest_float('reg_lambda', 0.0, 5.0), 'min_child_weight': trial.suggest_int('min_child_weight', 1, 10), } model = XGBClassifier(**params, random_state=42, eval_metric='logloss', verbosity=0) model.fit(X_train, y_train) return roc_auc_score(y_val, model.predict_proba(X_val)[:, 1]) study = optuna.create_study(direction='maximize', sampler=TPESampler(seed=42)) study.optimize(objective, n_trials=n_trials, show_progress_bar=True) logging.info(f"XGBoost best params: {study.best_params} | AUC: {study.best_value:.4f}") save_best_params(study.best_params, params_path) return XGBClassifier(**study.best_params, random_state=42, eval_metric='logloss', verbosity=0) def tune_random_forest(X_train, y_train, X_val, y_val, n_trials=20, params_path="artifacts/params/rf_best_params.json"): if os.path.exists(params_path): logging.info("Random Forest: found saved params, skipping Optuna.") return RandomForestClassifier(**load_best_params(params_path), random_state=42) def objective(trial): params = { 'n_estimators': trial.suggest_int('n_estimators', 100, 500), 'max_depth': trial.suggest_int('max_depth', 5, 15), 'min_samples_leaf': trial.suggest_int('min_samples_leaf', 5, 30), 'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', 0.5]), } model = RandomForestClassifier(**params, random_state=42) model.fit(X_train, y_train) return roc_auc_score(y_val, model.predict_proba(X_val)[:, 1]) study = optuna.create_study(direction='maximize', sampler=TPESampler(seed=42)) study.optimize(objective, n_trials=n_trials, show_progress_bar=True) logging.info(f"Random Forest best params: {study.best_params} | AUC: {study.best_value:.4f}") save_best_params(study.best_params, params_path) return RandomForestClassifier(**study.best_params, random_state=42) TUNING_FUNCTIONS = { 'LightGBM': tune_lightgbm, 'XGBoost': tune_xgboost, 'Random Forest': tune_random_forest, } mlflow.set_experiment("Threat_Forecaster") class ModelTrainer: def __init__(self): self.model_trainer_config = ModelTrainerConfig() def initiate_model_trainer(self, train_array, val_array): try: logging.info("Splitting train and val arrays into features and target.") X_train = train_array[:, :-1] y_train = train_array[:, -1] X_val = val_array[:, :-1] y_val = val_array[:, -1] models = { 'Logistic Regression': LogisticRegression(random_state=1, max_iter=1000), 'Random Forest': RandomForestClassifier(random_state=1), 'LightGBM': LGBMClassifier(random_state=1, verbosity=-1), 'XGBoost': XGBClassifier(random_state=1, verbosity=0, eval_metric='logloss'), 'Decision Tree': DecisionTreeClassifier(random_state=1), 'AdaBoost': AdaBoostClassifier(random_state=1), 'Bagging': BaggingClassifier(random_state=1), } model_report = evaluate_models( X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, models=models ) logging.info(f"Baseline model report: {model_report}") for name, model in models.items(): baseline_metrics = evaluate_final_model( model = model, X_val = X_val, y_val = y_val, selected = None, name = f"Baseline_{name}", report_dir = "artifacts/eval/baseline" ) with mlflow.start_run(run_name=f"Baseline_{name}"): mlflow.set_tag("stage", "baseline") mlflow.log_metric("val_roc_auc", baseline_metrics['auc']) mlflow.log_metric("val_recall", baseline_metrics['recall']) mlflow.log_metric("val_precision", baseline_metrics['precision']) mlflow.log_metric("val_f1", baseline_metrics['f1']) mlflow.log_metric("val_ap", baseline_metrics['ap']) top_3_names = sorted( model_report.keys(), key=lambda k: model_report[k]['val_roc_auc'], reverse=True )[:3] logging.info(f"Top 3 models by AUC: {top_3_names}") print(f"\nTop 3 selected for tuning: {top_3_names}") tuned_results = {} for name in top_3_names: if name not in TUNING_FUNCTIONS: logging.warning(f"No tuning function for {name} — using baseline model as-is.") tuned_model = models[name] else: tune_fn = TUNING_FUNCTIONS[name] params_path = f"artifacts/params/{name.replace(' ', '_').lower()}_best_params.json" tuned_model = tune_fn(X_train, y_train, X_val, y_val, params_path=params_path) tuned_model.fit(X_train, y_train) tuned_metrics = evaluate_final_model( model = tuned_model, X_val = X_val, y_val = y_val, selected = None, name = f"Tuned_{name}", report_dir = "artifacts/eval/tuned" ) logging.info(f"Tuned {name} → AUC: {tuned_metrics['auc']:.4f} | Recall: {tuned_metrics['recall']:.4f}") tuned_results[name] = { 'model': tuned_model, 'val_roc_auc': tuned_metrics['auc'], 'recall': tuned_metrics['recall'], } with mlflow.start_run(run_name=f"Tuned_{name}"): mlflow.set_tag("stage", "tuning") mlflow.log_params(tuned_model.get_params()) mlflow.log_metric("val_roc_auc", tuned_metrics['auc']) mlflow.log_metric("val_recall", tuned_metrics['recall']) mlflow.log_metric("val_precision", tuned_metrics['precision']) mlflow.log_metric("val_f1", tuned_metrics['f1']) mlflow.log_metric("val_ap", tuned_metrics['ap']) best_name = max(tuned_results, key=lambda k: tuned_results[k]['val_roc_auc']) best = tuned_results[best_name] if best['val_roc_auc'] < 0.5: raise CustomException("No best model found — AUC below 0.5.", sys) logging.info(f"Best model: {best_name} | AUC: {best['val_roc_auc']:.4f}") print(f"\nBest model: {best_name} — running RFECV for feature selection.") rfecv = RFECV( estimator = best['model'], step = 2, cv = 5, scoring = 'roc_auc', n_jobs = -1 ) rfecv.fit(X_train, y_train) selected = rfecv.support_ logging.info(f"RFECV selected {selected.sum()} / {len(selected)} features.") X_train_sel = X_train[:, selected] X_val_sel = X_val[:, selected] best['model'].fit(X_train_sel, y_train) final_metrics = evaluate_final_model( model = best['model'], X_val = X_val, y_val = y_val, selected = selected, name = f"Final_{best_name}", report_dir = "artifacts/eval/final" ) with mlflow.start_run(run_name=f"Final_{best_name}"): mlflow.set_tag("stage", "final") mlflow.log_params(best['model'].get_params()) mlflow.log_metric("val_roc_auc", final_metrics['auc']) mlflow.log_metric("val_recall", final_metrics['recall']) mlflow.log_metric("val_precision", final_metrics['precision']) mlflow.log_metric("val_f1", final_metrics['f1']) mlflow.log_metric("val_ap", final_metrics['ap']) mlflow.log_metric("features_selected", int(selected.sum())) mlflow.sklearn.log_model( best['model'], artifact_path="model", registered_model_name="Threat_Forecaster" ) save_object( file_path = self.model_trainer_config.trained_model_file_path, obj = { 'model': best['model'], 'selected': selected, 'name': best_name } ) logging.info("Best model saved to artifacts/.") return best['val_roc_auc'] except Exception as e: raise CustomException(e, sys)