| |
| from __future__ import annotations |
| import numpy as np |
| import pandas as pd |
|
|
| def _ensure_cols(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: |
| """Add any missing columns in one concat (avoids fragmentation).""" |
| missing = [c for c in cols if c not in df.columns] |
| if not missing: |
| return df |
|
|
| add = pd.DataFrame(np.nan, index=df.index, columns=missing) |
| out = pd.concat([df, add], axis=1) |
| return out.copy() |
|
|
| def _base_model_preds(stacking_model, X: np.ndarray) -> np.ndarray: |
| """ |
| Extract base estimator predictions from sklearn StackingRegressor (robustly). |
| Handles both: |
| - estimators_ : list of fitted estimators (most common) |
| - estimators : list of (name, estimator) pairs (pre-fit) |
| """ |
| preds = [] |
|
|
| |
| ests = getattr(stacking_model, "estimators_", None) |
| if ests is None: |
| |
| ests = [e for e in getattr(stacking_model, "estimators", [])] |
|
|
| for item in ests or []: |
| |
| est = item[1] if isinstance(item, (tuple, list)) and len(item) >= 2 else item |
|
|
| if est is None or est == "drop": |
| continue |
|
|
| try: |
| p = est.predict(X) |
| preds.append(np.asarray(p).reshape(-1)) |
| except Exception: |
| pass |
|
|
| if not preds: |
| return np.zeros((X.shape[0], 1)) |
|
|
| return np.column_stack(preds) |
|
|
|
|
|
|
|
|
| def _safe_minmax_norm(x: np.ndarray) -> np.ndarray: |
| """Min-max normalize, but if constant vector -> zeros.""" |
| x = np.asarray(x, dtype=float) |
| xmin = np.nanmin(x) |
| xmax = np.nanmax(x) |
| if not np.isfinite(xmin) or not np.isfinite(xmax) or (xmax - xmin) < 1e-12: |
| return np.zeros_like(x, dtype=float) |
| return (x - xmin) / (xmax - xmin) |
|
|
|
|
| def _norm_with_training_scale(x: np.ndarray, scale: float | None) -> np.ndarray: |
| """ |
| Normalize with a training-derived scale (e.g., p95). |
| If scale missing/invalid -> fallback to minmax (but stable for 1 row). |
| """ |
| x = np.asarray(x, dtype=float) |
| if scale is not None and np.isfinite(scale) and scale > 1e-12: |
| return np.clip(x / scale, 0.0, 1.0) |
| return _safe_minmax_norm(x) |
|
|
|
|
| def predict_with_confidence(bundle: dict, new_df: pd.DataFrame, interval: str = "q90") -> pd.DataFrame: |
| """ |
| Returns a DF with prediction, interval, confidence_score, confidence_label, flags. |
| interval: 'q90' or 'q95' |
| """ |
| model = bundle["model"] |
| imputer = bundle["imputer"] |
| impute_cols = bundle["impute_cols"] |
| feature_cols = bundle["feature_cols"] |
|
|
| calib = bundle.get("calibration", {}) |
| missing_rate = bundle.get("missing_rate", None) |
|
|
| |
| df = new_df.replace(-1, np.nan) |
| df = _ensure_cols(df, impute_cols) |
|
|
| |
| imputed = pd.DataFrame( |
| imputer.transform(df[impute_cols]), |
| columns=impute_cols, |
| index=df.index, |
| ) |
|
|
| X = imputed[feature_cols].values |
| pred = model.predict(X) |
|
|
| |
| q = float(calib.get(interval, np.nan)) |
| lower = pred - q if np.isfinite(q) else np.full_like(pred, np.nan) |
| upper = pred + q if np.isfinite(q) else np.full_like(pred, np.nan) |
|
|
| |
|
|
| |
| base_preds = _base_model_preds(model, X) |
| disagreement = np.std(base_preds, axis=1) if base_preds.shape[1] > 1 else np.zeros(len(pred)) |
|
|
| |
| |
| dis_stats = bundle.get("disagreement_stats", {}) |
| dis_p95 = dis_stats.get("p95", None) |
| d_norm = _norm_with_training_scale(disagreement, dis_p95) |
|
|
| |
| missing_input_frac = df[impute_cols].isna().mean(axis=1).values |
| m_norm = np.clip(missing_input_frac, 0.0, 1.0) |
|
|
| |
| |
| |
| int_stats = bundle.get("interval_stats", {}) |
| q_scale = int_stats.get(f"{interval}_p95", None) |
| if np.isfinite(q): |
| q_norm = _norm_with_training_scale(np.full(len(pred), q, dtype=float), q_scale) |
| else: |
| q_norm = np.zeros(len(pred), dtype=float) |
|
|
| |
| |
| |
| |
| score = 1.0 - (0.45 * m_norm + 0.35 * q_norm + 0.20 * d_norm) |
|
|
| |
| sparse_flag = ( |
| missing_rate is not None |
| and np.isfinite(missing_rate) |
| and missing_rate >= 0.90 |
| ) |
| if sparse_flag: |
| score = np.minimum(score, 0.35) |
|
|
| score = np.clip(score, 0.0, 1.0) |
|
|
| |
| label = np.where(score >= 0.75, "High", np.where(score >= 0.50, "Medium", "Low")) |
| if sparse_flag: |
| label[:] = "Low" |
|
|
| out = pd.DataFrame( |
| { |
| "prediction": pred, |
| "lower": lower, |
| "upper": upper, |
| "confidence_score": score, |
| "confidence_label": label, |
| }, |
| index=df.index, |
| ) |
|
|
| out["note"] = "⚠️ Target had ~90%+ missing in training; treat as noisy." if sparse_flag else "" |
| return out |
|
|