"""Tabular classification bundles: Titanic, Heart Disease, Wine Quality. Each bundle is trained lazily on first access from CSVs in ``sample_data/``. Per-request contributions use: - Titanic (LogisticRegression): signed ``coef * (x_scaled)`` per feature - Heart / Wine (GradientBoostingClassifier): LOCO-style approximation — substitute each feature with the training mean/mode and compute Δprob. All three bundles expose a uniform ``predict(payload)`` callable returning the unified response schema consumed by the ``/tabular/*`` endpoints. """ from __future__ import annotations import os import threading from dataclasses import dataclass from typing import Any, Callable import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler _DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sample_data") _lock = threading.Lock() _bundles: dict[str, "TabularBundle"] = {} @dataclass class TabularBundle: name: str pipeline: Pipeline feature_order: list[str] categorical: list[str] numeric: list[str] class_labels: list[str] model_label: str target_type: str # "binary" or "multiclass" baseline: dict[str, Any] # training means / modes for LOCO substitution positive_class: str | None = None # for binary display def predict(self, payload: dict[str, Any]) -> dict[str, Any]: """Run prediction + contributions on a single row payload.""" row = {f: payload.get(f, self.baseline[f]) for f in self.feature_order} x_df = pd.DataFrame([row], columns=self.feature_order) proba = self.pipeline.predict_proba(x_df)[0] classes = [str(c) for c in self.pipeline.classes_] probabilities = { label_for_class(self.class_labels, classes, c): float(p) for c, p in zip(classes, proba) } top_idx = int(np.argmax(proba)) top_class_raw = classes[top_idx] top_class_label = label_for_class(self.class_labels, classes, top_class_raw) # Contributions if isinstance(self.pipeline.named_steps["clf"], LogisticRegression): contributions = _logreg_contributions( self.pipeline, x_df, self.feature_order, self.numeric, self.categorical ) else: contributions = _loco_contributions( self.pipeline, x_df, self.feature_order, self.baseline, top_idx, ) # Top-1 probability for gauge confidence = float(proba[top_idx]) # Build feature_importance (signed) sorted by |contribution| fi_list = [ { "feature": feat, "value": _display_value(row[feat]), "contribution": float(contrib), } for feat, contrib in contributions.items() ] fi_list.sort(key=lambda d: abs(d["contribution"]), reverse=True) return { "prediction": top_class_label, "confidence": confidence, "probabilities": probabilities, "feature_importance": fi_list, "feature_order": self.feature_order, "model": self.model_label, "target_type": self.target_type, } def label_for_class(class_labels: list[str], classes: list[str], raw: str) -> str: """Map model class output (may be '0','1' or string) to display label.""" try: idx = classes.index(raw) if idx < len(class_labels): return class_labels[idx] except ValueError: pass return raw def _display_value(v: Any) -> Any: if isinstance(v, float): if v != v: # NaN return None return round(float(v), 3) return v # ─── Contribution helpers ─── def _logreg_contributions( pipeline: Pipeline, x_df: pd.DataFrame, feature_order: list[str], numeric: list[str], categorical: list[str], ) -> dict[str, float]: """Compute signed coef * x_scaled contribution per original feature. For one-hot encoded categoricals we sum over all dummy columns tied to the original feature so the contribution aggregates cleanly for the UI. """ preprocessor: ColumnTransformer = pipeline.named_steps["prep"] clf: LogisticRegression = pipeline.named_steps["clf"] x_trans = preprocessor.transform(x_df) if hasattr(x_trans, "toarray"): x_trans = x_trans.toarray() x_trans = np.asarray(x_trans).ravel() coef = clf.coef_ # Binary LR → single row of coefs; multiclass → use class 1 row (positive) if coef.shape[0] == 1: coefs = coef[0] else: coefs = coef[-1] # Map transformed column index → original feature name feature_names_out = _get_feature_names_out(preprocessor, numeric, categorical) per_feat: dict[str, float] = {f: 0.0 for f in feature_order} for i, col_name in enumerate(feature_names_out): original = _strip_prefix(col_name, numeric + categorical) if original in per_feat: per_feat[original] += float(coefs[i] * x_trans[i]) return per_feat def _get_feature_names_out( preprocessor: ColumnTransformer, numeric: list[str], categorical: list[str], ) -> list[str]: """Best-effort retrieval of column names from fitted ColumnTransformer.""" try: return list(preprocessor.get_feature_names_out()) except Exception: pass names: list[str] = [] for name, transformer, cols in preprocessor.transformers_: if name == "num": names.extend([f"num__{c}" for c in cols]) elif name == "cat": try: ohe_names = transformer.get_feature_names_out(cols) except Exception: ohe_names = [f"cat__{c}" for c in cols] names.extend(ohe_names) return names def _strip_prefix(col_name: str, originals: list[str]) -> str: # sklearn formats: "num__age", "cat__sex_female" for o in originals: if col_name == f"num__{o}" or col_name.startswith(f"cat__{o}_"): return o if col_name == o or col_name.startswith(f"{o}_"): return o return col_name.split("__", 1)[-1].split("_", 1)[0] def _loco_contributions( pipeline: Pipeline, x_df: pd.DataFrame, feature_order: list[str], baseline: dict[str, Any], target_idx: int, ) -> dict[str, float]: """LOCO approximation — substitute each feature with its baseline value and compute delta in predicted probability for the predicted class.""" base_proba = pipeline.predict_proba(x_df)[0][target_idx] contributions: dict[str, float] = {} for feat in feature_order: substituted = x_df.copy() substituted[feat] = baseline[feat] new_proba = pipeline.predict_proba(substituted)[0][target_idx] # contribution = base - new_with_feature_neutralised # → positive means feature pushed probability UP contributions[feat] = float(base_proba - new_proba) return contributions # ─── Bundle builders ─── def _build_titanic() -> TabularBundle: path = os.path.join(_DATA_DIR, "titanic.csv") df = pd.read_csv(path) # Clean df = df[["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked", "survived"]].copy() df["age"] = df["age"].fillna(df["age"].median()) df["fare"] = df["fare"].fillna(df["fare"].median()) df["embarked"] = df["embarked"].fillna(df["embarked"].mode().iloc[0]) df = df.dropna(subset=["survived"]) numeric = ["age", "sibsp", "parch", "fare"] categorical = ["pclass", "sex", "embarked"] feature_order = ["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked"] preprocessor = ColumnTransformer( transformers=[ ("num", StandardScaler(), numeric), ("cat", OneHotEncoder(handle_unknown="ignore"), categorical), ] ) pipeline = Pipeline([ ("prep", preprocessor), ("clf", LogisticRegression(max_iter=500, C=1.0)), ]) pipeline.fit(df[feature_order], df["survived"].astype(int)) baseline: dict[str, Any] = { "pclass": int(df["pclass"].mode().iloc[0]), "sex": df["sex"].mode().iloc[0], "age": float(df["age"].median()), "sibsp": int(df["sibsp"].median()), "parch": int(df["parch"].median()), "fare": float(df["fare"].median()), "embarked": df["embarked"].mode().iloc[0], } return TabularBundle( name="titanic-survival", pipeline=pipeline, feature_order=feature_order, categorical=categorical, numeric=numeric, class_labels=["Did not survive", "Survived"], model_label="LogisticRegression (Titanic)", target_type="binary", baseline=baseline, positive_class="Survived", ) def _build_heart() -> TabularBundle: path = os.path.join(_DATA_DIR, "heart.csv") df = pd.read_csv(path) # UCI Cleveland: num 0 = no disease, 1-4 = presence — bin to binary df = df[[ "age", "sex", "cp", "trestbps", "chol", "fbs", "restecg", "thalach", "exang", "oldpeak", "num", ]].copy() df = df.apply(pd.to_numeric, errors="coerce").dropna() df["target"] = (df["num"] > 0).astype(int) df = df.drop(columns=["num"]) feature_order = [ "age", "sex", "cp", "trestbps", "chol", "fbs", "restecg", "thalach", "exang", "oldpeak", ] numeric = feature_order[:] categorical: list[str] = [] preprocessor = ColumnTransformer( transformers=[("num", StandardScaler(), numeric)] ) pipeline = Pipeline([ ("prep", preprocessor), ("clf", GradientBoostingClassifier(n_estimators=150, max_depth=3, random_state=0)), ]) pipeline.fit(df[feature_order], df["target"]) baseline = {f: float(df[f].mean()) for f in feature_order} return TabularBundle( name="heart-disease", pipeline=pipeline, feature_order=feature_order, categorical=categorical, numeric=numeric, class_labels=["Low risk", "Elevated risk"], model_label="GradientBoosting (UCI Cleveland Heart)", target_type="binary", baseline=baseline, positive_class="Elevated risk", ) def _build_wine() -> TabularBundle: path = os.path.join(_DATA_DIR, "wine_red.csv") df = pd.read_csv(path, sep=";") df.columns = [c.strip().replace(" ", "_") for c in df.columns] df = df.dropna() # Bin quality → 3 classes def _bin(q: float) -> int: if q <= 4: return 0 # low if q <= 6: return 1 # medium return 2 # high df["target"] = df["quality"].apply(_bin) feature_order = [ "alcohol", "volatile_acidity", "sulphates", "citric_acid", "residual_sugar", "chlorides", "pH", "density", ] numeric = feature_order[:] categorical: list[str] = [] preprocessor = ColumnTransformer( transformers=[("num", StandardScaler(), numeric)] ) pipeline = Pipeline([ ("prep", preprocessor), ("clf", GradientBoostingClassifier(n_estimators=200, max_depth=3, random_state=0)), ]) pipeline.fit(df[feature_order], df["target"]) baseline = {f: float(df[f].mean()) for f in feature_order} return TabularBundle( name="wine-quality", pipeline=pipeline, feature_order=feature_order, categorical=categorical, numeric=numeric, class_labels=["Low (≤4)", "Medium (5–6)", "High (≥7)"], model_label="GradientBoosting (Wine Quality — Red)", target_type="multiclass", baseline=baseline, positive_class=None, ) _BUILDERS: dict[str, Callable[[], TabularBundle]] = { "titanic-survival": _build_titanic, "heart-disease": _build_heart, "wine-quality": _build_wine, } def get_bundle(name: str) -> TabularBundle: if name in _bundles: return _bundles[name] with _lock: if name in _bundles: return _bundles[name] if name not in _BUILDERS: raise KeyError(f"Unknown tabular bundle: {name}") _bundles[name] = _BUILDERS[name]() return _bundles[name] def schema_summary(name: str) -> dict[str, Any]: """Return baseline/metadata for warmup introspection.""" bundle = get_bundle(name) return { "name": bundle.name, "model": bundle.model_label, "features": bundle.feature_order, "numeric": bundle.numeric, "categorical": bundle.categorical, "classes": bundle.class_labels, "target_type": bundle.target_type, "baseline": bundle.baseline, }