# ---------- Host/port ---------- HOST, PORT, SHARE = "0.0.0.0", 7860, True import os os.environ["NO_PROXY"] = "127.0.0.1,localhost,::1" os.environ["no_proxy"] = "127.0.0.1,localhost,::1" for _k in ("HTTP_PROXY","http_proxy","HTTPS_PROXY","https_proxy"): os.environ.pop(_k, None) os.environ.setdefault("GRADIO_OPEN_BROWSER", "false") os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" os.environ["MPLBACKEND"] = "Agg" import matplotlib matplotlib.use("Agg", force=True) # ---------- Imports ---------- from typing import Any, Dict, Optional, Tuple, List import re import numpy as np import pandas as pd import gradio as gr from pathlib import Path import matplotlib.pyplot as plt import shap from pycaret.classification import load_model, predict_model from huggingface_hub import hf_hub_download # ---------- Hub model ---------- REPO = os.getenv("MODEL_REPO", "GDMProjects/my-private-model") FNAME = os.getenv("MODEL_FILE", "best_insulin_model.pkl") TOKEN = os.getenv("HF_TOKEN") # ---------- Data / schema ---------- SAMPLE_FILE = "INS.xlsx" TARGET_NAME = "insulin" POS_CLASS = 1 FEATURES = [ "age", "BMI", "history_of_htn", "history_infectious_endocrine_metabolic_disease", "history_infectious_digestive_disease", "history_infectious_cardiovascular_diseae", "family_history_dm", "family_history_htn", "Current_history_obsteric", "Previos_Obsteric_History_AB", "infertility", ] NUMERIC_INPUTS = {"age", "BMI", "Previos_Obsteric_History_AB"} BOOL_FEATURES = [f for f in FEATURES if f not in NUMERIC_INPUTS] # flags FLAG_SPECS = [ ("history_of_htn", "History of hypertension — Yes / No"), ("family_history_dm", "Family history of diabetes mellitus — Yes / No"), ("family_history_htn", "Family history of hypertension — Yes / No"), ("history_infectious_cardiovascular_diseae", "History of cardiovascular diseases — Yes / No"), ("history_infectious_endocrine_metabolic_disease", "History of endocrine metabolic disease — Yes / No"), ("history_infectious_digestive_disease", "History of digestive disease — Yes / No"), ("Current_history_obsteric", "Current obstetric normal — Yes / No"), ("infertility", "History of infertility — Yes / No"), ] # -------- Utilities ---------- def normalize(s: str) -> str: return re.sub(r"[^a-z0-9]+", "", str(s).lower()) def coerce_numeric(val: Any) -> Optional[float]: if val in ("", None) or (isinstance(val, float) and np.isnan(val)): return None try: return float(val) except: return None def truthy(val: Any) -> bool: if pd.isna(val): return False s = str(val).strip().lower() return s in {"1","true","yes","y","t","on"} or val is True or val == 1 def extract_probability_for_positive(preds: pd.DataFrame, positive_label=1) -> Optional[float]: str_pos = str(positive_label) # PyCaret predict_model often outputs per-class columns named as labels if str_pos in preds.columns: return float(preds.iloc[0][str_pos]) for c in preds.columns: if str_pos == str(c) or str(c).endswith("_"+str_pos): try: return float(preds.iloc[0][c]) except: pass for cname in ("prediction_score","Score","score"): if cname in preds.columns: try: return float(preds.iloc[0][cname]) except: pass return None def get_global_importance_table(model) -> Optional[pd.DataFrame]: """Fallback (non-SHAP) importances/coefficients from the final estimator.""" try: if hasattr(model, "named_steps"): est = model.named_steps.get("trained_model", list(model.named_steps.values())[-1]) elif hasattr(model, "steps"): est = model.steps[-1][1] else: est = model except Exception: est = model X_cols = getattr(model, "feature_names_in_", None) if hasattr(est, "feature_importances_"): vals = np.asarray(est.feature_importances_) if X_cols is not None and len(vals) == len(X_cols): df_imp = pd.DataFrame({"feature": list(X_cols), "importance": vals}) else: df_imp = pd.DataFrame({"feature": [f"f{i}" for i in range(len(vals))], "importance": vals}) return df_imp.sort_values("importance", ascending=False).reset_index(drop=True) if hasattr(est, "coef_"): coef = np.array(est.coef_) if coef.ndim > 1: coef = coef[0] coef = np.ravel(coef) if X_cols is not None and len(coef) == len(X_cols): df_coef = pd.DataFrame({"feature": list(X_cols), "coefficient": coef}) else: df_coef = pd.DataFrame({"feature": [f"f{i}" for i in range(len(coef))], "coefficient": coef}) order = df_coef.iloc[:, -1].abs().sort_values(ascending=False).index return df_coef.reindex(order).reset_index(drop=True) return None # ---------- Load model ---------- local_path = hf_hub_download(repo_id=REPO, filename=FNAME, token=TOKEN) MODEL = load_model(str(Path(local_path).with_suffix(""))) # ---------- Helpers to find positive-class index for predict_proba ---------- def _get_pos_index_and_classes(pipe, pos_label=1): est = None try: est = getattr(pipe, "named_steps", {}).get("trained_model", None) except Exception: est = None if est is None: est = pipe classes = getattr(est, "classes_", None) if classes is not None and pos_label in list(classes): return list(classes).index(pos_label), list(classes) # fallback: assume last column is positive if 2-class if classes is not None and len(classes) == 2: return 1, list(classes) return -1, list(classes) if classes is not None else None POS_IDX, _CLASSES = _get_pos_index_and_classes(MODEL, POS_CLASS) # ---------- Load fixed sample file (+ normalizer) ---------- def load_sample_dataframe(path: str) -> Tuple[pd.DataFrame, str]: if not os.path.exists(path): raise FileNotFoundError(f"Sample file not found: {path}") if path.lower().endswith((".xlsx",".xls")): sdf = pd.read_excel(path) else: sdf = pd.read_csv(path) # Find target col case-insensitively cols_norm = {normalize(c): c for c in sdf.columns} target_col = cols_norm.get(normalize(TARGET_NAME)) if target_col is None: raise ValueError(f"Target column '{TARGET_NAME}' not found in sample file (case-insensitive).") # Map to exact FEATURES (case-insensitive) rename_map, missing = {}, [] for f in FEATURES: src = cols_norm.get(normalize(f)) if src is None: missing.append(f) else: rename_map[src] = f if missing: raise ValueError(f"Missing required feature columns in sample file: {missing}") sdf2 = sdf.rename(columns=rename_map)[FEATURES + [target_col]] return sdf2, target_col try: SAMPLE_DF, SAMPLE_TARGET = load_sample_dataframe(SAMPLE_FILE) except Exception as e: SAMPLE_DF, SAMPLE_TARGET = pd.DataFrame(columns=FEATURES+[TARGET_NAME]), TARGET_NAME SAMPLE_ERROR = f"⚠️ Could not load sample file: {e}" else: SAMPLE_ERROR = "" def build_sample_choices(df: pd.DataFrame, tgt: str, flt: str = "All") -> List[str]: if df.empty: return [] if flt == "All": idxs = list(range(len(df))) else: want = int(flt) idxs = [i for i in range(len(df)) if str(df.iloc[i][tgt]) == str(want)] return [f"{i}: y={df.iloc[i][tgt]}" for i in idxs] # ---------- SHAP background / explainer ---------- def _prepare_background(df_samples: pd.DataFrame | None, max_rows: int = 200) -> pd.DataFrame: if df_samples is None or df_samples.empty: bg = pd.DataFrame([{k: 0.0 for k in FEATURES} for _ in range(50)]) else: bg = df_samples[FEATURES].copy() for c in FEATURES: if c not in bg.columns: bg[c] = np.nan for c in FEATURES: if c in NUMERIC_INPUTS: bg[c] = pd.to_numeric(bg[c], errors="coerce") else: bg[c] = bg[c].apply(lambda v: 1.0 if truthy(v) else 0.0) bg = bg.fillna(bg.median(numeric_only=True)) if len(bg) > max_rows: bg = bg.sample(max_rows, random_state=42) return bg.reset_index(drop=True) BACKGROUND = _prepare_background(SAMPLE_DF) def _f_proba_pos(X_np: np.ndarray) -> np.ndarray: X_df = pd.DataFrame(X_np, columns=FEATURES) proba = MODEL.predict_proba(X_df) if POS_IDX >= 0 and POS_IDX < proba.shape[1]: return proba[:, POS_IDX] # fallback: try class "1" if present if proba.shape[1] >= 2: return proba[:, 1] return proba[:, 0] try: EXPLAINER = shap.Explainer(_f_proba_pos, BACKGROUND.values) except Exception as e: print("[WARN] SHAP explainer init failed:", e) EXPLAINER = None def _plot_local_shap(row_dict: dict): if EXPLAINER is None: return None X = pd.DataFrame([row_dict], columns=FEATURES) exp = EXPLAINER(X.values) # (1, n_features) vals = exp.values[0] order = np.argsort(np.abs(vals)) fig, ax = plt.subplots(figsize=(7, 4.5)) ax.barh(np.array(FEATURES)[order], vals[order]) ax.axvline(0, linewidth=1) ax.set_title("Local SHAP values (current input)") ax.set_xlabel(f"Impact on P(class=={POS_CLASS})") fig.tight_layout() return fig def _plot_global_shap(): if EXPLAINER is None: return None exp = EXPLAINER(BACKGROUND.values) mean_abs = np.mean(np.abs(exp.values), axis=0) order = np.argsort(mean_abs) fig, ax = plt.subplots(figsize=(7, 4.5)) ax.barh(np.array(FEATURES)[order], mean_abs[order]) ax.set_title("Global feature importance (mean |SHAP|)") ax.set_xlabel(f"Mean |impact on P(class=={POS_CLASS})|") fig.tight_layout() return fig GLOBAL_FIG = _plot_global_shap() fi_df = get_global_importance_table(MODEL) GLOBAL_FI_TEXT = fi_df if (fi_df is not None) else pd.DataFrame() # ---------- Gradio UI ---------- with gr.Blocks(theme=gr.themes.Soft(), css=""" * { font-family: Inter, ui-sans-serif, system-ui, -apple-system, Segoe UI; } .gradio-container { max-width: 1040px !important; margin: 0 auto; } .card { border: 1px solid #e5e7eb; border-radius: 16px; padding: 16px; background: white; box-shadow: 0 1px 8px rgba(0,0,0,0.04); } h1.title { font-size: 28px; font-weight: 800; margin: 10px 0 2px; } .badge { display:inline-block; padding: 2px 10px; border-radius: 999px; background:#eef2ff; color:#3730a3; font-size: 12px; font-weight:700; } .small { font-size: 12px; color:#6b7280; } hr.sep { border: none; border-top: 1px solid #e5e7eb; margin: 8px 0 14px; } """) as demo: gr.Markdown( "

