final / ui /pipeline.py
k22056537
evaluation: channel ablation script + feature importance LOPO
e69e3a3
raw
history blame
23.6 kB
import collections
import glob
import json
import math
import os
import sys
import numpy as np
import joblib
import torch
import torch.nn as nn
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from data_preparation.prepare_dataset import SELECTED_FEATURES
from models.face_mesh import FaceMeshDetector
from models.head_pose import HeadPoseEstimator
from models.eye_scorer import EyeBehaviourScorer, compute_mar, MAR_YAWN_THRESHOLD
from models.collect_features import FEATURE_NAMES, TemporalTracker, extract_features
# Same 10 features used for MLP training (prepare_dataset) and inference
MLP_FEATURE_NAMES = SELECTED_FEATURES["face_orientation"]
_FEAT_IDX = {name: i for i, name in enumerate(FEATURE_NAMES)}
def _clip_features(vec):
out = vec.copy()
_i = _FEAT_IDX
out[_i["yaw"]] = np.clip(out[_i["yaw"]], -45, 45)
out[_i["pitch"]] = np.clip(out[_i["pitch"]], -30, 30)
out[_i["roll"]] = np.clip(out[_i["roll"]], -30, 30)
out[_i["head_deviation"]] = math.sqrt(
float(out[_i["yaw"]]) ** 2 + float(out[_i["pitch"]]) ** 2
)
for f in ("ear_left", "ear_right", "ear_avg"):
out[_i[f]] = np.clip(out[_i[f]], 0, 0.85)
out[_i["mar"]] = np.clip(out[_i["mar"]], 0, 1.0)
out[_i["gaze_offset"]] = np.clip(out[_i["gaze_offset"]], 0, 0.50)
out[_i["perclos"]] = np.clip(out[_i["perclos"]], 0, 0.80)
out[_i["blink_rate"]] = np.clip(out[_i["blink_rate"]], 0, 30.0)
out[_i["closure_duration"]] = np.clip(out[_i["closure_duration"]], 0, 10.0)
out[_i["yawn_duration"]] = np.clip(out[_i["yawn_duration"]], 0, 10.0)
return out
class _OutputSmoother:
def __init__(self, alpha: float = 0.3, grace_frames: int = 15):
self._alpha = alpha
self._grace = grace_frames
self._score = 0.5
self._no_face = 0
def reset(self):
self._score = 0.5
self._no_face = 0
def update(self, raw_score: float, face_detected: bool) -> float:
if face_detected:
self._no_face = 0
self._score += self._alpha * (raw_score - self._score)
else:
self._no_face += 1
if self._no_face > self._grace:
self._score *= 0.85
return self._score
DEFAULT_HYBRID_CONFIG = {
"use_xgb": False,
"w_mlp": 0.3,
"w_xgb": 0.0,
"w_geo": 0.7,
"threshold": 0.35,
"use_yawn_veto": True,
"geo_face_weight": 0.7,
"geo_eye_weight": 0.3,
"mar_yawn_threshold": float(MAR_YAWN_THRESHOLD),
"combiner": None,
"combiner_path": None,
}
class _RuntimeFeatureEngine:
_MAG_FEATURES = ["pitch", "yaw", "head_deviation", "gaze_offset", "v_gaze", "h_gaze"]
_VEL_FEATURES = ["pitch", "yaw", "h_gaze", "v_gaze", "head_deviation", "gaze_offset"]
_VAR_FEATURES = ["h_gaze", "v_gaze", "pitch"]
_VAR_WINDOW = 30
_WARMUP = 15
def __init__(self, base_feature_names, norm_features=None):
self._base_names = list(base_feature_names)
self._norm_features = list(norm_features) if norm_features else []
tracked = set(self._MAG_FEATURES) | set(self._norm_features)
self._ema_mean = {f: 0.0 for f in tracked}
self._ema_var = {f: 1.0 for f in tracked}
self._n = 0
self._prev = None
self._var_bufs = {
f: collections.deque(maxlen=self._VAR_WINDOW) for f in self._VAR_FEATURES
}
self._ext_names = (
list(self._base_names)
+ [f"{f}_mag" for f in self._MAG_FEATURES]
+ [f"{f}_vel" for f in self._VEL_FEATURES]
+ [f"{f}_var" for f in self._VAR_FEATURES]
)
@property
def extended_names(self):
return list(self._ext_names)
def transform(self, base_vec):
self._n += 1
raw = {name: float(base_vec[i]) for i, name in enumerate(self._base_names)}
alpha = 2.0 / (min(self._n, 120) + 1)
for feat in self._ema_mean:
if feat not in raw:
continue
v = raw[feat]
if self._n == 1:
self._ema_mean[feat] = v
self._ema_var[feat] = 0.0
else:
self._ema_mean[feat] += alpha * (v - self._ema_mean[feat])
self._ema_var[feat] += alpha * (
(v - self._ema_mean[feat]) ** 2 - self._ema_var[feat]
)
out = base_vec.copy().astype(np.float32)
if self._n > self._WARMUP:
for feat in self._norm_features:
if feat in raw:
idx = self._base_names.index(feat)
std = max(math.sqrt(self._ema_var[feat]), 1e-6)
out[idx] = (raw[feat] - self._ema_mean[feat]) / std
mag = np.zeros(len(self._MAG_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._MAG_FEATURES):
if feat in raw:
mag[i] = abs(raw[feat] - self._ema_mean.get(feat, raw[feat]))
vel = np.zeros(len(self._VEL_FEATURES), dtype=np.float32)
if self._prev is not None:
for i, feat in enumerate(self._VEL_FEATURES):
if feat in raw and feat in self._prev:
vel[i] = abs(raw[feat] - self._prev[feat])
self._prev = dict(raw)
for feat in self._VAR_FEATURES:
if feat in raw:
self._var_bufs[feat].append(raw[feat])
var = np.zeros(len(self._VAR_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._VAR_FEATURES):
buf = self._var_bufs[feat]
if len(buf) >= 2:
arr = np.array(buf)
var[i] = float(arr.var())
return np.concatenate([out, mag, vel, var])
class FaceMeshPipeline:
def __init__(
self,
max_angle: float = 22.0,
alpha: float = 0.7,
beta: float = 0.3,
threshold: float = 0.55,
detector=None,
):
self.detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self.head_pose = HeadPoseEstimator(max_angle=max_angle)
self.eye_scorer = EyeBehaviourScorer()
self.alpha = alpha
self.beta = beta
self.threshold = threshold
self._smoother = _OutputSmoother()
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self.detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"is_focused": False,
"yaw": None,
"pitch": None,
"roll": None,
"mar": None,
"is_yawning": False,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self.threshold
return out
angles = self.head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self.head_pose.score(landmarks, w, h)
out["s_eye"] = self.eye_scorer.score(landmarks)
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD
raw = self.alpha * out["s_face"] + self.beta * out["s_eye"]
if out["is_yawning"]:
raw = 0.0
out["raw_score"] = self._smoother.update(raw, True)
out["is_focused"] = out["raw_score"] >= self.threshold
return out
def reset_session(self):
self._smoother.reset()
def close(self):
if self._owns_detector:
self.detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# PyTorch MLP matching models/mlp/train.py BaseModel (10 -> 64 -> 32 -> 2)
class _FocusMLP(nn.Module):
def __init__(self, num_features: int, num_classes: int = 2):
super().__init__()
self.network = nn.Sequential(
nn.Linear(num_features, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, num_classes),
)
def forward(self, x):
return self.network(x)
def _mlp_artifacts_available(model_dir: str) -> bool:
pt_path = os.path.join(model_dir, "mlp_best.pt")
scaler_path = os.path.join(model_dir, "scaler_mlp.joblib")
return os.path.isfile(pt_path) and os.path.isfile(scaler_path)
def _load_mlp_artifacts(model_dir: str):
"""Load PyTorch MLP + scaler from checkpoints. Returns (model, scaler, feature_names)."""
pt_path = os.path.join(model_dir, "mlp_best.pt")
scaler_path = os.path.join(model_dir, "scaler_mlp.joblib")
if not os.path.isfile(pt_path):
raise FileNotFoundError(f"No MLP checkpoint at {pt_path}")
if not os.path.isfile(scaler_path):
raise FileNotFoundError(f"No scaler at {scaler_path}")
num_features = len(MLP_FEATURE_NAMES)
num_classes = 2
model = _FocusMLP(num_features, num_classes)
model.load_state_dict(torch.load(pt_path, map_location="cpu", weights_only=True))
model.eval()
scaler = joblib.load(scaler_path)
return model, scaler, list(MLP_FEATURE_NAMES)
def _load_hybrid_config(model_dir: str, config_path: str | None = None):
cfg = dict(DEFAULT_HYBRID_CONFIG)
resolved = config_path or os.path.join(model_dir, "hybrid_focus_config.json")
if not os.path.isfile(resolved):
print(f"[HYBRID] No config found at {resolved}; using defaults")
return cfg, None
with open(resolved, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
for key in DEFAULT_HYBRID_CONFIG:
if key in file_cfg:
cfg[key] = file_cfg[key]
cfg["use_xgb"] = bool(cfg.get("use_xgb", False))
cfg["w_mlp"] = float(cfg.get("w_mlp", 0.3))
cfg["w_xgb"] = float(cfg.get("w_xgb", 0.0))
cfg["w_geo"] = float(cfg["w_geo"])
if cfg["use_xgb"]:
weight_sum = cfg["w_xgb"] + cfg["w_geo"]
if weight_sum <= 0:
raise ValueError("[HYBRID] Invalid config: w_xgb + w_geo must be > 0")
cfg["w_xgb"] /= weight_sum
cfg["w_geo"] /= weight_sum
else:
weight_sum = cfg["w_mlp"] + cfg["w_geo"]
if weight_sum <= 0:
raise ValueError("[HYBRID] Invalid config: w_mlp + w_geo must be > 0")
cfg["w_mlp"] /= weight_sum
cfg["w_geo"] /= weight_sum
cfg["threshold"] = float(cfg["threshold"])
cfg["use_yawn_veto"] = bool(cfg["use_yawn_veto"])
cfg["geo_face_weight"] = float(cfg["geo_face_weight"])
cfg["geo_eye_weight"] = float(cfg["geo_eye_weight"])
cfg["mar_yawn_threshold"] = float(cfg["mar_yawn_threshold"])
cfg["combiner"] = cfg.get("combiner") or None
cfg["combiner_path"] = cfg.get("combiner_path") or None
print(f"[HYBRID] Loaded config: {resolved}")
return cfg, resolved
class MLPPipeline:
def __init__(self, model_dir=None, detector=None, threshold=0.23):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
self._mlp, self._scaler, self._feature_names = _load_mlp_artifacts(model_dir)
self._indices = [FEATURE_NAMES.index(n) for n in self._feature_names]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._threshold = threshold
print(f"[MLP] Loaded PyTorch MLP from {model_dir} | {len(self._feature_names)} features | threshold={threshold}")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mlp_prob": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
X = vec[self._indices].reshape(1, -1).astype(np.float32)
X_sc = self._scaler.transform(X) if self._scaler is not None else X
with torch.no_grad():
x_t = torch.from_numpy(X_sc).float()
logits = self._mlp(x_t)
probs = torch.softmax(logits, dim=1)
mlp_prob = float(probs[0, 1])
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
out["raw_score"] = self._smoother.update(out["mlp_prob"], True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _resolve_xgb_path():
return os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json")
class HybridFocusPipeline:
def __init__(
self,
model_dir=None,
config_path: str | None = None,
max_angle: float = 22.0,
detector=None,
):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
self._cfg, self._cfg_path = _load_hybrid_config(model_dir=model_dir, config_path=config_path)
self._use_xgb = self._cfg["use_xgb"]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator(max_angle=max_angle)
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self.head_pose = self._head_pose
self._smoother = _OutputSmoother()
self._combiner = None
combiner_path = self._cfg.get("combiner_path")
if combiner_path and self._cfg.get("combiner") == "logistic":
resolved_combiner = combiner_path if os.path.isabs(combiner_path) else os.path.join(model_dir, combiner_path)
if not os.path.isfile(resolved_combiner):
resolved_combiner = os.path.join(_PROJECT_ROOT, combiner_path)
if os.path.isfile(resolved_combiner):
blob = joblib.load(resolved_combiner)
self._combiner = blob.get("combiner")
if self._combiner is None:
self._combiner = blob
print(f"[HYBRID] LR combiner loaded from {resolved_combiner}")
else:
print(f"[HYBRID] combiner_path not found: {resolved_combiner}, using heuristic weights")
if self._use_xgb:
from xgboost import XGBClassifier
xgb_path = _resolve_xgb_path()
if not os.path.isfile(xgb_path):
raise FileNotFoundError(f"No XGBoost checkpoint at {xgb_path}")
self._xgb_model = XGBClassifier()
self._xgb_model.load_model(xgb_path)
self._xgb_indices = [FEATURE_NAMES.index(n) for n in XGBoostPipeline.SELECTED]
self._mlp = None
self._scaler = None
self._indices = None
self._feature_names = list(XGBoostPipeline.SELECTED)
mode = "LR combiner" if self._combiner else f"w_xgb={self._cfg['w_xgb']:.2f}, w_geo={self._cfg['w_geo']:.2f}"
print(f"[HYBRID] XGBoost+geo | {xgb_path} | {mode}, threshold={self._cfg['threshold']:.2f}")
else:
self._mlp, self._scaler, self._feature_names = _load_mlp_artifacts(model_dir)
self._indices = [FEATURE_NAMES.index(n) for n in self._feature_names]
self._xgb_model = None
self._xgb_indices = None
mode = "LR combiner" if self._combiner else f"w_mlp={self._cfg['w_mlp']:.2f}, w_geo={self._cfg['w_geo']:.2f}"
print(f"[HYBRID] MLP+geo | {len(self._feature_names)} features | {mode}, threshold={self._cfg['threshold']:.2f}")
@property
def config(self) -> dict:
return dict(self._cfg)
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"focus_score": 0.0,
"mlp_prob": 0.0,
"geo_score": 0.0,
"raw_score": 0.0,
"s_face": 0.0,
"s_eye": 0.0,
"mar": None,
"is_yawning": False,
"yaw": None,
"pitch": None,
"roll": None,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["focus_score"] = smoothed
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._cfg["threshold"]
return out
angles = self._head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self._head_pose.score(landmarks, w, h)
out["s_eye"] = self._eye_scorer.score(landmarks)
s_eye_geo = out["s_eye"]
geo_score = (
self._cfg["geo_face_weight"] * out["s_face"] +
self._cfg["geo_eye_weight"] * out["s_eye"]
)
geo_score = float(np.clip(geo_score, 0.0, 1.0))
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > self._cfg["mar_yawn_threshold"]
if self._cfg["use_yawn_veto"] and out["is_yawning"]:
geo_score = 0.0
out["geo_score"] = geo_score
pre = {
"angles": angles,
"s_face": out["s_face"],
"s_eye": s_eye_geo,
"mar": out["mar"],
}
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal, _pre=pre)
vec = _clip_features(vec)
if self._use_xgb:
X = vec[self._xgb_indices].reshape(1, -1).astype(np.float32)
prob = self._xgb_model.predict_proba(X)[0]
model_prob = float(np.clip(prob[1], 0.0, 1.0))
out["mlp_prob"] = model_prob
if self._combiner is not None:
meta = np.array([[model_prob, out["geo_score"]]], dtype=np.float32)
focus_score = float(self._combiner.predict_proba(meta)[0, 1])
else:
focus_score = self._cfg["w_xgb"] * model_prob + self._cfg["w_geo"] * out["geo_score"]
else:
X = vec[self._indices].reshape(1, -1).astype(np.float32)
X_sc = self._scaler.transform(X) if self._scaler is not None else X
with torch.no_grad():
x_t = torch.from_numpy(X_sc).float()
logits = self._mlp(x_t)
probs = torch.softmax(logits, dim=1)
mlp_prob = float(probs[0, 1])
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
if self._combiner is not None:
meta = np.array([[out["mlp_prob"], out["geo_score"]]], dtype=np.float32)
focus_score = float(self._combiner.predict_proba(meta)[0, 1])
else:
focus_score = self._cfg["w_mlp"] * out["mlp_prob"] + self._cfg["w_geo"] * out["geo_score"]
out["focus_score"] = self._smoother.update(float(np.clip(focus_score, 0.0, 1.0)), True)
out["raw_score"] = out["focus_score"]
out["is_focused"] = out["focus_score"] >= self._cfg["threshold"]
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class XGBoostPipeline:
SELECTED = [
'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos',
]
def __init__(self, model_path=None, threshold=0.38):
from xgboost import XGBClassifier
if model_path is None:
model_path = os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json")
if not os.path.isfile(model_path):
raise FileNotFoundError(f"No XGBoost checkpoint at {model_path}")
self._model = XGBClassifier()
self._model.load_model(model_path)
self._threshold = threshold
self._detector = FaceMeshDetector()
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._indices = [FEATURE_NAMES.index(n) for n in self.SELECTED]
print(f"[XGB] Loaded {model_path} | {len(self.SELECTED)} features, threshold={threshold}")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
X = vec[self._indices].reshape(1, -1).astype(np.float32)
prob = self._model.predict_proba(X)[0] # [prob_unfocused, prob_focused]
out["raw_score"] = self._smoother.update(float(prob[1]), True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()