Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel, Field | |
| import xgboost as xgb | |
| import os | |
| from huggingface_hub import hf_hub_download | |
| # Compatibility shim for pickles created with newer sklearn that include _RemainderColsList | |
| import sklearn.compose._column_transformer as _ct # type: ignore | |
| if not hasattr(_ct, "_RemainderColsList"): | |
| class _RemainderColsList(list): # type: ignore | |
| pass | |
| _ct._RemainderColsList = _RemainderColsList | |
| ROOT = Path(__file__).resolve().parents[1] | |
| MODEL_DIR = Path(__file__).resolve().parent / "model" | |
| # MODEL_PATH = MODEL_DIR / "xgboost_pipeline.pkl" | |
| BOOSTER_PATH = MODEL_DIR / "xgboost_booster.json" | |
| META_PATH = MODEL_DIR / "explain_meta.json" | |
| HF_MODEL_REPO = os.getenv("HF_MODEL_REPO", "Gutema/frankscore-model-artifact") | |
| HF_MODEL_REVISION = os.getenv("HF_MODEL_REVISION", "main") | |
| try: | |
| MODEL_PATH = Path( | |
| hf_hub_download( | |
| repo_id=HF_MODEL_REPO, | |
| filename="xgboost_pipeline.pkl", | |
| revision=HF_MODEL_REVISION, | |
| ) | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to download model artifact from HF repo={HF_MODEL_REPO}: {e}") from e | |
| if not META_PATH.exists(): | |
| raise FileNotFoundError(f"Explainability meta missing at {META_PATH}") | |
| if not BOOSTER_PATH.exists(): | |
| raise FileNotFoundError(f"Booster file missing at {BOOSTER_PATH}") | |
| if not MODEL_PATH.exists(): | |
| raise FileNotFoundError(f"Model file missing at {MODEL_PATH}") | |
| if not META_PATH.exists(): | |
| raise FileNotFoundError(f"Explainability meta missing at {META_PATH}") | |
| if not BOOSTER_PATH.exists(): | |
| raise FileNotFoundError(f"Booster file missing at {BOOSTER_PATH}") | |
| PIPELINE = joblib.load(MODEL_PATH) | |
| META = json.loads(META_PATH.read_text()) | |
| EXPECTED_FEATURES = list(getattr(PIPELINE, "feature_names_in_", [])) | |
| PREPROCESS = PIPELINE.named_steps.get("preprocess") if hasattr(PIPELINE, "named_steps") else None | |
| if PREPROCESS is None: | |
| raise RuntimeError("Pipeline missing 'preprocess' step; cannot infer columns.") | |
| if not EXPECTED_FEATURES: | |
| EXPECTED_FEATURES = list(getattr(PREPROCESS, "feature_names_in_", [])) | |
| if not EXPECTED_FEATURES: | |
| raise RuntimeError("Unable to determine expected feature names from the pipeline.") | |
| _col_map = {name: cols for name, _, cols in getattr(PREPROCESS, "transformers_", [])} | |
| NUM_FEATURES = list(_col_map.get("num", [])) | |
| CAT_FEATURES = list(_col_map.get("cat", [])) | |
| PRE_FEATURE_NAMES = META.get("pre_feature_names") or list(getattr(PREPROCESS, "get_feature_names_out", lambda: [])()) | |
| RAW_FEATURE_SET = set((META.get("raw_num_cols") or []) + (META.get("raw_cat_cols") or [])) | |
| FEATURE_GROUPS = { | |
| "Repayment Activity": [ | |
| "num_previous_defaults", | |
| "past_default_rate", | |
| "repayment_consistency", | |
| "repayment_intensity", | |
| ], | |
| "Loan Amount & Burden": [ | |
| "Total_Amount", | |
| "Total_Amount_to_Repay", | |
| "amount_bucket", | |
| "burden_percentile", | |
| "daily_burden", | |
| "duration", | |
| "duration_bucket", | |
| "interest_rate", | |
| "amount_ratio", | |
| "burden_ratio", | |
| "lender_exposure_ratio", | |
| ], | |
| "Borrowing History": [ | |
| "account_age_days", | |
| "avg_past_amount", | |
| "avg_past_daily_burden", | |
| "avg_time_bw_loans", | |
| "borrower_history_strength", | |
| "days_since_last_loan", | |
| "loan_frequency_per_year", | |
| "num_previous_loans", | |
| "std_past_amount", | |
| "std_past_daily_burden", | |
| "trend_in_amount", | |
| "trend_in_burden", | |
| "lender_id", | |
| "lender_risk_profile", | |
| ], | |
| "Spending & Transactions": [ | |
| "latest_amount_ma3", | |
| "days_to_local_festival", | |
| "days_to_salary_day", | |
| "month", | |
| "quarter", | |
| "week_of_year", | |
| ], | |
| } | |
| FEATURE_GROUP_LOOKUP: Dict[str, str] = {} | |
| for group, variables in FEATURE_GROUPS.items(): | |
| for var in variables: | |
| FEATURE_GROUP_LOOKUP[var] = group | |
| app = FastAPI(title="FrankScore", version="1.0.0") | |
| class PredictionRequest(BaseModel): | |
| records: List[Dict[str, Any]] = Field(..., description="List of borrower feature dictionaries") | |
| class PredictionResponse(BaseModel): | |
| probabilities: List[float] | |
| class ScoreRequest(BaseModel): | |
| probabilities: List[float] = Field(..., description="Probabilities of default (0-1)") | |
| class ScoreResponse(BaseModel): | |
| scores: List[float] | |
| class ExplainRequest(BaseModel): | |
| records: List[Dict[str, Any]] | |
| top_k: Optional[int] = Field(default=10, ge=1, le=100, description="Number of top features to return per record") | |
| class FeatureContribution(BaseModel): | |
| feature: str | |
| shap_value: float | |
| class GroupContribution(BaseModel): | |
| group: str | |
| total_shap_value: float | |
| percentage: float | |
| direction: str | |
| label: str | |
| features: List[FeatureContribution] | |
| class ExplainItem(BaseModel): | |
| probability: float | |
| base_value: float | |
| group_contributions: List[GroupContribution] | |
| class ExplainResponse(BaseModel): | |
| explanations: List[ExplainItem] | |
| class PredictExplainItem(BaseModel): | |
| probability: float | |
| score: float | |
| base_value: float | |
| group_contributions: List[GroupContribution] | |
| class PredictExplainResponse(BaseModel): | |
| results: List[PredictExplainItem] | |
| def prepare_frame(records: List[Dict[str, Any]]) -> pd.DataFrame: | |
| if not records: | |
| raise HTTPException(status_code=400, detail="No records provided.") | |
| df = pd.DataFrame(records) | |
| for col in EXPECTED_FEATURES: | |
| if col not in df.columns: | |
| df[col] = np.nan | |
| df = df[EXPECTED_FEATURES] | |
| if NUM_FEATURES: | |
| df[NUM_FEATURES] = df[NUM_FEATURES].apply(pd.to_numeric, errors="coerce") | |
| if CAT_FEATURES: | |
| df[CAT_FEATURES] = df[CAT_FEATURES].astype("object") | |
| return df | |
| def pd_to_score(p: np.ndarray, base_score: float = 50, base_odds: float = 9, pdo: float = 20) -> np.ndarray: | |
| p = np.clip(p, 1e-6, 1 - 1e-6) | |
| B = pdo / np.log(2) | |
| A = base_score - B * np.log(base_odds) | |
| odds = (1 - p) / p | |
| score = A + B * np.log(odds) | |
| return np.clip(score, 0, 100) | |
| def _sanitize_feature_name(name: str) -> str: | |
| sanitized = name | |
| for ch, repl in {"[": "", "]": "", "<": "lt", ">": "gt", " ": "_", ",": "_", "=": "_"}.items(): | |
| sanitized = sanitized.replace(ch, repl) | |
| return sanitized | |
| def _base_feature_name(name: str) -> str: | |
| base = name | |
| if "__" in base: | |
| base = base.split("__", 1)[1] | |
| if base in RAW_FEATURE_SET: | |
| return base | |
| parts = base.split("_") | |
| while len(parts) > 1: | |
| candidate = "_".join(parts[:-1]) | |
| if candidate in RAW_FEATURE_SET: | |
| return candidate | |
| parts = parts[:-1] | |
| return base | |
| def _label_for_percentage(pct: float) -> str: | |
| if pct >= 30: | |
| return "Exceptional" | |
| if pct >= 20: | |
| return "Very Good" | |
| if pct >= 10: | |
| return "Good" | |
| if pct >= 5: | |
| return "Bad" | |
| return "Very Bad" | |
| def _direction_for_value(val: float) -> str: | |
| if val > 0: | |
| return "raises risk" | |
| if val < 0: | |
| return "reduces risk" | |
| return "neutral" | |
| def _build_group_contribs( | |
| group_totals: Dict[str, float], | |
| group_details: Dict[str, List[FeatureContribution]], | |
| top_k: Optional[int], | |
| ) -> List[GroupContribution]: | |
| denom = sum(abs(v) for v in group_totals.values()) | |
| if denom == 0: | |
| denom = 1e-12 # avoid division by zero; all percentages become ~0 | |
| group_contribs: List[GroupContribution] = [] | |
| for grp, total in sorted(group_totals.items(), key=lambda kv: abs(kv[1]), reverse=True): | |
| feats = sorted(group_details.get(grp, []), key=lambda fc: abs(fc.shap_value), reverse=True) | |
| if top_k: | |
| feats = feats[:top_k] | |
| pct = abs(total) / denom * 100 | |
| group_contribs.append( | |
| GroupContribution( | |
| group=grp, | |
| total_shap_value=total, | |
| percentage=pct, | |
| direction=_direction_for_value(total), | |
| label=_label_for_percentage(pct), | |
| features=feats, | |
| ) | |
| ) | |
| return group_contribs | |
| def get_booster(): | |
| if not hasattr(get_booster, "_booster"): | |
| booster = xgb.Booster() | |
| booster.load_model(str(BOOSTER_PATH)) | |
| base_score = booster.attr("base_score") | |
| if base_score: | |
| try: | |
| float(base_score) | |
| except ValueError: | |
| cleaned = base_score.strip("[]") | |
| try: | |
| cleaned_val = str(float(cleaned)) | |
| except Exception: | |
| cleaned_val = "0.5" | |
| booster.set_param({"base_score": cleaned_val}) | |
| booster.set_attr(base_score=cleaned_val) | |
| get_booster._booster = booster | |
| return get_booster._booster | |
| def predict(req: PredictionRequest) -> PredictionResponse: | |
| frame = prepare_frame(req.records) | |
| probas = PIPELINE.predict_proba(frame)[:, 1] | |
| return PredictionResponse(probabilities=probas.tolist()) | |
| def health() -> Dict[str, str]: | |
| return {"status": "ok", "model_path": str(MODEL_PATH)} | |
| def score(req: ScoreRequest) -> ScoreResponse: | |
| if not req.probabilities: | |
| raise HTTPException(status_code=400, detail="No probabilities provided.") | |
| arr = np.array(req.probabilities, dtype=float) | |
| scores = pd_to_score(arr) | |
| return ScoreResponse(scores=scores.tolist()) | |
| def explain(req: ExplainRequest) -> ExplainResponse: | |
| if not req.records: | |
| raise HTTPException(status_code=400, detail="No records provided.") | |
| frame = prepare_frame(req.records) | |
| probas = PIPELINE.predict_proba(frame)[:, 1] | |
| booster = get_booster() | |
| X_proc = PREPROCESS.transform(frame) | |
| feat_names = np.array(PRE_FEATURE_NAMES) if PRE_FEATURE_NAMES else np.array([f"f{i}" for i in range(X_proc.shape[1])]) | |
| sanitized_names = [_sanitize_feature_name(n) for n in feat_names] | |
| dmat = xgb.DMatrix(X_proc, feature_names=sanitized_names) | |
| contribs = booster.predict(dmat, pred_contribs=True) | |
| if contribs.shape[1] != X_proc.shape[1] + 1: | |
| raise HTTPException(status_code=500, detail="Unexpected contribution shape from booster.") | |
| base_vals = contribs[:, -1] | |
| feat_contribs = contribs[:, :-1] | |
| explanations: List[ExplainItem] = [] | |
| for i in range(feat_contribs.shape[0]): | |
| row_vals = feat_contribs[i] | |
| group_totals: Dict[str, float] = {} | |
| group_details: Dict[str, List[FeatureContribution]] = {} | |
| for name, val in zip(feat_names, row_vals): | |
| base = _base_feature_name(str(name)) | |
| group = FEATURE_GROUP_LOOKUP.get(base, "Other") | |
| group_totals[group] = group_totals.get(group, 0.0) + float(val) | |
| group_details.setdefault(group, []).append( | |
| FeatureContribution(feature=str(name), shap_value=float(val)) | |
| ) | |
| group_contribs = _build_group_contribs(group_totals, group_details, req.top_k) | |
| explanations.append( | |
| ExplainItem( | |
| probability=float(probas[i]), | |
| base_value=float(base_vals[i]), | |
| group_contributions=group_contribs, | |
| ) | |
| ) | |
| return ExplainResponse(explanations=explanations) | |
| def predict_explain(req: ExplainRequest) -> PredictExplainResponse: | |
| if not req.records: | |
| raise HTTPException(status_code=400, detail="No records provided.") | |
| frame = prepare_frame(req.records) | |
| probas = PIPELINE.predict_proba(frame)[:, 1] | |
| booster = get_booster() | |
| X_proc = PREPROCESS.transform(frame) | |
| feat_names = np.array(PRE_FEATURE_NAMES) if PRE_FEATURE_NAMES else np.array([f"f{i}" for i in range(X_proc.shape[1])]) | |
| sanitized_names = [_sanitize_feature_name(n) for n in feat_names] | |
| dmat = xgb.DMatrix(X_proc, feature_names=sanitized_names) | |
| contribs = booster.predict(dmat, pred_contribs=True) | |
| if contribs.shape[1] != X_proc.shape[1] + 1: | |
| raise HTTPException(status_code=500, detail="Unexpected contribution shape from booster.") | |
| base_vals = contribs[:, -1] | |
| feat_contribs = contribs[:, :-1] | |
| items: List[PredictExplainItem] = [] | |
| for i in range(feat_contribs.shape[0]): | |
| row_vals = feat_contribs[i] | |
| group_totals: Dict[str, float] = {} | |
| group_details: Dict[str, List[FeatureContribution]] = {} | |
| for name, val in zip(feat_names, row_vals): | |
| base = _base_feature_name(str(name)) | |
| group = FEATURE_GROUP_LOOKUP.get(base, "Other") | |
| group_totals[group] = group_totals.get(group, 0.0) + float(val) | |
| group_details.setdefault(group, []).append( | |
| FeatureContribution(feature=str(name), shap_value=float(val)) | |
| ) | |
| group_contribs = _build_group_contribs(group_totals, group_details, req.top_k) | |
| score_val = int(round(float(pd_to_score(np.array([probas[i]]))[0]))) | |
| items.append( | |
| PredictExplainItem( | |
| probability=float(probas[i]), | |
| score=score_val, | |
| base_value=float(base_vals[i]), | |
| group_contributions=group_contribs, | |
| ) | |
| ) | |
| return PredictExplainResponse(results=items) | |