BREATHE / backend /ml /ml_engine.py
tannuiscoding's picture
Fix model path resolution to use correct project root
8414ee3
"""
ml_engine.py
────────────
Wraps the psychometric tabular model and the RoBERTa text model into a
single predict() function that returns a fused 5-class stress result.
On first import the models are loaded once and cached globally.
If a model file is not found the module operates in DEMO mode, returning
plausible random predictions so the web app can run without GPU weights.
"""
from __future__ import annotations
import os
import re
import logging
import warnings
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)
# ── Label architecture ───────────────────────────────────────────────────────
PSYCHO_SCORE = {"High": 1.00, "Medium": 0.50, "Low": 0.00}
TEXT_SCORE = {
"Normal": 0.00,
"Stress": 0.45,
"Personality disorder": 0.60,
"Bipolar": 0.65,
"Anxiety": 0.70,
"Depression": 0.80,
"Suicidal": 1.00,
}
FUSED_BINS = [0.0, 0.2, 0.4, 0.6, 0.8, 1.001]
FUSED_LABELS = ["Minimal", "Mild", "Moderate", "Severe", "Critical"]
TEXT_CLASSES = sorted(TEXT_SCORE.keys())
PSYCHO_CLASSES_DEFAULT = ["High", "Low", "Medium"] # sorted order as LabelEncoder would produce
def _score_to_fused(score: float) -> str:
for lo, hi, lbl in zip(FUSED_BINS[:-1], FUSED_BINS[1:], FUSED_LABELS):
if lo <= score < hi:
return lbl
return FUSED_LABELS[-1]
# ── Model artefacts (lazy-loaded) ────────────────────────────────────────────
_psycho_model = None
_base_scaler = None
_final_scaler = None
_le_dict = None
_le_target = None
_selected_cols = None
_poly = None
_top_num = None
_loaded_model_name = ""
_roberta_model = None
_tokenizer = None
DEMO_MODE = False # flips to True if weights are missing
def _load_psycho(model_dir: str) -> bool:
global _psycho_model, _base_scaler, _final_scaler, _le_dict, _le_target
global _selected_cols, _poly, _top_num, _loaded_model_name
try:
import joblib
_base_scaler = joblib.load(os.path.join(model_dir, "base_scaler.pkl"))
_final_scaler = joblib.load(os.path.join(model_dir, "final_scaler.pkl"))
_le_dict = joblib.load(os.path.join(model_dir, "le_dict.pkl"))
_le_target = joblib.load(os.path.join(model_dir, "le_target.pkl"))
_selected_cols = joblib.load(os.path.join(model_dir, "selected_cols.pkl"))
_poly = joblib.load(os.path.join(model_dir, "poly.pkl"))
_top_num = joblib.load(os.path.join(model_dir, "top_num.pkl"))
for candidate in [
"stacking_ensemble_best_model.pkl",
"lightgbm_best_model.pkl",
"catboost_best_model.pkl",
"xgboost_best_model.pkl",
"random_forest_best_model.pkl",
"mlp_sklearn_best_model.pkl",
]:
p = os.path.join(model_dir, candidate)
if os.path.exists(p):
_psycho_model = joblib.load(p)
_loaded_model_name = candidate
logger.info("Loaded psychometric model: %s", candidate)
return True
logger.warning("Psychometric model pkl not found in %s", model_dir)
except Exception as exc:
logger.warning("Failed to load psychometric model: %s", exc)
return False
def _load_roberta(ckpt_path: str) -> bool:
global _roberta_model, _tokenizer
try:
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
MODEL_NAME = "roberta-base"
class RobertaClassifier(nn.Module):
def __init__(self):
super().__init__()
self.roberta = AutoModel.from_pretrained(MODEL_NAME)
self.dropout = nn.Dropout(0.3)
self.fc = nn.Linear(self.roberta.config.hidden_size, 7)
for p in self.roberta.parameters():
p.requires_grad = False
for layer in self.roberta.encoder.layer[-3:]:
for p in layer.parameters():
p.requires_grad = True
def forward(self, input_ids, attention_mask):
out = self.roberta(input_ids=input_ids,
attention_mask=attention_mask)
cls_out = out.last_hidden_state[:, 0]
return self.fc(self.dropout(cls_out))
device = "cuda" if __import__("torch").cuda.is_available() else "cpu"
model = RobertaClassifier().to(device)
model.load_state_dict(
__import__("torch").load(ckpt_path, map_location=device)
)
model.eval()
_roberta_model = model
_tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
logger.info("Loaded RoBERTa model from %s", ckpt_path)
return True
except Exception as exc:
logger.warning("Failed to load RoBERTa model: %s", exc)
return False
def init_models(psycho_model_dir: str, roberta_ckpt: str) -> None:
global DEMO_MODE
# Resolve relative paths from the project root (where app.py lives)
_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if psycho_model_dir and not os.path.isabs(psycho_model_dir):
psycho_model_dir = os.path.join(_root, psycho_model_dir)
if roberta_ckpt and not os.path.isabs(roberta_ckpt):
roberta_ckpt = os.path.join(_root, roberta_ckpt)
ok_p = _load_psycho(psycho_model_dir)
ok_r = _load_roberta(roberta_ckpt)
if not ok_p and not ok_r:
DEMO_MODE = True
logger.warning("Both models unavailable β€” running in DEMO mode.")
elif not ok_p:
logger.warning("Psychometric model unavailable β€” text-only mode.")
elif not ok_r:
logger.warning("RoBERTa model unavailable β€” psychometric-only mode.")
# ── Feature engineering (mirrors notebook) ──────────────────────────────────
def _add_interactions(X: pd.DataFrame) -> pd.DataFrame:
X = X.copy()
if {"Sleep_Duration", "Sleep_Quality"}.issubset(X.columns):
X["sleep_score"] = X["Sleep_Duration"] * X["Sleep_Quality"]
if {"Sleep_Quality", "Screen_Time"}.issubset(X.columns):
X["screen_sleep_ratio"] = X["Screen_Time"] / (X["Sleep_Quality"] + 1e-6)
if {"Physical_Activity", "Work_Hours"}.issubset(X.columns):
X["activity_work_ratio"] = X["Physical_Activity"] / (X["Work_Hours"] + 1e-6)
if {"Social_Interactions", "Travel_Time"}.issubset(X.columns):
X["social_travel"] = X["Social_Interactions"] * X["Travel_Time"]
if {"Work_Hours", "Travel_Time", "Screen_Time"}.issubset(X.columns):
X["daily_burden"] = X["Work_Hours"] + X["Travel_Time"] + X["Screen_Time"]
if {"Blood_Pressure", "Cholesterol_Level", "Blood_Sugar_Level"}.issubset(X.columns):
X["cardio_risk"] = (X["Blood_Pressure"] + X["Cholesterol_Level"]
+ X["Blood_Sugar_Level"])
if {"Caffeine_Intake", "Alcohol_Intake"}.issubset(X.columns):
X["stimulant_load"] = X["Caffeine_Intake"] + 2 * X["Alcohol_Intake"]
if {"Physical_Activity", "Sleep_Duration"}.issubset(X.columns):
X["recovery_index"] = X["Physical_Activity"] + X["Sleep_Duration"]
return X
def _preprocess_psycho(raw_df: pd.DataFrame) -> np.ndarray:
"""Returns proba array (N, 3) in le_target.classes_ order."""
df = raw_df.copy()
if "Stress_Detection" in df.columns:
df = df.drop(columns=["Stress_Detection"])
for col, le in _le_dict.items():
if col in df.columns:
df[col] = le.transform(df[col])
if hasattr(_base_scaler, "feature_names_in_"):
for c in _base_scaler.feature_names_in_:
if c not in df.columns:
df[c] = 0.0
df = df[_base_scaler.feature_names_in_]
df_inter = _add_interactions(df)
poly_arr = _poly.transform(df_inter[_top_num])
poly_cols = [f"poly_{i}" for i in range(poly_arr.shape[1])]
df_poly = pd.concat(
[df_inter.reset_index(drop=True),
pd.DataFrame(poly_arr, columns=poly_cols)], axis=1
)
df_sel = df_poly[_selected_cols].copy()
needs_scale = {"mlp_sklearn_best_model.pkl", "svm-rbf_best_model.pkl",
"logreg_best_model.pkl"}
if _loaded_model_name in needs_scale:
df_sel = pd.DataFrame(
_final_scaler.transform(df_sel), columns=_selected_cols
)
return _psycho_model.predict_proba(df_sel)
def _get_text_proba(text: str) -> np.ndarray:
"""Returns proba array (7,) in TEXT_CLASSES order."""
import torch
import torch.nn.functional as F
device = next(_roberta_model.parameters()).device
enc = _tokenizer(
re.sub(r"[^a-zA-Z\s]", "", text.lower()),
padding="max_length", truncation=True,
max_length=128, return_tensors="pt",
)
ids = enc["input_ids"].to(device)
mask = enc["attention_mask"].to(device)
with torch.no_grad():
logits = _roberta_model(ids, mask)
return F.softmax(logits, dim=-1).cpu().numpy()[0]
# ── Public inference entry point ─────────────────────────────────────────────
def predict(
psychometric_row: dict | None = None,
text_note: str | None = None,
psycho_weight: float = 0.5,
) -> dict:
"""
Run stress prediction.
Parameters
----------
psychometric_row : dict mapping feature-name β†’ value (or None)
text_note : raw text string (or None)
psycho_weight : weight given to psychometric score [0..1]
Returns
-------
dict with keys:
psycho_label, psycho_score, text_label, text_score,
fused_label, fused_score, modality_used
"""
if DEMO_MODE:
return _demo_predict(psychometric_row, text_note)
psycho_score = None
psycho_label = None
text_score = None
text_label = None
# ── Psychometric branch ─────────────────────────────────────────────────
if psychometric_row and _psycho_model is not None:
try:
df_row = pd.DataFrame([psychometric_row])
proba = _preprocess_psycho(df_row)[0] # shape (3,)
classes = list(_le_target.classes_)
psycho_score = sum(PSYCHO_SCORE[c] * p
for c, p in zip(classes, proba))
psycho_label = classes[int(np.argmax(proba))]
except Exception as exc:
logger.error("Psychometric inference error: %s", exc)
# ── Text branch ─────────────────────────────────────────────────────────
if text_note and text_note.strip() and _roberta_model is not None:
try:
proba = _get_text_proba(text_note) # shape (7,)
text_score = sum(TEXT_SCORE[c] * p
for c, p in zip(TEXT_CLASSES, proba))
text_label = TEXT_CLASSES[int(np.argmax(proba))]
except Exception as exc:
logger.error("Text inference error: %s", exc)
# ── Fusion ───────────────────────────────────────────────────────────────
tw = 1.0 - psycho_weight
if psycho_score is not None and text_score is not None:
fused_score = psycho_weight * psycho_score + tw * text_score
modality_used = "both"
elif psycho_score is not None:
fused_score = float(psycho_score)
modality_used = "psycho"
elif text_score is not None:
fused_score = float(text_score)
modality_used = "text"
else:
fused_score = 0.0
modality_used = "none"
fused_label = _score_to_fused(fused_score)
return {
"psycho_label": psycho_label,
"psycho_score": round(float(psycho_score), 4) if psycho_score is not None else None,
"text_label": text_label,
"text_score": round(float(text_score), 4) if text_score is not None else None,
"fused_label": fused_label,
"fused_score": round(float(fused_score), 4),
"modality_used": modality_used,
}
# ── Demo / fallback (no weights needed) ─────────────────────────────────────
def _demo_predict(psychometric_row, text_note) -> dict:
"""Rule-based heuristic demo prediction β€” no ML weights required."""
score = 0.3 # baseline: mild
if psychometric_row:
sh = float(psychometric_row.get("Sleep_Duration", 7))
pa = float(psychometric_row.get("Physical_Activity", 3))
wh = float(psychometric_row.get("Work_Hours", 8))
caf = float(psychometric_row.get("Caffeine_Intake", 2))
alc = float(psychometric_row.get("Alcohol_Intake", 0))
slq = float(psychometric_row.get("Sleep_Quality", 3))
score += max(0, (8 - sh) * 0.04) # less sleep β†’ more stress
score += max(0, (wh - 8) * 0.03) # overwork
score -= pa * 0.02 # activity lowers stress
score += caf * 0.01
score += alc * 0.02
score -= slq * 0.015
score = float(np.clip(score, 0, 1))
psycho_label = ("High" if score >= 0.6
else "Medium" if score >= 0.3 else "Low")
text_score = None
text_label = None
KEYWORDS = {
"suicid": 0.95, "depress": 0.78, "anxiet": 0.68,
"panic": 0.65, "overwhelm": 0.55, "stress": 0.45,
"tired": 0.40, "exhaust": 0.50, "happy": 0.05, "fine": 0.1,
}
if text_note and text_note.strip():
t = text_note.lower()
ts = 0.3
for kw, s in KEYWORDS.items():
if kw in t:
ts = max(ts, s)
text_score = float(np.clip(ts, 0, 1))
text_label = TEXT_CLASSES[min(
int(text_score / (1.0 / 7)),
len(TEXT_CLASSES) - 1
)]
if text_score is not None:
fused = 0.5 * score + 0.5 * text_score
modality = "both"
else:
fused = score
modality = "psycho" if psychometric_row else "none"
return {
"psycho_label": psycho_label,
"psycho_score": round(score, 4),
"text_label": text_label,
"text_score": round(text_score, 4) if text_score is not None else None,
"fused_label": _score_to_fused(fused),
"fused_score": round(fused, 4),
"modality_used": modality,
}