mvppred / src /pipeline.py
Md Wasi Ul Kabir
Initial commit
8bb21fb
# src/pipeline.py
from __future__ import annotations
import os
import joblib
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
from sklearn.impute import KNNImputer
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold
import mlflow
from src.config import K_BY_TARGET, N_SPLITS, RANDOM_STATE
from src.models import build_sm2_stacking
import json
def save_conformal_calibration(residuals_by_target: dict[str, list[float]], out_path: str):
"""
Save empirical absolute residual distributions for conformal prediction intervals.
"""
calib = {}
for t, res in residuals_by_target.items():
arr = np.array(res, dtype=float)
arr = arr[~np.isnan(arr)]
calib[t] = {
"n": int(arr.size),
# 90% and 95% absolute error quantiles (change as you like)
"q90": float(np.quantile(arr, 0.90)) if arr.size else float("nan"),
"q95": float(np.quantile(arr, 0.95)) if arr.size else float("nan"),
}
with open(out_path, "w") as f:
json.dump(calib, f, indent=2)
def replace_minus_one_with_nan(df: pd.DataFrame) -> pd.DataFrame:
return df.replace(-1, np.nan)
def safe_pcc(y_true: np.ndarray, y_pred: np.ndarray) -> float:
if np.std(y_true) == 0 or np.std(y_pred) == 0:
return float("nan")
return float(pearsonr(y_true, y_pred)[0])
def evaluate_observed_only(y_true_raw: np.ndarray, y_pred: np.ndarray, observed_mask: np.ndarray) -> tuple[float, float]:
"""
Score ONLY on entries where the target was originally observed (not missing).
This avoids giving yourself credit for predicting imputed labels.
"""
y_true = y_true_raw[observed_mask]
y_hat = y_pred[observed_mask]
mae = float(mean_absolute_error(y_true, y_hat))
pcc = safe_pcc(y_true, y_hat)
return mae, pcc
def run_10fold_cv_paper_like(
data: pd.DataFrame,
morph_cols: list[str],
targets: list[str],
experiment_tag: str = "",
max_folds: int | None = None,
) -> pd.DataFrame:
"""
Paper-like evaluation:
- 10-fold CV
- KNN imputation per target (K from Table 2) fit on TRAIN fold only
- SM2-like stacking model trained per fold
- score only on observed test labels
"""
data = replace_minus_one_with_nan(data).copy()
kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE)
# ADD THIS BLOCK
residuals_by_target = {t: [] for t in targets}
missing_rate = {t: float(data[t].isna().mean()) for t in targets}
results = []
for target in targets:
if target not in K_BY_TARGET:
raise ValueError(f"Target '{target}' missing from K_BY_TARGET (paper Table 2 mapping).")
K = K_BY_TARGET[target]
fold_mae, fold_pcc = [], []
print(f"\n=== CV Target: {target} | KNN K={K} ===")
for fold, (train_idx, test_idx) in enumerate(kf.split(data), start=1):
train_df = data.iloc[train_idx].copy()
test_df = data.iloc[test_idx].copy()
# observed mask for proper evaluation
y_test_raw = test_df[target].values
y_test_observed_mask = test_df[target].notna().values
# Impute morphology + all targets (fit on train only -> no leakage)
impute_cols = morph_cols + targets
knn = KNNImputer(n_neighbors=K, weights="distance")
train_imputed = pd.DataFrame(
knn.fit_transform(train_df[impute_cols]),
columns=impute_cols,
index=train_df.index,
)
test_imputed = pd.DataFrame(
knn.transform(test_df[impute_cols]),
columns=impute_cols,
index=test_df.index,
)
# Features: morphology + other (imputed) performance traits (excluding target)
feature_cols = morph_cols + [t for t in targets if t != target]
# X_train = train_imputed[feature_cols].values
# y_train = train_imputed[target].values
# X_test = test_imputed[feature_cols].values
# model = build_sm2_stacking(random_state=RANDOM_STATE)
# model.fit(X_train, y_train)
# y_pred = model.predict(X_test)
X_train_all = train_imputed[feature_cols].values
X_test = test_imputed[feature_cols].values
y_train_raw = train_df[target].values # raw y (NaNs where missing)
obs_train = ~np.isnan(y_train_raw)
# Train only on observed labels
model = build_sm2_stacking(random_state=RANDOM_STATE)
model.fit(X_train_all[obs_train], y_train_raw[obs_train])
y_pred = model.predict(X_test)
# --- NEW: collect observed-only absolute residuals for conformal calibration
obs = y_test_observed_mask
abs_resid = np.abs(y_test_raw[obs] - y_pred[obs])
residuals_by_target[target].extend(abs_resid.tolist())
mae, pcc = evaluate_observed_only(y_test_raw, y_pred, y_test_observed_mask)
fold_mae.append(mae)
fold_pcc.append(pcc)
# MLflow logging per fold (optional but matches your style)
mlflow.set_experiment(target)
run_name = f"{target}_fold_{fold}{('_' + experiment_tag) if experiment_tag else ''}"
with mlflow.start_run(run_name=run_name):
mlflow.log_param("target", target)
mlflow.log_param("knn_k", K)
mlflow.log_param("n_splits", N_SPLITS)
mlflow.log_param("n_features", len(feature_cols))
mlflow.log_metric("mae", mae)
mlflow.log_metric("pcc", pcc)
mlflow.log_metric("n_test_observed", int(y_test_observed_mask.sum()))
mlflow.log_metric("n_test_total", int(len(test_idx)))
print(f"Fold {fold:02d}: MAE={mae:.4f}, PCC={pcc:.4f}, n_obs={y_test_observed_mask.sum()}")
if max_folds is not None and fold >= max_folds:
break
target_mae = float(np.nanmean(fold_mae))
target_pcc = float(np.nanmean(fold_pcc))
results.append({"target": target, "mae_mean": target_mae, "pcc_mean": target_pcc})
print(f"--- {target} AVERAGE: MAE={target_mae:.4f}, PCC={target_pcc:.4f}")
# Log summary per target
mlflow.set_experiment("CombinedResults")
with mlflow.start_run(run_name=f"{target}_avg{('_' + experiment_tag) if experiment_tag else ''}"):
mlflow.log_param("target", target)
mlflow.log_metric("mae_mean", target_mae)
mlflow.log_metric("pcc_mean", target_pcc)
results_df = pd.DataFrame(results)
overall_mae = float(results_df["mae_mean"].mean())
overall_pcc = float(results_df["pcc_mean"].mean())
print("\n=== OVERALL AVERAGE (across targets) ===")
print(f"MAE={overall_mae:.4f}, PCC={overall_pcc:.4f}")
mlflow.set_experiment("CombinedResults")
with mlflow.start_run(run_name=f"overall_avg{('_' + experiment_tag) if experiment_tag else ''}"):
mlflow.log_metric("mae_overall_mean", overall_mae)
mlflow.log_metric("pcc_overall_mean", overall_pcc)
# --- NEW: save conformal calibration + missingness rate for inference-time confidence
os.makedirs("artifacts_inference", exist_ok=True)
calib_path = "artifacts_inference/calibration_conformal.json"
save_conformal_calibration(residuals_by_target, calib_path)
mlflow.log_artifact(calib_path)
mr_path = "artifacts_inference/missing_rate.json"
with open(mr_path, "w") as f:
json.dump(missing_rate, f, indent=2)
mlflow.log_artifact(mr_path)
return results_df
def train_and_save_final_models(
data: pd.DataFrame,
morph_cols: list[str],
targets: list[str],
save_dir: str = "artifacts_inference",
experiment_name: str = "InferenceModels_SM2_KNN",
experiment_tag: str = "",
calibration_path: str = "artifacts_inference/calibration_conformal.json",
missing_rate_path: str = "artifacts_inference/missing_rate.json",
) -> pd.DataFrame:
"""
Deployment training:
- Fit ONE model per target on full dataset
- Save a joblib bundle per target:
{
imputer, model, feature_cols, impute_cols,
target, knn_k, calibration, missing_rate
}
Notes:
- Expects calibration_conformal.json and missing_rate.json to have been
produced by the CV script (train_eval) first.
- If those files don't exist, it will still train & save bundles, but
bundles will have empty calibration/missing_rate.
"""
import os
import json
import joblib
import numpy as np
import pandas as pd
import mlflow
from sklearn.impute import KNNImputer
# Uses these from your existing module:
# - replace_minus_one_with_nan
# - K_BY_TARGET
# - RANDOM_STATE
# - build_sm2_stacking
os.makedirs(save_dir, exist_ok=True)
data = replace_minus_one_with_nan(data).copy()
# Load calibration + missingness (generated by CV eval)
calib = json.load(open(calibration_path)) if os.path.exists(calibration_path) else {}
miss = json.load(open(missing_rate_path)) if os.path.exists(missing_rate_path) else {}
saved_rows: list[dict] = []
mlflow.set_experiment(experiment_name)
run_name = f"train_final_models{('_' + experiment_tag) if experiment_tag else ''}"
with mlflow.start_run(run_name=run_name):
for target in targets:
if target not in K_BY_TARGET:
raise ValueError(f"Target '{target}' missing from K_BY_TARGET (paper Table 2 mapping).")
K = K_BY_TARGET[target]
# Impute morphology + all targets on FULL data (deployment fit)
impute_cols = morph_cols + targets
knn = KNNImputer(n_neighbors=K, weights="distance")
imputed_full = pd.DataFrame(
knn.fit_transform(data[impute_cols]),
columns=impute_cols,
index=data.index,
)
# Features: morphology + other performance traits (excluding current target)
feature_cols = morph_cols + [t for t in targets if t != target]
X = imputed_full[feature_cols].values
y = imputed_full[target].values
# Train SM2-like stacking model
model = build_sm2_stacking(random_state=RANDOM_STATE)
model.fit(X, y)
# Bundle for inference
bundle = {
"target": target,
"knn_k": K,
"imputer": knn,
"model": model,
"feature_cols": feature_cols,
"impute_cols": impute_cols,
"morph_cols": morph_cols,
"targets": targets,
# Uncertainty + confidence inputs:
"calibration": calib.get(target, {}), # e.g., {"n":..., "q90":..., "q95":...}
"missing_rate": miss.get(target, None), # float like 0.91
}
out_path = os.path.join(save_dir, f"{target}_bundle.joblib")
joblib.dump(bundle, out_path)
# Log artifact(s)
mlflow.log_artifact(out_path)
mlflow.log_param(f"{target}_knn_k", K)
if bundle["missing_rate"] is not None:
mlflow.log_metric(f"{target}_missing_rate", float(bundle["missing_rate"]))
saved_rows.append({"target": target, "knn_k": K, "path": out_path})
print(f"Saved: {out_path}")
# Save a manifest for convenience
manifest_path = os.path.join(save_dir, "manifest.csv")
pd.DataFrame(saved_rows).to_csv(manifest_path, index=False)
mlflow.log_artifact(manifest_path)
# Also log the calibration/missingness files if present
if os.path.exists(calibration_path):
mlflow.log_artifact(calibration_path)
if os.path.exists(missing_rate_path):
mlflow.log_artifact(missing_rate_path)
return pd.DataFrame(saved_rows)