""" Model training utilities: data loading, CV, model selection, hyperparameter tuning, evaluation, and artifact persistence. """ import os import json import warnings import joblib import numpy as np import pandas as pd from datetime import datetime, timezone from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.model_selection import ( train_test_split, StratifiedKFold, cross_validate, GridSearchCV ) from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score ) from .feature_engineering import prepare_feature_matrix, build_preprocessor, FEATURE_NAMES RANDOM_STATE = 42 # --------------------------------------------------------------------------- # Data loading # --------------------------------------------------------------------------- def load_and_merge_data(households_path: str, gold_path: str) -> pd.DataFrame: """Load and merge households + gold labels. Validates >= 50 rows.""" if not os.path.exists(households_path): raise FileNotFoundError(f"Households file not found: {households_path}") if not os.path.exists(gold_path): raise FileNotFoundError(f"Gold labels file not found: {gold_path}") households = pd.read_csv(households_path) gold = pd.read_csv(gold_path) merged = gold.merge(households, on='household_id') if len(merged) < 50: raise ValueError( f"Merged dataset has only {len(merged)} rows. Minimum 50 required." ) return merged def split_data(df: pd.DataFrame): """ 80/20 stratified train/test split on stunting_flag. Returns X_train, X_test, y_train, y_test (all as numpy arrays after scaling). Also returns the fitted scaler and raw feature DataFrames. """ X_raw = prepare_feature_matrix(df) y = df['stunting_flag'].values X_train_raw, X_test_raw, y_train, y_test = train_test_split( X_raw, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE ) scaler = build_preprocessor() X_train = scaler.fit_transform(X_train_raw) X_test = scaler.transform(X_test_raw) return X_train, X_test, y_train, y_test, scaler, X_train_raw, X_test_raw # --------------------------------------------------------------------------- # Candidate models # --------------------------------------------------------------------------- def get_candidate_models() -> dict: return { 'LogisticRegression': LogisticRegression( C=1.0, class_weight='balanced', random_state=RANDOM_STATE, max_iter=1000 ), 'RandomForest': RandomForestClassifier( n_estimators=100, class_weight='balanced', random_state=RANDOM_STATE, n_jobs=-1 ), 'GradientBoosting': GradientBoostingClassifier( n_estimators=100, learning_rate=0.1, max_depth=3, random_state=RANDOM_STATE ), } # --------------------------------------------------------------------------- # Cross-validation # --------------------------------------------------------------------------- def run_cross_validation(models: dict, X_train: np.ndarray, y_train: np.ndarray) -> dict: """ 5-fold stratified CV for each model. Returns dict: {model_name: {auc_roc_mean, auc_roc_std, f1_mean, f1_std, precision_mean, precision_std, recall_mean, recall_std}} """ cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE) results = {} for name, estimator in models.items(): scores = cross_validate( estimator, X_train, y_train, cv=cv, scoring=['roc_auc', 'f1', 'precision', 'recall'], return_train_score=False ) results[name] = { 'auc_roc_mean': float(np.mean(scores['test_roc_auc'])), 'auc_roc_std': float(np.std(scores['test_roc_auc'])), 'f1_mean': float(np.mean(scores['test_f1'])), 'f1_std': float(np.std(scores['test_f1'])), 'precision_mean': float(np.mean(scores['test_precision'])), 'precision_std': float(np.std(scores['test_precision'])), 'recall_mean': float(np.mean(scores['test_recall'])), 'recall_std': float(np.std(scores['test_recall'])), } print(f" {name}: CV AUC-ROC = {results[name]['auc_roc_mean']:.4f} " f"± {results[name]['auc_roc_std']:.4f}") return results def select_best_model(cv_results: dict) -> str: """ Return the name of the model with highest mean AUC-ROC. Ties broken by lower std. """ best = max( cv_results.items(), key=lambda kv: (kv[1]['auc_roc_mean'], -kv[1]['auc_roc_std']) ) return best[0] # --------------------------------------------------------------------------- # Hyperparameter tuning # --------------------------------------------------------------------------- PARAM_GRIDS = { 'LogisticRegression': { 'C': [0.01, 0.1, 1.0, 10.0], }, 'RandomForest': { 'n_estimators': [50, 100, 200], 'max_depth': [3, 5, 10, None], 'min_samples_split': [2, 5, 10], }, 'GradientBoosting': { 'n_estimators': [50, 100, 200], 'learning_rate': [0.01, 0.1, 0.2], 'max_depth': [3, 5, 7], }, } def tune_hyperparameters(model_name: str, estimator, X_train: np.ndarray, y_train: np.ndarray): """ GridSearchCV with 5-fold stratified CV, scoring=roc_auc. Returns (best_estimator, best_params, best_score). """ param_grid = PARAM_GRIDS.get(model_name, {}) if not param_grid: print(f" No param grid for {model_name}, skipping tuning.") estimator.fit(X_train, y_train) return estimator, {}, None cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE) search = GridSearchCV( estimator, param_grid, scoring='roc_auc', cv=cv, n_jobs=-1, refit=True ) search.fit(X_train, y_train) print(f" Best params: {search.best_params_}") print(f" Best CV AUC-ROC: {search.best_score_:.4f}") return search.best_estimator_, search.best_params_, search.best_score_ # --------------------------------------------------------------------------- # Evaluation # --------------------------------------------------------------------------- def evaluate_on_test_set(model, X_test: np.ndarray, y_test: np.ndarray) -> dict: """Compute full metrics on held-out test set.""" y_pred = model.predict(X_test) y_prob = model.predict_proba(X_test)[:, 1] metrics = { 'accuracy': float(accuracy_score(y_test, y_pred)), 'precision': float(precision_score(y_test, y_pred, zero_division=0)), 'recall': float(recall_score(y_test, y_pred, zero_division=0)), 'f1_score': float(f1_score(y_test, y_pred, zero_division=0)), 'auc_roc': float(roc_auc_score(y_test, y_prob)), } if metrics['auc_roc'] < 0.70: warnings.warn( f"Test AUC-ROC is {metrics['auc_roc']:.4f} (< 0.70). " "Review feature engineering or data quality.", UserWarning ) return metrics # --------------------------------------------------------------------------- # Artifact persistence # --------------------------------------------------------------------------- def save_artifact(artifact: dict, output_dir: str) -> str: """ Save versioned + canonical .pkl and update model_registry.json. Returns the versioned artifact path. """ os.makedirs(output_dir, exist_ok=True) version_tag = datetime.now().strftime('%Y%m%d_%H%M%S') artifact['version_tag'] = version_tag versioned_path = os.path.join(output_dir, f"risk_model_{version_tag}.pkl") canonical_path = os.path.join(output_dir, "risk_model.pkl") joblib.dump(artifact, versioned_path) joblib.dump(artifact, canonical_path) print(f" Saved versioned artifact: {versioned_path}") print(f" Saved canonical artifact: {canonical_path}") # Update registry registry_path = os.path.join(output_dir, "model_registry.json") registry = [] if os.path.exists(registry_path): with open(registry_path, 'r') as f: registry = json.load(f) registry.append({ 'version_tag': version_tag, 'model_type': artifact.get('model_type', 'Unknown'), 'test_auc_roc': artifact.get('metrics', {}).get('auc_roc', None), 'timestamp': datetime.now(timezone.utc).isoformat(), 'artifact_path': versioned_path, }) with open(registry_path, 'w') as f: json.dump(registry, f, indent=2) print(f" Registry updated: {registry_path}") return versioned_path