from __future__ import annotations import json from pathlib import Path from typing import Dict, List, Tuple import matplotlib matplotlib.use("Agg") # Use non-GUI backend to avoid Tkinter cleanup warnings import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns import joblib from catboost import CatBoostClassifier from lightgbm import LGBMClassifier from sklearn.compose import ColumnTransformer from sklearn.ensemble import RandomForestClassifier from sklearn.impute import SimpleImputer from sklearn.metrics import ( accuracy_score, average_precision_score, classification_report, confusion_matrix, f1_score, precision_recall_curve, precision_score, recall_score, roc_auc_score, roc_curve, ) from sklearn.model_selection import GroupShuffleSplit, train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder from xgboost import XGBClassifier import xgboost as xgb RANDOM_STATE = 42 # Repository root (two levels up from this file: code/model/train_models.py -> repo root) ROOT = Path(__file__).resolve().parents[2] DATA_BASE = Path( "/home/name-1/AI-Agent/frankscore/kenyan-dataset-issue/data/feature-generated" ) DATASETS: Dict[str, Path] = { "full": DATA_BASE / "kenya_engineered_features.csv", "borrower": DATA_BASE / "kenya_engineered_features_borrower_side.csv", } OUTPUT_DIR = ROOT / "code" / "model" / "outputs_for_demo" TARGET_COL = "target" ID_COLS = ["customer_id", "tbl_loan_id"] GROUP_COL_CANDIDATES = ["customer_id", "customerId", "client_id"] DATE_COL_CANDIDATES = ["pseudo_disb_date", "disb_date", "disbursement_date", "application_date", "loan_date"] FEATURES_TO_DROP = { "interest_rate", "repayment_intensity", "lender_risk_profile", "pseudo_disb_date", } def build_preprocessor( feature_frame: pd.DataFrame, ) -> Tuple[ColumnTransformer, List[str], List[str]]: cat_cols = feature_frame.select_dtypes(include=["object"]).columns.tolist() num_cols = [c for c in feature_frame.columns if c not in cat_cols] num_pipe = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ] ) cat_pipe = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="most_frequent")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), ] ) preprocessor = ColumnTransformer( transformers=[ ("num", num_pipe, num_cols), ("cat", cat_pipe, cat_cols), ] ) return preprocessor, num_cols, cat_cols def find_first_existing_col(df: pd.DataFrame, candidates: List[str]) -> str | None: for c in candidates: if c in df.columns: return c return None def split_data_leakage_safe( df: pd.DataFrame, X: pd.DataFrame, y: pd.Series ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, str]: """ Priority: 1) time split if a date column exists 2) group split on customer id 3) stratified fallback """ date_col = find_first_existing_col(df, DATE_COL_CANDIDATES) group_col = find_first_existing_col(df, GROUP_COL_CANDIDATES) if date_col is not None: tmp = df[[date_col]].copy() tmp[date_col] = pd.to_datetime(tmp[date_col], errors="coerce") if tmp[date_col].notna().mean() > 0.8: order = tmp[date_col].sort_values().index cutoff = int(len(order) * 0.8) train_idx = order[:cutoff] test_idx = order[cutoff:] return ( X.loc[train_idx], X.loc[test_idx], y.loc[train_idx], y.loc[test_idx], f"time_split({date_col})", ) if group_col is not None: groups = df[group_col] gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=RANDOM_STATE) train_idx, test_idx = next(gss.split(X, y, groups=groups)) return ( X.iloc[train_idx], X.iloc[test_idx], y.iloc[train_idx], y.iloc[test_idx], f"group_split({group_col})", ) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE ) return X_train, X_test, y_train, y_test, "stratified_random_split" def get_models(scale_pos_weight: float) -> Dict[str, object]: # Using moderate defaults to keep runtime reasonable. return { "random_forest": RandomForestClassifier( n_estimators=300, max_depth=None, n_jobs=-1, class_weight="balanced", random_state=RANDOM_STATE, ), "xgboost": XGBClassifier( n_estimators=300, max_depth=6, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, eval_metric="logloss", n_jobs=-1, random_state=RANDOM_STATE, scale_pos_weight=scale_pos_weight, ), "lightgbm": LGBMClassifier( n_estimators=400, learning_rate=0.05, max_depth=-1, subsample=0.9, colsample_bytree=0.9, random_state=RANDOM_STATE, n_jobs=-1, class_weight="balanced", ), "catboost": CatBoostClassifier( iterations=400, depth=8, learning_rate=0.05, loss_function="Logloss", eval_metric="AUC", verbose=0, random_seed=RANDOM_STATE, ), } def plot_roc(y_true: np.ndarray, y_score: np.ndarray, title: str, path: Path) -> None: fpr, tpr, _ = roc_curve(y_true, y_score) auc_val = roc_auc_score(y_true, y_score) plt.figure() plt.plot(fpr, tpr, label=f"AUC = {auc_val:.3f}") plt.plot([0, 1], [0, 1], linestyle="--", color="grey") plt.xlabel("False Positive Rate") plt.ylabel("True Positive Rate") plt.title(title) plt.legend(loc="lower right") plt.tight_layout() plt.savefig(path, dpi=150) plt.close() def plot_pr(y_true: np.ndarray, y_score: np.ndarray, title: str, path: Path) -> None: precision, recall, _ = precision_recall_curve(y_true, y_score) ap = average_precision_score(y_true, y_score) plt.figure() plt.plot(recall, precision, label=f"AP = {ap:.3f}") plt.xlabel("Recall") plt.ylabel("Precision") plt.title(title) plt.legend(loc="lower left") plt.tight_layout() plt.savefig(path, dpi=150) plt.close() def plot_confusion(y_true: np.ndarray, y_pred: np.ndarray, title: str, path: Path) -> None: cm = confusion_matrix(y_true, y_pred) plt.figure() sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False) plt.xlabel("Predicted") plt.ylabel("Actual") plt.title(title) plt.tight_layout() plt.savefig(path, dpi=150) plt.close() def evaluate_models(dataset_name: str, data_path: Path) -> None: print(f"=== Training on {dataset_name} dataset ===") df = pd.read_csv(data_path) if TARGET_COL not in df.columns: raise SystemExit(f"target column missing in {data_path}") X = df.drop(columns=[TARGET_COL] + ID_COLS, errors="ignore") X = X.drop(columns=[c for c in FEATURES_TO_DROP if c in X.columns], errors="ignore") y = df[TARGET_COL] preprocessor, num_cols, cat_cols = build_preprocessor(X) X_train, X_test, y_train, y_test, split_tag = split_data_leakage_safe(df, X, y) print(f"Split used: {split_tag}") pos = y_train.sum() neg = len(y_train) - pos scale_pos_weight = float(neg / pos) if pos > 0 else 1.0 models = get_models(scale_pos_weight) ds_out = OUTPUT_DIR / dataset_name ds_out.mkdir(parents=True, exist_ok=True) # Save a small background sample for downstream explainability tooling. background_path = ds_out / "explain_background.csv" df.sample(min(len(df), 200), random_state=RANDOM_STATE).to_csv(background_path, index=False) metrics_rows = [] report_manifest = {} pre_feature_names = None for model_name, model in models.items(): print(f"Training {model_name}...") clf = Pipeline(steps=[("preprocess", preprocessor), ("model", model)]) clf.fit(X_train, y_train) if pre_feature_names is None: pre_feature_names = clf.named_steps["preprocess"].get_feature_names_out().tolist() probas = clf.predict_proba(X_test)[:, 1] preds = (probas >= 0.5).astype(int) metrics = { "dataset": dataset_name, "split": split_tag, "model": model_name, "auc_roc": roc_auc_score(y_test, probas), "auc_pr": average_precision_score(y_test, probas), "accuracy": accuracy_score(y_test, preds), "precision": precision_score(y_test, preds, zero_division=0), "recall": recall_score(y_test, preds, zero_division=0), "f1": f1_score(y_test, preds, zero_division=0), } metrics_rows.append(metrics) # Classification report cls_report = classification_report( y_test, preds, target_names=["non_default", "default"], digits=3, zero_division=0, ) report_path = ds_out / f"classification_report_{model_name}.txt" report_path.write_text(cls_report) report_manifest[f"classification_report_{model_name}"] = str(report_path) # Plots roc_path = ds_out / f"roc_{model_name}.png" pr_path = ds_out / f"pr_{model_name}.png" cm_path = ds_out / f"confusion_matrix_{model_name}.png" model_path = ds_out / f"{model_name}_pipeline.pkl" plot_roc(y_test, probas, f"{dataset_name.upper()} - {model_name} ROC", roc_path) plot_pr(y_test, probas, f"{dataset_name.upper()} - {model_name} PR", pr_path) plot_confusion( y_test, preds, f"{dataset_name.upper()} - {model_name} Confusion", cm_path ) joblib.dump(clf, model_path) report_manifest[f"roc_{model_name}"] = str(roc_path) report_manifest[f"pr_{model_name}"] = str(pr_path) report_manifest[f"confusion_{model_name}"] = str(cm_path) report_manifest[f"model_{model_name}"] = str(model_path) if model_name == "xgboost": booster = clf.named_steps["model"].get_booster() base_score = booster.attr("base_score") if base_score: try: float(base_score) except ValueError: cleaned = base_score.strip("[]") try: cleaned_val = str(float(cleaned)) except Exception: cleaned_val = "0.5" booster.set_param({"base_score": cleaned_val}) booster.set_attr(base_score=cleaned_val) booster_path = ds_out / f"{model_name}_booster.json" booster.save_model(str(booster_path)) report_manifest[f"booster_{model_name}"] = str(booster_path) if pre_feature_names is None: pre_feature_names = [] explain_meta = { "dataset": dataset_name, "target_col": TARGET_COL, "raw_num_cols": num_cols, "raw_cat_cols": cat_cols, "pre_feature_names": pre_feature_names, "id_cols": ID_COLS, "dropped_features": sorted(list(FEATURES_TO_DROP)), "split_used": split_tag, } meta_path = ds_out / "explain_meta.json" meta_path.write_text(json.dumps(explain_meta, indent=2)) report_manifest["explain_meta"] = str(meta_path) report_manifest["explain_background"] = str(background_path) metrics_df = pd.DataFrame(metrics_rows).sort_values( ["dataset", "auc_roc"], ascending=[True, False] ) metrics_path = ds_out / "metrics_summary.csv" metrics_df.to_csv(metrics_path, index=False) print(f"Saved metrics -> {metrics_path}") manifest_path = ds_out / "artifacts.json" manifest_path.write_text(json.dumps(report_manifest, indent=2)) def main() -> None: OUTPUT_DIR.mkdir(parents=True, exist_ok=True) for name, path in DATASETS.items(): if not path.exists(): print(f"Skipping {name}, missing file: {path}") continue evaluate_models(name, path) if __name__ == "__main__": main()