Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple | |
| import matplotlib | |
| matplotlib.use("Agg") # Use non-GUI backend to avoid Tkinter cleanup warnings | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import seaborn as sns | |
| import joblib | |
| from catboost import CatBoostClassifier | |
| from lightgbm import LGBMClassifier | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| average_precision_score, | |
| classification_report, | |
| confusion_matrix, | |
| f1_score, | |
| precision_recall_curve, | |
| precision_score, | |
| recall_score, | |
| roc_auc_score, | |
| roc_curve, | |
| ) | |
| from sklearn.model_selection import GroupShuffleSplit, train_test_split | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import OneHotEncoder | |
| from xgboost import XGBClassifier | |
| import xgboost as xgb | |
| RANDOM_STATE = 42 | |
| # Repository root (two levels up from this file: code/model/train_models.py -> repo root) | |
| ROOT = Path(__file__).resolve().parents[2] | |
| DATA_BASE = Path( | |
| "/home/name-1/AI-Agent/frankscore/kenyan-dataset-issue/data/feature-generated" | |
| ) | |
| DATASETS: Dict[str, Path] = { | |
| "full": DATA_BASE / "kenya_engineered_features.csv", | |
| "borrower": DATA_BASE / "kenya_engineered_features_borrower_side.csv", | |
| } | |
| OUTPUT_DIR = ROOT / "code" / "model" / "outputs_for_demo" | |
| TARGET_COL = "target" | |
| ID_COLS = ["customer_id", "tbl_loan_id"] | |
| GROUP_COL_CANDIDATES = ["customer_id", "customerId", "client_id"] | |
| DATE_COL_CANDIDATES = ["pseudo_disb_date", "disb_date", "disbursement_date", "application_date", "loan_date"] | |
| FEATURES_TO_DROP = { | |
| "interest_rate", | |
| "repayment_intensity", | |
| "lender_risk_profile", | |
| "pseudo_disb_date", | |
| } | |
| def build_preprocessor( | |
| feature_frame: pd.DataFrame, | |
| ) -> Tuple[ColumnTransformer, List[str], List[str]]: | |
| cat_cols = feature_frame.select_dtypes(include=["object"]).columns.tolist() | |
| num_cols = [c for c in feature_frame.columns if c not in cat_cols] | |
| num_pipe = Pipeline( | |
| steps=[ | |
| ("imputer", SimpleImputer(strategy="median")), | |
| ] | |
| ) | |
| cat_pipe = Pipeline( | |
| steps=[ | |
| ("imputer", SimpleImputer(strategy="most_frequent")), | |
| ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), | |
| ] | |
| ) | |
| preprocessor = ColumnTransformer( | |
| transformers=[ | |
| ("num", num_pipe, num_cols), | |
| ("cat", cat_pipe, cat_cols), | |
| ] | |
| ) | |
| return preprocessor, num_cols, cat_cols | |
| def find_first_existing_col(df: pd.DataFrame, candidates: List[str]) -> str | None: | |
| for c in candidates: | |
| if c in df.columns: | |
| return c | |
| return None | |
| def split_data_leakage_safe( | |
| df: pd.DataFrame, X: pd.DataFrame, y: pd.Series | |
| ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, str]: | |
| """ | |
| Priority: | |
| 1) time split if a date column exists | |
| 2) group split on customer id | |
| 3) stratified fallback | |
| """ | |
| date_col = find_first_existing_col(df, DATE_COL_CANDIDATES) | |
| group_col = find_first_existing_col(df, GROUP_COL_CANDIDATES) | |
| if date_col is not None: | |
| tmp = df[[date_col]].copy() | |
| tmp[date_col] = pd.to_datetime(tmp[date_col], errors="coerce") | |
| if tmp[date_col].notna().mean() > 0.8: | |
| order = tmp[date_col].sort_values().index | |
| cutoff = int(len(order) * 0.8) | |
| train_idx = order[:cutoff] | |
| test_idx = order[cutoff:] | |
| return ( | |
| X.loc[train_idx], | |
| X.loc[test_idx], | |
| y.loc[train_idx], | |
| y.loc[test_idx], | |
| f"time_split({date_col})", | |
| ) | |
| if group_col is not None: | |
| groups = df[group_col] | |
| gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=RANDOM_STATE) | |
| train_idx, test_idx = next(gss.split(X, y, groups=groups)) | |
| return ( | |
| X.iloc[train_idx], | |
| X.iloc[test_idx], | |
| y.iloc[train_idx], | |
| y.iloc[test_idx], | |
| f"group_split({group_col})", | |
| ) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE | |
| ) | |
| return X_train, X_test, y_train, y_test, "stratified_random_split" | |
| def get_models(scale_pos_weight: float) -> Dict[str, object]: | |
| # Using moderate defaults to keep runtime reasonable. | |
| return { | |
| "random_forest": RandomForestClassifier( | |
| n_estimators=300, | |
| max_depth=None, | |
| n_jobs=-1, | |
| class_weight="balanced", | |
| random_state=RANDOM_STATE, | |
| ), | |
| "xgboost": XGBClassifier( | |
| n_estimators=300, | |
| max_depth=6, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| eval_metric="logloss", | |
| n_jobs=-1, | |
| random_state=RANDOM_STATE, | |
| scale_pos_weight=scale_pos_weight, | |
| ), | |
| "lightgbm": LGBMClassifier( | |
| n_estimators=400, | |
| learning_rate=0.05, | |
| max_depth=-1, | |
| subsample=0.9, | |
| colsample_bytree=0.9, | |
| random_state=RANDOM_STATE, | |
| n_jobs=-1, | |
| class_weight="balanced", | |
| ), | |
| "catboost": CatBoostClassifier( | |
| iterations=400, | |
| depth=8, | |
| learning_rate=0.05, | |
| loss_function="Logloss", | |
| eval_metric="AUC", | |
| verbose=0, | |
| random_seed=RANDOM_STATE, | |
| ), | |
| } | |
| def plot_roc(y_true: np.ndarray, y_score: np.ndarray, title: str, path: Path) -> None: | |
| fpr, tpr, _ = roc_curve(y_true, y_score) | |
| auc_val = roc_auc_score(y_true, y_score) | |
| plt.figure() | |
| plt.plot(fpr, tpr, label=f"AUC = {auc_val:.3f}") | |
| plt.plot([0, 1], [0, 1], linestyle="--", color="grey") | |
| plt.xlabel("False Positive Rate") | |
| plt.ylabel("True Positive Rate") | |
| plt.title(title) | |
| plt.legend(loc="lower right") | |
| plt.tight_layout() | |
| plt.savefig(path, dpi=150) | |
| plt.close() | |
| def plot_pr(y_true: np.ndarray, y_score: np.ndarray, title: str, path: Path) -> None: | |
| precision, recall, _ = precision_recall_curve(y_true, y_score) | |
| ap = average_precision_score(y_true, y_score) | |
| plt.figure() | |
| plt.plot(recall, precision, label=f"AP = {ap:.3f}") | |
| plt.xlabel("Recall") | |
| plt.ylabel("Precision") | |
| plt.title(title) | |
| plt.legend(loc="lower left") | |
| plt.tight_layout() | |
| plt.savefig(path, dpi=150) | |
| plt.close() | |
| def plot_confusion(y_true: np.ndarray, y_pred: np.ndarray, title: str, path: Path) -> None: | |
| cm = confusion_matrix(y_true, y_pred) | |
| plt.figure() | |
| sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False) | |
| plt.xlabel("Predicted") | |
| plt.ylabel("Actual") | |
| plt.title(title) | |
| plt.tight_layout() | |
| plt.savefig(path, dpi=150) | |
| plt.close() | |
| def evaluate_models(dataset_name: str, data_path: Path) -> None: | |
| print(f"=== Training on {dataset_name} dataset ===") | |
| df = pd.read_csv(data_path) | |
| if TARGET_COL not in df.columns: | |
| raise SystemExit(f"target column missing in {data_path}") | |
| X = df.drop(columns=[TARGET_COL] + ID_COLS, errors="ignore") | |
| X = X.drop(columns=[c for c in FEATURES_TO_DROP if c in X.columns], errors="ignore") | |
| y = df[TARGET_COL] | |
| preprocessor, num_cols, cat_cols = build_preprocessor(X) | |
| X_train, X_test, y_train, y_test, split_tag = split_data_leakage_safe(df, X, y) | |
| print(f"Split used: {split_tag}") | |
| pos = y_train.sum() | |
| neg = len(y_train) - pos | |
| scale_pos_weight = float(neg / pos) if pos > 0 else 1.0 | |
| models = get_models(scale_pos_weight) | |
| ds_out = OUTPUT_DIR / dataset_name | |
| ds_out.mkdir(parents=True, exist_ok=True) | |
| # Save a small background sample for downstream explainability tooling. | |
| background_path = ds_out / "explain_background.csv" | |
| df.sample(min(len(df), 200), random_state=RANDOM_STATE).to_csv(background_path, index=False) | |
| metrics_rows = [] | |
| report_manifest = {} | |
| pre_feature_names = None | |
| for model_name, model in models.items(): | |
| print(f"Training {model_name}...") | |
| clf = Pipeline(steps=[("preprocess", preprocessor), ("model", model)]) | |
| clf.fit(X_train, y_train) | |
| if pre_feature_names is None: | |
| pre_feature_names = clf.named_steps["preprocess"].get_feature_names_out().tolist() | |
| probas = clf.predict_proba(X_test)[:, 1] | |
| preds = (probas >= 0.5).astype(int) | |
| metrics = { | |
| "dataset": dataset_name, | |
| "split": split_tag, | |
| "model": model_name, | |
| "auc_roc": roc_auc_score(y_test, probas), | |
| "auc_pr": average_precision_score(y_test, probas), | |
| "accuracy": accuracy_score(y_test, preds), | |
| "precision": precision_score(y_test, preds, zero_division=0), | |
| "recall": recall_score(y_test, preds, zero_division=0), | |
| "f1": f1_score(y_test, preds, zero_division=0), | |
| } | |
| metrics_rows.append(metrics) | |
| # Classification report | |
| cls_report = classification_report( | |
| y_test, | |
| preds, | |
| target_names=["non_default", "default"], | |
| digits=3, | |
| zero_division=0, | |
| ) | |
| report_path = ds_out / f"classification_report_{model_name}.txt" | |
| report_path.write_text(cls_report) | |
| report_manifest[f"classification_report_{model_name}"] = str(report_path) | |
| # Plots | |
| roc_path = ds_out / f"roc_{model_name}.png" | |
| pr_path = ds_out / f"pr_{model_name}.png" | |
| cm_path = ds_out / f"confusion_matrix_{model_name}.png" | |
| model_path = ds_out / f"{model_name}_pipeline.pkl" | |
| plot_roc(y_test, probas, f"{dataset_name.upper()} - {model_name} ROC", roc_path) | |
| plot_pr(y_test, probas, f"{dataset_name.upper()} - {model_name} PR", pr_path) | |
| plot_confusion( | |
| y_test, preds, f"{dataset_name.upper()} - {model_name} Confusion", cm_path | |
| ) | |
| joblib.dump(clf, model_path) | |
| report_manifest[f"roc_{model_name}"] = str(roc_path) | |
| report_manifest[f"pr_{model_name}"] = str(pr_path) | |
| report_manifest[f"confusion_{model_name}"] = str(cm_path) | |
| report_manifest[f"model_{model_name}"] = str(model_path) | |
| if model_name == "xgboost": | |
| booster = clf.named_steps["model"].get_booster() | |
| base_score = booster.attr("base_score") | |
| if base_score: | |
| try: | |
| float(base_score) | |
| except ValueError: | |
| cleaned = base_score.strip("[]") | |
| try: | |
| cleaned_val = str(float(cleaned)) | |
| except Exception: | |
| cleaned_val = "0.5" | |
| booster.set_param({"base_score": cleaned_val}) | |
| booster.set_attr(base_score=cleaned_val) | |
| booster_path = ds_out / f"{model_name}_booster.json" | |
| booster.save_model(str(booster_path)) | |
| report_manifest[f"booster_{model_name}"] = str(booster_path) | |
| if pre_feature_names is None: | |
| pre_feature_names = [] | |
| explain_meta = { | |
| "dataset": dataset_name, | |
| "target_col": TARGET_COL, | |
| "raw_num_cols": num_cols, | |
| "raw_cat_cols": cat_cols, | |
| "pre_feature_names": pre_feature_names, | |
| "id_cols": ID_COLS, | |
| "dropped_features": sorted(list(FEATURES_TO_DROP)), | |
| "split_used": split_tag, | |
| } | |
| meta_path = ds_out / "explain_meta.json" | |
| meta_path.write_text(json.dumps(explain_meta, indent=2)) | |
| report_manifest["explain_meta"] = str(meta_path) | |
| report_manifest["explain_background"] = str(background_path) | |
| metrics_df = pd.DataFrame(metrics_rows).sort_values( | |
| ["dataset", "auc_roc"], ascending=[True, False] | |
| ) | |
| metrics_path = ds_out / "metrics_summary.csv" | |
| metrics_df.to_csv(metrics_path, index=False) | |
| print(f"Saved metrics -> {metrics_path}") | |
| manifest_path = ds_out / "artifacts.json" | |
| manifest_path.write_text(json.dumps(report_manifest, indent=2)) | |
| def main() -> None: | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| for name, path in DATASETS.items(): | |
| if not path.exists(): | |
| print(f"Skipping {name}, missing file: {path}") | |
| continue | |
| evaluate_models(name, path) | |
| if __name__ == "__main__": | |
| main() | |