Spaces:
Running
Running
| """Ensemble training and inference utilities. | |
| This module exposes a production-safe weighted soft-voting ensemble with | |
| optional XGBoost and LightGBM backends and calibrated probabilities. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple | |
| import joblib | |
| import numpy as np | |
| from sklearn.calibration import CalibratedClassifierCV | |
| from sklearn.ensemble import RandomForestClassifier, VotingClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.metrics import accuracy_score, f1_score, recall_score | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.neural_network import MLPClassifier | |
| # Optional imports | |
| try: | |
| from xgboost import XGBClassifier | |
| except Exception: # pragma: no cover | |
| XGBClassifier = None # type: ignore | |
| try: | |
| from lightgbm import LGBMClassifier | |
| except Exception: # pragma: no cover | |
| LGBMClassifier = None # type: ignore | |
| logger = logging.getLogger("medcare_ddi.ensemble") | |
| def _normalize_weights(weights: np.ndarray) -> np.ndarray: | |
| clipped = np.clip(weights.astype(np.float64), 1e-8, None) | |
| return (clipped / clipped.sum()).astype(np.float64) | |
| def _blend_probabilities(prob_list: List[np.ndarray], weights: np.ndarray) -> np.ndarray: | |
| if not prob_list: | |
| raise ValueError('prob_list cannot be empty') | |
| w = _normalize_weights(weights) | |
| out = np.zeros_like(prob_list[0], dtype=np.float64) | |
| for idx, probs in enumerate(prob_list): | |
| out += w[idx] * probs | |
| return out | |
| def _score_healthcare(y_true: np.ndarray, probs: np.ndarray, severe_index: int) -> float: | |
| preds = np.argmax(probs, axis=1) | |
| macro_f1 = f1_score(y_true, preds, average='macro', zero_division=0) | |
| severe_recall = recall_score(y_true, preds, labels=[severe_index], average='macro', zero_division=0) | |
| return float(severe_recall + 0.5 * macro_f1) | |
| def _optimize_blend_weights( | |
| y_val: np.ndarray, | |
| prob_list: List[np.ndarray], | |
| severe_index: int, | |
| random_state: int, | |
| ) -> np.ndarray: | |
| if len(prob_list) == 1: | |
| return np.array([1.0], dtype=np.float64) | |
| rng = np.random.default_rng(random_state) | |
| best_w = np.ones((len(prob_list),), dtype=np.float64) / float(len(prob_list)) | |
| best_score = _score_healthcare(y_val, _blend_probabilities(prob_list, best_w), severe_index) | |
| for _ in range(500): | |
| candidate = rng.dirichlet(np.ones(len(prob_list), dtype=np.float64)) | |
| score = _score_healthcare(y_val, _blend_probabilities(prob_list, candidate), severe_index) | |
| if score > best_score: | |
| best_score = score | |
| best_w = candidate | |
| return best_w | |
| def _make_mlp(hidden_dim: int = 256) -> MLPClassifier: | |
| return MLPClassifier( | |
| hidden_layer_sizes=(hidden_dim, hidden_dim // 2), | |
| activation='relu', | |
| alpha=1e-4, | |
| batch_size=128, | |
| learning_rate_init=1e-3, | |
| max_iter=300, | |
| early_stopping=True, | |
| n_iter_no_change=15, | |
| random_state=42, | |
| ) | |
| def _make_estimators(num_classes: int) -> List[Tuple[str, Any]]: | |
| estimators: List[Tuple[str, Any]] = [] | |
| if XGBClassifier is not None: | |
| estimators.append( | |
| ( | |
| 'xgb', | |
| XGBClassifier( | |
| n_estimators=220, | |
| max_depth=6, | |
| learning_rate=0.05, | |
| subsample=0.9, | |
| colsample_bytree=0.9, | |
| objective='multi:softprob', | |
| num_class=num_classes, | |
| reg_lambda=1.0, | |
| random_state=42, | |
| n_jobs=4, | |
| ), | |
| ) | |
| ) | |
| if LGBMClassifier is not None: | |
| estimators.append( | |
| ( | |
| 'lgbm', | |
| LGBMClassifier( | |
| n_estimators=320, | |
| learning_rate=0.04, | |
| num_leaves=63, | |
| max_depth=-1, | |
| subsample=0.9, | |
| colsample_bytree=0.9, | |
| objective='multiclass', | |
| class_weight='balanced', | |
| random_state=42, | |
| n_jobs=4, | |
| ), | |
| ) | |
| ) | |
| estimators.append(('mlp', _make_mlp())) | |
| estimators.append( | |
| ( | |
| 'rf', | |
| RandomForestClassifier( | |
| n_estimators=400, | |
| max_depth=None, | |
| min_samples_leaf=2, | |
| class_weight='balanced_subsample', | |
| n_jobs=4, | |
| random_state=42, | |
| ), | |
| ) | |
| ) | |
| return estimators | |
| def train_base_models(X: np.ndarray, y: np.ndarray, output_dir: Path, random_state: int = 42) -> Dict[str, Any]: | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| models: Dict[str, Any] = {} | |
| X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=random_state, stratify=y) | |
| num_classes = int(len(np.unique(y))) | |
| estimators = _make_estimators(num_classes) | |
| for name, estimator in estimators: | |
| logger.info('Training %s...', name) | |
| estimator.fit(X_train, y_train) | |
| models[name] = estimator | |
| joblib.dump(estimator, output_dir / f'{name}.joblib') | |
| # Build weighted soft-voting ensemble favoring severe recall via tree models | |
| vote_estimators = [(n, models[n]) for n in models if n in {'xgb', 'lgbm', 'mlp', 'rf'}] | |
| vote_weights = [2.0 if n in {'xgb', 'lgbm'} else 1.0 for n, _ in vote_estimators] | |
| voting = VotingClassifier(estimators=vote_estimators, voting='soft', weights=vote_weights) | |
| voting.fit(X_train, y_train) | |
| models['voting'] = voting | |
| joblib.dump(voting, output_dir / 'voting.joblib') | |
| # Calibrate final voting model probabilities | |
| calib = CalibratedClassifierCV(voting, cv='prefit', method='sigmoid') | |
| calib.fit(X_val, y_val) | |
| models['calibrated_voting'] = calib | |
| joblib.dump(calib, output_dir / 'calibrated_voting.joblib') | |
| # Learn a blend of base model probabilities to optimize severe-recall-aware objective. | |
| blend_names: List[str] = [] | |
| blend_probs: List[np.ndarray] = [] | |
| for name, estimator in models.items(): | |
| if name in {'voting', 'calibrated_voting'}: | |
| continue | |
| if hasattr(estimator, 'predict_proba'): | |
| blend_names.append(name) | |
| blend_probs.append(estimator.predict_proba(X_val)) | |
| severe_index = int(num_classes - 1) | |
| blend_weights = _optimize_blend_weights(y_val, blend_probs, severe_index=severe_index, random_state=random_state) | |
| blend_val = _blend_probabilities(blend_probs, blend_weights) | |
| # Stacking meta-learner on concatenated probabilities. | |
| stacked_features = np.hstack(blend_probs) | |
| stacker = LogisticRegression( | |
| max_iter=2000, | |
| class_weight='balanced', | |
| multi_class='multinomial', | |
| n_jobs=1, | |
| random_state=random_state, | |
| ) | |
| stacker.fit(stacked_features, y_val) | |
| models['stacker'] = stacker | |
| joblib.dump(stacker, output_dir / 'stacker.joblib') | |
| # Persist a lightweight metrics summary for selection | |
| stack_pred = stacker.predict(stacked_features) | |
| blend_pred = np.argmax(blend_val, axis=1) | |
| val_pred = calib.predict(X_val) | |
| summary = { | |
| 'accuracy': float(accuracy_score(y_val, val_pred)), | |
| 'macro_f1': float(f1_score(y_val, val_pred, average='macro', zero_division=0)), | |
| 'severe_recall': float(recall_score(y_val, val_pred, labels=[num_classes - 1], average='macro', zero_division=0)), | |
| 'blend_accuracy': float(accuracy_score(y_val, blend_pred)), | |
| 'blend_macro_f1': float(f1_score(y_val, blend_pred, average='macro', zero_division=0)), | |
| 'blend_severe_recall': float(recall_score(y_val, blend_pred, labels=[num_classes - 1], average='macro', zero_division=0)), | |
| 'stack_accuracy': float(accuracy_score(y_val, stack_pred)), | |
| 'stack_macro_f1': float(f1_score(y_val, stack_pred, average='macro', zero_division=0)), | |
| 'stack_severe_recall': float(recall_score(y_val, stack_pred, labels=[num_classes - 1], average='macro', zero_division=0)), | |
| 'blend_model_names': blend_names, | |
| 'blend_weights': [float(w) for w in blend_weights.tolist()], | |
| 'models': list(models.keys()), | |
| 'num_classes': num_classes, | |
| } | |
| (output_dir / 'ensemble_summary.json').write_text(json.dumps(summary, indent=2), encoding='utf-8') | |
| bundle = { | |
| 'model_names': blend_names, | |
| 'weights': [float(w) for w in blend_weights.tolist()], | |
| 'num_classes': num_classes, | |
| } | |
| (output_dir / 'blend_weights.json').write_text(json.dumps(bundle, indent=2), encoding='utf-8') | |
| return models | |
| class EnsemblePredictor: | |
| def __init__(self, model_dir: Path): | |
| self.model_dir = Path(model_dir) | |
| self.models: Dict[str, Any] = {} | |
| self.load_models() | |
| def load_models(self) -> None: | |
| for artifact in ['calibrated_voting.joblib', 'voting.joblib', 'mlp.joblib', 'xgb.joblib', 'lgbm.joblib', 'rf.joblib', 'stacker.joblib']: | |
| p = self.model_dir / artifact | |
| if p.exists(): | |
| self.models[artifact.replace('.joblib', '')] = joblib.load(p) | |
| blend_weights_path = self.model_dir / 'blend_weights.json' | |
| self.blend_weights: Dict[str, Any] | None = None | |
| if blend_weights_path.exists(): | |
| self.blend_weights = json.loads(blend_weights_path.read_text(encoding='utf-8')) | |
| def _base_probabilities(self, X: np.ndarray) -> tuple[List[str], List[np.ndarray]]: | |
| names: List[str] = [] | |
| probs: List[np.ndarray] = [] | |
| for key in ['xgb', 'lgbm', 'mlp', 'rf']: | |
| model = self.models.get(key) | |
| if model is None: | |
| continue | |
| if hasattr(model, 'predict_proba'): | |
| names.append(key) | |
| probs.append(model.predict_proba(X)) | |
| return names, probs | |
| def _predict_proba_blend(self, X: np.ndarray) -> np.ndarray: | |
| if not self.blend_weights: | |
| raise RuntimeError('Blend weights are unavailable') | |
| names, probs = self._base_probabilities(X) | |
| name_to_probs = {n: p for n, p in zip(names, probs)} | |
| ordered_names = [str(n) for n in self.blend_weights.get('model_names', [])] | |
| selected_probs = [name_to_probs[name] for name in ordered_names if name in name_to_probs] | |
| if not selected_probs: | |
| raise RuntimeError('No base probabilities available for blend inference') | |
| weights = np.array(self.blend_weights.get('weights', [1.0] * len(selected_probs)), dtype=np.float64) | |
| return _blend_probabilities(selected_probs, weights) | |
| def _predict_proba_stacker(self, X: np.ndarray) -> np.ndarray: | |
| stacker = self.models.get('stacker') | |
| if stacker is None: | |
| raise RuntimeError('Stacker model unavailable') | |
| _, probs = self._base_probabilities(X) | |
| if not probs: | |
| raise RuntimeError('No base probabilities for stacker features') | |
| stacked = np.hstack(probs) | |
| return stacker.predict_proba(stacked) | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| if 'stacker' in self.models: | |
| try: | |
| return self._predict_proba_stacker(X) | |
| except Exception: | |
| pass | |
| if self.blend_weights is not None: | |
| try: | |
| return self._predict_proba_blend(X) | |
| except Exception: | |
| pass | |
| if 'calibrated_voting' in self.models: | |
| return self.models['calibrated_voting'].predict_proba(X) | |
| if 'voting' in self.models: | |
| return self.models['voting'].predict_proba(X) | |
| if 'mlp' in self.models: | |
| return self.models['mlp'].predict_proba(X) | |
| raise RuntimeError('No ensemble models available') | |
| def predict(self, X: np.ndarray) -> Dict[str, Any]: | |
| probs = self.predict_proba(X) | |
| preds = np.argmax(probs, axis=1) | |
| return {'preds': preds, 'probs': probs} | |
| if __name__ == '__main__': | |
| print('Ensemble module loaded') | |