Spaces:
Sleeping
Sleeping
| """Tabular classification bundles: Titanic, Heart Disease, Wine Quality. | |
| Each bundle is trained lazily on first access from CSVs in ``sample_data/``. | |
| Per-request contributions use: | |
| - Titanic (LogisticRegression): signed ``coef * (x_scaled)`` per feature | |
| - Heart / Wine (GradientBoostingClassifier): LOCO-style approximation — | |
| substitute each feature with the training mean/mode and compute Δprob. | |
| All three bundles expose a uniform ``predict(payload)`` callable returning the | |
| unified response schema consumed by the ``/tabular/*`` endpoints. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import threading | |
| from dataclasses import dataclass | |
| from typing import Any, Callable | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.ensemble import GradientBoostingClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import OneHotEncoder, StandardScaler | |
| _DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sample_data") | |
| _lock = threading.Lock() | |
| _bundles: dict[str, "TabularBundle"] = {} | |
| class TabularBundle: | |
| name: str | |
| pipeline: Pipeline | |
| feature_order: list[str] | |
| categorical: list[str] | |
| numeric: list[str] | |
| class_labels: list[str] | |
| model_label: str | |
| target_type: str # "binary" or "multiclass" | |
| baseline: dict[str, Any] # training means / modes for LOCO substitution | |
| positive_class: str | None = None # for binary display | |
| def predict(self, payload: dict[str, Any]) -> dict[str, Any]: | |
| """Run prediction + contributions on a single row payload.""" | |
| row = {f: payload.get(f, self.baseline[f]) for f in self.feature_order} | |
| x_df = pd.DataFrame([row], columns=self.feature_order) | |
| proba = self.pipeline.predict_proba(x_df)[0] | |
| classes = [str(c) for c in self.pipeline.classes_] | |
| probabilities = { | |
| label_for_class(self.class_labels, classes, c): float(p) | |
| for c, p in zip(classes, proba) | |
| } | |
| top_idx = int(np.argmax(proba)) | |
| top_class_raw = classes[top_idx] | |
| top_class_label = label_for_class(self.class_labels, classes, top_class_raw) | |
| # Contributions | |
| if isinstance(self.pipeline.named_steps["clf"], LogisticRegression): | |
| contributions = _logreg_contributions( | |
| self.pipeline, x_df, self.feature_order, self.numeric, self.categorical | |
| ) | |
| else: | |
| contributions = _loco_contributions( | |
| self.pipeline, | |
| x_df, | |
| self.feature_order, | |
| self.baseline, | |
| top_idx, | |
| ) | |
| # Top-1 probability for gauge | |
| confidence = float(proba[top_idx]) | |
| # Build feature_importance (signed) sorted by |contribution| | |
| fi_list = [ | |
| { | |
| "feature": feat, | |
| "value": _display_value(row[feat]), | |
| "contribution": float(contrib), | |
| } | |
| for feat, contrib in contributions.items() | |
| ] | |
| fi_list.sort(key=lambda d: abs(d["contribution"]), reverse=True) | |
| return { | |
| "prediction": top_class_label, | |
| "confidence": confidence, | |
| "probabilities": probabilities, | |
| "feature_importance": fi_list, | |
| "feature_order": self.feature_order, | |
| "model": self.model_label, | |
| "target_type": self.target_type, | |
| } | |
| def label_for_class(class_labels: list[str], classes: list[str], raw: str) -> str: | |
| """Map model class output (may be '0','1' or string) to display label.""" | |
| try: | |
| idx = classes.index(raw) | |
| if idx < len(class_labels): | |
| return class_labels[idx] | |
| except ValueError: | |
| pass | |
| return raw | |
| def _display_value(v: Any) -> Any: | |
| if isinstance(v, float): | |
| if v != v: # NaN | |
| return None | |
| return round(float(v), 3) | |
| return v | |
| # ─── Contribution helpers ─── | |
| def _logreg_contributions( | |
| pipeline: Pipeline, | |
| x_df: pd.DataFrame, | |
| feature_order: list[str], | |
| numeric: list[str], | |
| categorical: list[str], | |
| ) -> dict[str, float]: | |
| """Compute signed coef * x_scaled contribution per original feature. | |
| For one-hot encoded categoricals we sum over all dummy columns tied to the | |
| original feature so the contribution aggregates cleanly for the UI. | |
| """ | |
| preprocessor: ColumnTransformer = pipeline.named_steps["prep"] | |
| clf: LogisticRegression = pipeline.named_steps["clf"] | |
| x_trans = preprocessor.transform(x_df) | |
| if hasattr(x_trans, "toarray"): | |
| x_trans = x_trans.toarray() | |
| x_trans = np.asarray(x_trans).ravel() | |
| coef = clf.coef_ | |
| # Binary LR → single row of coefs; multiclass → use class 1 row (positive) | |
| if coef.shape[0] == 1: | |
| coefs = coef[0] | |
| else: | |
| coefs = coef[-1] | |
| # Map transformed column index → original feature name | |
| feature_names_out = _get_feature_names_out(preprocessor, numeric, categorical) | |
| per_feat: dict[str, float] = {f: 0.0 for f in feature_order} | |
| for i, col_name in enumerate(feature_names_out): | |
| original = _strip_prefix(col_name, numeric + categorical) | |
| if original in per_feat: | |
| per_feat[original] += float(coefs[i] * x_trans[i]) | |
| return per_feat | |
| def _get_feature_names_out( | |
| preprocessor: ColumnTransformer, | |
| numeric: list[str], | |
| categorical: list[str], | |
| ) -> list[str]: | |
| """Best-effort retrieval of column names from fitted ColumnTransformer.""" | |
| try: | |
| return list(preprocessor.get_feature_names_out()) | |
| except Exception: | |
| pass | |
| names: list[str] = [] | |
| for name, transformer, cols in preprocessor.transformers_: | |
| if name == "num": | |
| names.extend([f"num__{c}" for c in cols]) | |
| elif name == "cat": | |
| try: | |
| ohe_names = transformer.get_feature_names_out(cols) | |
| except Exception: | |
| ohe_names = [f"cat__{c}" for c in cols] | |
| names.extend(ohe_names) | |
| return names | |
| def _strip_prefix(col_name: str, originals: list[str]) -> str: | |
| # sklearn formats: "num__age", "cat__sex_female" | |
| for o in originals: | |
| if col_name == f"num__{o}" or col_name.startswith(f"cat__{o}_"): | |
| return o | |
| if col_name == o or col_name.startswith(f"{o}_"): | |
| return o | |
| return col_name.split("__", 1)[-1].split("_", 1)[0] | |
| def _loco_contributions( | |
| pipeline: Pipeline, | |
| x_df: pd.DataFrame, | |
| feature_order: list[str], | |
| baseline: dict[str, Any], | |
| target_idx: int, | |
| ) -> dict[str, float]: | |
| """LOCO approximation — substitute each feature with its baseline value and | |
| compute delta in predicted probability for the predicted class.""" | |
| base_proba = pipeline.predict_proba(x_df)[0][target_idx] | |
| contributions: dict[str, float] = {} | |
| for feat in feature_order: | |
| substituted = x_df.copy() | |
| substituted[feat] = baseline[feat] | |
| new_proba = pipeline.predict_proba(substituted)[0][target_idx] | |
| # contribution = base - new_with_feature_neutralised | |
| # → positive means feature pushed probability UP | |
| contributions[feat] = float(base_proba - new_proba) | |
| return contributions | |
| # ─── Bundle builders ─── | |
| def _build_titanic() -> TabularBundle: | |
| path = os.path.join(_DATA_DIR, "titanic.csv") | |
| df = pd.read_csv(path) | |
| # Clean | |
| df = df[["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked", "survived"]].copy() | |
| df["age"] = df["age"].fillna(df["age"].median()) | |
| df["fare"] = df["fare"].fillna(df["fare"].median()) | |
| df["embarked"] = df["embarked"].fillna(df["embarked"].mode().iloc[0]) | |
| df = df.dropna(subset=["survived"]) | |
| numeric = ["age", "sibsp", "parch", "fare"] | |
| categorical = ["pclass", "sex", "embarked"] | |
| feature_order = ["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked"] | |
| preprocessor = ColumnTransformer( | |
| transformers=[ | |
| ("num", StandardScaler(), numeric), | |
| ("cat", OneHotEncoder(handle_unknown="ignore"), categorical), | |
| ] | |
| ) | |
| pipeline = Pipeline([ | |
| ("prep", preprocessor), | |
| ("clf", LogisticRegression(max_iter=500, C=1.0)), | |
| ]) | |
| pipeline.fit(df[feature_order], df["survived"].astype(int)) | |
| baseline: dict[str, Any] = { | |
| "pclass": int(df["pclass"].mode().iloc[0]), | |
| "sex": df["sex"].mode().iloc[0], | |
| "age": float(df["age"].median()), | |
| "sibsp": int(df["sibsp"].median()), | |
| "parch": int(df["parch"].median()), | |
| "fare": float(df["fare"].median()), | |
| "embarked": df["embarked"].mode().iloc[0], | |
| } | |
| return TabularBundle( | |
| name="titanic-survival", | |
| pipeline=pipeline, | |
| feature_order=feature_order, | |
| categorical=categorical, | |
| numeric=numeric, | |
| class_labels=["Did not survive", "Survived"], | |
| model_label="LogisticRegression (Titanic)", | |
| target_type="binary", | |
| baseline=baseline, | |
| positive_class="Survived", | |
| ) | |
| def _build_heart() -> TabularBundle: | |
| path = os.path.join(_DATA_DIR, "heart.csv") | |
| df = pd.read_csv(path) | |
| # UCI Cleveland: num 0 = no disease, 1-4 = presence — bin to binary | |
| df = df[[ | |
| "age", "sex", "cp", "trestbps", "chol", "fbs", | |
| "restecg", "thalach", "exang", "oldpeak", "num", | |
| ]].copy() | |
| df = df.apply(pd.to_numeric, errors="coerce").dropna() | |
| df["target"] = (df["num"] > 0).astype(int) | |
| df = df.drop(columns=["num"]) | |
| feature_order = [ | |
| "age", "sex", "cp", "trestbps", "chol", "fbs", | |
| "restecg", "thalach", "exang", "oldpeak", | |
| ] | |
| numeric = feature_order[:] | |
| categorical: list[str] = [] | |
| preprocessor = ColumnTransformer( | |
| transformers=[("num", StandardScaler(), numeric)] | |
| ) | |
| pipeline = Pipeline([ | |
| ("prep", preprocessor), | |
| ("clf", GradientBoostingClassifier(n_estimators=150, max_depth=3, random_state=0)), | |
| ]) | |
| pipeline.fit(df[feature_order], df["target"]) | |
| baseline = {f: float(df[f].mean()) for f in feature_order} | |
| return TabularBundle( | |
| name="heart-disease", | |
| pipeline=pipeline, | |
| feature_order=feature_order, | |
| categorical=categorical, | |
| numeric=numeric, | |
| class_labels=["Low risk", "Elevated risk"], | |
| model_label="GradientBoosting (UCI Cleveland Heart)", | |
| target_type="binary", | |
| baseline=baseline, | |
| positive_class="Elevated risk", | |
| ) | |
| def _build_wine() -> TabularBundle: | |
| path = os.path.join(_DATA_DIR, "wine_red.csv") | |
| df = pd.read_csv(path, sep=";") | |
| df.columns = [c.strip().replace(" ", "_") for c in df.columns] | |
| df = df.dropna() | |
| # Bin quality → 3 classes | |
| def _bin(q: float) -> int: | |
| if q <= 4: | |
| return 0 # low | |
| if q <= 6: | |
| return 1 # medium | |
| return 2 # high | |
| df["target"] = df["quality"].apply(_bin) | |
| feature_order = [ | |
| "alcohol", "volatile_acidity", "sulphates", "citric_acid", | |
| "residual_sugar", "chlorides", "pH", "density", | |
| ] | |
| numeric = feature_order[:] | |
| categorical: list[str] = [] | |
| preprocessor = ColumnTransformer( | |
| transformers=[("num", StandardScaler(), numeric)] | |
| ) | |
| pipeline = Pipeline([ | |
| ("prep", preprocessor), | |
| ("clf", GradientBoostingClassifier(n_estimators=200, max_depth=3, random_state=0)), | |
| ]) | |
| pipeline.fit(df[feature_order], df["target"]) | |
| baseline = {f: float(df[f].mean()) for f in feature_order} | |
| return TabularBundle( | |
| name="wine-quality", | |
| pipeline=pipeline, | |
| feature_order=feature_order, | |
| categorical=categorical, | |
| numeric=numeric, | |
| class_labels=["Low (≤4)", "Medium (5–6)", "High (≥7)"], | |
| model_label="GradientBoosting (Wine Quality — Red)", | |
| target_type="multiclass", | |
| baseline=baseline, | |
| positive_class=None, | |
| ) | |
| _BUILDERS: dict[str, Callable[[], TabularBundle]] = { | |
| "titanic-survival": _build_titanic, | |
| "heart-disease": _build_heart, | |
| "wine-quality": _build_wine, | |
| } | |
| def get_bundle(name: str) -> TabularBundle: | |
| if name in _bundles: | |
| return _bundles[name] | |
| with _lock: | |
| if name in _bundles: | |
| return _bundles[name] | |
| if name not in _BUILDERS: | |
| raise KeyError(f"Unknown tabular bundle: {name}") | |
| _bundles[name] = _BUILDERS[name]() | |
| return _bundles[name] | |
| def schema_summary(name: str) -> dict[str, Any]: | |
| """Return baseline/metadata for warmup introspection.""" | |
| bundle = get_bundle(name) | |
| return { | |
| "name": bundle.name, | |
| "model": bundle.model_label, | |
| "features": bundle.feature_order, | |
| "numeric": bundle.numeric, | |
| "categorical": bundle.categorical, | |
| "classes": bundle.class_labels, | |
| "target_type": bundle.target_type, | |
| "baseline": bundle.baseline, | |
| } | |