# src/infer.py from __future__ import annotations import numpy as np import pandas as pd def _ensure_cols(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: """Add any missing columns in one concat (avoids fragmentation).""" missing = [c for c in cols if c not in df.columns] if not missing: return df add = pd.DataFrame(np.nan, index=df.index, columns=missing) out = pd.concat([df, add], axis=1) return out.copy() # defragment def _base_model_preds(stacking_model, X: np.ndarray) -> np.ndarray: """ Extract base estimator predictions from sklearn StackingRegressor (robustly). Handles both: - estimators_ : list of fitted estimators (most common) - estimators : list of (name, estimator) pairs (pre-fit) """ preds = [] # Prefer fitted estimators_ ests = getattr(stacking_model, "estimators_", None) if ests is None: # fallback to pre-fit definition (name, estimator) ests = [e for e in getattr(stacking_model, "estimators", [])] for item in ests or []: # item may be an estimator OR (name, estimator) est = item[1] if isinstance(item, (tuple, list)) and len(item) >= 2 else item if est is None or est == "drop": continue try: p = est.predict(X) preds.append(np.asarray(p).reshape(-1)) except Exception: pass if not preds: return np.zeros((X.shape[0], 1)) return np.column_stack(preds) # (n_samples, n_base) def _safe_minmax_norm(x: np.ndarray) -> np.ndarray: """Min-max normalize, but if constant vector -> zeros.""" x = np.asarray(x, dtype=float) xmin = np.nanmin(x) xmax = np.nanmax(x) if not np.isfinite(xmin) or not np.isfinite(xmax) or (xmax - xmin) < 1e-12: return np.zeros_like(x, dtype=float) return (x - xmin) / (xmax - xmin) def _norm_with_training_scale(x: np.ndarray, scale: float | None) -> np.ndarray: """ Normalize with a training-derived scale (e.g., p95). If scale missing/invalid -> fallback to minmax (but stable for 1 row). """ x = np.asarray(x, dtype=float) if scale is not None and np.isfinite(scale) and scale > 1e-12: return np.clip(x / scale, 0.0, 1.0) return _safe_minmax_norm(x) def predict_with_confidence(bundle: dict, new_df: pd.DataFrame, interval: str = "q90") -> pd.DataFrame: """ Returns a DF with prediction, interval, confidence_score, confidence_label, flags. interval: 'q90' or 'q95' """ model = bundle["model"] imputer = bundle["imputer"] impute_cols = bundle["impute_cols"] feature_cols = bundle["feature_cols"] calib = bundle.get("calibration", {}) # contains q90/q95, etc. missing_rate = bundle.get("missing_rate", None) # Replace sentinel missing df = new_df.replace(-1, np.nan) df = _ensure_cols(df, impute_cols) # Impute imputed = pd.DataFrame( imputer.transform(df[impute_cols]), columns=impute_cols, index=df.index, ) X = imputed[feature_cols].values pred = model.predict(X) # Conformal interval half-width q = float(calib.get(interval, np.nan)) lower = pred - q if np.isfinite(q) else np.full_like(pred, np.nan) upper = pred + q if np.isfinite(q) else np.full_like(pred, np.nan) # ---- Confidence components ---- # 1) Disagreement across base estimators base_preds = _base_model_preds(model, X) disagreement = np.std(base_preds, axis=1) if base_preds.shape[1] > 1 else np.zeros(len(pred)) # If you saved training disagreement stats, use them for stable scaling # e.g. bundle["disagreement_stats"] = {"p95": 0.123} dis_stats = bundle.get("disagreement_stats", {}) dis_p95 = dis_stats.get("p95", None) d_norm = _norm_with_training_scale(disagreement, dis_p95) # 2) Missing input fraction missing_input_frac = df[impute_cols].isna().mean(axis=1).values m_norm = np.clip(missing_input_frac, 0.0, 1.0) # 3) Interval width penalty (bigger q => lower confidence) # If you saved training q stats, use them too (recommended) # e.g. bundle["interval_stats"] = {"q90_p95": 12.3, "q95_p95": 15.8} int_stats = bundle.get("interval_stats", {}) q_scale = int_stats.get(f"{interval}_p95", None) if np.isfinite(q): q_norm = _norm_with_training_scale(np.full(len(pred), q, dtype=float), q_scale) else: q_norm = np.zeros(len(pred), dtype=float) # Weighted score (tune weights as you like) # - missingness is usually most important # - then interval width # - then model disagreement score = 1.0 - (0.45 * m_norm + 0.35 * q_norm + 0.20 * d_norm) # Apply training missingness penalty (your existing logic) sparse_flag = ( missing_rate is not None and np.isfinite(missing_rate) and missing_rate >= 0.90 ) if sparse_flag: score = np.minimum(score, 0.35) score = np.clip(score, 0.0, 1.0) # Labels label = np.where(score >= 0.75, "High", np.where(score >= 0.50, "Medium", "Low")) if sparse_flag: label[:] = "Low" out = pd.DataFrame( { "prediction": pred, "lower": lower, "upper": upper, "confidence_score": score, "confidence_label": label, }, index=df.index, ) out["note"] = "⚠️ Target had ~90%+ missing in training; treat as noisy." if sparse_flag else "" return out