ModelMatrix / matrix /ensemble.py
Akshay4506's picture
Fix deployment entry point and merge requirements
c4ff02d
"""
ensemble.py β€” Ensemble builder for the SAP RPT-1 Benchmarking Web App.
Given individual CV results, this module:
1. Selects the top-N performing models
2. Runs a Soft Voting ensemble (works with ALL model types)
3. Runs a Stacking ensemble (sklearn-native models only)
4. Returns CV results in the same schema as individual models
"""
import os, time, warnings
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.metrics import (accuracy_score, f1_score, roc_auc_score,
r2_score, mean_absolute_error, mean_squared_error)
from sklearn.linear_model import LogisticRegression, Ridge
warnings.filterwarnings("ignore")
N_FOLDS = int(os.getenv("N_FOLDS", "5"))
RAND = int(os.getenv("RANDOM_STATE", "42"))
# Sklearn-native builders safe to use in StackingClassifier/Regressor
SKLEARN_SAFE = {"XGBoost", "LightGBM", "CatBoost"}
# ── Model selection ────────────────────────────────────────────────────────────
def select_top_models(results: dict, builders: dict, task: str, n: int = 3):
"""
Return top-N (name, builder) pairs by primary metric, skipping errored models.
Only includes models that have >0.5 ROC-AUC or >0.0 RΒ².
"""
primary = "roc_auc" if task == "classification" else "r2"
threshold = 0.50 if task == "classification" else 0.0
ranked = []
for name in builders:
if name not in results or "error" in results[name]:
continue
score = results[name]["mean"].get(primary, 0) or 0
if score >= threshold:
ranked.append((name, score))
ranked.sort(key=lambda x: x[1], reverse=True)
top = ranked[:n]
return [(name, builders[name]) for name, _ in top]
# ── Voting ensemble (manual soft voting) ──────────────────────────────────────
def run_voting_ensemble(top_pairs: list, X: pd.DataFrame, y: pd.Series,
task: str, prep_fn) -> dict:
"""
Manual soft-voting ensemble. Works with ANY model (sklearn or custom).
Each fold trains all top models and averages probabilities / predictions.
"""
if len(top_pairs) < 2:
raise ValueError("Need at least 2 models to form an ensemble.")
if task == "classification":
splits = list(StratifiedKFold(N_FOLDS, shuffle=True, random_state=RAND).split(X, y))
else:
splits = list(KFold(N_FOLDS, shuffle=True, random_state=RAND).split(X))
n_classes = int(y.nunique()) if task == "classification" else None
fold_results = []
for tr_idx, val_idx in splits:
Xtr, Xval = X.iloc[tr_idx], X.iloc[val_idx]
ytr, yval = y.iloc[tr_idx], y.iloc[val_idx]
Xtr_p, encoders = prep_fn(Xtr)
Xval_p, _ = prep_fn(Xval, encoders=encoders)
t0 = time.perf_counter()
if task == "classification":
n_cls = n_classes or int(np.unique(ytr).size)
all_probas = []
for _, builder in top_pairs:
try:
model = builder(task)
model.fit(Xtr_p, ytr)
try:
proba = model.predict_proba(Xval_p)
# Normalise rows
row_sum = proba.sum(axis=1, keepdims=True) + 1e-9
all_probas.append(proba / row_sum)
except Exception:
# Fallback: one-hot from predict
pred = model.predict(Xval_p).astype(int)
oh = np.zeros((len(pred), n_cls))
for i, p in enumerate(pred):
if 0 <= p < n_cls:
oh[i, p] = 1.0
all_probas.append(oh)
except Exception:
continue # skip failing models within the fold
fit_t = time.perf_counter() - t0
if not all_probas:
continue
avg_proba = np.mean(all_probas, axis=0)
y_pred = np.argmax(avg_proba, axis=1)
acc = accuracy_score(yval, y_pred)
f1 = f1_score(yval, y_pred, average="macro", zero_division=0)
try:
auc = (roc_auc_score(yval, avg_proba[:, 1])
if avg_proba.shape[1] == 2
else roc_auc_score(yval, avg_proba,
multi_class="ovr", average="macro"))
except Exception:
auc = float("nan")
fold_results.append({"accuracy": acc, "f1_macro": f1,
"roc_auc": auc, "fit_time": fit_t})
else: # regression
all_preds = []
for _, builder in top_pairs:
try:
model = builder(task)
model.fit(Xtr_p, ytr)
all_preds.append(model.predict(Xval_p))
except Exception:
continue
fit_t = time.perf_counter() - t0
if not all_preds:
continue
avg_pred = np.mean(all_preds, axis=0)
fold_results.append({
"r2": r2_score(yval, avg_pred),
"mae": mean_absolute_error(yval, avg_pred),
"rmse": float(np.sqrt(mean_squared_error(yval, avg_pred))),
"fit_time": fit_t,
})
if not fold_results:
raise ValueError("All folds failed for voting ensemble.")
df = pd.DataFrame(fold_results)
return {"mean": df.mean().to_dict(), "std": df.std().to_dict(),
"folds": df.to_dict("records")}
# ── Stacking ensemble (sklearn-safe models only) ───────────────────────────────
def run_stacking_ensemble(sklearn_pairs: list, X: pd.DataFrame, y: pd.Series,
task: str, prep_fn) -> dict:
"""
Stacking ensemble using sklearn StackingClassifier / StackingRegressor.
Only XGBoost, LightGBM, CatBoost (sklearn-native) are used as base learners.
Meta-learner: LogisticRegression (clf) or Ridge (reg).
"""
from sklearn.ensemble import StackingClassifier, StackingRegressor
if len(sklearn_pairs) < 2:
raise ValueError("Need at least 2 sklearn-compatible models for stacking.")
if task == "classification":
splits = list(StratifiedKFold(N_FOLDS, shuffle=True, random_state=RAND).split(X, y))
meta = LogisticRegression(max_iter=1000, random_state=RAND, C=1.0)
else:
splits = list(KFold(N_FOLDS, shuffle=True, random_state=RAND).split(X))
meta = Ridge(random_state=RAND)
fold_results = []
for tr_idx, val_idx in splits:
Xtr, Xval = X.iloc[tr_idx], X.iloc[val_idx]
ytr, yval = y.iloc[tr_idx], y.iloc[val_idx]
Xtr_p, encoders = prep_fn(Xtr)
Xval_p, _ = prep_fn(Xval, encoders=encoders)
estimators = [(name, builder(task)) for name, builder in sklearn_pairs]
if task == "classification":
stacker = StackingClassifier(
estimators=estimators,
final_estimator=meta,
cv=3,
passthrough=False,
n_jobs=1,
)
else:
stacker = StackingRegressor(
estimators=estimators,
final_estimator=meta,
cv=3,
passthrough=False,
n_jobs=1,
)
t0 = time.perf_counter()
stacker.fit(Xtr_p, ytr)
fit_t = time.perf_counter() - t0
if task == "classification":
y_pred = stacker.predict(Xval_p)
acc = accuracy_score(yval, y_pred)
f1 = f1_score(yval, y_pred, average="macro", zero_division=0)
try:
proba = stacker.predict_proba(Xval_p)
auc = (roc_auc_score(yval, proba[:, 1])
if proba.shape[1] == 2
else roc_auc_score(yval, proba,
multi_class="ovr", average="macro"))
except Exception:
auc = float("nan")
fold_results.append({"accuracy": acc, "f1_macro": f1,
"roc_auc": auc, "fit_time": fit_t})
else:
y_pred = stacker.predict(Xval_p)
fold_results.append({
"r2": r2_score(yval, y_pred),
"mae": mean_absolute_error(yval, y_pred),
"rmse": float(np.sqrt(mean_squared_error(yval, y_pred))),
"fit_time": fit_t,
})
if not fold_results:
raise ValueError("All folds failed for stacking ensemble.")
df = pd.DataFrame(fold_results)
return {"mean": df.mean().to_dict(), "std": df.std().to_dict(),
"folds": df.to_dict("records")}