File size: 12,924 Bytes
4a71291 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# handler.py — Quantium insights Inference Endpoint (fixes XGBWrappedModel unpickle + Residence_type)
import os
import sys
import types
import json
import traceback
from typing import Any, Dict, List, Tuple
import joblib
import numpy as np
import pandas as pd
# =========================
# Re-declare the custom wrapper class and register it where pickle expects it
# =========================
class XGBWrappedModel:
"""
Wrapper saved in model.joblib:
- preprocessor_: sklearn ColumnTransformer
- model_: XGBClassifier (or similar exposing predict_proba)
- explainer_: optional SHAP explainer
- feature_names_out_: names after preprocessing
Provides:
- predict_proba(X_df)
- top_contrib(X_df, k)
"""
def __init__(self, preprocessor=None, booster=None, explainer=None,
feat_names_out=None, cat_prefix="cat__", num_prefix="num__"):
self.preprocessor_ = preprocessor
self.model_ = booster
self.explainer_ = explainer
self.feature_names_out_ = np.array(feat_names_out).astype(str) if feat_names_out is not None else None
self.cat_prefix = cat_prefix
self.num_prefix = num_prefix
def predict_proba(self, X_df: pd.DataFrame):
Z = self.preprocessor_.transform(X_df)
# XGBoost exposes predict_proba for binary: shape (n, 2)
return self.model_.predict_proba(Z)
def top_contrib(self, X_df: pd.DataFrame, k: int = 5) -> Tuple[List[str], List[float]]:
if self.explainer_ is None:
return [], []
Z = self.preprocessor_.transform(X_df)
try:
sv = self.explainer_.shap_values(Z)
if isinstance(sv, list):
sv = sv[1] if len(sv) > 1 else sv[0]
except Exception:
res = self.explainer_(Z)
sv = res.values
sv_row = np.array(sv[0], dtype=float)
def to_orig(name: str) -> str:
if name.startswith(self.cat_prefix):
return name[len(self.cat_prefix):].split("_", 1)[0]
if name.startswith(self.num_prefix):
return name[len(self.num_prefix):]
return name.split("_", 1)[0]
if self.feature_names_out_ is None:
names_out = [f"f{i}" for i in range(len(sv_row))]
else:
names_out = list(self.feature_names_out_)
orig_names = [to_orig(n) for n in names_out]
abs_sum: Dict[str, float] = {}
signed_sum: Dict[str, float] = {}
for n, v in zip(orig_names, sv_row):
abs_sum[n] = abs_sum.get(n, 0.0) + abs(float(v))
signed_sum[n] = signed_sum.get(n, 0.0) + float(v)
ranked = sorted(abs_sum.items(), key=lambda kv: kv[1], reverse=True)[:k]
names = [n for n, _ in ranked]
values = [signed_sum[n] for n, _ in ranked]
return names, values
# Register class under the module names pickle may look for
# (your training run saved it from __main__; sometimes from 'train_export_xgb')
sys.modules['__main__'].__dict__['XGBWrappedModel'] = XGBWrappedModel
if 'train_export_xgb' not in sys.modules:
sys.modules['train_export_xgb'] = types.ModuleType('train_export_xgb')
sys.modules['train_export_xgb'].__dict__['XGBWrappedModel'] = XGBWrappedModel
# =========================
# Feature schema (canonical)
# =========================
NUMERIC_COLS = ["age", "avg_glucose_level", "bmi", "hypertension", "heart_disease"]
# Canonical Residence key uses capital R
CAT_COLS = ["gender", "ever_married", "work_type", "smoking_status", "Residence_type"]
ALL_CANON = NUMERIC_COLS + CAT_COLS
EXPLAIN_ORDER = [
"age", "avg_glucose_level", "bmi", "hypertension", "heart_disease",
"gender", "ever_married", "work_type", "smoking_status", "Residence_type"
]
# =========================
# Utility: dtype coercion
# =========================
def _to_int01(x: Any) -> int:
if isinstance(x, (bool, np.bool_)):
return int(bool(x))
try:
if isinstance(x, str):
s = x.strip().lower()
if s in {"1", "true", "t", "yes", "y"}:
return 1
if s in {"0", "false", "f", "no", "n"}:
return 0
return int(float(x))
except Exception:
return 0
def _coerce_dataframe(rows: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Build a clean DataFrame:
- Canonical Residence key is 'Residence_type' (capital R).
- Accept 'residence_type' and map it to 'Residence_type' if needed.
- Ensure numerics are float64 and 0/1 flags are ints then float64.
- Ensure categoricals are plain strings (object), no NA.
- Also mirror lowercase 'residence_type' for legacy models.
"""
norm_rows: List[Dict[str, Any]] = []
for r in rows:
r = dict(r or {})
if "Residence_type" not in r and "residence_type" in r:
r["Residence_type"] = r["residence_type"]
entry = {k: r.get(k, None) for k in ALL_CANON}
norm_rows.append(entry)
df = pd.DataFrame(norm_rows, columns=ALL_CANON)
for col in ["hypertension", "heart_disease"]:
df[col] = df[col].map(_to_int01)
for col in ["age", "avg_glucose_level", "bmi"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df[NUMERIC_COLS] = df[NUMERIC_COLS].astype("float64")
for col in CAT_COLS:
df[col] = df[col].where(df[col].notna(), "Unknown")
df[col] = df[col].map(lambda v: "Unknown" if v is None else str(v)).astype(object)
# Mirror lowercase for backward compatibility
df["residence_type"] = df["Residence_type"].astype(object)
return df
# =========================
# Safety patches for OHE
# =========================
def _iter_estimators(est):
yield est
if hasattr(est, "named_steps"):
for step in est.named_steps.values():
yield from _iter_estimators(step)
if hasattr(est, "transformers"):
for _, tr, _ in est.transformers:
yield from _iter_estimators(tr)
def _numeric_like(x) -> bool:
if x is None:
return True
if isinstance(x, (int, np.integer, float, np.floating)):
return True
if isinstance(x, str):
try:
float(x)
return True
except Exception:
return False
return False
def _sanitize_onehot_categories(model):
"""Coerce OneHotEncoder.categories_ to consistent dtypes to avoid np.isnan crashes."""
try:
from sklearn.preprocessing import OneHotEncoder # type: ignore
except Exception:
OneHotEncoder = None
if OneHotEncoder is None:
return
for node in _iter_estimators(model):
if isinstance(node, OneHotEncoder) and hasattr(node, "categories_"):
new_cats = []
for cats in node.categories_:
arr = np.asarray(cats, dtype=object)
if all(_numeric_like(v) for v in arr):
vals = []
for v in arr:
try:
vals.append(np.nan if v is None else float(v))
except Exception:
vals.append(np.nan)
new_cats.append(np.asarray(vals, dtype=float))
else:
strs = ["Unknown" if (v is None or (isinstance(v, float) and np.isnan(v))) else str(v) for v in arr]
new_cats.append(np.asarray(strs, dtype=object))
node.categories_ = new_cats
if hasattr(node, "handle_unknown"):
node.handle_unknown = "ignore"
def _patch_check_unknown():
"""Patch sklearn _check_unknown to avoid np.isnan on object arrays (older builds)."""
try:
from sklearn.utils import _encode # type: ignore
_orig = _encode._check_unknown
def _safe_check_unknown(values, known_values, return_mask=False):
try:
return _orig(values, known_values, return_mask=return_mask)
except TypeError:
vals = np.asarray(values, dtype=object)
known = np.asarray(known_values, dtype=object)
mask = np.isin(vals, known, assume_unique=False)
diff = vals[~mask]
if return_mask:
return diff, mask
return diff
_encode._check_unknown = _safe_check_unknown # type: ignore[attr-defined]
print("[handler] Patched sklearn.utils._encode._check_unknown", flush=True)
except Exception as e:
print(f"[handler] Patch for _check_unknown not applied: {e}", flush=True)
# =========================
# Model introspection (debug)
# =========================
def _introspect_model(model) -> Dict[str, Any]:
info: Dict[str, Any] = {"type": str(type(model))}
try:
if hasattr(model, "named_steps"):
info["pipeline_steps"] = list(model.named_steps.keys())
for name, step in model.named_steps.items():
if step.__class__.__name__ == "ColumnTransformer":
info["column_transformer"] = str(step)
try:
info["transformers_"] = [(n, str(t.__class__), cols) for (n, t, cols) in step.transformers]
except Exception:
pass
except Exception:
pass
try:
info["feature_names_in_"] = list(getattr(model, "feature_names_in_", []))
except Exception:
pass
return info
# =========================
# Handler
# =========================
class EndpointHandler:
def __init__(self, path: str = "/repository") -> None:
_patch_check_unknown() # apply safety patch early
model_path = os.path.join(path, "model.joblib")
self.model = joblib.load(model_path)
try:
self.threshold = float(os.getenv("THRESHOLD", "0.38"))
except Exception:
self.threshold = 0.38
self.explainer = getattr(self.model, "explainer_", None)
_sanitize_onehot_categories(self.model)
print("[handler] Model loaded", flush=True)
print(f"[handler] Using threshold: {self.threshold}", flush=True)
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
debug = bool(data.get("debug", False))
explain = bool(data.get("explain", False))
rows = data.get("inputs") or []
if isinstance(rows, dict):
rows = [rows]
if not isinstance(rows, list) or not rows:
return {"error": "inputs must be a non-empty list of records", "threshold": self.threshold}
df = _coerce_dataframe(rows)
debug_info = {
"columns": list(df.columns),
"dtypes": {c: str(df[c].dtype) for c in df.columns},
"threshold": self.threshold,
"model": _introspect_model(self.model),
"head": df.head(1).to_dict(orient="records"),
}
# Predict
try:
if hasattr(self.model, "predict_proba"):
proba = self.model.predict_proba(df)[:, 1].astype(float)
else:
raw = self.model.predict(df).astype(float)
proba = 1.0 / (1.0 + np.exp(-raw))
except Exception as e:
return {
"error": f"model.predict failed: {e}",
"trace": traceback.format_exc(),
"debug": debug_info,
"threshold": self.threshold,
}
p = float(proba[0])
label = int(p >= self.threshold)
resp: Dict[str, Any] = {
"risk_probability": p,
"risk_label": label,
"threshold": self.threshold,
}
if explain:
if hasattr(self.model, "top_contrib"):
try:
names, vals = self.model.top_contrib(df, k=5)
if names:
resp["shap"] = {"feature_names": names, "values": vals}
except Exception as e:
resp["shap_error"] = f"top_contrib failed: {e}"
elif self.explainer is not None:
try:
shap_vals = self.explainer(df)
vals = shap_vals.values[0] if hasattr(shap_vals, "values") else shap_vals[0]
contrib = []
for feat in EXPLAIN_ORDER:
if feat in df.columns:
idx = list(df.columns).index(feat)
contrib.append({"feature": feat, "effect": float(vals[idx])})
resp["shap"] = {"contrib": contrib}
except Exception as e:
resp["shap_error"] = f"explainer failed: {e}"
if debug:
resp["debug"] = debug_info
try:
print(f"[handler] prob={p:.4f} label={label}", flush=True)
except Exception:
pass
return resp
|