Spaces:
Running
Running
| """ | |
| ml_engine.py | |
| ββββββββββββ | |
| Wraps the psychometric tabular model and the RoBERTa text model into a | |
| single predict() function that returns a fused 5-class stress result. | |
| On first import the models are loaded once and cached globally. | |
| If a model file is not found the module operates in DEMO mode, returning | |
| plausible random predictions so the web app can run without GPU weights. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import logging | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| warnings.filterwarnings("ignore") | |
| logger = logging.getLogger(__name__) | |
| # ββ Label architecture βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PSYCHO_SCORE = {"High": 1.00, "Medium": 0.50, "Low": 0.00} | |
| TEXT_SCORE = { | |
| "Normal": 0.00, | |
| "Stress": 0.45, | |
| "Personality disorder": 0.60, | |
| "Bipolar": 0.65, | |
| "Anxiety": 0.70, | |
| "Depression": 0.80, | |
| "Suicidal": 1.00, | |
| } | |
| FUSED_BINS = [0.0, 0.2, 0.4, 0.6, 0.8, 1.001] | |
| FUSED_LABELS = ["Minimal", "Mild", "Moderate", "Severe", "Critical"] | |
| TEXT_CLASSES = sorted(TEXT_SCORE.keys()) | |
| PSYCHO_CLASSES_DEFAULT = ["High", "Low", "Medium"] # sorted order as LabelEncoder would produce | |
| def _score_to_fused(score: float) -> str: | |
| for lo, hi, lbl in zip(FUSED_BINS[:-1], FUSED_BINS[1:], FUSED_LABELS): | |
| if lo <= score < hi: | |
| return lbl | |
| return FUSED_LABELS[-1] | |
| # ββ Model artefacts (lazy-loaded) ββββββββββββββββββββββββββββββββββββββββββββ | |
| _psycho_model = None | |
| _base_scaler = None | |
| _final_scaler = None | |
| _le_dict = None | |
| _le_target = None | |
| _selected_cols = None | |
| _poly = None | |
| _top_num = None | |
| _loaded_model_name = "" | |
| _roberta_model = None | |
| _tokenizer = None | |
| DEMO_MODE = False # flips to True if weights are missing | |
| def _load_psycho(model_dir: str) -> bool: | |
| global _psycho_model, _base_scaler, _final_scaler, _le_dict, _le_target | |
| global _selected_cols, _poly, _top_num, _loaded_model_name | |
| try: | |
| import joblib | |
| _base_scaler = joblib.load(os.path.join(model_dir, "base_scaler.pkl")) | |
| _final_scaler = joblib.load(os.path.join(model_dir, "final_scaler.pkl")) | |
| _le_dict = joblib.load(os.path.join(model_dir, "le_dict.pkl")) | |
| _le_target = joblib.load(os.path.join(model_dir, "le_target.pkl")) | |
| _selected_cols = joblib.load(os.path.join(model_dir, "selected_cols.pkl")) | |
| _poly = joblib.load(os.path.join(model_dir, "poly.pkl")) | |
| _top_num = joblib.load(os.path.join(model_dir, "top_num.pkl")) | |
| for candidate in [ | |
| "stacking_ensemble_best_model.pkl", | |
| "lightgbm_best_model.pkl", | |
| "catboost_best_model.pkl", | |
| "xgboost_best_model.pkl", | |
| "random_forest_best_model.pkl", | |
| "mlp_sklearn_best_model.pkl", | |
| ]: | |
| p = os.path.join(model_dir, candidate) | |
| if os.path.exists(p): | |
| _psycho_model = joblib.load(p) | |
| _loaded_model_name = candidate | |
| logger.info("Loaded psychometric model: %s", candidate) | |
| return True | |
| logger.warning("Psychometric model pkl not found in %s", model_dir) | |
| except Exception as exc: | |
| logger.warning("Failed to load psychometric model: %s", exc) | |
| return False | |
| def _load_roberta(ckpt_path: str) -> bool: | |
| global _roberta_model, _tokenizer | |
| try: | |
| import torch | |
| import torch.nn as nn | |
| from transformers import AutoTokenizer, AutoModel | |
| MODEL_NAME = "roberta-base" | |
| class RobertaClassifier(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.roberta = AutoModel.from_pretrained(MODEL_NAME) | |
| self.dropout = nn.Dropout(0.3) | |
| self.fc = nn.Linear(self.roberta.config.hidden_size, 7) | |
| for p in self.roberta.parameters(): | |
| p.requires_grad = False | |
| for layer in self.roberta.encoder.layer[-3:]: | |
| for p in layer.parameters(): | |
| p.requires_grad = True | |
| def forward(self, input_ids, attention_mask): | |
| out = self.roberta(input_ids=input_ids, | |
| attention_mask=attention_mask) | |
| cls_out = out.last_hidden_state[:, 0] | |
| return self.fc(self.dropout(cls_out)) | |
| device = "cuda" if __import__("torch").cuda.is_available() else "cpu" | |
| model = RobertaClassifier().to(device) | |
| model.load_state_dict( | |
| __import__("torch").load(ckpt_path, map_location=device) | |
| ) | |
| model.eval() | |
| _roberta_model = model | |
| _tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| logger.info("Loaded RoBERTa model from %s", ckpt_path) | |
| return True | |
| except Exception as exc: | |
| logger.warning("Failed to load RoBERTa model: %s", exc) | |
| return False | |
| def init_models(psycho_model_dir: str, roberta_ckpt: str) -> None: | |
| global DEMO_MODE | |
| # Resolve relative paths from the project root (where app.py lives) | |
| _root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if psycho_model_dir and not os.path.isabs(psycho_model_dir): | |
| psycho_model_dir = os.path.join(_root, psycho_model_dir) | |
| if roberta_ckpt and not os.path.isabs(roberta_ckpt): | |
| roberta_ckpt = os.path.join(_root, roberta_ckpt) | |
| ok_p = _load_psycho(psycho_model_dir) | |
| ok_r = _load_roberta(roberta_ckpt) | |
| if not ok_p and not ok_r: | |
| DEMO_MODE = True | |
| logger.warning("Both models unavailable β running in DEMO mode.") | |
| elif not ok_p: | |
| logger.warning("Psychometric model unavailable β text-only mode.") | |
| elif not ok_r: | |
| logger.warning("RoBERTa model unavailable β psychometric-only mode.") | |
| # ββ Feature engineering (mirrors notebook) ββββββββββββββββββββββββββββββββββ | |
| def _add_interactions(X: pd.DataFrame) -> pd.DataFrame: | |
| X = X.copy() | |
| if {"Sleep_Duration", "Sleep_Quality"}.issubset(X.columns): | |
| X["sleep_score"] = X["Sleep_Duration"] * X["Sleep_Quality"] | |
| if {"Sleep_Quality", "Screen_Time"}.issubset(X.columns): | |
| X["screen_sleep_ratio"] = X["Screen_Time"] / (X["Sleep_Quality"] + 1e-6) | |
| if {"Physical_Activity", "Work_Hours"}.issubset(X.columns): | |
| X["activity_work_ratio"] = X["Physical_Activity"] / (X["Work_Hours"] + 1e-6) | |
| if {"Social_Interactions", "Travel_Time"}.issubset(X.columns): | |
| X["social_travel"] = X["Social_Interactions"] * X["Travel_Time"] | |
| if {"Work_Hours", "Travel_Time", "Screen_Time"}.issubset(X.columns): | |
| X["daily_burden"] = X["Work_Hours"] + X["Travel_Time"] + X["Screen_Time"] | |
| if {"Blood_Pressure", "Cholesterol_Level", "Blood_Sugar_Level"}.issubset(X.columns): | |
| X["cardio_risk"] = (X["Blood_Pressure"] + X["Cholesterol_Level"] | |
| + X["Blood_Sugar_Level"]) | |
| if {"Caffeine_Intake", "Alcohol_Intake"}.issubset(X.columns): | |
| X["stimulant_load"] = X["Caffeine_Intake"] + 2 * X["Alcohol_Intake"] | |
| if {"Physical_Activity", "Sleep_Duration"}.issubset(X.columns): | |
| X["recovery_index"] = X["Physical_Activity"] + X["Sleep_Duration"] | |
| return X | |
| def _preprocess_psycho(raw_df: pd.DataFrame) -> np.ndarray: | |
| """Returns proba array (N, 3) in le_target.classes_ order.""" | |
| df = raw_df.copy() | |
| if "Stress_Detection" in df.columns: | |
| df = df.drop(columns=["Stress_Detection"]) | |
| for col, le in _le_dict.items(): | |
| if col in df.columns: | |
| df[col] = le.transform(df[col]) | |
| if hasattr(_base_scaler, "feature_names_in_"): | |
| for c in _base_scaler.feature_names_in_: | |
| if c not in df.columns: | |
| df[c] = 0.0 | |
| df = df[_base_scaler.feature_names_in_] | |
| df_inter = _add_interactions(df) | |
| poly_arr = _poly.transform(df_inter[_top_num]) | |
| poly_cols = [f"poly_{i}" for i in range(poly_arr.shape[1])] | |
| df_poly = pd.concat( | |
| [df_inter.reset_index(drop=True), | |
| pd.DataFrame(poly_arr, columns=poly_cols)], axis=1 | |
| ) | |
| df_sel = df_poly[_selected_cols].copy() | |
| needs_scale = {"mlp_sklearn_best_model.pkl", "svm-rbf_best_model.pkl", | |
| "logreg_best_model.pkl"} | |
| if _loaded_model_name in needs_scale: | |
| df_sel = pd.DataFrame( | |
| _final_scaler.transform(df_sel), columns=_selected_cols | |
| ) | |
| return _psycho_model.predict_proba(df_sel) | |
| def _get_text_proba(text: str) -> np.ndarray: | |
| """Returns proba array (7,) in TEXT_CLASSES order.""" | |
| import torch | |
| import torch.nn.functional as F | |
| device = next(_roberta_model.parameters()).device | |
| enc = _tokenizer( | |
| re.sub(r"[^a-zA-Z\s]", "", text.lower()), | |
| padding="max_length", truncation=True, | |
| max_length=128, return_tensors="pt", | |
| ) | |
| ids = enc["input_ids"].to(device) | |
| mask = enc["attention_mask"].to(device) | |
| with torch.no_grad(): | |
| logits = _roberta_model(ids, mask) | |
| return F.softmax(logits, dim=-1).cpu().numpy()[0] | |
| # ββ Public inference entry point βββββββββββββββββββββββββββββββββββββββββββββ | |
| def predict( | |
| psychometric_row: dict | None = None, | |
| text_note: str | None = None, | |
| psycho_weight: float = 0.5, | |
| ) -> dict: | |
| """ | |
| Run stress prediction. | |
| Parameters | |
| ---------- | |
| psychometric_row : dict mapping feature-name β value (or None) | |
| text_note : raw text string (or None) | |
| psycho_weight : weight given to psychometric score [0..1] | |
| Returns | |
| ------- | |
| dict with keys: | |
| psycho_label, psycho_score, text_label, text_score, | |
| fused_label, fused_score, modality_used | |
| """ | |
| if DEMO_MODE: | |
| return _demo_predict(psychometric_row, text_note) | |
| psycho_score = None | |
| psycho_label = None | |
| text_score = None | |
| text_label = None | |
| # ββ Psychometric branch βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if psychometric_row and _psycho_model is not None: | |
| try: | |
| df_row = pd.DataFrame([psychometric_row]) | |
| proba = _preprocess_psycho(df_row)[0] # shape (3,) | |
| classes = list(_le_target.classes_) | |
| psycho_score = sum(PSYCHO_SCORE[c] * p | |
| for c, p in zip(classes, proba)) | |
| psycho_label = classes[int(np.argmax(proba))] | |
| except Exception as exc: | |
| logger.error("Psychometric inference error: %s", exc) | |
| # ββ Text branch βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if text_note and text_note.strip() and _roberta_model is not None: | |
| try: | |
| proba = _get_text_proba(text_note) # shape (7,) | |
| text_score = sum(TEXT_SCORE[c] * p | |
| for c, p in zip(TEXT_CLASSES, proba)) | |
| text_label = TEXT_CLASSES[int(np.argmax(proba))] | |
| except Exception as exc: | |
| logger.error("Text inference error: %s", exc) | |
| # ββ Fusion βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| tw = 1.0 - psycho_weight | |
| if psycho_score is not None and text_score is not None: | |
| fused_score = psycho_weight * psycho_score + tw * text_score | |
| modality_used = "both" | |
| elif psycho_score is not None: | |
| fused_score = float(psycho_score) | |
| modality_used = "psycho" | |
| elif text_score is not None: | |
| fused_score = float(text_score) | |
| modality_used = "text" | |
| else: | |
| fused_score = 0.0 | |
| modality_used = "none" | |
| fused_label = _score_to_fused(fused_score) | |
| return { | |
| "psycho_label": psycho_label, | |
| "psycho_score": round(float(psycho_score), 4) if psycho_score is not None else None, | |
| "text_label": text_label, | |
| "text_score": round(float(text_score), 4) if text_score is not None else None, | |
| "fused_label": fused_label, | |
| "fused_score": round(float(fused_score), 4), | |
| "modality_used": modality_used, | |
| } | |
| # ββ Demo / fallback (no weights needed) βββββββββββββββββββββββββββββββββββββ | |
| def _demo_predict(psychometric_row, text_note) -> dict: | |
| """Rule-based heuristic demo prediction β no ML weights required.""" | |
| score = 0.3 # baseline: mild | |
| if psychometric_row: | |
| sh = float(psychometric_row.get("Sleep_Duration", 7)) | |
| pa = float(psychometric_row.get("Physical_Activity", 3)) | |
| wh = float(psychometric_row.get("Work_Hours", 8)) | |
| caf = float(psychometric_row.get("Caffeine_Intake", 2)) | |
| alc = float(psychometric_row.get("Alcohol_Intake", 0)) | |
| slq = float(psychometric_row.get("Sleep_Quality", 3)) | |
| score += max(0, (8 - sh) * 0.04) # less sleep β more stress | |
| score += max(0, (wh - 8) * 0.03) # overwork | |
| score -= pa * 0.02 # activity lowers stress | |
| score += caf * 0.01 | |
| score += alc * 0.02 | |
| score -= slq * 0.015 | |
| score = float(np.clip(score, 0, 1)) | |
| psycho_label = ("High" if score >= 0.6 | |
| else "Medium" if score >= 0.3 else "Low") | |
| text_score = None | |
| text_label = None | |
| KEYWORDS = { | |
| "suicid": 0.95, "depress": 0.78, "anxiet": 0.68, | |
| "panic": 0.65, "overwhelm": 0.55, "stress": 0.45, | |
| "tired": 0.40, "exhaust": 0.50, "happy": 0.05, "fine": 0.1, | |
| } | |
| if text_note and text_note.strip(): | |
| t = text_note.lower() | |
| ts = 0.3 | |
| for kw, s in KEYWORDS.items(): | |
| if kw in t: | |
| ts = max(ts, s) | |
| text_score = float(np.clip(ts, 0, 1)) | |
| text_label = TEXT_CLASSES[min( | |
| int(text_score / (1.0 / 7)), | |
| len(TEXT_CLASSES) - 1 | |
| )] | |
| if text_score is not None: | |
| fused = 0.5 * score + 0.5 * text_score | |
| modality = "both" | |
| else: | |
| fused = score | |
| modality = "psycho" if psychometric_row else "none" | |
| return { | |
| "psycho_label": psycho_label, | |
| "psycho_score": round(score, 4), | |
| "text_label": text_label, | |
| "text_score": round(text_score, 4) if text_score is not None else None, | |
| "fused_label": _score_to_fused(fused), | |
| "fused_score": round(fused, 4), | |
| "modality_used": modality, | |
| } | |