| |
| from __future__ import annotations |
|
|
| import os |
| import joblib |
| import numpy as np |
| import pandas as pd |
|
|
| from scipy.stats import pearsonr |
| from sklearn.impute import KNNImputer |
| from sklearn.metrics import mean_absolute_error |
| from sklearn.model_selection import KFold |
|
|
| import mlflow |
|
|
| from src.config import K_BY_TARGET, N_SPLITS, RANDOM_STATE |
| from src.models import build_sm2_stacking |
| import json |
|
|
| def save_conformal_calibration(residuals_by_target: dict[str, list[float]], out_path: str): |
| """ |
| Save empirical absolute residual distributions for conformal prediction intervals. |
| """ |
| calib = {} |
| for t, res in residuals_by_target.items(): |
| arr = np.array(res, dtype=float) |
| arr = arr[~np.isnan(arr)] |
| calib[t] = { |
| "n": int(arr.size), |
| |
| "q90": float(np.quantile(arr, 0.90)) if arr.size else float("nan"), |
| "q95": float(np.quantile(arr, 0.95)) if arr.size else float("nan"), |
| } |
| with open(out_path, "w") as f: |
| json.dump(calib, f, indent=2) |
|
|
|
|
| def replace_minus_one_with_nan(df: pd.DataFrame) -> pd.DataFrame: |
| return df.replace(-1, np.nan) |
|
|
|
|
| def safe_pcc(y_true: np.ndarray, y_pred: np.ndarray) -> float: |
| if np.std(y_true) == 0 or np.std(y_pred) == 0: |
| return float("nan") |
| return float(pearsonr(y_true, y_pred)[0]) |
|
|
|
|
| def evaluate_observed_only(y_true_raw: np.ndarray, y_pred: np.ndarray, observed_mask: np.ndarray) -> tuple[float, float]: |
| """ |
| Score ONLY on entries where the target was originally observed (not missing). |
| This avoids giving yourself credit for predicting imputed labels. |
| """ |
| y_true = y_true_raw[observed_mask] |
| y_hat = y_pred[observed_mask] |
| mae = float(mean_absolute_error(y_true, y_hat)) |
| pcc = safe_pcc(y_true, y_hat) |
| return mae, pcc |
|
|
|
|
| def run_10fold_cv_paper_like( |
| data: pd.DataFrame, |
| morph_cols: list[str], |
| targets: list[str], |
| experiment_tag: str = "", |
| max_folds: int | None = None, |
| ) -> pd.DataFrame: |
| """ |
| Paper-like evaluation: |
| - 10-fold CV |
| - KNN imputation per target (K from Table 2) fit on TRAIN fold only |
| - SM2-like stacking model trained per fold |
| - score only on observed test labels |
| """ |
| data = replace_minus_one_with_nan(data).copy() |
| kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE) |
|
|
| |
| residuals_by_target = {t: [] for t in targets} |
| missing_rate = {t: float(data[t].isna().mean()) for t in targets} |
|
|
| results = [] |
|
|
| for target in targets: |
| if target not in K_BY_TARGET: |
| raise ValueError(f"Target '{target}' missing from K_BY_TARGET (paper Table 2 mapping).") |
|
|
| K = K_BY_TARGET[target] |
| fold_mae, fold_pcc = [], [] |
|
|
| print(f"\n=== CV Target: {target} | KNN K={K} ===") |
|
|
| for fold, (train_idx, test_idx) in enumerate(kf.split(data), start=1): |
| train_df = data.iloc[train_idx].copy() |
| test_df = data.iloc[test_idx].copy() |
|
|
| |
| y_test_raw = test_df[target].values |
| y_test_observed_mask = test_df[target].notna().values |
|
|
| |
| impute_cols = morph_cols + targets |
| knn = KNNImputer(n_neighbors=K, weights="distance") |
|
|
| train_imputed = pd.DataFrame( |
| knn.fit_transform(train_df[impute_cols]), |
| columns=impute_cols, |
| index=train_df.index, |
| ) |
| test_imputed = pd.DataFrame( |
| knn.transform(test_df[impute_cols]), |
| columns=impute_cols, |
| index=test_df.index, |
| ) |
|
|
| |
| feature_cols = morph_cols + [t for t in targets if t != target] |
|
|
|
|
|
|
| |
| |
|
|
| |
|
|
| |
| |
|
|
| |
|
|
|
|
|
|
| X_train_all = train_imputed[feature_cols].values |
| X_test = test_imputed[feature_cols].values |
|
|
| y_train_raw = train_df[target].values |
| obs_train = ~np.isnan(y_train_raw) |
|
|
| |
| model = build_sm2_stacking(random_state=RANDOM_STATE) |
| model.fit(X_train_all[obs_train], y_train_raw[obs_train]) |
|
|
| y_pred = model.predict(X_test) |
|
|
|
|
|
|
|
|
| |
| obs = y_test_observed_mask |
| abs_resid = np.abs(y_test_raw[obs] - y_pred[obs]) |
| residuals_by_target[target].extend(abs_resid.tolist()) |
|
|
|
|
| mae, pcc = evaluate_observed_only(y_test_raw, y_pred, y_test_observed_mask) |
| fold_mae.append(mae) |
| fold_pcc.append(pcc) |
|
|
| |
| mlflow.set_experiment(target) |
| run_name = f"{target}_fold_{fold}{('_' + experiment_tag) if experiment_tag else ''}" |
| with mlflow.start_run(run_name=run_name): |
| mlflow.log_param("target", target) |
| mlflow.log_param("knn_k", K) |
| mlflow.log_param("n_splits", N_SPLITS) |
| mlflow.log_param("n_features", len(feature_cols)) |
| mlflow.log_metric("mae", mae) |
| mlflow.log_metric("pcc", pcc) |
| mlflow.log_metric("n_test_observed", int(y_test_observed_mask.sum())) |
| mlflow.log_metric("n_test_total", int(len(test_idx))) |
|
|
| print(f"Fold {fold:02d}: MAE={mae:.4f}, PCC={pcc:.4f}, n_obs={y_test_observed_mask.sum()}") |
| if max_folds is not None and fold >= max_folds: |
| break |
|
|
| target_mae = float(np.nanmean(fold_mae)) |
| target_pcc = float(np.nanmean(fold_pcc)) |
|
|
| results.append({"target": target, "mae_mean": target_mae, "pcc_mean": target_pcc}) |
|
|
| print(f"--- {target} AVERAGE: MAE={target_mae:.4f}, PCC={target_pcc:.4f}") |
|
|
| |
| mlflow.set_experiment("CombinedResults") |
| with mlflow.start_run(run_name=f"{target}_avg{('_' + experiment_tag) if experiment_tag else ''}"): |
| mlflow.log_param("target", target) |
| mlflow.log_metric("mae_mean", target_mae) |
| mlflow.log_metric("pcc_mean", target_pcc) |
|
|
| results_df = pd.DataFrame(results) |
| overall_mae = float(results_df["mae_mean"].mean()) |
| overall_pcc = float(results_df["pcc_mean"].mean()) |
|
|
| print("\n=== OVERALL AVERAGE (across targets) ===") |
| print(f"MAE={overall_mae:.4f}, PCC={overall_pcc:.4f}") |
|
|
| mlflow.set_experiment("CombinedResults") |
| with mlflow.start_run(run_name=f"overall_avg{('_' + experiment_tag) if experiment_tag else ''}"): |
| mlflow.log_metric("mae_overall_mean", overall_mae) |
| mlflow.log_metric("pcc_overall_mean", overall_pcc) |
|
|
|
|
| |
| os.makedirs("artifacts_inference", exist_ok=True) |
|
|
| calib_path = "artifacts_inference/calibration_conformal.json" |
| save_conformal_calibration(residuals_by_target, calib_path) |
| mlflow.log_artifact(calib_path) |
|
|
| mr_path = "artifacts_inference/missing_rate.json" |
| with open(mr_path, "w") as f: |
| json.dump(missing_rate, f, indent=2) |
| mlflow.log_artifact(mr_path) |
|
|
|
|
| return results_df |
|
|
|
|
| def train_and_save_final_models( |
| data: pd.DataFrame, |
| morph_cols: list[str], |
| targets: list[str], |
| save_dir: str = "artifacts_inference", |
| experiment_name: str = "InferenceModels_SM2_KNN", |
| experiment_tag: str = "", |
| calibration_path: str = "artifacts_inference/calibration_conformal.json", |
| missing_rate_path: str = "artifacts_inference/missing_rate.json", |
| ) -> pd.DataFrame: |
| """ |
| Deployment training: |
| - Fit ONE model per target on full dataset |
| - Save a joblib bundle per target: |
| { |
| imputer, model, feature_cols, impute_cols, |
| target, knn_k, calibration, missing_rate |
| } |
| |
| Notes: |
| - Expects calibration_conformal.json and missing_rate.json to have been |
| produced by the CV script (train_eval) first. |
| - If those files don't exist, it will still train & save bundles, but |
| bundles will have empty calibration/missing_rate. |
| """ |
| import os |
| import json |
| import joblib |
| import numpy as np |
| import pandas as pd |
| import mlflow |
| from sklearn.impute import KNNImputer |
|
|
| |
| |
| |
| |
| |
|
|
| os.makedirs(save_dir, exist_ok=True) |
| data = replace_minus_one_with_nan(data).copy() |
|
|
| |
| calib = json.load(open(calibration_path)) if os.path.exists(calibration_path) else {} |
| miss = json.load(open(missing_rate_path)) if os.path.exists(missing_rate_path) else {} |
|
|
| saved_rows: list[dict] = [] |
|
|
| mlflow.set_experiment(experiment_name) |
| run_name = f"train_final_models{('_' + experiment_tag) if experiment_tag else ''}" |
|
|
| with mlflow.start_run(run_name=run_name): |
| for target in targets: |
| if target not in K_BY_TARGET: |
| raise ValueError(f"Target '{target}' missing from K_BY_TARGET (paper Table 2 mapping).") |
|
|
| K = K_BY_TARGET[target] |
|
|
| |
| impute_cols = morph_cols + targets |
| knn = KNNImputer(n_neighbors=K, weights="distance") |
|
|
| imputed_full = pd.DataFrame( |
| knn.fit_transform(data[impute_cols]), |
| columns=impute_cols, |
| index=data.index, |
| ) |
|
|
| |
| feature_cols = morph_cols + [t for t in targets if t != target] |
| X = imputed_full[feature_cols].values |
| y = imputed_full[target].values |
|
|
| |
| model = build_sm2_stacking(random_state=RANDOM_STATE) |
| model.fit(X, y) |
|
|
| |
| bundle = { |
| "target": target, |
| "knn_k": K, |
| "imputer": knn, |
| "model": model, |
| "feature_cols": feature_cols, |
| "impute_cols": impute_cols, |
| "morph_cols": morph_cols, |
| "targets": targets, |
| |
| "calibration": calib.get(target, {}), |
| "missing_rate": miss.get(target, None), |
| } |
|
|
| out_path = os.path.join(save_dir, f"{target}_bundle.joblib") |
| joblib.dump(bundle, out_path) |
|
|
| |
| mlflow.log_artifact(out_path) |
| mlflow.log_param(f"{target}_knn_k", K) |
| if bundle["missing_rate"] is not None: |
| mlflow.log_metric(f"{target}_missing_rate", float(bundle["missing_rate"])) |
|
|
| saved_rows.append({"target": target, "knn_k": K, "path": out_path}) |
| print(f"Saved: {out_path}") |
|
|
| |
| manifest_path = os.path.join(save_dir, "manifest.csv") |
| pd.DataFrame(saved_rows).to_csv(manifest_path, index=False) |
| mlflow.log_artifact(manifest_path) |
|
|
| |
| if os.path.exists(calibration_path): |
| mlflow.log_artifact(calibration_path) |
| if os.path.exists(missing_rate_path): |
| mlflow.log_artifact(missing_rate_path) |
|
|
| return pd.DataFrame(saved_rows) |
|
|
|
|