Spaces:
Sleeping
Sleeping
| """ | |
| ensemble.py β Ensemble builder for the SAP RPT-1 Benchmarking Web App. | |
| Given individual CV results, this module: | |
| 1. Selects the top-N performing models | |
| 2. Runs a Soft Voting ensemble (works with ALL model types) | |
| 3. Runs a Stacking ensemble (sklearn-native models only) | |
| 4. Returns CV results in the same schema as individual models | |
| """ | |
| import os, time, warnings | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import StratifiedKFold, KFold | |
| from sklearn.metrics import (accuracy_score, f1_score, roc_auc_score, | |
| r2_score, mean_absolute_error, mean_squared_error) | |
| from sklearn.linear_model import LogisticRegression, Ridge | |
| warnings.filterwarnings("ignore") | |
| N_FOLDS = int(os.getenv("N_FOLDS", "5")) | |
| RAND = int(os.getenv("RANDOM_STATE", "42")) | |
| # Sklearn-native builders safe to use in StackingClassifier/Regressor | |
| SKLEARN_SAFE = {"XGBoost", "LightGBM", "CatBoost"} | |
| # ββ Model selection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def select_top_models(results: dict, builders: dict, task: str, n: int = 3): | |
| """ | |
| Return top-N (name, builder) pairs by primary metric, skipping errored models. | |
| Only includes models that have >0.5 ROC-AUC or >0.0 RΒ². | |
| """ | |
| primary = "roc_auc" if task == "classification" else "r2" | |
| threshold = 0.50 if task == "classification" else 0.0 | |
| ranked = [] | |
| for name in builders: | |
| if name not in results or "error" in results[name]: | |
| continue | |
| score = results[name]["mean"].get(primary, 0) or 0 | |
| if score >= threshold: | |
| ranked.append((name, score)) | |
| ranked.sort(key=lambda x: x[1], reverse=True) | |
| top = ranked[:n] | |
| return [(name, builders[name]) for name, _ in top] | |
| # ββ Voting ensemble (manual soft voting) ββββββββββββββββββββββββββββββββββββββ | |
| def run_voting_ensemble(top_pairs: list, X: pd.DataFrame, y: pd.Series, | |
| task: str, prep_fn) -> dict: | |
| """ | |
| Manual soft-voting ensemble. Works with ANY model (sklearn or custom). | |
| Each fold trains all top models and averages probabilities / predictions. | |
| """ | |
| if len(top_pairs) < 2: | |
| raise ValueError("Need at least 2 models to form an ensemble.") | |
| if task == "classification": | |
| splits = list(StratifiedKFold(N_FOLDS, shuffle=True, random_state=RAND).split(X, y)) | |
| else: | |
| splits = list(KFold(N_FOLDS, shuffle=True, random_state=RAND).split(X)) | |
| n_classes = int(y.nunique()) if task == "classification" else None | |
| fold_results = [] | |
| for tr_idx, val_idx in splits: | |
| Xtr, Xval = X.iloc[tr_idx], X.iloc[val_idx] | |
| ytr, yval = y.iloc[tr_idx], y.iloc[val_idx] | |
| Xtr_p, encoders = prep_fn(Xtr) | |
| Xval_p, _ = prep_fn(Xval, encoders=encoders) | |
| t0 = time.perf_counter() | |
| if task == "classification": | |
| n_cls = n_classes or int(np.unique(ytr).size) | |
| all_probas = [] | |
| for _, builder in top_pairs: | |
| try: | |
| model = builder(task) | |
| model.fit(Xtr_p, ytr) | |
| try: | |
| proba = model.predict_proba(Xval_p) | |
| # Normalise rows | |
| row_sum = proba.sum(axis=1, keepdims=True) + 1e-9 | |
| all_probas.append(proba / row_sum) | |
| except Exception: | |
| # Fallback: one-hot from predict | |
| pred = model.predict(Xval_p).astype(int) | |
| oh = np.zeros((len(pred), n_cls)) | |
| for i, p in enumerate(pred): | |
| if 0 <= p < n_cls: | |
| oh[i, p] = 1.0 | |
| all_probas.append(oh) | |
| except Exception: | |
| continue # skip failing models within the fold | |
| fit_t = time.perf_counter() - t0 | |
| if not all_probas: | |
| continue | |
| avg_proba = np.mean(all_probas, axis=0) | |
| y_pred = np.argmax(avg_proba, axis=1) | |
| acc = accuracy_score(yval, y_pred) | |
| f1 = f1_score(yval, y_pred, average="macro", zero_division=0) | |
| try: | |
| auc = (roc_auc_score(yval, avg_proba[:, 1]) | |
| if avg_proba.shape[1] == 2 | |
| else roc_auc_score(yval, avg_proba, | |
| multi_class="ovr", average="macro")) | |
| except Exception: | |
| auc = float("nan") | |
| fold_results.append({"accuracy": acc, "f1_macro": f1, | |
| "roc_auc": auc, "fit_time": fit_t}) | |
| else: # regression | |
| all_preds = [] | |
| for _, builder in top_pairs: | |
| try: | |
| model = builder(task) | |
| model.fit(Xtr_p, ytr) | |
| all_preds.append(model.predict(Xval_p)) | |
| except Exception: | |
| continue | |
| fit_t = time.perf_counter() - t0 | |
| if not all_preds: | |
| continue | |
| avg_pred = np.mean(all_preds, axis=0) | |
| fold_results.append({ | |
| "r2": r2_score(yval, avg_pred), | |
| "mae": mean_absolute_error(yval, avg_pred), | |
| "rmse": float(np.sqrt(mean_squared_error(yval, avg_pred))), | |
| "fit_time": fit_t, | |
| }) | |
| if not fold_results: | |
| raise ValueError("All folds failed for voting ensemble.") | |
| df = pd.DataFrame(fold_results) | |
| return {"mean": df.mean().to_dict(), "std": df.std().to_dict(), | |
| "folds": df.to_dict("records")} | |
| # ββ Stacking ensemble (sklearn-safe models only) βββββββββββββββββββββββββββββββ | |
| def run_stacking_ensemble(sklearn_pairs: list, X: pd.DataFrame, y: pd.Series, | |
| task: str, prep_fn) -> dict: | |
| """ | |
| Stacking ensemble using sklearn StackingClassifier / StackingRegressor. | |
| Only XGBoost, LightGBM, CatBoost (sklearn-native) are used as base learners. | |
| Meta-learner: LogisticRegression (clf) or Ridge (reg). | |
| """ | |
| from sklearn.ensemble import StackingClassifier, StackingRegressor | |
| if len(sklearn_pairs) < 2: | |
| raise ValueError("Need at least 2 sklearn-compatible models for stacking.") | |
| if task == "classification": | |
| splits = list(StratifiedKFold(N_FOLDS, shuffle=True, random_state=RAND).split(X, y)) | |
| meta = LogisticRegression(max_iter=1000, random_state=RAND, C=1.0) | |
| else: | |
| splits = list(KFold(N_FOLDS, shuffle=True, random_state=RAND).split(X)) | |
| meta = Ridge(random_state=RAND) | |
| fold_results = [] | |
| for tr_idx, val_idx in splits: | |
| Xtr, Xval = X.iloc[tr_idx], X.iloc[val_idx] | |
| ytr, yval = y.iloc[tr_idx], y.iloc[val_idx] | |
| Xtr_p, encoders = prep_fn(Xtr) | |
| Xval_p, _ = prep_fn(Xval, encoders=encoders) | |
| estimators = [(name, builder(task)) for name, builder in sklearn_pairs] | |
| if task == "classification": | |
| stacker = StackingClassifier( | |
| estimators=estimators, | |
| final_estimator=meta, | |
| cv=3, | |
| passthrough=False, | |
| n_jobs=1, | |
| ) | |
| else: | |
| stacker = StackingRegressor( | |
| estimators=estimators, | |
| final_estimator=meta, | |
| cv=3, | |
| passthrough=False, | |
| n_jobs=1, | |
| ) | |
| t0 = time.perf_counter() | |
| stacker.fit(Xtr_p, ytr) | |
| fit_t = time.perf_counter() - t0 | |
| if task == "classification": | |
| y_pred = stacker.predict(Xval_p) | |
| acc = accuracy_score(yval, y_pred) | |
| f1 = f1_score(yval, y_pred, average="macro", zero_division=0) | |
| try: | |
| proba = stacker.predict_proba(Xval_p) | |
| auc = (roc_auc_score(yval, proba[:, 1]) | |
| if proba.shape[1] == 2 | |
| else roc_auc_score(yval, proba, | |
| multi_class="ovr", average="macro")) | |
| except Exception: | |
| auc = float("nan") | |
| fold_results.append({"accuracy": acc, "f1_macro": f1, | |
| "roc_auc": auc, "fit_time": fit_t}) | |
| else: | |
| y_pred = stacker.predict(Xval_p) | |
| fold_results.append({ | |
| "r2": r2_score(yval, y_pred), | |
| "mae": mean_absolute_error(yval, y_pred), | |
| "rmse": float(np.sqrt(mean_squared_error(yval, y_pred))), | |
| "fit_time": fit_t, | |
| }) | |
| if not fold_results: | |
| raise ValueError("All folds failed for stacking ensemble.") | |
| df = pd.DataFrame(fold_results) | |
| return {"mean": df.mean().to_dict(), "std": df.std().to_dict(), | |
| "folds": df.to_dict("records")} | |