# LOPO threshold/weight analysis. Run: python -m evaluation.justify_thresholds import glob import os import sys import numpy as np import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import joblib from sklearn.linear_model import LogisticRegression from sklearn.neural_network import MLPClassifier from sklearn.preprocessing import StandardScaler from sklearn.metrics import ( roc_curve, roc_auc_score, f1_score, precision_score, recall_score, accuracy_score, confusion_matrix, ) from xgboost import XGBClassifier _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, _PROJECT_ROOT) from data_preparation.prepare_dataset import load_per_person, SELECTED_FEATURES PLOTS_DIR = os.path.join(os.path.dirname(__file__), "plots") REPORT_PATH = os.path.join(os.path.dirname(__file__), "THRESHOLD_JUSTIFICATION.md") SEED = 42 def _youdens_j(y_true, y_prob): fpr, tpr, thresholds = roc_curve(y_true, y_prob) j = tpr - fpr idx = j.argmax() auc = roc_auc_score(y_true, y_prob) return float(thresholds[idx]), fpr, tpr, thresholds, float(auc) def _f1_at_threshold(y_true, y_prob, threshold): return f1_score(y_true, (y_prob >= threshold).astype(int), zero_division=0) def _plot_roc(fpr, tpr, auc, opt_thresh, opt_idx, title, path): fig, ax = plt.subplots(figsize=(6, 5)) ax.plot(fpr, tpr, lw=2, label=f"ROC (AUC = {auc:.4f})") ax.plot(fpr[opt_idx], tpr[opt_idx], "ro", markersize=10, label=f"Youden's J optimum (t = {opt_thresh:.3f})") ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5) ax.set_xlabel("False Positive Rate") ax.set_ylabel("True Positive Rate") ax.set_title(title) ax.legend(loc="lower right") fig.tight_layout() fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") def run_lopo_models(): print("\n=== LOPO: MLP and XGBoost ===") by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) results = {"mlp": {"y": [], "p": [], "y_folds": [], "p_folds": []}, "xgb": {"y": [], "p": [], "y_folds": [], "p_folds": []}} for i, held_out in enumerate(persons): X_test, y_test = by_person[held_out] train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) scaler = StandardScaler().fit(train_X) X_tr_sc = scaler.transform(train_X) X_te_sc = scaler.transform(X_test) mlp = MLPClassifier( hidden_layer_sizes=(64, 32), activation="relu", max_iter=200, early_stopping=True, validation_fraction=0.15, random_state=SEED, verbose=False, ) mlp.fit(X_tr_sc, train_y) mlp_prob = mlp.predict_proba(X_te_sc)[:, 1] results["mlp"]["y"].append(y_test) results["mlp"]["p"].append(mlp_prob) results["mlp"]["y_folds"].append(y_test) results["mlp"]["p_folds"].append(mlp_prob) xgb = XGBClassifier( n_estimators=600, max_depth=8, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, eval_metric="logloss", random_state=SEED, verbosity=0, ) xgb.fit(X_tr_sc, train_y) xgb_prob = xgb.predict_proba(X_te_sc)[:, 1] results["xgb"]["y"].append(y_test) results["xgb"]["p"].append(xgb_prob) results["xgb"]["y_folds"].append(y_test) results["xgb"]["p_folds"].append(xgb_prob) print(f" fold {i+1}/{len(persons)}: held out {held_out} " f"({X_test.shape[0]} samples)") results["persons"] = persons for key in ("mlp", "xgb"): results[key]["y"] = np.concatenate(results[key]["y"]) results[key]["p"] = np.concatenate(results[key]["p"]) return results def analyse_model_thresholds(results): print("\n=== Model threshold analysis ===") model_stats = {} for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]: y, p = results[name]["y"], results[name]["p"] opt_t, fpr, tpr, thresholds, auc = _youdens_j(y, p) j = tpr - fpr opt_idx = j.argmax() f1_opt = _f1_at_threshold(y, p, opt_t) f1_50 = _f1_at_threshold(y, p, 0.50) path = os.path.join(PLOTS_DIR, f"roc_{name}.png") _plot_roc(fpr, tpr, auc, opt_t, opt_idx, f"LOPO ROC — {label} (9 folds, 144k samples)", path) model_stats[name] = { "label": label, "auc": auc, "opt_threshold": opt_t, "f1_opt": f1_opt, "f1_50": f1_50, } print(f" {label}: AUC={auc:.4f}, optimal threshold={opt_t:.3f} " f"(F1={f1_opt:.4f}), F1@0.50={f1_50:.4f}") return model_stats def _ci_95_t(n): """95% CI half-width multiplier (t-distribution, df=n-1). Approximate for small n.""" if n <= 1: return 0.0 df = n - 1 t_975 = [0, 12.71, 4.30, 3.18, 2.78, 2.57, 2.45, 2.37, 2.31] if df < len(t_975): return float(t_975[df]) if df <= 30: return 2.0 + (30 - df) / 100 return 1.96 def analyse_precision_recall_confusion(results, model_stats): """Precision/recall at optimal threshold, pooled confusion matrix, per-fold metrics, 95% CIs.""" print("\n=== Precision, recall, confusion matrix, per-person variance ===") from sklearn.metrics import precision_recall_curve, average_precision_score extended = {} persons = results["persons"] n_folds = len(persons) for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]: y_all = results[name]["y"] p_all = results[name]["p"] y_folds = results[name]["y_folds"] p_folds = results[name]["p_folds"] opt_t = model_stats[name]["opt_threshold"] y_pred = (p_all >= opt_t).astype(int) prec_pooled = precision_score(y_all, y_pred, zero_division=0) rec_pooled = recall_score(y_all, y_pred, zero_division=0) acc_pooled = accuracy_score(y_all, y_pred) cm = confusion_matrix(y_all, y_pred) if cm.shape == (2, 2): tn, fp, fn, tp = cm.ravel() else: tn = fp = fn = tp = 0 prec_folds = [] rec_folds = [] acc_folds = [] f1_folds = [] per_person = [] for k, (y_f, p_f) in enumerate(zip(y_folds, p_folds)): pred_f = (p_f >= opt_t).astype(int) prec_f = precision_score(y_f, pred_f, zero_division=0) rec_f = recall_score(y_f, pred_f, zero_division=0) acc_f = accuracy_score(y_f, pred_f) f1_f = f1_score(y_f, pred_f, zero_division=0) prec_folds.append(prec_f) rec_folds.append(rec_f) acc_folds.append(acc_f) f1_folds.append(f1_f) per_person.append({ "person": persons[k], "accuracy": acc_f, "f1": f1_f, "precision": prec_f, "recall": rec_f, }) t_mult = _ci_95_t(n_folds) mean_acc = np.mean(acc_folds) std_acc = np.std(acc_folds, ddof=1) if n_folds > 1 else 0.0 mean_f1 = np.mean(f1_folds) std_f1 = np.std(f1_folds, ddof=1) if n_folds > 1 else 0.0 mean_prec = np.mean(prec_folds) std_prec = np.std(prec_folds, ddof=1) if n_folds > 1 else 0.0 mean_rec = np.mean(rec_folds) std_rec = np.std(rec_folds, ddof=1) if n_folds > 1 else 0.0 extended[name] = { "label": label, "opt_threshold": opt_t, "precision_pooled": prec_pooled, "recall_pooled": rec_pooled, "accuracy_pooled": acc_pooled, "confusion_matrix": cm, "tn": int(tn), "fp": int(fp), "fn": int(fn), "tp": int(tp), "per_person": per_person, "accuracy_mean": mean_acc, "accuracy_std": std_acc, "accuracy_ci_half": t_mult * (std_acc / np.sqrt(n_folds)) if n_folds > 1 else 0.0, "f1_mean": mean_f1, "f1_std": std_f1, "f1_ci_half": t_mult * (std_f1 / np.sqrt(n_folds)) if n_folds > 1 else 0.0, "precision_mean": mean_prec, "precision_std": std_prec, "precision_ci_half": t_mult * (std_prec / np.sqrt(n_folds)) if n_folds > 1 else 0.0, "recall_mean": mean_rec, "recall_std": std_rec, "recall_ci_half": t_mult * (std_rec / np.sqrt(n_folds)) if n_folds > 1 else 0.0, "n_folds": n_folds, } print(f" {label}: precision={prec_pooled:.4f}, recall={rec_pooled:.4f} | " f"per-fold F1 mean={mean_f1:.4f} ± {std_f1:.4f} " f"(95% CI [{mean_f1 - extended[name]['f1_ci_half']:.4f}, {mean_f1 + extended[name]['f1_ci_half']:.4f}])") return extended def plot_confusion_matrices(extended_stats): """Save confusion matrix heatmaps for MLP and XGBoost.""" for name in ("mlp", "xgb"): s = extended_stats[name] cm = s["confusion_matrix"] fig, ax = plt.subplots(figsize=(4, 3)) im = ax.imshow(cm, cmap="Blues") ax.set_xticks([0, 1]) ax.set_yticks([0, 1]) ax.set_xticklabels(["Pred 0", "Pred 1"]) ax.set_yticklabels(["True 0", "True 1"]) ax.set_ylabel("True label") ax.set_xlabel("Predicted label") for i in range(2): for j in range(2): ax.text(j, i, str(cm[i, j]), ha="center", va="center", color="white" if cm[i, j] > cm.max() / 2 else "black", fontweight="bold") ax.set_title(f"LOPO {s['label']} @ t={s['opt_threshold']:.3f}") fig.tight_layout() path = os.path.join(PLOTS_DIR, f"confusion_matrix_{name}.png") fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") def run_geo_weight_search(): print("\n=== Geometric weight grid search ===") by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) features = SELECTED_FEATURES["face_orientation"] sf_idx = features.index("s_face") se_idx = features.index("s_eye") alphas = np.arange(0.2, 0.85, 0.1).round(1) alpha_f1 = {a: [] for a in alphas} for held_out in persons: X_test, y_test = by_person[held_out] sf = X_test[:, sf_idx] se = X_test[:, se_idx] train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) sf_tr = train_X[:, sf_idx] se_tr = train_X[:, se_idx] for a in alphas: score_tr = a * sf_tr + (1.0 - a) * se_tr opt_t, *_ = _youdens_j(train_y, score_tr) score_te = a * sf + (1.0 - a) * se f1 = _f1_at_threshold(y_test, score_te, opt_t) alpha_f1[a].append(f1) mean_f1 = {a: np.mean(f1s) for a, f1s in alpha_f1.items()} best_alpha = max(mean_f1, key=mean_f1.get) fig, ax = plt.subplots(figsize=(7, 4)) ax.bar([f"{a:.1f}" for a in alphas], [mean_f1[a] for a in alphas], color="steelblue") ax.set_xlabel("Face weight (alpha); eye weight = 1 - alpha") ax.set_ylabel("Mean LOPO F1") ax.set_title("Geometric Pipeline: Face vs Eye Weight Search") ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) for i, a in enumerate(alphas): ax.text(i, mean_f1[a] + 0.003, f"{mean_f1[a]:.3f}", ha="center", va="bottom", fontsize=8) fig.tight_layout() path = os.path.join(PLOTS_DIR, "geo_weight_search.png") fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") print(f" Best alpha (face weight) = {best_alpha:.1f}, " f"mean LOPO F1 = {mean_f1[best_alpha]:.4f}") return dict(mean_f1), best_alpha def run_hybrid_weight_search(lopo_results): print("\n=== Hybrid weight grid search ===") by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) features = SELECTED_FEATURES["face_orientation"] sf_idx = features.index("s_face") se_idx = features.index("s_eye") GEO_FACE_W = 0.7 GEO_EYE_W = 0.3 w_mlps = np.arange(0.3, 0.85, 0.1).round(1) wmf1 = {w: [] for w in w_mlps} mlp_p = lopo_results["mlp"]["p"] offset = 0 for held_out in persons: X_test, y_test = by_person[held_out] n = X_test.shape[0] mlp_prob_fold = mlp_p[offset:offset + n] offset += n sf = X_test[:, sf_idx] se = X_test[:, se_idx] geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) sf_tr = train_X[:, sf_idx] se_tr = train_X[:, se_idx] geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) scaler = StandardScaler().fit(train_X) mlp_tr = MLPClassifier( hidden_layer_sizes=(64, 32), activation="relu", max_iter=200, early_stopping=True, validation_fraction=0.15, random_state=SEED, verbose=False, ) mlp_tr.fit(scaler.transform(train_X), train_y) mlp_prob_tr = mlp_tr.predict_proba(scaler.transform(train_X))[:, 1] for w in w_mlps: combo_tr = w * mlp_prob_tr + (1.0 - w) * geo_tr opt_t, *_ = _youdens_j(train_y, combo_tr) combo_te = w * mlp_prob_fold + (1.0 - w) * geo_score f1 = _f1_at_threshold(y_test, combo_te, opt_t) wmf1[w].append(f1) mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()} best_w = max(mean_f1, key=mean_f1.get) fig, ax = plt.subplots(figsize=(7, 4)) ax.bar([f"{w:.1f}" for w in w_mlps], [mean_f1[w] for w in w_mlps], color="darkorange") ax.set_xlabel("MLP weight (w_mlp); geo weight = 1 - w_mlp") ax.set_ylabel("Mean LOPO F1") ax.set_title("Hybrid Pipeline: MLP vs Geometric Weight Search") ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) for i, w in enumerate(w_mlps): ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}", ha="center", va="bottom", fontsize=8) fig.tight_layout() path = os.path.join(PLOTS_DIR, "hybrid_weight_search.png") fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") print(f" Best w_mlp = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}") return dict(mean_f1), best_w def run_hybrid_xgb_weight_search(lopo_results): """Grid search: XGBoost prob + geometric. Same structure as MLP hybrid.""" print("\n=== Hybrid XGBoost weight grid search ===") by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) features = SELECTED_FEATURES["face_orientation"] sf_idx = features.index("s_face") se_idx = features.index("s_eye") GEO_FACE_W = 0.7 GEO_EYE_W = 0.3 w_xgbs = np.arange(0.3, 0.85, 0.1).round(1) wmf1 = {w: [] for w in w_xgbs} xgb_p = lopo_results["xgb"]["p"] offset = 0 for held_out in persons: X_test, y_test = by_person[held_out] n = X_test.shape[0] xgb_prob_fold = xgb_p[offset : offset + n] offset += n sf = X_test[:, sf_idx] se = X_test[:, se_idx] geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) sf_tr = train_X[:, sf_idx] se_tr = train_X[:, se_idx] geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) scaler = StandardScaler().fit(train_X) X_tr_sc = scaler.transform(train_X) xgb_tr = XGBClassifier( n_estimators=600, max_depth=8, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, eval_metric="logloss", random_state=SEED, verbosity=0, ) xgb_tr.fit(X_tr_sc, train_y) xgb_prob_tr = xgb_tr.predict_proba(X_tr_sc)[:, 1] for w in w_xgbs: combo_tr = w * xgb_prob_tr + (1.0 - w) * geo_tr opt_t, *_ = _youdens_j(train_y, combo_tr) combo_te = w * xgb_prob_fold + (1.0 - w) * geo_score f1 = _f1_at_threshold(y_test, combo_te, opt_t) wmf1[w].append(f1) mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()} best_w = max(mean_f1, key=mean_f1.get) fig, ax = plt.subplots(figsize=(7, 4)) ax.bar([f"{w:.1f}" for w in w_xgbs], [mean_f1[w] for w in w_xgbs], color="steelblue") ax.set_xlabel("XGBoost weight (w_xgb); geo weight = 1 - w_xgb") ax.set_ylabel("Mean LOPO F1") ax.set_title("Hybrid Pipeline: XGBoost vs Geometric Weight Search") ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) for i, w in enumerate(w_xgbs): ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}", ha="center", va="bottom", fontsize=8) fig.tight_layout() path = os.path.join(PLOTS_DIR, "hybrid_xgb_weight_search.png") fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") print(f" Best w_xgb = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}") return dict(mean_f1), best_w def run_hybrid_lr_combiner(lopo_results, use_xgb=True): """LR combiner: meta-features = [model_prob, geo_score], learned weights instead of grid search.""" print("\n=== Hybrid LR combiner (LOPO) ===") by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) features = SELECTED_FEATURES["face_orientation"] sf_idx = features.index("s_face") se_idx = features.index("s_eye") GEO_FACE_W = 0.7 GEO_EYE_W = 0.3 key = "xgb" if use_xgb else "mlp" model_p = lopo_results[key]["p"] offset = 0 fold_f1s = [] for held_out in persons: X_test, y_test = by_person[held_out] n = X_test.shape[0] prob_fold = model_p[offset : offset + n] offset += n sf = X_test[:, sf_idx] se = X_test[:, se_idx] geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) meta_te = np.column_stack([prob_fold, geo_score]) train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) sf_tr = train_X[:, sf_idx] se_tr = train_X[:, se_idx] geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) scaler = StandardScaler().fit(train_X) X_tr_sc = scaler.transform(train_X) if use_xgb: xgb_tr = XGBClassifier( n_estimators=600, max_depth=8, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, eval_metric="logloss", random_state=SEED, verbosity=0, ) xgb_tr.fit(X_tr_sc, train_y) prob_tr = xgb_tr.predict_proba(X_tr_sc)[:, 1] else: mlp_tr = MLPClassifier( hidden_layer_sizes=(64, 32), activation="relu", max_iter=200, early_stopping=True, validation_fraction=0.15, random_state=SEED, verbose=False, ) mlp_tr.fit(X_tr_sc, train_y) prob_tr = mlp_tr.predict_proba(X_tr_sc)[:, 1] meta_tr = np.column_stack([prob_tr, geo_tr]) lr = LogisticRegression(C=1.0, max_iter=500, random_state=SEED) lr.fit(meta_tr, train_y) p_tr = lr.predict_proba(meta_tr)[:, 1] opt_t, *_ = _youdens_j(train_y, p_tr) p_te = lr.predict_proba(meta_te)[:, 1] f1 = _f1_at_threshold(y_test, p_te, opt_t) fold_f1s.append(f1) print(f" fold {held_out}: F1 = {f1:.4f} (threshold = {opt_t:.3f})") mean_f1 = float(np.mean(fold_f1s)) print(f" LR combiner mean LOPO F1 = {mean_f1:.4f}") return mean_f1 def train_and_save_hybrid_combiner(lopo_results, use_xgb, geo_face_weight=0.7, geo_eye_weight=0.3, combiner_path=None): """Build OOS meta-dataset from LOPO predictions, train one LR, save joblib + optimal threshold.""" by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) features = SELECTED_FEATURES["face_orientation"] sf_idx = features.index("s_face") se_idx = features.index("s_eye") key = "xgb" if use_xgb else "mlp" model_p = lopo_results[key]["p"] meta_y = lopo_results[key]["y"] geo_list = [] offset = 0 for p in persons: X, _ = by_person[p] n = X.shape[0] sf = X[:, sf_idx] se = X[:, se_idx] geo_list.append(np.clip(geo_face_weight * sf + geo_eye_weight * se, 0, 1)) offset += n geo_all = np.concatenate(geo_list) meta_X = np.column_stack([model_p, geo_all]) lr = LogisticRegression(C=1.0, max_iter=500, random_state=SEED) lr.fit(meta_X, meta_y) p = lr.predict_proba(meta_X)[:, 1] opt_threshold, *_ = _youdens_j(meta_y, p) if combiner_path is None: combiner_path = os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_combiner.joblib") os.makedirs(os.path.dirname(combiner_path), exist_ok=True) joblib.dump({ "combiner": lr, "threshold": float(opt_threshold), "use_xgb": bool(use_xgb), "geo_face_weight": geo_face_weight, "geo_eye_weight": geo_eye_weight, }, combiner_path) print(f" Saved combiner to {combiner_path} (threshold={opt_threshold:.3f})") return opt_threshold, combiner_path def plot_distributions(): print("\n=== EAR / MAR distributions ===") npz_files = sorted(glob.glob(os.path.join(_PROJECT_ROOT, "data", "collected_*", "*.npz"))) all_ear_l, all_ear_r, all_mar, all_labels = [], [], [], [] for f in npz_files: d = np.load(f, allow_pickle=True) names = list(d["feature_names"]) feat = d["features"].astype(np.float32) lab = d["labels"].astype(np.int64) all_ear_l.append(feat[:, names.index("ear_left")]) all_ear_r.append(feat[:, names.index("ear_right")]) all_mar.append(feat[:, names.index("mar")]) all_labels.append(lab) ear_l = np.concatenate(all_ear_l) ear_r = np.concatenate(all_ear_r) mar = np.concatenate(all_mar) labels = np.concatenate(all_labels) ear_min = np.minimum(ear_l, ear_r) ear_plot = np.clip(ear_min, 0, 0.85) mar_plot = np.clip(mar, 0, 1.5) fig, ax = plt.subplots(figsize=(7, 4)) ax.hist(ear_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True) ax.hist(ear_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True) for val, lbl, c in [ (0.16, "ear_closed = 0.16", "red"), (0.21, "EAR_BLINK = 0.21", "orange"), (0.30, "ear_open = 0.30", "green"), ]: ax.axvline(val, color=c, ls="--", lw=1.5, label=lbl) ax.set_xlabel("min(left_EAR, right_EAR)") ax.set_ylabel("Density") ax.set_title("EAR Distribution by Class (144k samples)") ax.legend(fontsize=8) fig.tight_layout() path = os.path.join(PLOTS_DIR, "ear_distribution.png") fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") fig, ax = plt.subplots(figsize=(7, 4)) ax.hist(mar_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True) ax.hist(mar_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True) ax.axvline(0.55, color="red", ls="--", lw=1.5, label="MAR_YAWN = 0.55") ax.set_xlabel("Mouth Aspect Ratio (MAR)") ax.set_ylabel("Density") ax.set_title("MAR Distribution by Class (144k samples)") ax.legend(fontsize=8) fig.tight_layout() path = os.path.join(PLOTS_DIR, "mar_distribution.png") fig.savefig(path, dpi=150) plt.close(fig) print(f" saved {path}") closed_pct = np.mean(ear_min < 0.16) * 100 blink_pct = np.mean(ear_min < 0.21) * 100 open_pct = np.mean(ear_min >= 0.30) * 100 yawn_pct = np.mean(mar > 0.55) * 100 stats = { "ear_below_016": closed_pct, "ear_below_021": blink_pct, "ear_above_030": open_pct, "mar_above_055": yawn_pct, "n_samples": len(ear_min), } print(f" EAR<0.16 (closed): {closed_pct:.1f}% | EAR<0.21 (blink): {blink_pct:.1f}% | " f"EAR>=0.30 (open): {open_pct:.1f}%") print(f" MAR>0.55 (yawn): {yawn_pct:.1f}%") return stats def write_report(model_stats, extended_stats, geo_f1, best_alpha, hybrid_mlp_f1, best_w_mlp, hybrid_xgb_f1, best_w_xgb, use_xgb_for_hybrid, dist_stats, lr_combiner_f1=None): lines = [] lines.append("# Threshold Justification Report") lines.append("") lines.append("Auto-generated by `evaluation/justify_thresholds.py` using LOPO cross-validation " "over 9 participants (~145k samples).") lines.append("") lines.append("## 1. ML Model Decision Thresholds") lines.append("") lines.append("Thresholds selected via **Youden's J statistic** (J = sensitivity + specificity - 1) " "on pooled LOPO held-out predictions.") lines.append("") lines.append("| Model | LOPO AUC | Optimal Threshold (Youden's J) | F1 @ Optimal | F1 @ 0.50 |") lines.append("|-------|----------|-------------------------------|--------------|-----------|") for key in ("mlp", "xgb"): s = model_stats[key] lines.append(f"| {s['label']} | {s['auc']:.4f} | **{s['opt_threshold']:.3f}** | " f"{s['f1_opt']:.4f} | {s['f1_50']:.4f} |") lines.append("") lines.append("![MLP ROC](plots/roc_mlp.png)") lines.append("") lines.append("![XGBoost ROC](plots/roc_xgboost.png)") lines.append("") lines.append("## 2. Precision, Recall and Tradeoff") lines.append("") lines.append("At the optimal threshold (Youden's J), pooled over all LOPO held-out predictions:") lines.append("") lines.append("| Model | Threshold | Precision | Recall | F1 | Accuracy |") lines.append("|-------|----------:|----------:|-------:|---:|---------:|") for key in ("mlp", "xgb"): s = extended_stats[key] lines.append(f"| {s['label']} | {s['opt_threshold']:.3f} | {s['precision_pooled']:.4f} | " f"{s['recall_pooled']:.4f} | {model_stats[key]['f1_opt']:.4f} | {s['accuracy_pooled']:.4f} |") lines.append("") lines.append("Higher threshold → fewer positive predictions → higher precision, lower recall. " "Youden's J picks the threshold that balances sensitivity and specificity (recall for the positive class and true negative rate).") lines.append("") lines.append("## 3. Confusion Matrix (Pooled LOPO)") lines.append("") lines.append("At optimal threshold. Rows = true label, columns = predicted label (0 = unfocused, 1 = focused).") lines.append("") for key in ("mlp", "xgb"): s = extended_stats[key] lines.append(f"### {s['label']}") lines.append("") lines.append("| | Pred 0 | Pred 1 |") lines.append("|--|-------:|-------:|") cm = s["confusion_matrix"] if cm.shape == (2, 2): lines.append(f"| **True 0** | {cm[0,0]} (TN) | {cm[0,1]} (FP) |") lines.append(f"| **True 1** | {cm[1,0]} (FN) | {cm[1,1]} (TP) |") lines.append("") lines.append(f"TN={s['tn']}, FP={s['fp']}, FN={s['fn']}, TP={s['tp']}. ") lines.append("") lines.append("![Confusion MLP](plots/confusion_matrix_mlp.png)") lines.append("") lines.append("![Confusion XGBoost](plots/confusion_matrix_xgb.png)") lines.append("") lines.append("## 4. Per-Person Performance Variance (LOPO)") lines.append("") lines.append("One fold per left-out person; metrics at optimal threshold.") lines.append("") for key in ("mlp", "xgb"): s = extended_stats[key] lines.append(f"### {s['label']} — per held-out person") lines.append("") lines.append("| Person | Accuracy | F1 | Precision | Recall |") lines.append("|--------|---------:|---:|----------:|-------:|") for row in s["per_person"]: lines.append(f"| {row['person']} | {row['accuracy']:.4f} | {row['f1']:.4f} | {row['precision']:.4f} | {row['recall']:.4f} |") lines.append("") lines.append("### Summary across persons") lines.append("") lines.append("| Model | Accuracy mean ± std | F1 mean ± std | Precision mean ± std | Recall mean ± std |") lines.append("|-------|---------------------|---------------|----------------------|-------------------|") for key in ("mlp", "xgb"): s = extended_stats[key] lines.append(f"| {s['label']} | {s['accuracy_mean']:.4f} ± {s['accuracy_std']:.4f} | " f"{s['f1_mean']:.4f} ± {s['f1_std']:.4f} | " f"{s['precision_mean']:.4f} ± {s['precision_std']:.4f} | " f"{s['recall_mean']:.4f} ± {s['recall_std']:.4f} |") lines.append("") lines.append("## 5. Confidence Intervals (95%, LOPO over 9 persons)") lines.append("") lines.append("Mean ± half-width of 95% t-interval (df=8) for each metric across the 9 left-out persons.") lines.append("") lines.append("| Model | F1 | Accuracy | Precision | Recall |") lines.append("|-------|---:|--------:|----------:|-------:|") for key in ("mlp", "xgb"): s = extended_stats[key] f1_lo = s["f1_mean"] - s["f1_ci_half"] f1_hi = s["f1_mean"] + s["f1_ci_half"] acc_lo = s["accuracy_mean"] - s["accuracy_ci_half"] acc_hi = s["accuracy_mean"] + s["accuracy_ci_half"] prec_lo = s["precision_mean"] - s["precision_ci_half"] prec_hi = s["precision_mean"] + s["precision_ci_half"] rec_lo = s["recall_mean"] - s["recall_ci_half"] rec_hi = s["recall_mean"] + s["recall_ci_half"] lines.append(f"| {s['label']} | {s['f1_mean']:.4f} [{f1_lo:.4f}, {f1_hi:.4f}] | " f"{s['accuracy_mean']:.4f} [{acc_lo:.4f}, {acc_hi:.4f}] | " f"{s['precision_mean']:.4f} [{prec_lo:.4f}, {prec_hi:.4f}] | " f"{s['recall_mean']:.4f} [{rec_lo:.4f}, {rec_hi:.4f}] |") lines.append("") lines.append("## 6. Geometric Pipeline Weights (s_face vs s_eye)") lines.append("") lines.append("Grid search over face weight alpha in {0.2 ... 0.8}. " "Eye weight = 1 - alpha. Threshold per fold via Youden's J.") lines.append("") lines.append("| Face Weight (alpha) | Mean LOPO F1 |") lines.append("|--------------------:|-------------:|") for a in sorted(geo_f1.keys()): marker = " **<-- selected**" if a == best_alpha else "" lines.append(f"| {a:.1f} | {geo_f1[a]:.4f}{marker} |") lines.append("") lines.append(f"**Best:** alpha = {best_alpha:.1f} (face {best_alpha*100:.0f}%, " f"eye {(1-best_alpha)*100:.0f}%)") lines.append("") lines.append("![Geometric weight search](plots/geo_weight_search.png)") lines.append("") lines.append("## 7. Hybrid Pipeline: MLP vs Geometric") lines.append("") lines.append("Grid search over w_mlp in {0.3 ... 0.8}. w_geo = 1 - w_mlp. " "Geometric sub-score uses same weights as geometric pipeline (face=0.7, eye=0.3).") lines.append("") lines.append("| MLP Weight (w_mlp) | Mean LOPO F1 |") lines.append("|-------------------:|-------------:|") for w in sorted(hybrid_mlp_f1.keys()): marker = " **<-- selected**" if w == best_w_mlp else "" lines.append(f"| {w:.1f} | {hybrid_mlp_f1[w]:.4f}{marker} |") lines.append("") lines.append(f"**Best:** w_mlp = {best_w_mlp:.1f} (MLP {best_w_mlp*100:.0f}%, " f"geometric {(1-best_w_mlp)*100:.0f}%) → mean LOPO F1 = {hybrid_mlp_f1[best_w_mlp]:.4f}") lines.append("") lines.append("![Hybrid MLP weight search](plots/hybrid_weight_search.png)") lines.append("") lines.append("## 8. Hybrid Pipeline: XGBoost vs Geometric") lines.append("") lines.append("Same grid over w_xgb in {0.3 ... 0.8}. w_geo = 1 - w_xgb.") lines.append("") lines.append("| XGBoost Weight (w_xgb) | Mean LOPO F1 |") lines.append("|-----------------------:|-------------:|") for w in sorted(hybrid_xgb_f1.keys()): marker = " **<-- selected**" if w == best_w_xgb else "" lines.append(f"| {w:.1f} | {hybrid_xgb_f1[w]:.4f}{marker} |") lines.append("") lines.append(f"**Best:** w_xgb = {best_w_xgb:.1f} → mean LOPO F1 = {hybrid_xgb_f1[best_w_xgb]:.4f}") lines.append("") lines.append("![Hybrid XGBoost weight search](plots/hybrid_xgb_weight_search.png)") lines.append("") f1_mlp = hybrid_mlp_f1[best_w_mlp] f1_xgb = hybrid_xgb_f1[best_w_xgb] lines.append("### Which hybrid is used in the app?") lines.append("") if use_xgb_for_hybrid: lines.append(f"**XGBoost hybrid is better** (F1 = {f1_xgb:.4f} vs MLP hybrid F1 = {f1_mlp:.4f}).") else: lines.append(f"**MLP hybrid is better** (F1 = {f1_mlp:.4f} vs XGBoost hybrid F1 = {f1_xgb:.4f}).") lines.append("") if lr_combiner_f1 is not None: lines.append("### Logistic regression combiner (replaces heuristic weights)") lines.append("") lines.append("Instead of a fixed linear blend (e.g. 0.3·ML + 0.7·geo), a **logistic regression** " "combines model probability and geometric score: meta-features = [model_prob, geo_score], " "trained on the same LOPO splits. Threshold from Youden's J on combiner output.") lines.append("") lines.append(f"| Method | Mean LOPO F1 |") lines.append("|--------|-------------:|") lines.append(f"| Heuristic weight grid (best w) | {(f1_xgb if use_xgb_for_hybrid else f1_mlp):.4f} |") lines.append(f"| **LR combiner** | **{lr_combiner_f1:.4f}** |") lines.append("") lines.append("The app uses the saved LR combiner when `combiner_path` is set in `hybrid_focus_config.json`.") lines.append("") else: if use_xgb_for_hybrid: lines.append("The app uses **XGBoost + geometric** with the weights above.") else: lines.append("The app uses **MLP + geometric** with the weights above.") lines.append("") lines.append("## 5. Eye and Mouth Aspect Ratio Thresholds") lines.append("") lines.append("### EAR (Eye Aspect Ratio)") lines.append("") lines.append("Reference: Soukupova & Cech, \"Real-Time Eye Blink Detection Using Facial " "Landmarks\" (2016) established EAR ~ 0.2 as a blink threshold.") lines.append("") lines.append("Our thresholds define a linear interpolation zone around this established value:") lines.append("") lines.append("| Constant | Value | Justification |") lines.append("|----------|------:|---------------|") lines.append(f"| `ear_closed` | 0.16 | Below this, eyes are fully shut. " f"{dist_stats['ear_below_016']:.1f}% of samples fall here. |") lines.append(f"| `EAR_BLINK_THRESH` | 0.21 | Blink detection point; close to the 0.2 reference. " f"{dist_stats['ear_below_021']:.1f}% of samples below. |") lines.append(f"| `ear_open` | 0.30 | Above this, eyes are fully open. " f"{dist_stats['ear_above_030']:.1f}% of samples here. |") lines.append("") lines.append("Between 0.16 and 0.30 the `_ear_score` function linearly interpolates from 0 to 1, " "providing a smooth transition rather than a hard binary cutoff.") lines.append("") lines.append("![EAR distribution](plots/ear_distribution.png)") lines.append("") lines.append("### MAR (Mouth Aspect Ratio)") lines.append("") lines.append(f"| Constant | Value | Justification |") lines.append("|----------|------:|---------------|") lines.append(f"| `MAR_YAWN_THRESHOLD` | 0.55 | Only {dist_stats['mar_above_055']:.1f}% of " f"samples exceed this, confirming it captures genuine yawns without false positives. |") lines.append("") lines.append("![MAR distribution](plots/mar_distribution.png)") lines.append("") lines.append("## 10. Other Constants") lines.append("") lines.append("| Constant | Value | Rationale |") lines.append("|----------|------:|-----------|") lines.append("| `gaze_max_offset` | 0.28 | Max iris displacement (normalised) before gaze score " "drops to zero. Corresponds to ~56% of the eye width; beyond this the iris is at " "the extreme edge. |") lines.append("| `max_angle` | 22.0 deg | Head deviation beyond which face score = 0. Based on " "typical monitor-viewing cone: at 60 cm distance and a 24\" monitor, the viewing " "angle is ~20-25 degrees. |") lines.append("| `roll_weight` | 0.5 | Roll is less indicative of inattention than yaw/pitch " "(tilting head doesn't mean looking away), so it's down-weighted by 50%. |") lines.append("| `EMA alpha` | 0.3 | Smoothing factor for focus score. " "Gives ~3-4 frame effective window; balances responsiveness vs flicker. |") lines.append("| `grace_frames` | 15 | ~0.5 s at 30 fps before penalising no-face. Allows brief " "occlusions (e.g. hand gesture) without dropping score. |") lines.append("| `PERCLOS_WINDOW` | 60 frames | 2 s at 30 fps; standard PERCLOS measurement " "window (Dinges & Grace, 1998). |") lines.append("| `BLINK_WINDOW_SEC` | 30 s | Blink rate measured over 30 s; typical spontaneous " "blink rate is 15-20/min (Bentivoglio et al., 1997). |") lines.append("") with open(REPORT_PATH, "w", encoding="utf-8") as f: f.write("\n".join(lines)) print(f"\nReport written to {REPORT_PATH}") def write_hybrid_config(use_xgb, best_w_mlp, best_w_xgb, config_path, combiner_path=None, combiner_threshold=None): """Write hybrid_focus_config.json. If combiner_path set, app uses LR combiner instead of heuristic weights.""" import json if use_xgb: w_xgb = round(float(best_w_xgb), 2) w_geo = round(1.0 - best_w_xgb, 2) w_mlp = 0.3 else: w_mlp = round(float(best_w_mlp), 2) w_geo = round(1.0 - best_w_mlp, 2) w_xgb = 0.0 cfg = { "use_xgb": bool(use_xgb), "w_mlp": w_mlp, "w_xgb": w_xgb, "w_geo": w_geo, "threshold": float(combiner_threshold) if combiner_threshold is not None else 0.35, "use_yawn_veto": True, "geo_face_weight": 0.7, "geo_eye_weight": 0.3, "mar_yawn_threshold": 0.55, "metric": "f1", } if combiner_path: cfg["combiner"] = "logistic" cfg["combiner_path"] = os.path.basename(combiner_path) with open(config_path, "w", encoding="utf-8") as f: json.dump(cfg, f, indent=2) print(f" Written {config_path} (use_xgb={cfg['use_xgb']}, combiner={cfg.get('combiner', 'heuristic')})") def main(): os.makedirs(PLOTS_DIR, exist_ok=True) lopo_results = run_lopo_models() model_stats = analyse_model_thresholds(lopo_results) extended_stats = analyse_precision_recall_confusion(lopo_results, model_stats) plot_confusion_matrices(extended_stats) geo_f1, best_alpha = run_geo_weight_search() hybrid_mlp_f1, best_w_mlp = run_hybrid_weight_search(lopo_results) hybrid_xgb_f1, best_w_xgb = run_hybrid_xgb_weight_search(lopo_results) dist_stats = plot_distributions() f1_mlp = hybrid_mlp_f1[best_w_mlp] f1_xgb = hybrid_xgb_f1[best_w_xgb] use_xgb_for_hybrid = f1_xgb > f1_mlp print(f"\n Hybrid comparison: MLP F1 = {f1_mlp:.4f}, XGBoost F1 = {f1_xgb:.4f} → " f"use {'XGBoost' if use_xgb_for_hybrid else 'MLP'}") lr_combiner_f1 = run_hybrid_lr_combiner(lopo_results, use_xgb=use_xgb_for_hybrid) combiner_threshold, combiner_path = train_and_save_hybrid_combiner( lopo_results, use_xgb_for_hybrid, combiner_path=os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_combiner.joblib"), ) config_path = os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_focus_config.json") write_hybrid_config(use_xgb_for_hybrid, best_w_mlp, best_w_xgb, config_path, combiner_path=combiner_path, combiner_threshold=combiner_threshold) write_report(model_stats, extended_stats, geo_f1, best_alpha, hybrid_mlp_f1, best_w_mlp, hybrid_xgb_f1, best_w_xgb, use_xgb_for_hybrid, dist_stats, lr_combiner_f1=lr_combiner_f1) print("\nDone.") if __name__ == "__main__": main()