from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional import joblib import numpy as np import pandas as pd from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field import xgboost as xgb import os from huggingface_hub import hf_hub_download # Compatibility shim for pickles created with newer sklearn that include _RemainderColsList import sklearn.compose._column_transformer as _ct # type: ignore if not hasattr(_ct, "_RemainderColsList"): class _RemainderColsList(list): # type: ignore pass _ct._RemainderColsList = _RemainderColsList ROOT = Path(__file__).resolve().parents[1] MODEL_DIR = Path(__file__).resolve().parent / "model" # MODEL_PATH = MODEL_DIR / "xgboost_pipeline.pkl" BOOSTER_PATH = MODEL_DIR / "xgboost_booster.json" META_PATH = MODEL_DIR / "explain_meta.json" HF_MODEL_REPO = os.getenv("HF_MODEL_REPO", "Gutema/frankscore-model-artifact") HF_MODEL_REVISION = os.getenv("HF_MODEL_REVISION", "main") try: MODEL_PATH = Path( hf_hub_download( repo_id=HF_MODEL_REPO, filename="xgboost_pipeline.pkl", revision=HF_MODEL_REVISION, ) ) except Exception as e: raise RuntimeError(f"Failed to download model artifact from HF repo={HF_MODEL_REPO}: {e}") from e if not META_PATH.exists(): raise FileNotFoundError(f"Explainability meta missing at {META_PATH}") if not BOOSTER_PATH.exists(): raise FileNotFoundError(f"Booster file missing at {BOOSTER_PATH}") if not MODEL_PATH.exists(): raise FileNotFoundError(f"Model file missing at {MODEL_PATH}") if not META_PATH.exists(): raise FileNotFoundError(f"Explainability meta missing at {META_PATH}") if not BOOSTER_PATH.exists(): raise FileNotFoundError(f"Booster file missing at {BOOSTER_PATH}") PIPELINE = joblib.load(MODEL_PATH) META = json.loads(META_PATH.read_text()) EXPECTED_FEATURES = list(getattr(PIPELINE, "feature_names_in_", [])) PREPROCESS = PIPELINE.named_steps.get("preprocess") if hasattr(PIPELINE, "named_steps") else None if PREPROCESS is None: raise RuntimeError("Pipeline missing 'preprocess' step; cannot infer columns.") if not EXPECTED_FEATURES: EXPECTED_FEATURES = list(getattr(PREPROCESS, "feature_names_in_", [])) if not EXPECTED_FEATURES: raise RuntimeError("Unable to determine expected feature names from the pipeline.") _col_map = {name: cols for name, _, cols in getattr(PREPROCESS, "transformers_", [])} NUM_FEATURES = list(_col_map.get("num", [])) CAT_FEATURES = list(_col_map.get("cat", [])) PRE_FEATURE_NAMES = META.get("pre_feature_names") or list(getattr(PREPROCESS, "get_feature_names_out", lambda: [])()) RAW_FEATURE_SET = set((META.get("raw_num_cols") or []) + (META.get("raw_cat_cols") or [])) FEATURE_GROUPS = { "Repayment Activity": [ "num_previous_defaults", "past_default_rate", "repayment_consistency", "repayment_intensity", ], "Loan Amount & Burden": [ "Total_Amount", "Total_Amount_to_Repay", "amount_bucket", "burden_percentile", "daily_burden", "duration", "duration_bucket", "interest_rate", "amount_ratio", "burden_ratio", "lender_exposure_ratio", ], "Borrowing History": [ "account_age_days", "avg_past_amount", "avg_past_daily_burden", "avg_time_bw_loans", "borrower_history_strength", "days_since_last_loan", "loan_frequency_per_year", "num_previous_loans", "std_past_amount", "std_past_daily_burden", "trend_in_amount", "trend_in_burden", "lender_id", "lender_risk_profile", ], "Spending & Transactions": [ "latest_amount_ma3", "days_to_local_festival", "days_to_salary_day", "month", "quarter", "week_of_year", ], } FEATURE_GROUP_LOOKUP: Dict[str, str] = {} for group, variables in FEATURE_GROUPS.items(): for var in variables: FEATURE_GROUP_LOOKUP[var] = group app = FastAPI(title="FrankScore", version="1.0.0") class PredictionRequest(BaseModel): records: List[Dict[str, Any]] = Field(..., description="List of borrower feature dictionaries") class PredictionResponse(BaseModel): probabilities: List[float] class ScoreRequest(BaseModel): probabilities: List[float] = Field(..., description="Probabilities of default (0-1)") class ScoreResponse(BaseModel): scores: List[float] class ExplainRequest(BaseModel): records: List[Dict[str, Any]] top_k: Optional[int] = Field(default=10, ge=1, le=100, description="Number of top features to return per record") class FeatureContribution(BaseModel): feature: str shap_value: float class GroupContribution(BaseModel): group: str total_shap_value: float percentage: float direction: str label: str features: List[FeatureContribution] class ExplainItem(BaseModel): probability: float base_value: float group_contributions: List[GroupContribution] class ExplainResponse(BaseModel): explanations: List[ExplainItem] class PredictExplainItem(BaseModel): probability: float score: float base_value: float group_contributions: List[GroupContribution] class PredictExplainResponse(BaseModel): results: List[PredictExplainItem] def prepare_frame(records: List[Dict[str, Any]]) -> pd.DataFrame: if not records: raise HTTPException(status_code=400, detail="No records provided.") df = pd.DataFrame(records) for col in EXPECTED_FEATURES: if col not in df.columns: df[col] = np.nan df = df[EXPECTED_FEATURES] if NUM_FEATURES: df[NUM_FEATURES] = df[NUM_FEATURES].apply(pd.to_numeric, errors="coerce") if CAT_FEATURES: df[CAT_FEATURES] = df[CAT_FEATURES].astype("object") return df def pd_to_score(p: np.ndarray, base_score: float = 50, base_odds: float = 9, pdo: float = 20) -> np.ndarray: p = np.clip(p, 1e-6, 1 - 1e-6) B = pdo / np.log(2) A = base_score - B * np.log(base_odds) odds = (1 - p) / p score = A + B * np.log(odds) return np.clip(score, 0, 100) def _sanitize_feature_name(name: str) -> str: sanitized = name for ch, repl in {"[": "", "]": "", "<": "lt", ">": "gt", " ": "_", ",": "_", "=": "_"}.items(): sanitized = sanitized.replace(ch, repl) return sanitized def _base_feature_name(name: str) -> str: base = name if "__" in base: base = base.split("__", 1)[1] if base in RAW_FEATURE_SET: return base parts = base.split("_") while len(parts) > 1: candidate = "_".join(parts[:-1]) if candidate in RAW_FEATURE_SET: return candidate parts = parts[:-1] return base def _label_for_percentage(pct: float) -> str: if pct >= 30: return "Exceptional" if pct >= 20: return "Very Good" if pct >= 10: return "Good" if pct >= 5: return "Bad" return "Very Bad" def _direction_for_value(val: float) -> str: if val > 0: return "raises risk" if val < 0: return "reduces risk" return "neutral" def _build_group_contribs( group_totals: Dict[str, float], group_details: Dict[str, List[FeatureContribution]], top_k: Optional[int], ) -> List[GroupContribution]: denom = sum(abs(v) for v in group_totals.values()) if denom == 0: denom = 1e-12 # avoid division by zero; all percentages become ~0 group_contribs: List[GroupContribution] = [] for grp, total in sorted(group_totals.items(), key=lambda kv: abs(kv[1]), reverse=True): feats = sorted(group_details.get(grp, []), key=lambda fc: abs(fc.shap_value), reverse=True) if top_k: feats = feats[:top_k] pct = abs(total) / denom * 100 group_contribs.append( GroupContribution( group=grp, total_shap_value=total, percentage=pct, direction=_direction_for_value(total), label=_label_for_percentage(pct), features=feats, ) ) return group_contribs def get_booster(): if not hasattr(get_booster, "_booster"): booster = xgb.Booster() booster.load_model(str(BOOSTER_PATH)) base_score = booster.attr("base_score") if base_score: try: float(base_score) except ValueError: cleaned = base_score.strip("[]") try: cleaned_val = str(float(cleaned)) except Exception: cleaned_val = "0.5" booster.set_param({"base_score": cleaned_val}) booster.set_attr(base_score=cleaned_val) get_booster._booster = booster return get_booster._booster @app.post("/predict", response_model=PredictionResponse) def predict(req: PredictionRequest) -> PredictionResponse: frame = prepare_frame(req.records) probas = PIPELINE.predict_proba(frame)[:, 1] return PredictionResponse(probabilities=probas.tolist()) @app.get("/health") def health() -> Dict[str, str]: return {"status": "ok", "model_path": str(MODEL_PATH)} @app.post("/score", response_model=ScoreResponse) def score(req: ScoreRequest) -> ScoreResponse: if not req.probabilities: raise HTTPException(status_code=400, detail="No probabilities provided.") arr = np.array(req.probabilities, dtype=float) scores = pd_to_score(arr) return ScoreResponse(scores=scores.tolist()) @app.post("/explain", response_model=ExplainResponse) def explain(req: ExplainRequest) -> ExplainResponse: if not req.records: raise HTTPException(status_code=400, detail="No records provided.") frame = prepare_frame(req.records) probas = PIPELINE.predict_proba(frame)[:, 1] booster = get_booster() X_proc = PREPROCESS.transform(frame) feat_names = np.array(PRE_FEATURE_NAMES) if PRE_FEATURE_NAMES else np.array([f"f{i}" for i in range(X_proc.shape[1])]) sanitized_names = [_sanitize_feature_name(n) for n in feat_names] dmat = xgb.DMatrix(X_proc, feature_names=sanitized_names) contribs = booster.predict(dmat, pred_contribs=True) if contribs.shape[1] != X_proc.shape[1] + 1: raise HTTPException(status_code=500, detail="Unexpected contribution shape from booster.") base_vals = contribs[:, -1] feat_contribs = contribs[:, :-1] explanations: List[ExplainItem] = [] for i in range(feat_contribs.shape[0]): row_vals = feat_contribs[i] group_totals: Dict[str, float] = {} group_details: Dict[str, List[FeatureContribution]] = {} for name, val in zip(feat_names, row_vals): base = _base_feature_name(str(name)) group = FEATURE_GROUP_LOOKUP.get(base, "Other") group_totals[group] = group_totals.get(group, 0.0) + float(val) group_details.setdefault(group, []).append( FeatureContribution(feature=str(name), shap_value=float(val)) ) group_contribs = _build_group_contribs(group_totals, group_details, req.top_k) explanations.append( ExplainItem( probability=float(probas[i]), base_value=float(base_vals[i]), group_contributions=group_contribs, ) ) return ExplainResponse(explanations=explanations) @app.post("/predict_explain", response_model=PredictExplainResponse) def predict_explain(req: ExplainRequest) -> PredictExplainResponse: if not req.records: raise HTTPException(status_code=400, detail="No records provided.") frame = prepare_frame(req.records) probas = PIPELINE.predict_proba(frame)[:, 1] booster = get_booster() X_proc = PREPROCESS.transform(frame) feat_names = np.array(PRE_FEATURE_NAMES) if PRE_FEATURE_NAMES else np.array([f"f{i}" for i in range(X_proc.shape[1])]) sanitized_names = [_sanitize_feature_name(n) for n in feat_names] dmat = xgb.DMatrix(X_proc, feature_names=sanitized_names) contribs = booster.predict(dmat, pred_contribs=True) if contribs.shape[1] != X_proc.shape[1] + 1: raise HTTPException(status_code=500, detail="Unexpected contribution shape from booster.") base_vals = contribs[:, -1] feat_contribs = contribs[:, :-1] items: List[PredictExplainItem] = [] for i in range(feat_contribs.shape[0]): row_vals = feat_contribs[i] group_totals: Dict[str, float] = {} group_details: Dict[str, List[FeatureContribution]] = {} for name, val in zip(feat_names, row_vals): base = _base_feature_name(str(name)) group = FEATURE_GROUP_LOOKUP.get(base, "Other") group_totals[group] = group_totals.get(group, 0.0) + float(val) group_details.setdefault(group, []).append( FeatureContribution(feature=str(name), shap_value=float(val)) ) group_contribs = _build_group_contribs(group_totals, group_details, req.top_k) score_val = int(round(float(pd_to_score(np.array([probas[i]]))[0]))) items.append( PredictExplainItem( probability=float(probas[i]), score=score_val, base_value=float(base_vals[i]), group_contributions=group_contribs, ) ) return PredictExplainResponse(results=items)