mvppred / scripts /infer.py
Md Wasi Ul Kabir
Initial commit
8bb21fb
# src/infer.py
from __future__ import annotations
import numpy as np
import pandas as pd
def _ensure_cols(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
"""Add any missing columns in one concat (avoids fragmentation)."""
missing = [c for c in cols if c not in df.columns]
if not missing:
return df
add = pd.DataFrame(np.nan, index=df.index, columns=missing)
out = pd.concat([df, add], axis=1)
return out.copy() # defragment
def _base_model_preds(stacking_model, X: np.ndarray) -> np.ndarray:
"""
Extract base estimator predictions from sklearn StackingRegressor (robustly).
Handles both:
- estimators_ : list of fitted estimators (most common)
- estimators : list of (name, estimator) pairs (pre-fit)
"""
preds = []
# Prefer fitted estimators_
ests = getattr(stacking_model, "estimators_", None)
if ests is None:
# fallback to pre-fit definition (name, estimator)
ests = [e for e in getattr(stacking_model, "estimators", [])]
for item in ests or []:
# item may be an estimator OR (name, estimator)
est = item[1] if isinstance(item, (tuple, list)) and len(item) >= 2 else item
if est is None or est == "drop":
continue
try:
p = est.predict(X)
preds.append(np.asarray(p).reshape(-1))
except Exception:
pass
if not preds:
return np.zeros((X.shape[0], 1))
return np.column_stack(preds) # (n_samples, n_base)
def _safe_minmax_norm(x: np.ndarray) -> np.ndarray:
"""Min-max normalize, but if constant vector -> zeros."""
x = np.asarray(x, dtype=float)
xmin = np.nanmin(x)
xmax = np.nanmax(x)
if not np.isfinite(xmin) or not np.isfinite(xmax) or (xmax - xmin) < 1e-12:
return np.zeros_like(x, dtype=float)
return (x - xmin) / (xmax - xmin)
def _norm_with_training_scale(x: np.ndarray, scale: float | None) -> np.ndarray:
"""
Normalize with a training-derived scale (e.g., p95).
If scale missing/invalid -> fallback to minmax (but stable for 1 row).
"""
x = np.asarray(x, dtype=float)
if scale is not None and np.isfinite(scale) and scale > 1e-12:
return np.clip(x / scale, 0.0, 1.0)
return _safe_minmax_norm(x)
def predict_with_confidence(bundle: dict, new_df: pd.DataFrame, interval: str = "q90") -> pd.DataFrame:
"""
Returns a DF with prediction, interval, confidence_score, confidence_label, flags.
interval: 'q90' or 'q95'
"""
model = bundle["model"]
imputer = bundle["imputer"]
impute_cols = bundle["impute_cols"]
feature_cols = bundle["feature_cols"]
calib = bundle.get("calibration", {}) # contains q90/q95, etc.
missing_rate = bundle.get("missing_rate", None)
# Replace sentinel missing
df = new_df.replace(-1, np.nan)
df = _ensure_cols(df, impute_cols)
# Impute
imputed = pd.DataFrame(
imputer.transform(df[impute_cols]),
columns=impute_cols,
index=df.index,
)
X = imputed[feature_cols].values
pred = model.predict(X)
# Conformal interval half-width
q = float(calib.get(interval, np.nan))
lower = pred - q if np.isfinite(q) else np.full_like(pred, np.nan)
upper = pred + q if np.isfinite(q) else np.full_like(pred, np.nan)
# ---- Confidence components ----
# 1) Disagreement across base estimators
base_preds = _base_model_preds(model, X)
disagreement = np.std(base_preds, axis=1) if base_preds.shape[1] > 1 else np.zeros(len(pred))
# If you saved training disagreement stats, use them for stable scaling
# e.g. bundle["disagreement_stats"] = {"p95": 0.123}
dis_stats = bundle.get("disagreement_stats", {})
dis_p95 = dis_stats.get("p95", None)
d_norm = _norm_with_training_scale(disagreement, dis_p95)
# 2) Missing input fraction
missing_input_frac = df[impute_cols].isna().mean(axis=1).values
m_norm = np.clip(missing_input_frac, 0.0, 1.0)
# 3) Interval width penalty (bigger q => lower confidence)
# If you saved training q stats, use them too (recommended)
# e.g. bundle["interval_stats"] = {"q90_p95": 12.3, "q95_p95": 15.8}
int_stats = bundle.get("interval_stats", {})
q_scale = int_stats.get(f"{interval}_p95", None)
if np.isfinite(q):
q_norm = _norm_with_training_scale(np.full(len(pred), q, dtype=float), q_scale)
else:
q_norm = np.zeros(len(pred), dtype=float)
# Weighted score (tune weights as you like)
# - missingness is usually most important
# - then interval width
# - then model disagreement
score = 1.0 - (0.45 * m_norm + 0.35 * q_norm + 0.20 * d_norm)
# Apply training missingness penalty (your existing logic)
sparse_flag = (
missing_rate is not None
and np.isfinite(missing_rate)
and missing_rate >= 0.90
)
if sparse_flag:
score = np.minimum(score, 0.35)
score = np.clip(score, 0.0, 1.0)
# Labels
label = np.where(score >= 0.75, "High", np.where(score >= 0.50, "Medium", "Low"))
if sparse_flag:
label[:] = "Low"
out = pd.DataFrame(
{
"prediction": pred,
"lower": lower,
"upper": upper,
"confidence_score": score,
"confidence_label": label,
},
index=df.index,
)
out["note"] = "⚠️ Target had ~90%+ missing in training; treat as noisy." if sparse_flag else ""
return out