""" ml_engine.py ──────────── Wraps the psychometric tabular model and the RoBERTa text model into a single predict() function that returns a fused 5-class stress result. On first import the models are loaded once and cached globally. If a model file is not found the module operates in DEMO mode, returning plausible random predictions so the web app can run without GPU weights. """ from __future__ import annotations import os import re import logging import warnings import numpy as np import pandas as pd warnings.filterwarnings("ignore") logger = logging.getLogger(__name__) # ── Label architecture ─────────────────────────────────────────────────────── PSYCHO_SCORE = {"High": 1.00, "Medium": 0.50, "Low": 0.00} TEXT_SCORE = { "Normal": 0.00, "Stress": 0.45, "Personality disorder": 0.60, "Bipolar": 0.65, "Anxiety": 0.70, "Depression": 0.80, "Suicidal": 1.00, } FUSED_BINS = [0.0, 0.2, 0.4, 0.6, 0.8, 1.001] FUSED_LABELS = ["Minimal", "Mild", "Moderate", "Severe", "Critical"] TEXT_CLASSES = sorted(TEXT_SCORE.keys()) PSYCHO_CLASSES_DEFAULT = ["High", "Low", "Medium"] # sorted order as LabelEncoder would produce def _score_to_fused(score: float) -> str: for lo, hi, lbl in zip(FUSED_BINS[:-1], FUSED_BINS[1:], FUSED_LABELS): if lo <= score < hi: return lbl return FUSED_LABELS[-1] # ── Model artefacts (lazy-loaded) ──────────────────────────────────────────── _psycho_model = None _base_scaler = None _final_scaler = None _le_dict = None _le_target = None _selected_cols = None _poly = None _top_num = None _loaded_model_name = "" _roberta_model = None _tokenizer = None DEMO_MODE = False # flips to True if weights are missing def _load_psycho(model_dir: str) -> bool: global _psycho_model, _base_scaler, _final_scaler, _le_dict, _le_target global _selected_cols, _poly, _top_num, _loaded_model_name try: import joblib _base_scaler = joblib.load(os.path.join(model_dir, "base_scaler.pkl")) _final_scaler = joblib.load(os.path.join(model_dir, "final_scaler.pkl")) _le_dict = joblib.load(os.path.join(model_dir, "le_dict.pkl")) _le_target = joblib.load(os.path.join(model_dir, "le_target.pkl")) _selected_cols = joblib.load(os.path.join(model_dir, "selected_cols.pkl")) _poly = joblib.load(os.path.join(model_dir, "poly.pkl")) _top_num = joblib.load(os.path.join(model_dir, "top_num.pkl")) for candidate in [ "stacking_ensemble_best_model.pkl", "lightgbm_best_model.pkl", "catboost_best_model.pkl", "xgboost_best_model.pkl", "random_forest_best_model.pkl", "mlp_sklearn_best_model.pkl", ]: p = os.path.join(model_dir, candidate) if os.path.exists(p): _psycho_model = joblib.load(p) _loaded_model_name = candidate logger.info("Loaded psychometric model: %s", candidate) return True logger.warning("Psychometric model pkl not found in %s", model_dir) except Exception as exc: logger.warning("Failed to load psychometric model: %s", exc) return False def _load_roberta(ckpt_path: str) -> bool: global _roberta_model, _tokenizer try: import torch import torch.nn as nn from transformers import AutoTokenizer, AutoModel MODEL_NAME = "roberta-base" class RobertaClassifier(nn.Module): def __init__(self): super().__init__() self.roberta = AutoModel.from_pretrained(MODEL_NAME) self.dropout = nn.Dropout(0.3) self.fc = nn.Linear(self.roberta.config.hidden_size, 7) for p in self.roberta.parameters(): p.requires_grad = False for layer in self.roberta.encoder.layer[-3:]: for p in layer.parameters(): p.requires_grad = True def forward(self, input_ids, attention_mask): out = self.roberta(input_ids=input_ids, attention_mask=attention_mask) cls_out = out.last_hidden_state[:, 0] return self.fc(self.dropout(cls_out)) device = "cuda" if __import__("torch").cuda.is_available() else "cpu" model = RobertaClassifier().to(device) model.load_state_dict( __import__("torch").load(ckpt_path, map_location=device) ) model.eval() _roberta_model = model _tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) logger.info("Loaded RoBERTa model from %s", ckpt_path) return True except Exception as exc: logger.warning("Failed to load RoBERTa model: %s", exc) return False def init_models(psycho_model_dir: str, roberta_ckpt: str) -> None: global DEMO_MODE # Resolve relative paths from the project root (where app.py lives) _root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if psycho_model_dir and not os.path.isabs(psycho_model_dir): psycho_model_dir = os.path.join(_root, psycho_model_dir) if roberta_ckpt and not os.path.isabs(roberta_ckpt): roberta_ckpt = os.path.join(_root, roberta_ckpt) ok_p = _load_psycho(psycho_model_dir) ok_r = _load_roberta(roberta_ckpt) if not ok_p and not ok_r: DEMO_MODE = True logger.warning("Both models unavailable — running in DEMO mode.") elif not ok_p: logger.warning("Psychometric model unavailable — text-only mode.") elif not ok_r: logger.warning("RoBERTa model unavailable — psychometric-only mode.") # ── Feature engineering (mirrors notebook) ────────────────────────────────── def _add_interactions(X: pd.DataFrame) -> pd.DataFrame: X = X.copy() if {"Sleep_Duration", "Sleep_Quality"}.issubset(X.columns): X["sleep_score"] = X["Sleep_Duration"] * X["Sleep_Quality"] if {"Sleep_Quality", "Screen_Time"}.issubset(X.columns): X["screen_sleep_ratio"] = X["Screen_Time"] / (X["Sleep_Quality"] + 1e-6) if {"Physical_Activity", "Work_Hours"}.issubset(X.columns): X["activity_work_ratio"] = X["Physical_Activity"] / (X["Work_Hours"] + 1e-6) if {"Social_Interactions", "Travel_Time"}.issubset(X.columns): X["social_travel"] = X["Social_Interactions"] * X["Travel_Time"] if {"Work_Hours", "Travel_Time", "Screen_Time"}.issubset(X.columns): X["daily_burden"] = X["Work_Hours"] + X["Travel_Time"] + X["Screen_Time"] if {"Blood_Pressure", "Cholesterol_Level", "Blood_Sugar_Level"}.issubset(X.columns): X["cardio_risk"] = (X["Blood_Pressure"] + X["Cholesterol_Level"] + X["Blood_Sugar_Level"]) if {"Caffeine_Intake", "Alcohol_Intake"}.issubset(X.columns): X["stimulant_load"] = X["Caffeine_Intake"] + 2 * X["Alcohol_Intake"] if {"Physical_Activity", "Sleep_Duration"}.issubset(X.columns): X["recovery_index"] = X["Physical_Activity"] + X["Sleep_Duration"] return X def _preprocess_psycho(raw_df: pd.DataFrame) -> np.ndarray: """Returns proba array (N, 3) in le_target.classes_ order.""" df = raw_df.copy() if "Stress_Detection" in df.columns: df = df.drop(columns=["Stress_Detection"]) for col, le in _le_dict.items(): if col in df.columns: df[col] = le.transform(df[col]) if hasattr(_base_scaler, "feature_names_in_"): for c in _base_scaler.feature_names_in_: if c not in df.columns: df[c] = 0.0 df = df[_base_scaler.feature_names_in_] df_inter = _add_interactions(df) poly_arr = _poly.transform(df_inter[_top_num]) poly_cols = [f"poly_{i}" for i in range(poly_arr.shape[1])] df_poly = pd.concat( [df_inter.reset_index(drop=True), pd.DataFrame(poly_arr, columns=poly_cols)], axis=1 ) df_sel = df_poly[_selected_cols].copy() needs_scale = {"mlp_sklearn_best_model.pkl", "svm-rbf_best_model.pkl", "logreg_best_model.pkl"} if _loaded_model_name in needs_scale: df_sel = pd.DataFrame( _final_scaler.transform(df_sel), columns=_selected_cols ) return _psycho_model.predict_proba(df_sel) def _get_text_proba(text: str) -> np.ndarray: """Returns proba array (7,) in TEXT_CLASSES order.""" import torch import torch.nn.functional as F device = next(_roberta_model.parameters()).device enc = _tokenizer( re.sub(r"[^a-zA-Z\s]", "", text.lower()), padding="max_length", truncation=True, max_length=128, return_tensors="pt", ) ids = enc["input_ids"].to(device) mask = enc["attention_mask"].to(device) with torch.no_grad(): logits = _roberta_model(ids, mask) return F.softmax(logits, dim=-1).cpu().numpy()[0] # ── Public inference entry point ───────────────────────────────────────────── def predict( psychometric_row: dict | None = None, text_note: str | None = None, psycho_weight: float = 0.5, ) -> dict: """ Run stress prediction. Parameters ---------- psychometric_row : dict mapping feature-name → value (or None) text_note : raw text string (or None) psycho_weight : weight given to psychometric score [0..1] Returns ------- dict with keys: psycho_label, psycho_score, text_label, text_score, fused_label, fused_score, modality_used """ if DEMO_MODE: return _demo_predict(psychometric_row, text_note) psycho_score = None psycho_label = None text_score = None text_label = None # ── Psychometric branch ───────────────────────────────────────────────── if psychometric_row and _psycho_model is not None: try: df_row = pd.DataFrame([psychometric_row]) proba = _preprocess_psycho(df_row)[0] # shape (3,) classes = list(_le_target.classes_) psycho_score = sum(PSYCHO_SCORE[c] * p for c, p in zip(classes, proba)) psycho_label = classes[int(np.argmax(proba))] except Exception as exc: logger.error("Psychometric inference error: %s", exc) # ── Text branch ───────────────────────────────────────────────────────── if text_note and text_note.strip() and _roberta_model is not None: try: proba = _get_text_proba(text_note) # shape (7,) text_score = sum(TEXT_SCORE[c] * p for c, p in zip(TEXT_CLASSES, proba)) text_label = TEXT_CLASSES[int(np.argmax(proba))] except Exception as exc: logger.error("Text inference error: %s", exc) # ── Fusion ─────────────────────────────────────────────────────────────── tw = 1.0 - psycho_weight if psycho_score is not None and text_score is not None: fused_score = psycho_weight * psycho_score + tw * text_score modality_used = "both" elif psycho_score is not None: fused_score = float(psycho_score) modality_used = "psycho" elif text_score is not None: fused_score = float(text_score) modality_used = "text" else: fused_score = 0.0 modality_used = "none" fused_label = _score_to_fused(fused_score) return { "psycho_label": psycho_label, "psycho_score": round(float(psycho_score), 4) if psycho_score is not None else None, "text_label": text_label, "text_score": round(float(text_score), 4) if text_score is not None else None, "fused_label": fused_label, "fused_score": round(float(fused_score), 4), "modality_used": modality_used, } # ── Demo / fallback (no weights needed) ───────────────────────────────────── def _demo_predict(psychometric_row, text_note) -> dict: """Rule-based heuristic demo prediction — no ML weights required.""" score = 0.3 # baseline: mild if psychometric_row: sh = float(psychometric_row.get("Sleep_Duration", 7)) pa = float(psychometric_row.get("Physical_Activity", 3)) wh = float(psychometric_row.get("Work_Hours", 8)) caf = float(psychometric_row.get("Caffeine_Intake", 2)) alc = float(psychometric_row.get("Alcohol_Intake", 0)) slq = float(psychometric_row.get("Sleep_Quality", 3)) score += max(0, (8 - sh) * 0.04) # less sleep → more stress score += max(0, (wh - 8) * 0.03) # overwork score -= pa * 0.02 # activity lowers stress score += caf * 0.01 score += alc * 0.02 score -= slq * 0.015 score = float(np.clip(score, 0, 1)) psycho_label = ("High" if score >= 0.6 else "Medium" if score >= 0.3 else "Low") text_score = None text_label = None KEYWORDS = { "suicid": 0.95, "depress": 0.78, "anxiet": 0.68, "panic": 0.65, "overwhelm": 0.55, "stress": 0.45, "tired": 0.40, "exhaust": 0.50, "happy": 0.05, "fine": 0.1, } if text_note and text_note.strip(): t = text_note.lower() ts = 0.3 for kw, s in KEYWORDS.items(): if kw in t: ts = max(ts, s) text_score = float(np.clip(ts, 0, 1)) text_label = TEXT_CLASSES[min( int(text_score / (1.0 / 7)), len(TEXT_CLASSES) - 1 )] if text_score is not None: fused = 0.5 * score + 0.5 * text_score modality = "both" else: fused = score modality = "psycho" if psychometric_row else "none" return { "psycho_label": psycho_label, "psycho_score": round(score, 4), "text_label": text_label, "text_score": round(text_score, 4) if text_score is not None else None, "fused_label": _score_to_fused(fused), "fused_score": round(fused, 4), "modality_used": modality, }