Spaces:
Sleeping
Sleeping
| # LOPO threshold/weight analysis. Run: python -m evaluation.justify_thresholds | |
| import glob | |
| import os | |
| import sys | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| import joblib | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import ( | |
| roc_curve, | |
| roc_auc_score, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| accuracy_score, | |
| confusion_matrix, | |
| ) | |
| from xgboost import XGBClassifier | |
| _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| sys.path.insert(0, _PROJECT_ROOT) | |
| from data_preparation.prepare_dataset import load_per_person, SELECTED_FEATURES | |
| PLOTS_DIR = os.path.join(os.path.dirname(__file__), "plots") | |
| REPORT_PATH = os.path.join(os.path.dirname(__file__), "THRESHOLD_JUSTIFICATION.md") | |
| SEED = 42 | |
| def _youdens_j(y_true, y_prob): | |
| fpr, tpr, thresholds = roc_curve(y_true, y_prob) | |
| j = tpr - fpr | |
| idx = j.argmax() | |
| auc = roc_auc_score(y_true, y_prob) | |
| return float(thresholds[idx]), fpr, tpr, thresholds, float(auc) | |
| def _f1_at_threshold(y_true, y_prob, threshold): | |
| return f1_score(y_true, (y_prob >= threshold).astype(int), zero_division=0) | |
| def _plot_roc(fpr, tpr, auc, opt_thresh, opt_idx, title, path): | |
| fig, ax = plt.subplots(figsize=(6, 5)) | |
| ax.plot(fpr, tpr, lw=2, label=f"ROC (AUC = {auc:.4f})") | |
| ax.plot(fpr[opt_idx], tpr[opt_idx], "ro", markersize=10, | |
| label=f"Youden's J optimum (t = {opt_thresh:.3f})") | |
| ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5) | |
| ax.set_xlabel("False Positive Rate") | |
| ax.set_ylabel("True Positive Rate") | |
| ax.set_title(title) | |
| ax.legend(loc="lower right") | |
| fig.tight_layout() | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| def run_lopo_models(): | |
| print("\n=== LOPO: MLP and XGBoost ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| results = {"mlp": {"y": [], "p": [], "y_folds": [], "p_folds": []}, | |
| "xgb": {"y": [], "p": [], "y_folds": [], "p_folds": []}} | |
| for i, held_out in enumerate(persons): | |
| X_test, y_test = by_person[held_out] | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| scaler = StandardScaler().fit(train_X) | |
| X_tr_sc = scaler.transform(train_X) | |
| X_te_sc = scaler.transform(X_test) | |
| mlp = MLPClassifier( | |
| hidden_layer_sizes=(64, 32), activation="relu", | |
| max_iter=200, early_stopping=True, validation_fraction=0.15, | |
| random_state=SEED, verbose=False, | |
| ) | |
| mlp.fit(X_tr_sc, train_y) | |
| mlp_prob = mlp.predict_proba(X_te_sc)[:, 1] | |
| results["mlp"]["y"].append(y_test) | |
| results["mlp"]["p"].append(mlp_prob) | |
| results["mlp"]["y_folds"].append(y_test) | |
| results["mlp"]["p_folds"].append(mlp_prob) | |
| xgb = XGBClassifier( | |
| n_estimators=600, max_depth=8, learning_rate=0.05, | |
| subsample=0.8, colsample_bytree=0.8, | |
| reg_alpha=0.1, reg_lambda=1.0, | |
| eval_metric="logloss", | |
| random_state=SEED, verbosity=0, | |
| ) | |
| xgb.fit(X_tr_sc, train_y) | |
| xgb_prob = xgb.predict_proba(X_te_sc)[:, 1] | |
| results["xgb"]["y"].append(y_test) | |
| results["xgb"]["p"].append(xgb_prob) | |
| results["xgb"]["y_folds"].append(y_test) | |
| results["xgb"]["p_folds"].append(xgb_prob) | |
| print(f" fold {i+1}/{len(persons)}: held out {held_out} " | |
| f"({X_test.shape[0]} samples)") | |
| results["persons"] = persons | |
| for key in ("mlp", "xgb"): | |
| results[key]["y"] = np.concatenate(results[key]["y"]) | |
| results[key]["p"] = np.concatenate(results[key]["p"]) | |
| return results | |
| def analyse_model_thresholds(results): | |
| print("\n=== Model threshold analysis ===") | |
| model_stats = {} | |
| for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]: | |
| y, p = results[name]["y"], results[name]["p"] | |
| opt_t, fpr, tpr, thresholds, auc = _youdens_j(y, p) | |
| j = tpr - fpr | |
| opt_idx = j.argmax() | |
| f1_opt = _f1_at_threshold(y, p, opt_t) | |
| f1_50 = _f1_at_threshold(y, p, 0.50) | |
| path = os.path.join(PLOTS_DIR, f"roc_{name}.png") | |
| _plot_roc(fpr, tpr, auc, opt_t, opt_idx, | |
| f"LOPO ROC — {label} (9 folds, 144k samples)", path) | |
| model_stats[name] = { | |
| "label": label, "auc": auc, | |
| "opt_threshold": opt_t, "f1_opt": f1_opt, "f1_50": f1_50, | |
| } | |
| print(f" {label}: AUC={auc:.4f}, optimal threshold={opt_t:.3f} " | |
| f"(F1={f1_opt:.4f}), F1@0.50={f1_50:.4f}") | |
| return model_stats | |
| def _ci_95_t(n): | |
| """95% CI half-width multiplier (t-distribution, df=n-1). Approximate for small n.""" | |
| if n <= 1: | |
| return 0.0 | |
| df = n - 1 | |
| t_975 = [0, 12.71, 4.30, 3.18, 2.78, 2.57, 2.45, 2.37, 2.31] | |
| if df < len(t_975): | |
| return float(t_975[df]) | |
| if df <= 30: | |
| return 2.0 + (30 - df) / 100 | |
| return 1.96 | |
| def analyse_precision_recall_confusion(results, model_stats): | |
| """Precision/recall at optimal threshold, pooled confusion matrix, per-fold metrics, 95% CIs.""" | |
| print("\n=== Precision, recall, confusion matrix, per-person variance ===") | |
| from sklearn.metrics import precision_recall_curve, average_precision_score | |
| extended = {} | |
| persons = results["persons"] | |
| n_folds = len(persons) | |
| for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]: | |
| y_all = results[name]["y"] | |
| p_all = results[name]["p"] | |
| y_folds = results[name]["y_folds"] | |
| p_folds = results[name]["p_folds"] | |
| opt_t = model_stats[name]["opt_threshold"] | |
| y_pred = (p_all >= opt_t).astype(int) | |
| prec_pooled = precision_score(y_all, y_pred, zero_division=0) | |
| rec_pooled = recall_score(y_all, y_pred, zero_division=0) | |
| acc_pooled = accuracy_score(y_all, y_pred) | |
| cm = confusion_matrix(y_all, y_pred) | |
| if cm.shape == (2, 2): | |
| tn, fp, fn, tp = cm.ravel() | |
| else: | |
| tn = fp = fn = tp = 0 | |
| prec_folds = [] | |
| rec_folds = [] | |
| acc_folds = [] | |
| f1_folds = [] | |
| per_person = [] | |
| for k, (y_f, p_f) in enumerate(zip(y_folds, p_folds)): | |
| pred_f = (p_f >= opt_t).astype(int) | |
| prec_f = precision_score(y_f, pred_f, zero_division=0) | |
| rec_f = recall_score(y_f, pred_f, zero_division=0) | |
| acc_f = accuracy_score(y_f, pred_f) | |
| f1_f = f1_score(y_f, pred_f, zero_division=0) | |
| prec_folds.append(prec_f) | |
| rec_folds.append(rec_f) | |
| acc_folds.append(acc_f) | |
| f1_folds.append(f1_f) | |
| per_person.append({ | |
| "person": persons[k], | |
| "accuracy": acc_f, | |
| "f1": f1_f, | |
| "precision": prec_f, | |
| "recall": rec_f, | |
| }) | |
| t_mult = _ci_95_t(n_folds) | |
| mean_acc = np.mean(acc_folds) | |
| std_acc = np.std(acc_folds, ddof=1) if n_folds > 1 else 0.0 | |
| mean_f1 = np.mean(f1_folds) | |
| std_f1 = np.std(f1_folds, ddof=1) if n_folds > 1 else 0.0 | |
| mean_prec = np.mean(prec_folds) | |
| std_prec = np.std(prec_folds, ddof=1) if n_folds > 1 else 0.0 | |
| mean_rec = np.mean(rec_folds) | |
| std_rec = np.std(rec_folds, ddof=1) if n_folds > 1 else 0.0 | |
| extended[name] = { | |
| "label": label, | |
| "opt_threshold": opt_t, | |
| "precision_pooled": prec_pooled, | |
| "recall_pooled": rec_pooled, | |
| "accuracy_pooled": acc_pooled, | |
| "confusion_matrix": cm, | |
| "tn": int(tn), "fp": int(fp), "fn": int(fn), "tp": int(tp), | |
| "per_person": per_person, | |
| "accuracy_mean": mean_acc, "accuracy_std": std_acc, | |
| "accuracy_ci_half": t_mult * (std_acc / np.sqrt(n_folds)) if n_folds > 1 else 0.0, | |
| "f1_mean": mean_f1, "f1_std": std_f1, | |
| "f1_ci_half": t_mult * (std_f1 / np.sqrt(n_folds)) if n_folds > 1 else 0.0, | |
| "precision_mean": mean_prec, "precision_std": std_prec, | |
| "precision_ci_half": t_mult * (std_prec / np.sqrt(n_folds)) if n_folds > 1 else 0.0, | |
| "recall_mean": mean_rec, "recall_std": std_rec, | |
| "recall_ci_half": t_mult * (std_rec / np.sqrt(n_folds)) if n_folds > 1 else 0.0, | |
| "n_folds": n_folds, | |
| } | |
| print(f" {label}: precision={prec_pooled:.4f}, recall={rec_pooled:.4f} | " | |
| f"per-fold F1 mean={mean_f1:.4f} ± {std_f1:.4f} " | |
| f"(95% CI [{mean_f1 - extended[name]['f1_ci_half']:.4f}, {mean_f1 + extended[name]['f1_ci_half']:.4f}])") | |
| return extended | |
| def plot_confusion_matrices(extended_stats): | |
| """Save confusion matrix heatmaps for MLP and XGBoost.""" | |
| for name in ("mlp", "xgb"): | |
| s = extended_stats[name] | |
| cm = s["confusion_matrix"] | |
| fig, ax = plt.subplots(figsize=(4, 3)) | |
| im = ax.imshow(cm, cmap="Blues") | |
| ax.set_xticks([0, 1]) | |
| ax.set_yticks([0, 1]) | |
| ax.set_xticklabels(["Pred 0", "Pred 1"]) | |
| ax.set_yticklabels(["True 0", "True 1"]) | |
| ax.set_ylabel("True label") | |
| ax.set_xlabel("Predicted label") | |
| for i in range(2): | |
| for j in range(2): | |
| ax.text(j, i, str(cm[i, j]), ha="center", va="center", color="white" if cm[i, j] > cm.max() / 2 else "black", fontweight="bold") | |
| ax.set_title(f"LOPO {s['label']} @ t={s['opt_threshold']:.3f}") | |
| fig.tight_layout() | |
| path = os.path.join(PLOTS_DIR, f"confusion_matrix_{name}.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| def run_geo_weight_search(): | |
| print("\n=== Geometric weight grid search ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| alphas = np.arange(0.2, 0.85, 0.1).round(1) | |
| alpha_f1 = {a: [] for a in alphas} | |
| for held_out in persons: | |
| X_test, y_test = by_person[held_out] | |
| sf = X_test[:, sf_idx] | |
| se = X_test[:, se_idx] | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| sf_tr = train_X[:, sf_idx] | |
| se_tr = train_X[:, se_idx] | |
| for a in alphas: | |
| score_tr = a * sf_tr + (1.0 - a) * se_tr | |
| opt_t, *_ = _youdens_j(train_y, score_tr) | |
| score_te = a * sf + (1.0 - a) * se | |
| f1 = _f1_at_threshold(y_test, score_te, opt_t) | |
| alpha_f1[a].append(f1) | |
| mean_f1 = {a: np.mean(f1s) for a, f1s in alpha_f1.items()} | |
| best_alpha = max(mean_f1, key=mean_f1.get) | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.bar([f"{a:.1f}" for a in alphas], | |
| [mean_f1[a] for a in alphas], color="steelblue") | |
| ax.set_xlabel("Face weight (alpha); eye weight = 1 - alpha") | |
| ax.set_ylabel("Mean LOPO F1") | |
| ax.set_title("Geometric Pipeline: Face vs Eye Weight Search") | |
| ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) | |
| for i, a in enumerate(alphas): | |
| ax.text(i, mean_f1[a] + 0.003, f"{mean_f1[a]:.3f}", | |
| ha="center", va="bottom", fontsize=8) | |
| fig.tight_layout() | |
| path = os.path.join(PLOTS_DIR, "geo_weight_search.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| print(f" Best alpha (face weight) = {best_alpha:.1f}, " | |
| f"mean LOPO F1 = {mean_f1[best_alpha]:.4f}") | |
| return dict(mean_f1), best_alpha | |
| def run_hybrid_weight_search(lopo_results): | |
| print("\n=== Hybrid weight grid search ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| GEO_FACE_W = 0.7 | |
| GEO_EYE_W = 0.3 | |
| w_mlps = np.arange(0.3, 0.85, 0.1).round(1) | |
| wmf1 = {w: [] for w in w_mlps} | |
| mlp_p = lopo_results["mlp"]["p"] | |
| offset = 0 | |
| for held_out in persons: | |
| X_test, y_test = by_person[held_out] | |
| n = X_test.shape[0] | |
| mlp_prob_fold = mlp_p[offset:offset + n] | |
| offset += n | |
| sf = X_test[:, sf_idx] | |
| se = X_test[:, se_idx] | |
| geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| sf_tr = train_X[:, sf_idx] | |
| se_tr = train_X[:, se_idx] | |
| geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) | |
| scaler = StandardScaler().fit(train_X) | |
| mlp_tr = MLPClassifier( | |
| hidden_layer_sizes=(64, 32), activation="relu", | |
| max_iter=200, early_stopping=True, validation_fraction=0.15, | |
| random_state=SEED, verbose=False, | |
| ) | |
| mlp_tr.fit(scaler.transform(train_X), train_y) | |
| mlp_prob_tr = mlp_tr.predict_proba(scaler.transform(train_X))[:, 1] | |
| for w in w_mlps: | |
| combo_tr = w * mlp_prob_tr + (1.0 - w) * geo_tr | |
| opt_t, *_ = _youdens_j(train_y, combo_tr) | |
| combo_te = w * mlp_prob_fold + (1.0 - w) * geo_score | |
| f1 = _f1_at_threshold(y_test, combo_te, opt_t) | |
| wmf1[w].append(f1) | |
| mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()} | |
| best_w = max(mean_f1, key=mean_f1.get) | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.bar([f"{w:.1f}" for w in w_mlps], | |
| [mean_f1[w] for w in w_mlps], color="darkorange") | |
| ax.set_xlabel("MLP weight (w_mlp); geo weight = 1 - w_mlp") | |
| ax.set_ylabel("Mean LOPO F1") | |
| ax.set_title("Hybrid Pipeline: MLP vs Geometric Weight Search") | |
| ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) | |
| for i, w in enumerate(w_mlps): | |
| ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}", | |
| ha="center", va="bottom", fontsize=8) | |
| fig.tight_layout() | |
| path = os.path.join(PLOTS_DIR, "hybrid_weight_search.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| print(f" Best w_mlp = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}") | |
| return dict(mean_f1), best_w | |
| def run_hybrid_xgb_weight_search(lopo_results): | |
| """Grid search: XGBoost prob + geometric. Same structure as MLP hybrid.""" | |
| print("\n=== Hybrid XGBoost weight grid search ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| GEO_FACE_W = 0.7 | |
| GEO_EYE_W = 0.3 | |
| w_xgbs = np.arange(0.3, 0.85, 0.1).round(1) | |
| wmf1 = {w: [] for w in w_xgbs} | |
| xgb_p = lopo_results["xgb"]["p"] | |
| offset = 0 | |
| for held_out in persons: | |
| X_test, y_test = by_person[held_out] | |
| n = X_test.shape[0] | |
| xgb_prob_fold = xgb_p[offset : offset + n] | |
| offset += n | |
| sf = X_test[:, sf_idx] | |
| se = X_test[:, se_idx] | |
| geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| sf_tr = train_X[:, sf_idx] | |
| se_tr = train_X[:, se_idx] | |
| geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) | |
| scaler = StandardScaler().fit(train_X) | |
| X_tr_sc = scaler.transform(train_X) | |
| xgb_tr = XGBClassifier( | |
| n_estimators=600, max_depth=8, learning_rate=0.05, | |
| subsample=0.8, colsample_bytree=0.8, | |
| reg_alpha=0.1, reg_lambda=1.0, | |
| eval_metric="logloss", | |
| random_state=SEED, verbosity=0, | |
| ) | |
| xgb_tr.fit(X_tr_sc, train_y) | |
| xgb_prob_tr = xgb_tr.predict_proba(X_tr_sc)[:, 1] | |
| for w in w_xgbs: | |
| combo_tr = w * xgb_prob_tr + (1.0 - w) * geo_tr | |
| opt_t, *_ = _youdens_j(train_y, combo_tr) | |
| combo_te = w * xgb_prob_fold + (1.0 - w) * geo_score | |
| f1 = _f1_at_threshold(y_test, combo_te, opt_t) | |
| wmf1[w].append(f1) | |
| mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()} | |
| best_w = max(mean_f1, key=mean_f1.get) | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.bar([f"{w:.1f}" for w in w_xgbs], | |
| [mean_f1[w] for w in w_xgbs], color="steelblue") | |
| ax.set_xlabel("XGBoost weight (w_xgb); geo weight = 1 - w_xgb") | |
| ax.set_ylabel("Mean LOPO F1") | |
| ax.set_title("Hybrid Pipeline: XGBoost vs Geometric Weight Search") | |
| ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) | |
| for i, w in enumerate(w_xgbs): | |
| ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}", | |
| ha="center", va="bottom", fontsize=8) | |
| fig.tight_layout() | |
| path = os.path.join(PLOTS_DIR, "hybrid_xgb_weight_search.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| print(f" Best w_xgb = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}") | |
| return dict(mean_f1), best_w | |
| def run_hybrid_lr_combiner(lopo_results, use_xgb=True): | |
| """LR combiner: meta-features = [model_prob, geo_score], learned weights instead of grid search.""" | |
| print("\n=== Hybrid LR combiner (LOPO) ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| GEO_FACE_W = 0.7 | |
| GEO_EYE_W = 0.3 | |
| key = "xgb" if use_xgb else "mlp" | |
| model_p = lopo_results[key]["p"] | |
| offset = 0 | |
| fold_f1s = [] | |
| for held_out in persons: | |
| X_test, y_test = by_person[held_out] | |
| n = X_test.shape[0] | |
| prob_fold = model_p[offset : offset + n] | |
| offset += n | |
| sf = X_test[:, sf_idx] | |
| se = X_test[:, se_idx] | |
| geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) | |
| meta_te = np.column_stack([prob_fold, geo_score]) | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| sf_tr = train_X[:, sf_idx] | |
| se_tr = train_X[:, se_idx] | |
| geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) | |
| scaler = StandardScaler().fit(train_X) | |
| X_tr_sc = scaler.transform(train_X) | |
| if use_xgb: | |
| xgb_tr = XGBClassifier( | |
| n_estimators=600, max_depth=8, learning_rate=0.05, | |
| subsample=0.8, colsample_bytree=0.8, | |
| reg_alpha=0.1, reg_lambda=1.0, | |
| eval_metric="logloss", | |
| random_state=SEED, verbosity=0, | |
| ) | |
| xgb_tr.fit(X_tr_sc, train_y) | |
| prob_tr = xgb_tr.predict_proba(X_tr_sc)[:, 1] | |
| else: | |
| mlp_tr = MLPClassifier( | |
| hidden_layer_sizes=(64, 32), activation="relu", | |
| max_iter=200, early_stopping=True, validation_fraction=0.15, | |
| random_state=SEED, verbose=False, | |
| ) | |
| mlp_tr.fit(X_tr_sc, train_y) | |
| prob_tr = mlp_tr.predict_proba(X_tr_sc)[:, 1] | |
| meta_tr = np.column_stack([prob_tr, geo_tr]) | |
| lr = LogisticRegression(C=1.0, max_iter=500, random_state=SEED) | |
| lr.fit(meta_tr, train_y) | |
| p_tr = lr.predict_proba(meta_tr)[:, 1] | |
| opt_t, *_ = _youdens_j(train_y, p_tr) | |
| p_te = lr.predict_proba(meta_te)[:, 1] | |
| f1 = _f1_at_threshold(y_test, p_te, opt_t) | |
| fold_f1s.append(f1) | |
| print(f" fold {held_out}: F1 = {f1:.4f} (threshold = {opt_t:.3f})") | |
| mean_f1 = float(np.mean(fold_f1s)) | |
| print(f" LR combiner mean LOPO F1 = {mean_f1:.4f}") | |
| return mean_f1 | |
| def train_and_save_hybrid_combiner(lopo_results, use_xgb, geo_face_weight=0.7, geo_eye_weight=0.3, | |
| combiner_path=None): | |
| """Build OOS meta-dataset from LOPO predictions, train one LR, save joblib + optimal threshold.""" | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| key = "xgb" if use_xgb else "mlp" | |
| model_p = lopo_results[key]["p"] | |
| meta_y = lopo_results[key]["y"] | |
| geo_list = [] | |
| offset = 0 | |
| for p in persons: | |
| X, _ = by_person[p] | |
| n = X.shape[0] | |
| sf = X[:, sf_idx] | |
| se = X[:, se_idx] | |
| geo_list.append(np.clip(geo_face_weight * sf + geo_eye_weight * se, 0, 1)) | |
| offset += n | |
| geo_all = np.concatenate(geo_list) | |
| meta_X = np.column_stack([model_p, geo_all]) | |
| lr = LogisticRegression(C=1.0, max_iter=500, random_state=SEED) | |
| lr.fit(meta_X, meta_y) | |
| p = lr.predict_proba(meta_X)[:, 1] | |
| opt_threshold, *_ = _youdens_j(meta_y, p) | |
| if combiner_path is None: | |
| combiner_path = os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_combiner.joblib") | |
| os.makedirs(os.path.dirname(combiner_path), exist_ok=True) | |
| joblib.dump({ | |
| "combiner": lr, | |
| "threshold": float(opt_threshold), | |
| "use_xgb": bool(use_xgb), | |
| "geo_face_weight": geo_face_weight, | |
| "geo_eye_weight": geo_eye_weight, | |
| }, combiner_path) | |
| print(f" Saved combiner to {combiner_path} (threshold={opt_threshold:.3f})") | |
| return opt_threshold, combiner_path | |
| def plot_distributions(): | |
| print("\n=== EAR / MAR distributions ===") | |
| npz_files = sorted(glob.glob(os.path.join(_PROJECT_ROOT, "data", "collected_*", "*.npz"))) | |
| all_ear_l, all_ear_r, all_mar, all_labels = [], [], [], [] | |
| for f in npz_files: | |
| d = np.load(f, allow_pickle=True) | |
| names = list(d["feature_names"]) | |
| feat = d["features"].astype(np.float32) | |
| lab = d["labels"].astype(np.int64) | |
| all_ear_l.append(feat[:, names.index("ear_left")]) | |
| all_ear_r.append(feat[:, names.index("ear_right")]) | |
| all_mar.append(feat[:, names.index("mar")]) | |
| all_labels.append(lab) | |
| ear_l = np.concatenate(all_ear_l) | |
| ear_r = np.concatenate(all_ear_r) | |
| mar = np.concatenate(all_mar) | |
| labels = np.concatenate(all_labels) | |
| ear_min = np.minimum(ear_l, ear_r) | |
| ear_plot = np.clip(ear_min, 0, 0.85) | |
| mar_plot = np.clip(mar, 0, 1.5) | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.hist(ear_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True) | |
| ax.hist(ear_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True) | |
| for val, lbl, c in [ | |
| (0.16, "ear_closed = 0.16", "red"), | |
| (0.21, "EAR_BLINK = 0.21", "orange"), | |
| (0.30, "ear_open = 0.30", "green"), | |
| ]: | |
| ax.axvline(val, color=c, ls="--", lw=1.5, label=lbl) | |
| ax.set_xlabel("min(left_EAR, right_EAR)") | |
| ax.set_ylabel("Density") | |
| ax.set_title("EAR Distribution by Class (144k samples)") | |
| ax.legend(fontsize=8) | |
| fig.tight_layout() | |
| path = os.path.join(PLOTS_DIR, "ear_distribution.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.hist(mar_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True) | |
| ax.hist(mar_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True) | |
| ax.axvline(0.55, color="red", ls="--", lw=1.5, label="MAR_YAWN = 0.55") | |
| ax.set_xlabel("Mouth Aspect Ratio (MAR)") | |
| ax.set_ylabel("Density") | |
| ax.set_title("MAR Distribution by Class (144k samples)") | |
| ax.legend(fontsize=8) | |
| fig.tight_layout() | |
| path = os.path.join(PLOTS_DIR, "mar_distribution.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| closed_pct = np.mean(ear_min < 0.16) * 100 | |
| blink_pct = np.mean(ear_min < 0.21) * 100 | |
| open_pct = np.mean(ear_min >= 0.30) * 100 | |
| yawn_pct = np.mean(mar > 0.55) * 100 | |
| stats = { | |
| "ear_below_016": closed_pct, | |
| "ear_below_021": blink_pct, | |
| "ear_above_030": open_pct, | |
| "mar_above_055": yawn_pct, | |
| "n_samples": len(ear_min), | |
| } | |
| print(f" EAR<0.16 (closed): {closed_pct:.1f}% | EAR<0.21 (blink): {blink_pct:.1f}% | " | |
| f"EAR>=0.30 (open): {open_pct:.1f}%") | |
| print(f" MAR>0.55 (yawn): {yawn_pct:.1f}%") | |
| return stats | |
| def write_report(model_stats, extended_stats, geo_f1, best_alpha, | |
| hybrid_mlp_f1, best_w_mlp, | |
| hybrid_xgb_f1, best_w_xgb, | |
| use_xgb_for_hybrid, dist_stats, | |
| lr_combiner_f1=None): | |
| lines = [] | |
| lines.append("# Threshold Justification Report") | |
| lines.append("") | |
| lines.append("Auto-generated by `evaluation/justify_thresholds.py` using LOPO cross-validation " | |
| "over 9 participants (~145k samples).") | |
| lines.append("") | |
| lines.append("## 1. ML Model Decision Thresholds") | |
| lines.append("") | |
| lines.append("Thresholds selected via **Youden's J statistic** (J = sensitivity + specificity - 1) " | |
| "on pooled LOPO held-out predictions.") | |
| lines.append("") | |
| lines.append("| Model | LOPO AUC | Optimal Threshold (Youden's J) | F1 @ Optimal | F1 @ 0.50 |") | |
| lines.append("|-------|----------|-------------------------------|--------------|-----------|") | |
| for key in ("mlp", "xgb"): | |
| s = model_stats[key] | |
| lines.append(f"| {s['label']} | {s['auc']:.4f} | **{s['opt_threshold']:.3f}** | " | |
| f"{s['f1_opt']:.4f} | {s['f1_50']:.4f} |") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 2. Precision, Recall and Tradeoff") | |
| lines.append("") | |
| lines.append("At the optimal threshold (Youden's J), pooled over all LOPO held-out predictions:") | |
| lines.append("") | |
| lines.append("| Model | Threshold | Precision | Recall | F1 | Accuracy |") | |
| lines.append("|-------|----------:|----------:|-------:|---:|---------:|") | |
| for key in ("mlp", "xgb"): | |
| s = extended_stats[key] | |
| lines.append(f"| {s['label']} | {s['opt_threshold']:.3f} | {s['precision_pooled']:.4f} | " | |
| f"{s['recall_pooled']:.4f} | {model_stats[key]['f1_opt']:.4f} | {s['accuracy_pooled']:.4f} |") | |
| lines.append("") | |
| lines.append("Higher threshold → fewer positive predictions → higher precision, lower recall. " | |
| "Youden's J picks the threshold that balances sensitivity and specificity (recall for the positive class and true negative rate).") | |
| lines.append("") | |
| lines.append("## 3. Confusion Matrix (Pooled LOPO)") | |
| lines.append("") | |
| lines.append("At optimal threshold. Rows = true label, columns = predicted label (0 = unfocused, 1 = focused).") | |
| lines.append("") | |
| for key in ("mlp", "xgb"): | |
| s = extended_stats[key] | |
| lines.append(f"### {s['label']}") | |
| lines.append("") | |
| lines.append("| | Pred 0 | Pred 1 |") | |
| lines.append("|--|-------:|-------:|") | |
| cm = s["confusion_matrix"] | |
| if cm.shape == (2, 2): | |
| lines.append(f"| **True 0** | {cm[0,0]} (TN) | {cm[0,1]} (FP) |") | |
| lines.append(f"| **True 1** | {cm[1,0]} (FN) | {cm[1,1]} (TP) |") | |
| lines.append("") | |
| lines.append(f"TN={s['tn']}, FP={s['fp']}, FN={s['fn']}, TP={s['tp']}. ") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 4. Per-Person Performance Variance (LOPO)") | |
| lines.append("") | |
| lines.append("One fold per left-out person; metrics at optimal threshold.") | |
| lines.append("") | |
| for key in ("mlp", "xgb"): | |
| s = extended_stats[key] | |
| lines.append(f"### {s['label']} — per held-out person") | |
| lines.append("") | |
| lines.append("| Person | Accuracy | F1 | Precision | Recall |") | |
| lines.append("|--------|---------:|---:|----------:|-------:|") | |
| for row in s["per_person"]: | |
| lines.append(f"| {row['person']} | {row['accuracy']:.4f} | {row['f1']:.4f} | {row['precision']:.4f} | {row['recall']:.4f} |") | |
| lines.append("") | |
| lines.append("### Summary across persons") | |
| lines.append("") | |
| lines.append("| Model | Accuracy mean ± std | F1 mean ± std | Precision mean ± std | Recall mean ± std |") | |
| lines.append("|-------|---------------------|---------------|----------------------|-------------------|") | |
| for key in ("mlp", "xgb"): | |
| s = extended_stats[key] | |
| lines.append(f"| {s['label']} | {s['accuracy_mean']:.4f} ± {s['accuracy_std']:.4f} | " | |
| f"{s['f1_mean']:.4f} ± {s['f1_std']:.4f} | " | |
| f"{s['precision_mean']:.4f} ± {s['precision_std']:.4f} | " | |
| f"{s['recall_mean']:.4f} ± {s['recall_std']:.4f} |") | |
| lines.append("") | |
| lines.append("## 5. Confidence Intervals (95%, LOPO over 9 persons)") | |
| lines.append("") | |
| lines.append("Mean ± half-width of 95% t-interval (df=8) for each metric across the 9 left-out persons.") | |
| lines.append("") | |
| lines.append("| Model | F1 | Accuracy | Precision | Recall |") | |
| lines.append("|-------|---:|--------:|----------:|-------:|") | |
| for key in ("mlp", "xgb"): | |
| s = extended_stats[key] | |
| f1_lo = s["f1_mean"] - s["f1_ci_half"] | |
| f1_hi = s["f1_mean"] + s["f1_ci_half"] | |
| acc_lo = s["accuracy_mean"] - s["accuracy_ci_half"] | |
| acc_hi = s["accuracy_mean"] + s["accuracy_ci_half"] | |
| prec_lo = s["precision_mean"] - s["precision_ci_half"] | |
| prec_hi = s["precision_mean"] + s["precision_ci_half"] | |
| rec_lo = s["recall_mean"] - s["recall_ci_half"] | |
| rec_hi = s["recall_mean"] + s["recall_ci_half"] | |
| lines.append(f"| {s['label']} | {s['f1_mean']:.4f} [{f1_lo:.4f}, {f1_hi:.4f}] | " | |
| f"{s['accuracy_mean']:.4f} [{acc_lo:.4f}, {acc_hi:.4f}] | " | |
| f"{s['precision_mean']:.4f} [{prec_lo:.4f}, {prec_hi:.4f}] | " | |
| f"{s['recall_mean']:.4f} [{rec_lo:.4f}, {rec_hi:.4f}] |") | |
| lines.append("") | |
| lines.append("## 6. Geometric Pipeline Weights (s_face vs s_eye)") | |
| lines.append("") | |
| lines.append("Grid search over face weight alpha in {0.2 ... 0.8}. " | |
| "Eye weight = 1 - alpha. Threshold per fold via Youden's J.") | |
| lines.append("") | |
| lines.append("| Face Weight (alpha) | Mean LOPO F1 |") | |
| lines.append("|--------------------:|-------------:|") | |
| for a in sorted(geo_f1.keys()): | |
| marker = " **<-- selected**" if a == best_alpha else "" | |
| lines.append(f"| {a:.1f} | {geo_f1[a]:.4f}{marker} |") | |
| lines.append("") | |
| lines.append(f"**Best:** alpha = {best_alpha:.1f} (face {best_alpha*100:.0f}%, " | |
| f"eye {(1-best_alpha)*100:.0f}%)") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 7. Hybrid Pipeline: MLP vs Geometric") | |
| lines.append("") | |
| lines.append("Grid search over w_mlp in {0.3 ... 0.8}. w_geo = 1 - w_mlp. " | |
| "Geometric sub-score uses same weights as geometric pipeline (face=0.7, eye=0.3).") | |
| lines.append("") | |
| lines.append("| MLP Weight (w_mlp) | Mean LOPO F1 |") | |
| lines.append("|-------------------:|-------------:|") | |
| for w in sorted(hybrid_mlp_f1.keys()): | |
| marker = " **<-- selected**" if w == best_w_mlp else "" | |
| lines.append(f"| {w:.1f} | {hybrid_mlp_f1[w]:.4f}{marker} |") | |
| lines.append("") | |
| lines.append(f"**Best:** w_mlp = {best_w_mlp:.1f} (MLP {best_w_mlp*100:.0f}%, " | |
| f"geometric {(1-best_w_mlp)*100:.0f}%) → mean LOPO F1 = {hybrid_mlp_f1[best_w_mlp]:.4f}") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 8. Hybrid Pipeline: XGBoost vs Geometric") | |
| lines.append("") | |
| lines.append("Same grid over w_xgb in {0.3 ... 0.8}. w_geo = 1 - w_xgb.") | |
| lines.append("") | |
| lines.append("| XGBoost Weight (w_xgb) | Mean LOPO F1 |") | |
| lines.append("|-----------------------:|-------------:|") | |
| for w in sorted(hybrid_xgb_f1.keys()): | |
| marker = " **<-- selected**" if w == best_w_xgb else "" | |
| lines.append(f"| {w:.1f} | {hybrid_xgb_f1[w]:.4f}{marker} |") | |
| lines.append("") | |
| lines.append(f"**Best:** w_xgb = {best_w_xgb:.1f} → mean LOPO F1 = {hybrid_xgb_f1[best_w_xgb]:.4f}") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| f1_mlp = hybrid_mlp_f1[best_w_mlp] | |
| f1_xgb = hybrid_xgb_f1[best_w_xgb] | |
| lines.append("### Which hybrid is used in the app?") | |
| lines.append("") | |
| if use_xgb_for_hybrid: | |
| lines.append(f"**XGBoost hybrid is better** (F1 = {f1_xgb:.4f} vs MLP hybrid F1 = {f1_mlp:.4f}).") | |
| else: | |
| lines.append(f"**MLP hybrid is better** (F1 = {f1_mlp:.4f} vs XGBoost hybrid F1 = {f1_xgb:.4f}).") | |
| lines.append("") | |
| if lr_combiner_f1 is not None: | |
| lines.append("### Logistic regression combiner (replaces heuristic weights)") | |
| lines.append("") | |
| lines.append("Instead of a fixed linear blend (e.g. 0.3·ML + 0.7·geo), a **logistic regression** " | |
| "combines model probability and geometric score: meta-features = [model_prob, geo_score], " | |
| "trained on the same LOPO splits. Threshold from Youden's J on combiner output.") | |
| lines.append("") | |
| lines.append(f"| Method | Mean LOPO F1 |") | |
| lines.append("|--------|-------------:|") | |
| lines.append(f"| Heuristic weight grid (best w) | {(f1_xgb if use_xgb_for_hybrid else f1_mlp):.4f} |") | |
| lines.append(f"| **LR combiner** | **{lr_combiner_f1:.4f}** |") | |
| lines.append("") | |
| lines.append("The app uses the saved LR combiner when `combiner_path` is set in `hybrid_focus_config.json`.") | |
| lines.append("") | |
| else: | |
| if use_xgb_for_hybrid: | |
| lines.append("The app uses **XGBoost + geometric** with the weights above.") | |
| else: | |
| lines.append("The app uses **MLP + geometric** with the weights above.") | |
| lines.append("") | |
| lines.append("## 5. Eye and Mouth Aspect Ratio Thresholds") | |
| lines.append("") | |
| lines.append("### EAR (Eye Aspect Ratio)") | |
| lines.append("") | |
| lines.append("Reference: Soukupova & Cech, \"Real-Time Eye Blink Detection Using Facial " | |
| "Landmarks\" (2016) established EAR ~ 0.2 as a blink threshold.") | |
| lines.append("") | |
| lines.append("Our thresholds define a linear interpolation zone around this established value:") | |
| lines.append("") | |
| lines.append("| Constant | Value | Justification |") | |
| lines.append("|----------|------:|---------------|") | |
| lines.append(f"| `ear_closed` | 0.16 | Below this, eyes are fully shut. " | |
| f"{dist_stats['ear_below_016']:.1f}% of samples fall here. |") | |
| lines.append(f"| `EAR_BLINK_THRESH` | 0.21 | Blink detection point; close to the 0.2 reference. " | |
| f"{dist_stats['ear_below_021']:.1f}% of samples below. |") | |
| lines.append(f"| `ear_open` | 0.30 | Above this, eyes are fully open. " | |
| f"{dist_stats['ear_above_030']:.1f}% of samples here. |") | |
| lines.append("") | |
| lines.append("Between 0.16 and 0.30 the `_ear_score` function linearly interpolates from 0 to 1, " | |
| "providing a smooth transition rather than a hard binary cutoff.") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("### MAR (Mouth Aspect Ratio)") | |
| lines.append("") | |
| lines.append(f"| Constant | Value | Justification |") | |
| lines.append("|----------|------:|---------------|") | |
| lines.append(f"| `MAR_YAWN_THRESHOLD` | 0.55 | Only {dist_stats['mar_above_055']:.1f}% of " | |
| f"samples exceed this, confirming it captures genuine yawns without false positives. |") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 10. Other Constants") | |
| lines.append("") | |
| lines.append("| Constant | Value | Rationale |") | |
| lines.append("|----------|------:|-----------|") | |
| lines.append("| `gaze_max_offset` | 0.28 | Max iris displacement (normalised) before gaze score " | |
| "drops to zero. Corresponds to ~56% of the eye width; beyond this the iris is at " | |
| "the extreme edge. |") | |
| lines.append("| `max_angle` | 22.0 deg | Head deviation beyond which face score = 0. Based on " | |
| "typical monitor-viewing cone: at 60 cm distance and a 24\" monitor, the viewing " | |
| "angle is ~20-25 degrees. |") | |
| lines.append("| `roll_weight` | 0.5 | Roll is less indicative of inattention than yaw/pitch " | |
| "(tilting head doesn't mean looking away), so it's down-weighted by 50%. |") | |
| lines.append("| `EMA alpha` | 0.3 | Smoothing factor for focus score. " | |
| "Gives ~3-4 frame effective window; balances responsiveness vs flicker. |") | |
| lines.append("| `grace_frames` | 15 | ~0.5 s at 30 fps before penalising no-face. Allows brief " | |
| "occlusions (e.g. hand gesture) without dropping score. |") | |
| lines.append("| `PERCLOS_WINDOW` | 60 frames | 2 s at 30 fps; standard PERCLOS measurement " | |
| "window (Dinges & Grace, 1998). |") | |
| lines.append("| `BLINK_WINDOW_SEC` | 30 s | Blink rate measured over 30 s; typical spontaneous " | |
| "blink rate is 15-20/min (Bentivoglio et al., 1997). |") | |
| lines.append("") | |
| with open(REPORT_PATH, "w", encoding="utf-8") as f: | |
| f.write("\n".join(lines)) | |
| print(f"\nReport written to {REPORT_PATH}") | |
| def write_hybrid_config(use_xgb, best_w_mlp, best_w_xgb, config_path, | |
| combiner_path=None, combiner_threshold=None): | |
| """Write hybrid_focus_config.json. If combiner_path set, app uses LR combiner instead of heuristic weights.""" | |
| import json | |
| if use_xgb: | |
| w_xgb = round(float(best_w_xgb), 2) | |
| w_geo = round(1.0 - best_w_xgb, 2) | |
| w_mlp = 0.3 | |
| else: | |
| w_mlp = round(float(best_w_mlp), 2) | |
| w_geo = round(1.0 - best_w_mlp, 2) | |
| w_xgb = 0.0 | |
| cfg = { | |
| "use_xgb": bool(use_xgb), | |
| "w_mlp": w_mlp, | |
| "w_xgb": w_xgb, | |
| "w_geo": w_geo, | |
| "threshold": float(combiner_threshold) if combiner_threshold is not None else 0.35, | |
| "use_yawn_veto": True, | |
| "geo_face_weight": 0.7, | |
| "geo_eye_weight": 0.3, | |
| "mar_yawn_threshold": 0.55, | |
| "metric": "f1", | |
| } | |
| if combiner_path: | |
| cfg["combiner"] = "logistic" | |
| cfg["combiner_path"] = os.path.basename(combiner_path) | |
| with open(config_path, "w", encoding="utf-8") as f: | |
| json.dump(cfg, f, indent=2) | |
| print(f" Written {config_path} (use_xgb={cfg['use_xgb']}, combiner={cfg.get('combiner', 'heuristic')})") | |
| def main(): | |
| os.makedirs(PLOTS_DIR, exist_ok=True) | |
| lopo_results = run_lopo_models() | |
| model_stats = analyse_model_thresholds(lopo_results) | |
| extended_stats = analyse_precision_recall_confusion(lopo_results, model_stats) | |
| plot_confusion_matrices(extended_stats) | |
| geo_f1, best_alpha = run_geo_weight_search() | |
| hybrid_mlp_f1, best_w_mlp = run_hybrid_weight_search(lopo_results) | |
| hybrid_xgb_f1, best_w_xgb = run_hybrid_xgb_weight_search(lopo_results) | |
| dist_stats = plot_distributions() | |
| f1_mlp = hybrid_mlp_f1[best_w_mlp] | |
| f1_xgb = hybrid_xgb_f1[best_w_xgb] | |
| use_xgb_for_hybrid = f1_xgb > f1_mlp | |
| print(f"\n Hybrid comparison: MLP F1 = {f1_mlp:.4f}, XGBoost F1 = {f1_xgb:.4f} → " | |
| f"use {'XGBoost' if use_xgb_for_hybrid else 'MLP'}") | |
| lr_combiner_f1 = run_hybrid_lr_combiner(lopo_results, use_xgb=use_xgb_for_hybrid) | |
| combiner_threshold, combiner_path = train_and_save_hybrid_combiner( | |
| lopo_results, use_xgb_for_hybrid, | |
| combiner_path=os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_combiner.joblib"), | |
| ) | |
| config_path = os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_focus_config.json") | |
| write_hybrid_config(use_xgb_for_hybrid, best_w_mlp, best_w_xgb, config_path, | |
| combiner_path=combiner_path, combiner_threshold=combiner_threshold) | |
| write_report(model_stats, extended_stats, geo_f1, best_alpha, | |
| hybrid_mlp_f1, best_w_mlp, | |
| hybrid_xgb_f1, best_w_xgb, | |
| use_xgb_for_hybrid, dist_stats, | |
| lr_combiner_f1=lr_combiner_f1) | |
| print("\nDone.") | |
| if __name__ == "__main__": | |
| main() | |