""" Train XGBoost ensemble classifier with ML accuracy improvements: - scale_pos_weight for class imbalance - Early stopping on validation AUC - Regularization: gamma, min_child_weight - Separate held-out test evaluation """ import csv import pickle import logging import argparse import numpy as np from pathlib import Path import json logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) ROOT = Path(__file__).parents[1] FEATURES = ROOT / "data" / "features.csv" MODEL_OUT = ROOT / "data" / "reference" / "ensemble_xgb.pkl" RESULTS_OUT = ROOT / "data" / "reference" / "ensemble_results.json" def main(): import xgboost as xgb import shap from sklearn.model_selection import StratifiedKFold, GroupKFold, cross_validate, train_test_split from sklearn.metrics import accuracy_score, f1_score, roc_auc_score parser = argparse.ArgumentParser() parser.add_argument("--test-size", type=float, default=0.15) parser.add_argument("--early-stop", type=int, default=20) args = parser.parse_args() logger.info("Loading feature matrix") rows, labels, sources, feature_names = [], [], [], None with open(FEATURES, newline="", encoding="utf-8") as f: for row in csv.DictReader(f): if feature_names is None: feature_names = [k for k in row if k not in ("label", "path", "source")] labels.append(int(row["label"])) rows.append([float(row[k]) for k in feature_names]) sources.append(row.get("source", "unknown")) X = np.array(rows) y = np.array(labels) logger.info(f"Feature matrix: {X.shape} | Positives: {y.sum()}/{len(y)}") neg_count = (y == 0).sum() pos_count = y.sum() scale_pos_weight = neg_count / max(pos_count, 1) logger.info(f"scale_pos_weight: {scale_pos_weight:.3f}") X_dev, X_test, y_dev, y_test = train_test_split( X, y, test_size=args.test_size, stratify=y, random_state=42 ) cv_model = xgb.XGBClassifier( n_estimators=300, max_depth=4, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, min_child_weight=3, gamma=0.1, scale_pos_weight=scale_pos_weight, eval_metric="logloss", random_state=42, ) logger.info("Cross-validating (5-fold stratified)") sources_arr = np.array(sources) unique_sources = np.unique(sources_arr) if len(unique_sources) > 1: cv = GroupKFold(n_splits=5) cv_groups = sources_arr cv_method = "GroupKFold" logger.info( "Using GroupKFold by source (%d unique sources: %s) to prevent " "generator-family leakage from inflating CV scores.", len(unique_sources), list(unique_sources)[:10], ) else: cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) cv_groups = None cv_method = "StratifiedKFold" logger.info("Only one source present — falling back to StratifiedKFold.") scores = cross_validate(cv_model, X_dev, y_dev, cv=cv, scoring=["roc_auc", "f1"], return_train_score=True) auc_cv = scores["test_roc_auc"].mean() f1_cv = scores["test_f1"].mean() logger.info(f"CV AUC: {auc_cv:.4f} +/- {scores['test_roc_auc'].std():.4f}") logger.info(f"CV F1: {f1_cv:.4f} +/- {scores['test_f1'].std():.4f}") X_tr, X_val, y_tr, y_val = train_test_split( X_dev, y_dev, test_size=0.15, stratify=y_dev, random_state=0 ) model = xgb.XGBClassifier( n_estimators=500, max_depth=4, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, min_child_weight=3, gamma=0.1, scale_pos_weight=scale_pos_weight, eval_metric="auc", early_stopping_rounds=args.early_stop, random_state=42, ) logger.info("Fitting final model with early stopping") model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)], verbose=50) logger.info(f"Best iteration: {model.best_iteration}") y_score = model.predict_proba(X_test)[:, 1] y_pred = (y_score >= 0.5).astype(int) test_auc = float(roc_auc_score(y_test, y_score)) test_f1 = float(f1_score(y_test, y_pred, zero_division=0)) test_acc = float(accuracy_score(y_test, y_pred)) logger.info(f"Test AUC: {test_auc:.4f} F1: {test_f1:.4f} Acc: {test_acc:.4f}") logger.info("Computing SHAP values") try: # shap.Explainer handles XGBoost 2.x base_score format correctly explainer = shap.Explainer(model) shap_values = explainer(X_dev).values if shap_values.ndim == 3: shap_values = shap_values[:, :, 1] # binary: take class-1 slice mean_shap = np.abs(shap_values).mean(axis=0) signal_importance = sorted(zip(feature_names, mean_shap.tolist()), key=lambda x: x[1], reverse=True) logger.info("Top 10 signals by SHAP importance:") for name, imp in signal_importance[:10]: logger.info(f" {name:<45} {imp:.4f}") shap_ok = True except Exception as exc: logger.warning(f"SHAP failed ({exc}) — falling back to XGBoost native importance") native_imp = model.get_booster().get_score(importance_type="gain") signal_importance = sorted(native_imp.items(), key=lambda x: x[1], reverse=True) logger.info("Top 10 signals by gain importance:") for name, imp in signal_importance[:10]: logger.info(f" {name:<45} {imp:.4f}") explainer = None shap_ok = False MODEL_OUT.parent.mkdir(parents=True, exist_ok=True) with open(MODEL_OUT, "wb") as f: pickle.dump({"model": model, "feature_names": feature_names, "explainer": explainer}, f) logger.info(f"Model saved to {MODEL_OUT}") results = { "cv_auc_mean": round(auc_cv, 4), "cv_auc_std": round(scores["test_roc_auc"].std(), 4), "cv_f1_mean": round(f1_cv, 4), "cv_f1_std": round(scores["test_f1"].std(), 4), "test_auc": round(test_auc, 4), "test_f1": round(test_f1, 4), "test_accuracy": round(test_acc, 4), "best_iteration": int(model.best_iteration), "scale_pos_weight": round(float(scale_pos_weight), 4), "n_features": len(feature_names), "n_samples": len(y), "feature_importance": {k: round(v, 6) for k, v in signal_importance}, } with open(RESULTS_OUT, "w") as f: json.dump(results, f, indent=2) if results["cv_auc_mean"] > 0.995: logger.warning( "CV AUC = 1.0 — likely data leakage. " "CIFAKE images are 32x32 while COCO images are large JPEGs. " "The model may be learning resolution/compression, not AI signals. " "Consider adding more diverse datasets (ArtiFact, Defactify) with " "matched resolutions before trusting these results in production." ) logger.info(f"Results saved to {RESULTS_OUT}") if __name__ == "__main__": main()