Insulin Classifier

" ) if SAMPLE_ERROR: gr.Markdown(f"
{SAMPLE_ERROR}
") with gr.Row(): # -------- Left: Manual inputs + Sample picker -------- with gr.Column(scale=1): gr.Markdown("### 1) Manual input") age_in = gr.Number(label="Age — 19–48 years", value=None, precision=2) bmi_in = gr.Number(label="BMI — 16–169 kg/m²", value=None, precision=3) prev_ab = gr.Number(label="History of abortion in previous pregnancies — count (0–6)", value=None, precision=0) gr.Markdown("
") gr.Markdown("#### Clinical flags") checkbox_map: Dict[str, gr.Checkbox] = {} for feat, nice_label in FLAG_SPECS: checkbox_map[feat] = gr.Checkbox(label=nice_label, value=False) gr.Markdown("
") thr = gr.Slider(0.05, 0.95, value=0.50, step=0.01, label=f"Decision threshold for class '{POS_CLASS}'") with gr.Row(): run_btn = gr.Button("🚀 Predict (manual)", variant="primary") explain_btn = gr.Button("🧠 Explain (SHAP for current input)") # -------- Sample picker (fixed file) -------- gr.Markdown("
") gr.Markdown("### 2) Sample picker (from fixed file)") grp_dd = gr.Dropdown(label="Filter by target", choices=["All","0","1"], value="All") choices0 = build_sample_choices(SAMPLE_DF, SAMPLE_TARGET, "All") sample_dd = gr.Dropdown(label="Choose sample row", choices=choices0, value=(choices0[0] if choices0 else None)) with gr.Row(): load_btn = gr.Button("📥 Load sample into manual inputs", variant="secondary") pred_btn = gr.Button("🎯 Predict & compare (sample)", variant="primary") # -------- Right: Results -------- with gr.Column(scale=1): gr.Markdown("### 3) Results") pred_label = gr.Textbox(label="Predicted label (with threshold decision)", interactive=False) with gr.Row(): prob_out = gr.Number(label=f"P(class=={POS_CLASS})", interactive=False, precision=6) decision = gr.Textbox(label="Decision @ threshold", interactive=False) with gr.Row(): gt_out = gr.Textbox(label="Ground truth (sample)", interactive=False) match_out= gr.Textbox(label="Correct vs. ground truth?", interactive=False) with gr.Accordion("Echoed input (row sent to model)", open=False): echoed = gr.Dataframe(wrap=True) with gr.Accordion("Global feature importance (SHAP)", open=False): gr.Plot(value=GLOBAL_FIG) if isinstance(GLOBAL_FI_TEXT, pd.DataFrame) and not GLOBAL_FI_TEXT.empty: gr.Markdown("> Text fallback (native model importances/coefficients):") gr.Dataframe(value=GLOBAL_FI_TEXT, interactive=False, wrap=True) with gr.Accordion("Local explanation (SHAP) for current input", open=False): local_plot = gr.Plot() # -------- Manual predict -------- def do_predict_manual(age, bmi, prev_ab_cnt, threshold, *flag_values): row = {c: None for c in FEATURES} row["age"] = coerce_numeric(age) row["BMI"] = coerce_numeric(bmi) row["Previos_Obsteric_History_AB"] = coerce_numeric(prev_ab_cnt) for feat, val in zip(BOOL_FEATURES, flag_values): row[feat] = 1.0 if bool(val) else 0.0 df_row = pd.DataFrame([row], columns=FEATURES) preds = predict_model(MODEL, data=df_row.copy()) label_col = next((c for c in preds.columns if c.lower() in ("prediction_label","label")), None) label = preds.iloc[0][label_col] if label_col else None p = extract_probability_for_positive(preds, positive_label=POS_CLASS) if p is not None: dec = 1 if float(p) >= float(threshold) else 0 pretty = f"{label} (threshold {threshold:.2f} ⇒ decision={dec})" return pretty, float(p), str(dec), "", "", df_row else: return str(label), float("nan"), str(label), "", "", df_row run_btn.click( do_predict_manual, inputs=[age_in, bmi_in, prev_ab, thr] + [checkbox_map[f] for f in BOOL_FEATURES], outputs=[pred_label, prob_out, decision, gt_out, match_out, echoed], ) # -------- Local SHAP for current manual input -------- def do_explain_local(age, bmi, prev_ab_cnt, *flag_values): row = {c: None for c in FEATURES} row["age"] = coerce_numeric(age) row["BMI"] = coerce_numeric(bmi) row["Previos_Obsteric_History_AB"] = coerce_numeric(prev_ab_cnt) for feat, val in zip(BOOL_FEATURES, flag_values): row[feat] = 1.0 if bool(val) else 0.0 fig = _plot_local_shap(row) return fig explain_btn.click( do_explain_local, inputs=[age_in, bmi_in, prev_ab] + [checkbox_map[f] for f in BOOL_FEATURES], outputs=[local_plot], ) # -------- Update sample choices on filter change -------- def update_choices(group_value): ch = build_sample_choices(SAMPLE_DF, SAMPLE_TARGET, group_value) return gr.Dropdown(choices=ch, value=(ch[0] if ch else None)) grp_dd.change(update_choices, inputs=[grp_dd], outputs=[sample_dd]) # -------- Load selected sample INTO manual inputs -------- def load_into_manual(sample_choice): if SAMPLE_DF.empty or sample_choice is None or str(sample_choice).strip() == "": raise gr.Error("Sample file is empty or no row selected. Check SAMPLE_FILE path.") idx = int(str(sample_choice).split(":")[0]) srow = SAMPLE_DF.iloc[idx] updates = [ gr.update(value=coerce_numeric(srow["age"])), gr.update(value=coerce_numeric(srow["BMI"])), gr.update(value=coerce_numeric(srow["Previos_Obsteric_History_AB"])), ] for feat in BOOL_FEATURES: updates.append(gr.update(value=bool(truthy(srow[feat])))) # also surface ground truth to the Results panel updates.append(gr.update(value=str(srow[SAMPLE_TARGET]))) return updates load_into_outputs = [age_in, bmi_in, prev_ab] + [checkbox_map[f] for f in BOOL_FEATURES] + [gt_out] load_btn.click(load_into_manual, inputs=[sample_dd], outputs=load_into_outputs) # -------- Predict & compare for selected sample -------- def predict_sample(sample_choice, threshold): if SAMPLE_DF.empty or sample_choice is None or str(sample_choice).strip() == "": raise gr.Error("Sample file is empty or no row selected. Check SAMPLE_FILE path.") idx = int(str(sample_choice).split(":")[0]) srow = SAMPLE_DF.iloc[idx] row = {c: None for c in FEATURES} row["age"] = coerce_numeric(srow["age"]) row["BMI"] = coerce_numeric(srow["BMI"]) row["Previos_Obsteric_History_AB"] = coerce_numeric(srow["Previos_Obsteric_History_AB"]) for feat in BOOL_FEATURES: row[feat] = 1.0 if truthy(srow[feat]) else 0.0 df_row = pd.DataFrame([row], columns=FEATURES) preds = predict_model(MODEL, data=df_row.copy()) label_col = next((c for c in preds.columns if c.lower() in ("prediction_label","label")), None) label = preds.iloc[0][label_col] if label_col else None p = extract_probability_for_positive(preds, positive_label=POS_CLASS) if p is not None: dec = 1 if float(p) >= float(threshold) else 0 pretty = f"{label} (threshold {threshold:.2f} ⇒ decision={dec})" else: dec, pretty = label, str(label) gt = srow[SAMPLE_TARGET] match = "✅ Correct" if gt == label else "❌ Incorrect" return pretty, (float(p) if p is not None else float("nan")), str(dec), str(gt), match, df_row pred_btn.click( predict_sample, inputs=[sample_dd, thr], outputs=[pred_label, prob_out, decision, gt_out, match_out, echoed], ) # ---------- Launch ---------- if __name__ == "__main__": demo.launch(server_name=HOST, server_port=PORT, share=SHARE)