"""Ensemble training and inference utilities. This module exposes a production-safe weighted soft-voting ensemble with optional XGBoost and LightGBM backends and calibrated probabilities. """ from __future__ import annotations import json import logging from pathlib import Path from typing import Any, Dict, List, Tuple import joblib import numpy as np from sklearn.calibration import CalibratedClassifierCV from sklearn.ensemble import RandomForestClassifier, VotingClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, f1_score, recall_score from sklearn.model_selection import train_test_split from sklearn.neural_network import MLPClassifier # Optional imports try: from xgboost import XGBClassifier except Exception: # pragma: no cover XGBClassifier = None # type: ignore try: from lightgbm import LGBMClassifier except Exception: # pragma: no cover LGBMClassifier = None # type: ignore logger = logging.getLogger("medcare_ddi.ensemble") def _normalize_weights(weights: np.ndarray) -> np.ndarray: clipped = np.clip(weights.astype(np.float64), 1e-8, None) return (clipped / clipped.sum()).astype(np.float64) def _blend_probabilities(prob_list: List[np.ndarray], weights: np.ndarray) -> np.ndarray: if not prob_list: raise ValueError('prob_list cannot be empty') w = _normalize_weights(weights) out = np.zeros_like(prob_list[0], dtype=np.float64) for idx, probs in enumerate(prob_list): out += w[idx] * probs return out def _score_healthcare(y_true: np.ndarray, probs: np.ndarray, severe_index: int) -> float: preds = np.argmax(probs, axis=1) macro_f1 = f1_score(y_true, preds, average='macro', zero_division=0) severe_recall = recall_score(y_true, preds, labels=[severe_index], average='macro', zero_division=0) return float(severe_recall + 0.5 * macro_f1) def _optimize_blend_weights( y_val: np.ndarray, prob_list: List[np.ndarray], severe_index: int, random_state: int, ) -> np.ndarray: if len(prob_list) == 1: return np.array([1.0], dtype=np.float64) rng = np.random.default_rng(random_state) best_w = np.ones((len(prob_list),), dtype=np.float64) / float(len(prob_list)) best_score = _score_healthcare(y_val, _blend_probabilities(prob_list, best_w), severe_index) for _ in range(500): candidate = rng.dirichlet(np.ones(len(prob_list), dtype=np.float64)) score = _score_healthcare(y_val, _blend_probabilities(prob_list, candidate), severe_index) if score > best_score: best_score = score best_w = candidate return best_w def _make_mlp(hidden_dim: int = 256) -> MLPClassifier: return MLPClassifier( hidden_layer_sizes=(hidden_dim, hidden_dim // 2), activation='relu', alpha=1e-4, batch_size=128, learning_rate_init=1e-3, max_iter=300, early_stopping=True, n_iter_no_change=15, random_state=42, ) def _make_estimators(num_classes: int) -> List[Tuple[str, Any]]: estimators: List[Tuple[str, Any]] = [] if XGBClassifier is not None: estimators.append( ( 'xgb', XGBClassifier( n_estimators=220, max_depth=6, learning_rate=0.05, subsample=0.9, colsample_bytree=0.9, objective='multi:softprob', num_class=num_classes, reg_lambda=1.0, random_state=42, n_jobs=4, ), ) ) if LGBMClassifier is not None: estimators.append( ( 'lgbm', LGBMClassifier( n_estimators=320, learning_rate=0.04, num_leaves=63, max_depth=-1, subsample=0.9, colsample_bytree=0.9, objective='multiclass', class_weight='balanced', random_state=42, n_jobs=4, ), ) ) estimators.append(('mlp', _make_mlp())) estimators.append( ( 'rf', RandomForestClassifier( n_estimators=400, max_depth=None, min_samples_leaf=2, class_weight='balanced_subsample', n_jobs=4, random_state=42, ), ) ) return estimators def train_base_models(X: np.ndarray, y: np.ndarray, output_dir: Path, random_state: int = 42) -> Dict[str, Any]: output_dir.mkdir(parents=True, exist_ok=True) models: Dict[str, Any] = {} X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=random_state, stratify=y) num_classes = int(len(np.unique(y))) estimators = _make_estimators(num_classes) for name, estimator in estimators: logger.info('Training %s...', name) estimator.fit(X_train, y_train) models[name] = estimator joblib.dump(estimator, output_dir / f'{name}.joblib') # Build weighted soft-voting ensemble favoring severe recall via tree models vote_estimators = [(n, models[n]) for n in models if n in {'xgb', 'lgbm', 'mlp', 'rf'}] vote_weights = [2.0 if n in {'xgb', 'lgbm'} else 1.0 for n, _ in vote_estimators] voting = VotingClassifier(estimators=vote_estimators, voting='soft', weights=vote_weights) voting.fit(X_train, y_train) models['voting'] = voting joblib.dump(voting, output_dir / 'voting.joblib') # Calibrate final voting model probabilities calib = CalibratedClassifierCV(voting, cv='prefit', method='sigmoid') calib.fit(X_val, y_val) models['calibrated_voting'] = calib joblib.dump(calib, output_dir / 'calibrated_voting.joblib') # Learn a blend of base model probabilities to optimize severe-recall-aware objective. blend_names: List[str] = [] blend_probs: List[np.ndarray] = [] for name, estimator in models.items(): if name in {'voting', 'calibrated_voting'}: continue if hasattr(estimator, 'predict_proba'): blend_names.append(name) blend_probs.append(estimator.predict_proba(X_val)) severe_index = int(num_classes - 1) blend_weights = _optimize_blend_weights(y_val, blend_probs, severe_index=severe_index, random_state=random_state) blend_val = _blend_probabilities(blend_probs, blend_weights) # Stacking meta-learner on concatenated probabilities. stacked_features = np.hstack(blend_probs) stacker = LogisticRegression( max_iter=2000, class_weight='balanced', multi_class='multinomial', n_jobs=1, random_state=random_state, ) stacker.fit(stacked_features, y_val) models['stacker'] = stacker joblib.dump(stacker, output_dir / 'stacker.joblib') # Persist a lightweight metrics summary for selection stack_pred = stacker.predict(stacked_features) blend_pred = np.argmax(blend_val, axis=1) val_pred = calib.predict(X_val) summary = { 'accuracy': float(accuracy_score(y_val, val_pred)), 'macro_f1': float(f1_score(y_val, val_pred, average='macro', zero_division=0)), 'severe_recall': float(recall_score(y_val, val_pred, labels=[num_classes - 1], average='macro', zero_division=0)), 'blend_accuracy': float(accuracy_score(y_val, blend_pred)), 'blend_macro_f1': float(f1_score(y_val, blend_pred, average='macro', zero_division=0)), 'blend_severe_recall': float(recall_score(y_val, blend_pred, labels=[num_classes - 1], average='macro', zero_division=0)), 'stack_accuracy': float(accuracy_score(y_val, stack_pred)), 'stack_macro_f1': float(f1_score(y_val, stack_pred, average='macro', zero_division=0)), 'stack_severe_recall': float(recall_score(y_val, stack_pred, labels=[num_classes - 1], average='macro', zero_division=0)), 'blend_model_names': blend_names, 'blend_weights': [float(w) for w in blend_weights.tolist()], 'models': list(models.keys()), 'num_classes': num_classes, } (output_dir / 'ensemble_summary.json').write_text(json.dumps(summary, indent=2), encoding='utf-8') bundle = { 'model_names': blend_names, 'weights': [float(w) for w in blend_weights.tolist()], 'num_classes': num_classes, } (output_dir / 'blend_weights.json').write_text(json.dumps(bundle, indent=2), encoding='utf-8') return models class EnsemblePredictor: def __init__(self, model_dir: Path): self.model_dir = Path(model_dir) self.models: Dict[str, Any] = {} self.load_models() def load_models(self) -> None: for artifact in ['calibrated_voting.joblib', 'voting.joblib', 'mlp.joblib', 'xgb.joblib', 'lgbm.joblib', 'rf.joblib', 'stacker.joblib']: p = self.model_dir / artifact if p.exists(): self.models[artifact.replace('.joblib', '')] = joblib.load(p) blend_weights_path = self.model_dir / 'blend_weights.json' self.blend_weights: Dict[str, Any] | None = None if blend_weights_path.exists(): self.blend_weights = json.loads(blend_weights_path.read_text(encoding='utf-8')) def _base_probabilities(self, X: np.ndarray) -> tuple[List[str], List[np.ndarray]]: names: List[str] = [] probs: List[np.ndarray] = [] for key in ['xgb', 'lgbm', 'mlp', 'rf']: model = self.models.get(key) if model is None: continue if hasattr(model, 'predict_proba'): names.append(key) probs.append(model.predict_proba(X)) return names, probs def _predict_proba_blend(self, X: np.ndarray) -> np.ndarray: if not self.blend_weights: raise RuntimeError('Blend weights are unavailable') names, probs = self._base_probabilities(X) name_to_probs = {n: p for n, p in zip(names, probs)} ordered_names = [str(n) for n in self.blend_weights.get('model_names', [])] selected_probs = [name_to_probs[name] for name in ordered_names if name in name_to_probs] if not selected_probs: raise RuntimeError('No base probabilities available for blend inference') weights = np.array(self.blend_weights.get('weights', [1.0] * len(selected_probs)), dtype=np.float64) return _blend_probabilities(selected_probs, weights) def _predict_proba_stacker(self, X: np.ndarray) -> np.ndarray: stacker = self.models.get('stacker') if stacker is None: raise RuntimeError('Stacker model unavailable') _, probs = self._base_probabilities(X) if not probs: raise RuntimeError('No base probabilities for stacker features') stacked = np.hstack(probs) return stacker.predict_proba(stacked) def predict_proba(self, X: np.ndarray) -> np.ndarray: if 'stacker' in self.models: try: return self._predict_proba_stacker(X) except Exception: pass if self.blend_weights is not None: try: return self._predict_proba_blend(X) except Exception: pass if 'calibrated_voting' in self.models: return self.models['calibrated_voting'].predict_proba(X) if 'voting' in self.models: return self.models['voting'].predict_proba(X) if 'mlp' in self.models: return self.models['mlp'].predict_proba(X) raise RuntimeError('No ensemble models available') def predict(self, X: np.ndarray) -> Dict[str, Any]: probs = self.predict_proba(X) preds = np.argmax(probs, axis=1) return {'preds': preds, 'probs': probs} if __name__ == '__main__': print('Ensemble module loaded')