final / evaluation /justify_thresholds.py
k22056537
evaluation: channel ablation script + feature importance LOPO
e69e3a3
raw
history blame
41.2 kB
# LOPO threshold/weight analysis. Run: python -m evaluation.justify_thresholds
import glob
import os
import sys
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import joblib
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
roc_curve,
roc_auc_score,
f1_score,
precision_score,
recall_score,
accuracy_score,
confusion_matrix,
)
from xgboost import XGBClassifier
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, _PROJECT_ROOT)
from data_preparation.prepare_dataset import load_per_person, SELECTED_FEATURES
PLOTS_DIR = os.path.join(os.path.dirname(__file__), "plots")
REPORT_PATH = os.path.join(os.path.dirname(__file__), "THRESHOLD_JUSTIFICATION.md")
SEED = 42
def _youdens_j(y_true, y_prob):
fpr, tpr, thresholds = roc_curve(y_true, y_prob)
j = tpr - fpr
idx = j.argmax()
auc = roc_auc_score(y_true, y_prob)
return float(thresholds[idx]), fpr, tpr, thresholds, float(auc)
def _f1_at_threshold(y_true, y_prob, threshold):
return f1_score(y_true, (y_prob >= threshold).astype(int), zero_division=0)
def _plot_roc(fpr, tpr, auc, opt_thresh, opt_idx, title, path):
fig, ax = plt.subplots(figsize=(6, 5))
ax.plot(fpr, tpr, lw=2, label=f"ROC (AUC = {auc:.4f})")
ax.plot(fpr[opt_idx], tpr[opt_idx], "ro", markersize=10,
label=f"Youden's J optimum (t = {opt_thresh:.3f})")
ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5)
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title(title)
ax.legend(loc="lower right")
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
def run_lopo_models():
print("\n=== LOPO: MLP and XGBoost ===")
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
results = {"mlp": {"y": [], "p": [], "y_folds": [], "p_folds": []},
"xgb": {"y": [], "p": [], "y_folds": [], "p_folds": []}}
for i, held_out in enumerate(persons):
X_test, y_test = by_person[held_out]
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
scaler = StandardScaler().fit(train_X)
X_tr_sc = scaler.transform(train_X)
X_te_sc = scaler.transform(X_test)
mlp = MLPClassifier(
hidden_layer_sizes=(64, 32), activation="relu",
max_iter=200, early_stopping=True, validation_fraction=0.15,
random_state=SEED, verbose=False,
)
mlp.fit(X_tr_sc, train_y)
mlp_prob = mlp.predict_proba(X_te_sc)[:, 1]
results["mlp"]["y"].append(y_test)
results["mlp"]["p"].append(mlp_prob)
results["mlp"]["y_folds"].append(y_test)
results["mlp"]["p_folds"].append(mlp_prob)
xgb = XGBClassifier(
n_estimators=600, max_depth=8, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8,
reg_alpha=0.1, reg_lambda=1.0,
eval_metric="logloss",
random_state=SEED, verbosity=0,
)
xgb.fit(X_tr_sc, train_y)
xgb_prob = xgb.predict_proba(X_te_sc)[:, 1]
results["xgb"]["y"].append(y_test)
results["xgb"]["p"].append(xgb_prob)
results["xgb"]["y_folds"].append(y_test)
results["xgb"]["p_folds"].append(xgb_prob)
print(f" fold {i+1}/{len(persons)}: held out {held_out} "
f"({X_test.shape[0]} samples)")
results["persons"] = persons
for key in ("mlp", "xgb"):
results[key]["y"] = np.concatenate(results[key]["y"])
results[key]["p"] = np.concatenate(results[key]["p"])
return results
def analyse_model_thresholds(results):
print("\n=== Model threshold analysis ===")
model_stats = {}
for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]:
y, p = results[name]["y"], results[name]["p"]
opt_t, fpr, tpr, thresholds, auc = _youdens_j(y, p)
j = tpr - fpr
opt_idx = j.argmax()
f1_opt = _f1_at_threshold(y, p, opt_t)
f1_50 = _f1_at_threshold(y, p, 0.50)
path = os.path.join(PLOTS_DIR, f"roc_{name}.png")
_plot_roc(fpr, tpr, auc, opt_t, opt_idx,
f"LOPO ROC — {label} (9 folds, 144k samples)", path)
model_stats[name] = {
"label": label, "auc": auc,
"opt_threshold": opt_t, "f1_opt": f1_opt, "f1_50": f1_50,
}
print(f" {label}: AUC={auc:.4f}, optimal threshold={opt_t:.3f} "
f"(F1={f1_opt:.4f}), F1@0.50={f1_50:.4f}")
return model_stats
def _ci_95_t(n):
"""95% CI half-width multiplier (t-distribution, df=n-1). Approximate for small n."""
if n <= 1:
return 0.0
df = n - 1
t_975 = [0, 12.71, 4.30, 3.18, 2.78, 2.57, 2.45, 2.37, 2.31]
if df < len(t_975):
return float(t_975[df])
if df <= 30:
return 2.0 + (30 - df) / 100
return 1.96
def analyse_precision_recall_confusion(results, model_stats):
"""Precision/recall at optimal threshold, pooled confusion matrix, per-fold metrics, 95% CIs."""
print("\n=== Precision, recall, confusion matrix, per-person variance ===")
from sklearn.metrics import precision_recall_curve, average_precision_score
extended = {}
persons = results["persons"]
n_folds = len(persons)
for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]:
y_all = results[name]["y"]
p_all = results[name]["p"]
y_folds = results[name]["y_folds"]
p_folds = results[name]["p_folds"]
opt_t = model_stats[name]["opt_threshold"]
y_pred = (p_all >= opt_t).astype(int)
prec_pooled = precision_score(y_all, y_pred, zero_division=0)
rec_pooled = recall_score(y_all, y_pred, zero_division=0)
acc_pooled = accuracy_score(y_all, y_pred)
cm = confusion_matrix(y_all, y_pred)
if cm.shape == (2, 2):
tn, fp, fn, tp = cm.ravel()
else:
tn = fp = fn = tp = 0
prec_folds = []
rec_folds = []
acc_folds = []
f1_folds = []
per_person = []
for k, (y_f, p_f) in enumerate(zip(y_folds, p_folds)):
pred_f = (p_f >= opt_t).astype(int)
prec_f = precision_score(y_f, pred_f, zero_division=0)
rec_f = recall_score(y_f, pred_f, zero_division=0)
acc_f = accuracy_score(y_f, pred_f)
f1_f = f1_score(y_f, pred_f, zero_division=0)
prec_folds.append(prec_f)
rec_folds.append(rec_f)
acc_folds.append(acc_f)
f1_folds.append(f1_f)
per_person.append({
"person": persons[k],
"accuracy": acc_f,
"f1": f1_f,
"precision": prec_f,
"recall": rec_f,
})
t_mult = _ci_95_t(n_folds)
mean_acc = np.mean(acc_folds)
std_acc = np.std(acc_folds, ddof=1) if n_folds > 1 else 0.0
mean_f1 = np.mean(f1_folds)
std_f1 = np.std(f1_folds, ddof=1) if n_folds > 1 else 0.0
mean_prec = np.mean(prec_folds)
std_prec = np.std(prec_folds, ddof=1) if n_folds > 1 else 0.0
mean_rec = np.mean(rec_folds)
std_rec = np.std(rec_folds, ddof=1) if n_folds > 1 else 0.0
extended[name] = {
"label": label,
"opt_threshold": opt_t,
"precision_pooled": prec_pooled,
"recall_pooled": rec_pooled,
"accuracy_pooled": acc_pooled,
"confusion_matrix": cm,
"tn": int(tn), "fp": int(fp), "fn": int(fn), "tp": int(tp),
"per_person": per_person,
"accuracy_mean": mean_acc, "accuracy_std": std_acc,
"accuracy_ci_half": t_mult * (std_acc / np.sqrt(n_folds)) if n_folds > 1 else 0.0,
"f1_mean": mean_f1, "f1_std": std_f1,
"f1_ci_half": t_mult * (std_f1 / np.sqrt(n_folds)) if n_folds > 1 else 0.0,
"precision_mean": mean_prec, "precision_std": std_prec,
"precision_ci_half": t_mult * (std_prec / np.sqrt(n_folds)) if n_folds > 1 else 0.0,
"recall_mean": mean_rec, "recall_std": std_rec,
"recall_ci_half": t_mult * (std_rec / np.sqrt(n_folds)) if n_folds > 1 else 0.0,
"n_folds": n_folds,
}
print(f" {label}: precision={prec_pooled:.4f}, recall={rec_pooled:.4f} | "
f"per-fold F1 mean={mean_f1:.4f} ± {std_f1:.4f} "
f"(95% CI [{mean_f1 - extended[name]['f1_ci_half']:.4f}, {mean_f1 + extended[name]['f1_ci_half']:.4f}])")
return extended
def plot_confusion_matrices(extended_stats):
"""Save confusion matrix heatmaps for MLP and XGBoost."""
for name in ("mlp", "xgb"):
s = extended_stats[name]
cm = s["confusion_matrix"]
fig, ax = plt.subplots(figsize=(4, 3))
im = ax.imshow(cm, cmap="Blues")
ax.set_xticks([0, 1])
ax.set_yticks([0, 1])
ax.set_xticklabels(["Pred 0", "Pred 1"])
ax.set_yticklabels(["True 0", "True 1"])
ax.set_ylabel("True label")
ax.set_xlabel("Predicted label")
for i in range(2):
for j in range(2):
ax.text(j, i, str(cm[i, j]), ha="center", va="center", color="white" if cm[i, j] > cm.max() / 2 else "black", fontweight="bold")
ax.set_title(f"LOPO {s['label']} @ t={s['opt_threshold']:.3f}")
fig.tight_layout()
path = os.path.join(PLOTS_DIR, f"confusion_matrix_{name}.png")
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
def run_geo_weight_search():
print("\n=== Geometric weight grid search ===")
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
features = SELECTED_FEATURES["face_orientation"]
sf_idx = features.index("s_face")
se_idx = features.index("s_eye")
alphas = np.arange(0.2, 0.85, 0.1).round(1)
alpha_f1 = {a: [] for a in alphas}
for held_out in persons:
X_test, y_test = by_person[held_out]
sf = X_test[:, sf_idx]
se = X_test[:, se_idx]
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
sf_tr = train_X[:, sf_idx]
se_tr = train_X[:, se_idx]
for a in alphas:
score_tr = a * sf_tr + (1.0 - a) * se_tr
opt_t, *_ = _youdens_j(train_y, score_tr)
score_te = a * sf + (1.0 - a) * se
f1 = _f1_at_threshold(y_test, score_te, opt_t)
alpha_f1[a].append(f1)
mean_f1 = {a: np.mean(f1s) for a, f1s in alpha_f1.items()}
best_alpha = max(mean_f1, key=mean_f1.get)
fig, ax = plt.subplots(figsize=(7, 4))
ax.bar([f"{a:.1f}" for a in alphas],
[mean_f1[a] for a in alphas], color="steelblue")
ax.set_xlabel("Face weight (alpha); eye weight = 1 - alpha")
ax.set_ylabel("Mean LOPO F1")
ax.set_title("Geometric Pipeline: Face vs Eye Weight Search")
ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05))
for i, a in enumerate(alphas):
ax.text(i, mean_f1[a] + 0.003, f"{mean_f1[a]:.3f}",
ha="center", va="bottom", fontsize=8)
fig.tight_layout()
path = os.path.join(PLOTS_DIR, "geo_weight_search.png")
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
print(f" Best alpha (face weight) = {best_alpha:.1f}, "
f"mean LOPO F1 = {mean_f1[best_alpha]:.4f}")
return dict(mean_f1), best_alpha
def run_hybrid_weight_search(lopo_results):
print("\n=== Hybrid weight grid search ===")
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
features = SELECTED_FEATURES["face_orientation"]
sf_idx = features.index("s_face")
se_idx = features.index("s_eye")
GEO_FACE_W = 0.7
GEO_EYE_W = 0.3
w_mlps = np.arange(0.3, 0.85, 0.1).round(1)
wmf1 = {w: [] for w in w_mlps}
mlp_p = lopo_results["mlp"]["p"]
offset = 0
for held_out in persons:
X_test, y_test = by_person[held_out]
n = X_test.shape[0]
mlp_prob_fold = mlp_p[offset:offset + n]
offset += n
sf = X_test[:, sf_idx]
se = X_test[:, se_idx]
geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1)
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
sf_tr = train_X[:, sf_idx]
se_tr = train_X[:, se_idx]
geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1)
scaler = StandardScaler().fit(train_X)
mlp_tr = MLPClassifier(
hidden_layer_sizes=(64, 32), activation="relu",
max_iter=200, early_stopping=True, validation_fraction=0.15,
random_state=SEED, verbose=False,
)
mlp_tr.fit(scaler.transform(train_X), train_y)
mlp_prob_tr = mlp_tr.predict_proba(scaler.transform(train_X))[:, 1]
for w in w_mlps:
combo_tr = w * mlp_prob_tr + (1.0 - w) * geo_tr
opt_t, *_ = _youdens_j(train_y, combo_tr)
combo_te = w * mlp_prob_fold + (1.0 - w) * geo_score
f1 = _f1_at_threshold(y_test, combo_te, opt_t)
wmf1[w].append(f1)
mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()}
best_w = max(mean_f1, key=mean_f1.get)
fig, ax = plt.subplots(figsize=(7, 4))
ax.bar([f"{w:.1f}" for w in w_mlps],
[mean_f1[w] for w in w_mlps], color="darkorange")
ax.set_xlabel("MLP weight (w_mlp); geo weight = 1 - w_mlp")
ax.set_ylabel("Mean LOPO F1")
ax.set_title("Hybrid Pipeline: MLP vs Geometric Weight Search")
ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05))
for i, w in enumerate(w_mlps):
ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}",
ha="center", va="bottom", fontsize=8)
fig.tight_layout()
path = os.path.join(PLOTS_DIR, "hybrid_weight_search.png")
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
print(f" Best w_mlp = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}")
return dict(mean_f1), best_w
def run_hybrid_xgb_weight_search(lopo_results):
"""Grid search: XGBoost prob + geometric. Same structure as MLP hybrid."""
print("\n=== Hybrid XGBoost weight grid search ===")
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
features = SELECTED_FEATURES["face_orientation"]
sf_idx = features.index("s_face")
se_idx = features.index("s_eye")
GEO_FACE_W = 0.7
GEO_EYE_W = 0.3
w_xgbs = np.arange(0.3, 0.85, 0.1).round(1)
wmf1 = {w: [] for w in w_xgbs}
xgb_p = lopo_results["xgb"]["p"]
offset = 0
for held_out in persons:
X_test, y_test = by_person[held_out]
n = X_test.shape[0]
xgb_prob_fold = xgb_p[offset : offset + n]
offset += n
sf = X_test[:, sf_idx]
se = X_test[:, se_idx]
geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1)
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
sf_tr = train_X[:, sf_idx]
se_tr = train_X[:, se_idx]
geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1)
scaler = StandardScaler().fit(train_X)
X_tr_sc = scaler.transform(train_X)
xgb_tr = XGBClassifier(
n_estimators=600, max_depth=8, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8,
reg_alpha=0.1, reg_lambda=1.0,
eval_metric="logloss",
random_state=SEED, verbosity=0,
)
xgb_tr.fit(X_tr_sc, train_y)
xgb_prob_tr = xgb_tr.predict_proba(X_tr_sc)[:, 1]
for w in w_xgbs:
combo_tr = w * xgb_prob_tr + (1.0 - w) * geo_tr
opt_t, *_ = _youdens_j(train_y, combo_tr)
combo_te = w * xgb_prob_fold + (1.0 - w) * geo_score
f1 = _f1_at_threshold(y_test, combo_te, opt_t)
wmf1[w].append(f1)
mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()}
best_w = max(mean_f1, key=mean_f1.get)
fig, ax = plt.subplots(figsize=(7, 4))
ax.bar([f"{w:.1f}" for w in w_xgbs],
[mean_f1[w] for w in w_xgbs], color="steelblue")
ax.set_xlabel("XGBoost weight (w_xgb); geo weight = 1 - w_xgb")
ax.set_ylabel("Mean LOPO F1")
ax.set_title("Hybrid Pipeline: XGBoost vs Geometric Weight Search")
ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05))
for i, w in enumerate(w_xgbs):
ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}",
ha="center", va="bottom", fontsize=8)
fig.tight_layout()
path = os.path.join(PLOTS_DIR, "hybrid_xgb_weight_search.png")
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
print(f" Best w_xgb = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}")
return dict(mean_f1), best_w
def run_hybrid_lr_combiner(lopo_results, use_xgb=True):
"""LR combiner: meta-features = [model_prob, geo_score], learned weights instead of grid search."""
print("\n=== Hybrid LR combiner (LOPO) ===")
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
features = SELECTED_FEATURES["face_orientation"]
sf_idx = features.index("s_face")
se_idx = features.index("s_eye")
GEO_FACE_W = 0.7
GEO_EYE_W = 0.3
key = "xgb" if use_xgb else "mlp"
model_p = lopo_results[key]["p"]
offset = 0
fold_f1s = []
for held_out in persons:
X_test, y_test = by_person[held_out]
n = X_test.shape[0]
prob_fold = model_p[offset : offset + n]
offset += n
sf = X_test[:, sf_idx]
se = X_test[:, se_idx]
geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1)
meta_te = np.column_stack([prob_fold, geo_score])
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
sf_tr = train_X[:, sf_idx]
se_tr = train_X[:, se_idx]
geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1)
scaler = StandardScaler().fit(train_X)
X_tr_sc = scaler.transform(train_X)
if use_xgb:
xgb_tr = XGBClassifier(
n_estimators=600, max_depth=8, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8,
reg_alpha=0.1, reg_lambda=1.0,
eval_metric="logloss",
random_state=SEED, verbosity=0,
)
xgb_tr.fit(X_tr_sc, train_y)
prob_tr = xgb_tr.predict_proba(X_tr_sc)[:, 1]
else:
mlp_tr = MLPClassifier(
hidden_layer_sizes=(64, 32), activation="relu",
max_iter=200, early_stopping=True, validation_fraction=0.15,
random_state=SEED, verbose=False,
)
mlp_tr.fit(X_tr_sc, train_y)
prob_tr = mlp_tr.predict_proba(X_tr_sc)[:, 1]
meta_tr = np.column_stack([prob_tr, geo_tr])
lr = LogisticRegression(C=1.0, max_iter=500, random_state=SEED)
lr.fit(meta_tr, train_y)
p_tr = lr.predict_proba(meta_tr)[:, 1]
opt_t, *_ = _youdens_j(train_y, p_tr)
p_te = lr.predict_proba(meta_te)[:, 1]
f1 = _f1_at_threshold(y_test, p_te, opt_t)
fold_f1s.append(f1)
print(f" fold {held_out}: F1 = {f1:.4f} (threshold = {opt_t:.3f})")
mean_f1 = float(np.mean(fold_f1s))
print(f" LR combiner mean LOPO F1 = {mean_f1:.4f}")
return mean_f1
def train_and_save_hybrid_combiner(lopo_results, use_xgb, geo_face_weight=0.7, geo_eye_weight=0.3,
combiner_path=None):
"""Build OOS meta-dataset from LOPO predictions, train one LR, save joblib + optimal threshold."""
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
features = SELECTED_FEATURES["face_orientation"]
sf_idx = features.index("s_face")
se_idx = features.index("s_eye")
key = "xgb" if use_xgb else "mlp"
model_p = lopo_results[key]["p"]
meta_y = lopo_results[key]["y"]
geo_list = []
offset = 0
for p in persons:
X, _ = by_person[p]
n = X.shape[0]
sf = X[:, sf_idx]
se = X[:, se_idx]
geo_list.append(np.clip(geo_face_weight * sf + geo_eye_weight * se, 0, 1))
offset += n
geo_all = np.concatenate(geo_list)
meta_X = np.column_stack([model_p, geo_all])
lr = LogisticRegression(C=1.0, max_iter=500, random_state=SEED)
lr.fit(meta_X, meta_y)
p = lr.predict_proba(meta_X)[:, 1]
opt_threshold, *_ = _youdens_j(meta_y, p)
if combiner_path is None:
combiner_path = os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_combiner.joblib")
os.makedirs(os.path.dirname(combiner_path), exist_ok=True)
joblib.dump({
"combiner": lr,
"threshold": float(opt_threshold),
"use_xgb": bool(use_xgb),
"geo_face_weight": geo_face_weight,
"geo_eye_weight": geo_eye_weight,
}, combiner_path)
print(f" Saved combiner to {combiner_path} (threshold={opt_threshold:.3f})")
return opt_threshold, combiner_path
def plot_distributions():
print("\n=== EAR / MAR distributions ===")
npz_files = sorted(glob.glob(os.path.join(_PROJECT_ROOT, "data", "collected_*", "*.npz")))
all_ear_l, all_ear_r, all_mar, all_labels = [], [], [], []
for f in npz_files:
d = np.load(f, allow_pickle=True)
names = list(d["feature_names"])
feat = d["features"].astype(np.float32)
lab = d["labels"].astype(np.int64)
all_ear_l.append(feat[:, names.index("ear_left")])
all_ear_r.append(feat[:, names.index("ear_right")])
all_mar.append(feat[:, names.index("mar")])
all_labels.append(lab)
ear_l = np.concatenate(all_ear_l)
ear_r = np.concatenate(all_ear_r)
mar = np.concatenate(all_mar)
labels = np.concatenate(all_labels)
ear_min = np.minimum(ear_l, ear_r)
ear_plot = np.clip(ear_min, 0, 0.85)
mar_plot = np.clip(mar, 0, 1.5)
fig, ax = plt.subplots(figsize=(7, 4))
ax.hist(ear_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True)
ax.hist(ear_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True)
for val, lbl, c in [
(0.16, "ear_closed = 0.16", "red"),
(0.21, "EAR_BLINK = 0.21", "orange"),
(0.30, "ear_open = 0.30", "green"),
]:
ax.axvline(val, color=c, ls="--", lw=1.5, label=lbl)
ax.set_xlabel("min(left_EAR, right_EAR)")
ax.set_ylabel("Density")
ax.set_title("EAR Distribution by Class (144k samples)")
ax.legend(fontsize=8)
fig.tight_layout()
path = os.path.join(PLOTS_DIR, "ear_distribution.png")
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
fig, ax = plt.subplots(figsize=(7, 4))
ax.hist(mar_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True)
ax.hist(mar_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True)
ax.axvline(0.55, color="red", ls="--", lw=1.5, label="MAR_YAWN = 0.55")
ax.set_xlabel("Mouth Aspect Ratio (MAR)")
ax.set_ylabel("Density")
ax.set_title("MAR Distribution by Class (144k samples)")
ax.legend(fontsize=8)
fig.tight_layout()
path = os.path.join(PLOTS_DIR, "mar_distribution.png")
fig.savefig(path, dpi=150)
plt.close(fig)
print(f" saved {path}")
closed_pct = np.mean(ear_min < 0.16) * 100
blink_pct = np.mean(ear_min < 0.21) * 100
open_pct = np.mean(ear_min >= 0.30) * 100
yawn_pct = np.mean(mar > 0.55) * 100
stats = {
"ear_below_016": closed_pct,
"ear_below_021": blink_pct,
"ear_above_030": open_pct,
"mar_above_055": yawn_pct,
"n_samples": len(ear_min),
}
print(f" EAR<0.16 (closed): {closed_pct:.1f}% | EAR<0.21 (blink): {blink_pct:.1f}% | "
f"EAR>=0.30 (open): {open_pct:.1f}%")
print(f" MAR>0.55 (yawn): {yawn_pct:.1f}%")
return stats
def write_report(model_stats, extended_stats, geo_f1, best_alpha,
hybrid_mlp_f1, best_w_mlp,
hybrid_xgb_f1, best_w_xgb,
use_xgb_for_hybrid, dist_stats,
lr_combiner_f1=None):
lines = []
lines.append("# Threshold Justification Report")
lines.append("")
lines.append("Auto-generated by `evaluation/justify_thresholds.py` using LOPO cross-validation "
"over 9 participants (~145k samples).")
lines.append("")
lines.append("## 1. ML Model Decision Thresholds")
lines.append("")
lines.append("Thresholds selected via **Youden's J statistic** (J = sensitivity + specificity - 1) "
"on pooled LOPO held-out predictions.")
lines.append("")
lines.append("| Model | LOPO AUC | Optimal Threshold (Youden's J) | F1 @ Optimal | F1 @ 0.50 |")
lines.append("|-------|----------|-------------------------------|--------------|-----------|")
for key in ("mlp", "xgb"):
s = model_stats[key]
lines.append(f"| {s['label']} | {s['auc']:.4f} | **{s['opt_threshold']:.3f}** | "
f"{s['f1_opt']:.4f} | {s['f1_50']:.4f} |")
lines.append("")
lines.append("![MLP ROC](plots/roc_mlp.png)")
lines.append("")
lines.append("![XGBoost ROC](plots/roc_xgboost.png)")
lines.append("")
lines.append("## 2. Precision, Recall and Tradeoff")
lines.append("")
lines.append("At the optimal threshold (Youden's J), pooled over all LOPO held-out predictions:")
lines.append("")
lines.append("| Model | Threshold | Precision | Recall | F1 | Accuracy |")
lines.append("|-------|----------:|----------:|-------:|---:|---------:|")
for key in ("mlp", "xgb"):
s = extended_stats[key]
lines.append(f"| {s['label']} | {s['opt_threshold']:.3f} | {s['precision_pooled']:.4f} | "
f"{s['recall_pooled']:.4f} | {model_stats[key]['f1_opt']:.4f} | {s['accuracy_pooled']:.4f} |")
lines.append("")
lines.append("Higher threshold → fewer positive predictions → higher precision, lower recall. "
"Youden's J picks the threshold that balances sensitivity and specificity (recall for the positive class and true negative rate).")
lines.append("")
lines.append("## 3. Confusion Matrix (Pooled LOPO)")
lines.append("")
lines.append("At optimal threshold. Rows = true label, columns = predicted label (0 = unfocused, 1 = focused).")
lines.append("")
for key in ("mlp", "xgb"):
s = extended_stats[key]
lines.append(f"### {s['label']}")
lines.append("")
lines.append("| | Pred 0 | Pred 1 |")
lines.append("|--|-------:|-------:|")
cm = s["confusion_matrix"]
if cm.shape == (2, 2):
lines.append(f"| **True 0** | {cm[0,0]} (TN) | {cm[0,1]} (FP) |")
lines.append(f"| **True 1** | {cm[1,0]} (FN) | {cm[1,1]} (TP) |")
lines.append("")
lines.append(f"TN={s['tn']}, FP={s['fp']}, FN={s['fn']}, TP={s['tp']}. ")
lines.append("")
lines.append("![Confusion MLP](plots/confusion_matrix_mlp.png)")
lines.append("")
lines.append("![Confusion XGBoost](plots/confusion_matrix_xgb.png)")
lines.append("")
lines.append("## 4. Per-Person Performance Variance (LOPO)")
lines.append("")
lines.append("One fold per left-out person; metrics at optimal threshold.")
lines.append("")
for key in ("mlp", "xgb"):
s = extended_stats[key]
lines.append(f"### {s['label']} — per held-out person")
lines.append("")
lines.append("| Person | Accuracy | F1 | Precision | Recall |")
lines.append("|--------|---------:|---:|----------:|-------:|")
for row in s["per_person"]:
lines.append(f"| {row['person']} | {row['accuracy']:.4f} | {row['f1']:.4f} | {row['precision']:.4f} | {row['recall']:.4f} |")
lines.append("")
lines.append("### Summary across persons")
lines.append("")
lines.append("| Model | Accuracy mean ± std | F1 mean ± std | Precision mean ± std | Recall mean ± std |")
lines.append("|-------|---------------------|---------------|----------------------|-------------------|")
for key in ("mlp", "xgb"):
s = extended_stats[key]
lines.append(f"| {s['label']} | {s['accuracy_mean']:.4f} ± {s['accuracy_std']:.4f} | "
f"{s['f1_mean']:.4f} ± {s['f1_std']:.4f} | "
f"{s['precision_mean']:.4f} ± {s['precision_std']:.4f} | "
f"{s['recall_mean']:.4f} ± {s['recall_std']:.4f} |")
lines.append("")
lines.append("## 5. Confidence Intervals (95%, LOPO over 9 persons)")
lines.append("")
lines.append("Mean ± half-width of 95% t-interval (df=8) for each metric across the 9 left-out persons.")
lines.append("")
lines.append("| Model | F1 | Accuracy | Precision | Recall |")
lines.append("|-------|---:|--------:|----------:|-------:|")
for key in ("mlp", "xgb"):
s = extended_stats[key]
f1_lo = s["f1_mean"] - s["f1_ci_half"]
f1_hi = s["f1_mean"] + s["f1_ci_half"]
acc_lo = s["accuracy_mean"] - s["accuracy_ci_half"]
acc_hi = s["accuracy_mean"] + s["accuracy_ci_half"]
prec_lo = s["precision_mean"] - s["precision_ci_half"]
prec_hi = s["precision_mean"] + s["precision_ci_half"]
rec_lo = s["recall_mean"] - s["recall_ci_half"]
rec_hi = s["recall_mean"] + s["recall_ci_half"]
lines.append(f"| {s['label']} | {s['f1_mean']:.4f} [{f1_lo:.4f}, {f1_hi:.4f}] | "
f"{s['accuracy_mean']:.4f} [{acc_lo:.4f}, {acc_hi:.4f}] | "
f"{s['precision_mean']:.4f} [{prec_lo:.4f}, {prec_hi:.4f}] | "
f"{s['recall_mean']:.4f} [{rec_lo:.4f}, {rec_hi:.4f}] |")
lines.append("")
lines.append("## 6. Geometric Pipeline Weights (s_face vs s_eye)")
lines.append("")
lines.append("Grid search over face weight alpha in {0.2 ... 0.8}. "
"Eye weight = 1 - alpha. Threshold per fold via Youden's J.")
lines.append("")
lines.append("| Face Weight (alpha) | Mean LOPO F1 |")
lines.append("|--------------------:|-------------:|")
for a in sorted(geo_f1.keys()):
marker = " **<-- selected**" if a == best_alpha else ""
lines.append(f"| {a:.1f} | {geo_f1[a]:.4f}{marker} |")
lines.append("")
lines.append(f"**Best:** alpha = {best_alpha:.1f} (face {best_alpha*100:.0f}%, "
f"eye {(1-best_alpha)*100:.0f}%)")
lines.append("")
lines.append("![Geometric weight search](plots/geo_weight_search.png)")
lines.append("")
lines.append("## 7. Hybrid Pipeline: MLP vs Geometric")
lines.append("")
lines.append("Grid search over w_mlp in {0.3 ... 0.8}. w_geo = 1 - w_mlp. "
"Geometric sub-score uses same weights as geometric pipeline (face=0.7, eye=0.3).")
lines.append("")
lines.append("| MLP Weight (w_mlp) | Mean LOPO F1 |")
lines.append("|-------------------:|-------------:|")
for w in sorted(hybrid_mlp_f1.keys()):
marker = " **<-- selected**" if w == best_w_mlp else ""
lines.append(f"| {w:.1f} | {hybrid_mlp_f1[w]:.4f}{marker} |")
lines.append("")
lines.append(f"**Best:** w_mlp = {best_w_mlp:.1f} (MLP {best_w_mlp*100:.0f}%, "
f"geometric {(1-best_w_mlp)*100:.0f}%) → mean LOPO F1 = {hybrid_mlp_f1[best_w_mlp]:.4f}")
lines.append("")
lines.append("![Hybrid MLP weight search](plots/hybrid_weight_search.png)")
lines.append("")
lines.append("## 8. Hybrid Pipeline: XGBoost vs Geometric")
lines.append("")
lines.append("Same grid over w_xgb in {0.3 ... 0.8}. w_geo = 1 - w_xgb.")
lines.append("")
lines.append("| XGBoost Weight (w_xgb) | Mean LOPO F1 |")
lines.append("|-----------------------:|-------------:|")
for w in sorted(hybrid_xgb_f1.keys()):
marker = " **<-- selected**" if w == best_w_xgb else ""
lines.append(f"| {w:.1f} | {hybrid_xgb_f1[w]:.4f}{marker} |")
lines.append("")
lines.append(f"**Best:** w_xgb = {best_w_xgb:.1f} → mean LOPO F1 = {hybrid_xgb_f1[best_w_xgb]:.4f}")
lines.append("")
lines.append("![Hybrid XGBoost weight search](plots/hybrid_xgb_weight_search.png)")
lines.append("")
f1_mlp = hybrid_mlp_f1[best_w_mlp]
f1_xgb = hybrid_xgb_f1[best_w_xgb]
lines.append("### Which hybrid is used in the app?")
lines.append("")
if use_xgb_for_hybrid:
lines.append(f"**XGBoost hybrid is better** (F1 = {f1_xgb:.4f} vs MLP hybrid F1 = {f1_mlp:.4f}).")
else:
lines.append(f"**MLP hybrid is better** (F1 = {f1_mlp:.4f} vs XGBoost hybrid F1 = {f1_xgb:.4f}).")
lines.append("")
if lr_combiner_f1 is not None:
lines.append("### Logistic regression combiner (replaces heuristic weights)")
lines.append("")
lines.append("Instead of a fixed linear blend (e.g. 0.3·ML + 0.7·geo), a **logistic regression** "
"combines model probability and geometric score: meta-features = [model_prob, geo_score], "
"trained on the same LOPO splits. Threshold from Youden's J on combiner output.")
lines.append("")
lines.append(f"| Method | Mean LOPO F1 |")
lines.append("|--------|-------------:|")
lines.append(f"| Heuristic weight grid (best w) | {(f1_xgb if use_xgb_for_hybrid else f1_mlp):.4f} |")
lines.append(f"| **LR combiner** | **{lr_combiner_f1:.4f}** |")
lines.append("")
lines.append("The app uses the saved LR combiner when `combiner_path` is set in `hybrid_focus_config.json`.")
lines.append("")
else:
if use_xgb_for_hybrid:
lines.append("The app uses **XGBoost + geometric** with the weights above.")
else:
lines.append("The app uses **MLP + geometric** with the weights above.")
lines.append("")
lines.append("## 5. Eye and Mouth Aspect Ratio Thresholds")
lines.append("")
lines.append("### EAR (Eye Aspect Ratio)")
lines.append("")
lines.append("Reference: Soukupova & Cech, \"Real-Time Eye Blink Detection Using Facial "
"Landmarks\" (2016) established EAR ~ 0.2 as a blink threshold.")
lines.append("")
lines.append("Our thresholds define a linear interpolation zone around this established value:")
lines.append("")
lines.append("| Constant | Value | Justification |")
lines.append("|----------|------:|---------------|")
lines.append(f"| `ear_closed` | 0.16 | Below this, eyes are fully shut. "
f"{dist_stats['ear_below_016']:.1f}% of samples fall here. |")
lines.append(f"| `EAR_BLINK_THRESH` | 0.21 | Blink detection point; close to the 0.2 reference. "
f"{dist_stats['ear_below_021']:.1f}% of samples below. |")
lines.append(f"| `ear_open` | 0.30 | Above this, eyes are fully open. "
f"{dist_stats['ear_above_030']:.1f}% of samples here. |")
lines.append("")
lines.append("Between 0.16 and 0.30 the `_ear_score` function linearly interpolates from 0 to 1, "
"providing a smooth transition rather than a hard binary cutoff.")
lines.append("")
lines.append("![EAR distribution](plots/ear_distribution.png)")
lines.append("")
lines.append("### MAR (Mouth Aspect Ratio)")
lines.append("")
lines.append(f"| Constant | Value | Justification |")
lines.append("|----------|------:|---------------|")
lines.append(f"| `MAR_YAWN_THRESHOLD` | 0.55 | Only {dist_stats['mar_above_055']:.1f}% of "
f"samples exceed this, confirming it captures genuine yawns without false positives. |")
lines.append("")
lines.append("![MAR distribution](plots/mar_distribution.png)")
lines.append("")
lines.append("## 10. Other Constants")
lines.append("")
lines.append("| Constant | Value | Rationale |")
lines.append("|----------|------:|-----------|")
lines.append("| `gaze_max_offset` | 0.28 | Max iris displacement (normalised) before gaze score "
"drops to zero. Corresponds to ~56% of the eye width; beyond this the iris is at "
"the extreme edge. |")
lines.append("| `max_angle` | 22.0 deg | Head deviation beyond which face score = 0. Based on "
"typical monitor-viewing cone: at 60 cm distance and a 24\" monitor, the viewing "
"angle is ~20-25 degrees. |")
lines.append("| `roll_weight` | 0.5 | Roll is less indicative of inattention than yaw/pitch "
"(tilting head doesn't mean looking away), so it's down-weighted by 50%. |")
lines.append("| `EMA alpha` | 0.3 | Smoothing factor for focus score. "
"Gives ~3-4 frame effective window; balances responsiveness vs flicker. |")
lines.append("| `grace_frames` | 15 | ~0.5 s at 30 fps before penalising no-face. Allows brief "
"occlusions (e.g. hand gesture) without dropping score. |")
lines.append("| `PERCLOS_WINDOW` | 60 frames | 2 s at 30 fps; standard PERCLOS measurement "
"window (Dinges & Grace, 1998). |")
lines.append("| `BLINK_WINDOW_SEC` | 30 s | Blink rate measured over 30 s; typical spontaneous "
"blink rate is 15-20/min (Bentivoglio et al., 1997). |")
lines.append("")
with open(REPORT_PATH, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"\nReport written to {REPORT_PATH}")
def write_hybrid_config(use_xgb, best_w_mlp, best_w_xgb, config_path,
combiner_path=None, combiner_threshold=None):
"""Write hybrid_focus_config.json. If combiner_path set, app uses LR combiner instead of heuristic weights."""
import json
if use_xgb:
w_xgb = round(float(best_w_xgb), 2)
w_geo = round(1.0 - best_w_xgb, 2)
w_mlp = 0.3
else:
w_mlp = round(float(best_w_mlp), 2)
w_geo = round(1.0 - best_w_mlp, 2)
w_xgb = 0.0
cfg = {
"use_xgb": bool(use_xgb),
"w_mlp": w_mlp,
"w_xgb": w_xgb,
"w_geo": w_geo,
"threshold": float(combiner_threshold) if combiner_threshold is not None else 0.35,
"use_yawn_veto": True,
"geo_face_weight": 0.7,
"geo_eye_weight": 0.3,
"mar_yawn_threshold": 0.55,
"metric": "f1",
}
if combiner_path:
cfg["combiner"] = "logistic"
cfg["combiner_path"] = os.path.basename(combiner_path)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
print(f" Written {config_path} (use_xgb={cfg['use_xgb']}, combiner={cfg.get('combiner', 'heuristic')})")
def main():
os.makedirs(PLOTS_DIR, exist_ok=True)
lopo_results = run_lopo_models()
model_stats = analyse_model_thresholds(lopo_results)
extended_stats = analyse_precision_recall_confusion(lopo_results, model_stats)
plot_confusion_matrices(extended_stats)
geo_f1, best_alpha = run_geo_weight_search()
hybrid_mlp_f1, best_w_mlp = run_hybrid_weight_search(lopo_results)
hybrid_xgb_f1, best_w_xgb = run_hybrid_xgb_weight_search(lopo_results)
dist_stats = plot_distributions()
f1_mlp = hybrid_mlp_f1[best_w_mlp]
f1_xgb = hybrid_xgb_f1[best_w_xgb]
use_xgb_for_hybrid = f1_xgb > f1_mlp
print(f"\n Hybrid comparison: MLP F1 = {f1_mlp:.4f}, XGBoost F1 = {f1_xgb:.4f} → "
f"use {'XGBoost' if use_xgb_for_hybrid else 'MLP'}")
lr_combiner_f1 = run_hybrid_lr_combiner(lopo_results, use_xgb=use_xgb_for_hybrid)
combiner_threshold, combiner_path = train_and_save_hybrid_combiner(
lopo_results, use_xgb_for_hybrid,
combiner_path=os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_combiner.joblib"),
)
config_path = os.path.join(_PROJECT_ROOT, "checkpoints", "hybrid_focus_config.json")
write_hybrid_config(use_xgb_for_hybrid, best_w_mlp, best_w_xgb, config_path,
combiner_path=combiner_path, combiner_threshold=combiner_threshold)
write_report(model_stats, extended_stats, geo_f1, best_alpha,
hybrid_mlp_f1, best_w_mlp,
hybrid_xgb_f1, best_w_xgb,
use_xgb_for_hybrid, dist_stats,
lr_combiner_f1=lr_combiner_f1)
print("\nDone.")
if __name__ == "__main__":
main()