File size: 5,507 Bytes
8bb21fb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | # src/infer.py
from __future__ import annotations
import numpy as np
import pandas as pd
def _ensure_cols(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
"""Add any missing columns in one concat (avoids fragmentation)."""
missing = [c for c in cols if c not in df.columns]
if not missing:
return df
add = pd.DataFrame(np.nan, index=df.index, columns=missing)
out = pd.concat([df, add], axis=1)
return out.copy() # defragment
def _base_model_preds(stacking_model, X: np.ndarray) -> np.ndarray:
"""
Extract base estimator predictions from sklearn StackingRegressor (robustly).
Handles both:
- estimators_ : list of fitted estimators (most common)
- estimators : list of (name, estimator) pairs (pre-fit)
"""
preds = []
# Prefer fitted estimators_
ests = getattr(stacking_model, "estimators_", None)
if ests is None:
# fallback to pre-fit definition (name, estimator)
ests = [e for e in getattr(stacking_model, "estimators", [])]
for item in ests or []:
# item may be an estimator OR (name, estimator)
est = item[1] if isinstance(item, (tuple, list)) and len(item) >= 2 else item
if est is None or est == "drop":
continue
try:
p = est.predict(X)
preds.append(np.asarray(p).reshape(-1))
except Exception:
pass
if not preds:
return np.zeros((X.shape[0], 1))
return np.column_stack(preds) # (n_samples, n_base)
def _safe_minmax_norm(x: np.ndarray) -> np.ndarray:
"""Min-max normalize, but if constant vector -> zeros."""
x = np.asarray(x, dtype=float)
xmin = np.nanmin(x)
xmax = np.nanmax(x)
if not np.isfinite(xmin) or not np.isfinite(xmax) or (xmax - xmin) < 1e-12:
return np.zeros_like(x, dtype=float)
return (x - xmin) / (xmax - xmin)
def _norm_with_training_scale(x: np.ndarray, scale: float | None) -> np.ndarray:
"""
Normalize with a training-derived scale (e.g., p95).
If scale missing/invalid -> fallback to minmax (but stable for 1 row).
"""
x = np.asarray(x, dtype=float)
if scale is not None and np.isfinite(scale) and scale > 1e-12:
return np.clip(x / scale, 0.0, 1.0)
return _safe_minmax_norm(x)
def predict_with_confidence(bundle: dict, new_df: pd.DataFrame, interval: str = "q90") -> pd.DataFrame:
"""
Returns a DF with prediction, interval, confidence_score, confidence_label, flags.
interval: 'q90' or 'q95'
"""
model = bundle["model"]
imputer = bundle["imputer"]
impute_cols = bundle["impute_cols"]
feature_cols = bundle["feature_cols"]
calib = bundle.get("calibration", {}) # contains q90/q95, etc.
missing_rate = bundle.get("missing_rate", None)
# Replace sentinel missing
df = new_df.replace(-1, np.nan)
df = _ensure_cols(df, impute_cols)
# Impute
imputed = pd.DataFrame(
imputer.transform(df[impute_cols]),
columns=impute_cols,
index=df.index,
)
X = imputed[feature_cols].values
pred = model.predict(X)
# Conformal interval half-width
q = float(calib.get(interval, np.nan))
lower = pred - q if np.isfinite(q) else np.full_like(pred, np.nan)
upper = pred + q if np.isfinite(q) else np.full_like(pred, np.nan)
# ---- Confidence components ----
# 1) Disagreement across base estimators
base_preds = _base_model_preds(model, X)
disagreement = np.std(base_preds, axis=1) if base_preds.shape[1] > 1 else np.zeros(len(pred))
# If you saved training disagreement stats, use them for stable scaling
# e.g. bundle["disagreement_stats"] = {"p95": 0.123}
dis_stats = bundle.get("disagreement_stats", {})
dis_p95 = dis_stats.get("p95", None)
d_norm = _norm_with_training_scale(disagreement, dis_p95)
# 2) Missing input fraction
missing_input_frac = df[impute_cols].isna().mean(axis=1).values
m_norm = np.clip(missing_input_frac, 0.0, 1.0)
# 3) Interval width penalty (bigger q => lower confidence)
# If you saved training q stats, use them too (recommended)
# e.g. bundle["interval_stats"] = {"q90_p95": 12.3, "q95_p95": 15.8}
int_stats = bundle.get("interval_stats", {})
q_scale = int_stats.get(f"{interval}_p95", None)
if np.isfinite(q):
q_norm = _norm_with_training_scale(np.full(len(pred), q, dtype=float), q_scale)
else:
q_norm = np.zeros(len(pred), dtype=float)
# Weighted score (tune weights as you like)
# - missingness is usually most important
# - then interval width
# - then model disagreement
score = 1.0 - (0.45 * m_norm + 0.35 * q_norm + 0.20 * d_norm)
# Apply training missingness penalty (your existing logic)
sparse_flag = (
missing_rate is not None
and np.isfinite(missing_rate)
and missing_rate >= 0.90
)
if sparse_flag:
score = np.minimum(score, 0.35)
score = np.clip(score, 0.0, 1.0)
# Labels
label = np.where(score >= 0.75, "High", np.where(score >= 0.50, "Medium", "Low"))
if sparse_flag:
label[:] = "Low"
out = pd.DataFrame(
{
"prediction": pred,
"lower": lower,
"upper": upper,
"confidence_score": score,
"confidence_label": label,
},
index=df.index,
)
out["note"] = "⚠️ Target had ~90%+ missing in training; treat as noisy." if sparse_flag else ""
return out
|