Instructions to use getachewgetu/stunting-risk-model with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Scikit-learn
How to use getachewgetu/stunting-risk-model with Scikit-learn:
from huggingface_hub import hf_hub_download import joblib model = joblib.load( hf_hub_download("getachewgetu/stunting-risk-model", "sklearn_model.joblib") ) # only load pickle files from sources you trust # read more about it here https://skops.readthedocs.io/en/stable/persistence.html - Notebooks
- Google Colab
- Kaggle
| """ | |
| Model training utilities: data loading, CV, model selection, | |
| hyperparameter tuning, evaluation, and artifact persistence. | |
| """ | |
| import os | |
| import json | |
| import warnings | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timezone | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier | |
| from sklearn.model_selection import ( | |
| train_test_split, StratifiedKFold, cross_validate, GridSearchCV | |
| ) | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, | |
| f1_score, roc_auc_score | |
| ) | |
| from .feature_engineering import prepare_feature_matrix, build_preprocessor, FEATURE_NAMES | |
| RANDOM_STATE = 42 | |
| # --------------------------------------------------------------------------- | |
| # Data loading | |
| # --------------------------------------------------------------------------- | |
| def load_and_merge_data(households_path: str, gold_path: str) -> pd.DataFrame: | |
| """Load and merge households + gold labels. Validates >= 50 rows.""" | |
| if not os.path.exists(households_path): | |
| raise FileNotFoundError(f"Households file not found: {households_path}") | |
| if not os.path.exists(gold_path): | |
| raise FileNotFoundError(f"Gold labels file not found: {gold_path}") | |
| households = pd.read_csv(households_path) | |
| gold = pd.read_csv(gold_path) | |
| merged = gold.merge(households, on='household_id') | |
| if len(merged) < 50: | |
| raise ValueError( | |
| f"Merged dataset has only {len(merged)} rows. Minimum 50 required." | |
| ) | |
| return merged | |
| def split_data(df: pd.DataFrame): | |
| """ | |
| 80/20 stratified train/test split on stunting_flag. | |
| Returns X_train, X_test, y_train, y_test (all as numpy arrays after scaling). | |
| Also returns the fitted scaler and raw feature DataFrames. | |
| """ | |
| X_raw = prepare_feature_matrix(df) | |
| y = df['stunting_flag'].values | |
| X_train_raw, X_test_raw, y_train, y_test = train_test_split( | |
| X_raw, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE | |
| ) | |
| scaler = build_preprocessor() | |
| X_train = scaler.fit_transform(X_train_raw) | |
| X_test = scaler.transform(X_test_raw) | |
| return X_train, X_test, y_train, y_test, scaler, X_train_raw, X_test_raw | |
| # --------------------------------------------------------------------------- | |
| # Candidate models | |
| # --------------------------------------------------------------------------- | |
| def get_candidate_models() -> dict: | |
| return { | |
| 'LogisticRegression': LogisticRegression( | |
| C=1.0, class_weight='balanced', | |
| random_state=RANDOM_STATE, max_iter=1000 | |
| ), | |
| 'RandomForest': RandomForestClassifier( | |
| n_estimators=100, class_weight='balanced', | |
| random_state=RANDOM_STATE, n_jobs=-1 | |
| ), | |
| 'GradientBoosting': GradientBoostingClassifier( | |
| n_estimators=100, learning_rate=0.1, | |
| max_depth=3, random_state=RANDOM_STATE | |
| ), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Cross-validation | |
| # --------------------------------------------------------------------------- | |
| def run_cross_validation(models: dict, X_train: np.ndarray, y_train: np.ndarray) -> dict: | |
| """ | |
| 5-fold stratified CV for each model. | |
| Returns dict: {model_name: {auc_roc_mean, auc_roc_std, f1_mean, f1_std, | |
| precision_mean, precision_std, recall_mean, recall_std}} | |
| """ | |
| cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE) | |
| results = {} | |
| for name, estimator in models.items(): | |
| scores = cross_validate( | |
| estimator, X_train, y_train, cv=cv, | |
| scoring=['roc_auc', 'f1', 'precision', 'recall'], | |
| return_train_score=False | |
| ) | |
| results[name] = { | |
| 'auc_roc_mean': float(np.mean(scores['test_roc_auc'])), | |
| 'auc_roc_std': float(np.std(scores['test_roc_auc'])), | |
| 'f1_mean': float(np.mean(scores['test_f1'])), | |
| 'f1_std': float(np.std(scores['test_f1'])), | |
| 'precision_mean': float(np.mean(scores['test_precision'])), | |
| 'precision_std': float(np.std(scores['test_precision'])), | |
| 'recall_mean': float(np.mean(scores['test_recall'])), | |
| 'recall_std': float(np.std(scores['test_recall'])), | |
| } | |
| print(f" {name}: CV AUC-ROC = {results[name]['auc_roc_mean']:.4f} " | |
| f"± {results[name]['auc_roc_std']:.4f}") | |
| return results | |
| def select_best_model(cv_results: dict) -> str: | |
| """ | |
| Return the name of the model with highest mean AUC-ROC. | |
| Ties broken by lower std. | |
| """ | |
| best = max( | |
| cv_results.items(), | |
| key=lambda kv: (kv[1]['auc_roc_mean'], -kv[1]['auc_roc_std']) | |
| ) | |
| return best[0] | |
| # --------------------------------------------------------------------------- | |
| # Hyperparameter tuning | |
| # --------------------------------------------------------------------------- | |
| PARAM_GRIDS = { | |
| 'LogisticRegression': { | |
| 'C': [0.01, 0.1, 1.0, 10.0], | |
| }, | |
| 'RandomForest': { | |
| 'n_estimators': [50, 100, 200], | |
| 'max_depth': [3, 5, 10, None], | |
| 'min_samples_split': [2, 5, 10], | |
| }, | |
| 'GradientBoosting': { | |
| 'n_estimators': [50, 100, 200], | |
| 'learning_rate': [0.01, 0.1, 0.2], | |
| 'max_depth': [3, 5, 7], | |
| }, | |
| } | |
| def tune_hyperparameters(model_name: str, estimator, X_train: np.ndarray, y_train: np.ndarray): | |
| """ | |
| GridSearchCV with 5-fold stratified CV, scoring=roc_auc. | |
| Returns (best_estimator, best_params, best_score). | |
| """ | |
| param_grid = PARAM_GRIDS.get(model_name, {}) | |
| if not param_grid: | |
| print(f" No param grid for {model_name}, skipping tuning.") | |
| estimator.fit(X_train, y_train) | |
| return estimator, {}, None | |
| cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE) | |
| search = GridSearchCV( | |
| estimator, param_grid, scoring='roc_auc', | |
| cv=cv, n_jobs=-1, refit=True | |
| ) | |
| search.fit(X_train, y_train) | |
| print(f" Best params: {search.best_params_}") | |
| print(f" Best CV AUC-ROC: {search.best_score_:.4f}") | |
| return search.best_estimator_, search.best_params_, search.best_score_ | |
| # --------------------------------------------------------------------------- | |
| # Evaluation | |
| # --------------------------------------------------------------------------- | |
| def evaluate_on_test_set(model, X_test: np.ndarray, y_test: np.ndarray) -> dict: | |
| """Compute full metrics on held-out test set.""" | |
| y_pred = model.predict(X_test) | |
| y_prob = model.predict_proba(X_test)[:, 1] | |
| metrics = { | |
| 'accuracy': float(accuracy_score(y_test, y_pred)), | |
| 'precision': float(precision_score(y_test, y_pred, zero_division=0)), | |
| 'recall': float(recall_score(y_test, y_pred, zero_division=0)), | |
| 'f1_score': float(f1_score(y_test, y_pred, zero_division=0)), | |
| 'auc_roc': float(roc_auc_score(y_test, y_prob)), | |
| } | |
| if metrics['auc_roc'] < 0.70: | |
| warnings.warn( | |
| f"Test AUC-ROC is {metrics['auc_roc']:.4f} (< 0.70). " | |
| "Review feature engineering or data quality.", | |
| UserWarning | |
| ) | |
| return metrics | |
| # --------------------------------------------------------------------------- | |
| # Artifact persistence | |
| # --------------------------------------------------------------------------- | |
| def save_artifact(artifact: dict, output_dir: str) -> str: | |
| """ | |
| Save versioned + canonical .pkl and update model_registry.json. | |
| Returns the versioned artifact path. | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| version_tag = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| artifact['version_tag'] = version_tag | |
| versioned_path = os.path.join(output_dir, f"risk_model_{version_tag}.pkl") | |
| canonical_path = os.path.join(output_dir, "risk_model.pkl") | |
| joblib.dump(artifact, versioned_path) | |
| joblib.dump(artifact, canonical_path) | |
| print(f" Saved versioned artifact: {versioned_path}") | |
| print(f" Saved canonical artifact: {canonical_path}") | |
| # Update registry | |
| registry_path = os.path.join(output_dir, "model_registry.json") | |
| registry = [] | |
| if os.path.exists(registry_path): | |
| with open(registry_path, 'r') as f: | |
| registry = json.load(f) | |
| registry.append({ | |
| 'version_tag': version_tag, | |
| 'model_type': artifact.get('model_type', 'Unknown'), | |
| 'test_auc_roc': artifact.get('metrics', {}).get('auc_roc', None), | |
| 'timestamp': datetime.now(timezone.utc).isoformat(), | |
| 'artifact_path': versioned_path, | |
| }) | |
| with open(registry_path, 'w') as f: | |
| json.dump(registry, f, indent=2) | |
| print(f" Registry updated: {registry_path}") | |
| return versioned_path | |