Spaces:
Sleeping
Sleeping
| import os, json, io, traceback | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| import tensorflow as tf | |
| from fastapi import FastAPI, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| # ---------- SHAP optional import ---------- | |
| try: | |
| import shap | |
| SHAP_AVAILABLE = True | |
| except ImportError: | |
| SHAP_AVAILABLE = False | |
| # ----------------- CONFIG ----------------- | |
| MODEL_PATH = os.getenv("MODEL_PATH", "best_model.h5") | |
| STATS_PATH = os.getenv("STATS_PATH", "means_std.json") | |
| IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"] | |
| SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"] | |
| CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"] | |
| # ⛔ DO NOT CHANGE: exact order used in training | |
| FEATURES: List[str] = [ | |
| "autosuf_oper", | |
| "improductiva", | |
| "gastos_fin_over_avg_cart", | |
| "_equity", | |
| "grado_absorcion", | |
| "_cartera_bruta", | |
| "gastos_oper_over_ing_oper", | |
| "cartera_vencida_ratio", | |
| "roe_pre_tax", | |
| "_assets", | |
| "_liab", | |
| "equity_over_assets", | |
| "_margen_bruto", | |
| "prov_over_cartera", | |
| "gastos_oper_over_cart", | |
| "ing_cartera_over_ing_total", | |
| "debt_to_equity", | |
| "prov_gasto_over_cart", | |
| "cov_improductiva", | |
| "rend_cart_over_avg_cart", | |
| "roa_pre_tax", | |
| ] | |
| # ------------------------------------------ | |
| # --------- helpers: I/O + numeric coercion --------- | |
| def coerce_float(val: Any) -> float: | |
| """ | |
| Accepts numeric, or strings like: | |
| "49.709,14" -> 49709.14 | |
| "49,709.14" -> 49709.14 | |
| "0,005" -> 0.005 | |
| """ | |
| if isinstance(val, (int, float, np.number)): | |
| return float(val) | |
| s = str(val).strip() | |
| if s == "": | |
| raise ValueError("empty") | |
| s = s.replace(" ", "") | |
| has_dot, has_comma = "." in s, "," in s | |
| if has_dot and has_comma: | |
| # Decide decimal by last occurrence | |
| if s.rfind(",") > s.rfind("."): | |
| s = s.replace(".", "") | |
| s = s.replace(",", ".") | |
| else: | |
| s = s.replace(",", "") | |
| elif has_comma and not has_dot: | |
| s = s.replace(",", ".") | |
| # else leave as-is | |
| return float(s) | |
| def load_json(path: str) -> dict: | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| def load_joblib_if_exists(candidates: List[str]): | |
| """ | |
| Try loading a joblib/pickle artifact (imputer/scaler). | |
| Returns (obj, path_str or None, error_str or None). | |
| """ | |
| for name in candidates: | |
| p = os.path.join(os.getcwd(), name) | |
| if os.path.isfile(p): | |
| try: | |
| # Import inside to avoid hard dependency if not used | |
| import joblib # type: ignore | |
| with open(p, "rb") as fh: | |
| obj = joblib.load(fh) | |
| return obj, p, None | |
| except Exception as e: | |
| return None, p, f"{type(e).__name__}({e})" | |
| return None, None, None | |
| # --------- model / artifacts load --------- | |
| print("Loading model / imputer / scaler...") | |
| # Model | |
| model = tf.keras.models.load_model(MODEL_PATH, compile=False) | |
| # Imputer | |
| imputer, imputer_path, imputer_err = load_joblib_if_exists(IMPUTER_CANDIDATES) | |
| if imputer_path and imputer_err: | |
| print(f"⚠️ Failed to load imputer from {imputer_path}: {imputer_err}") | |
| elif imputer: | |
| print(f"Loaded imputer from {imputer_path}") | |
| else: | |
| print("⚠️ No imputer found — skipping median imputation.") | |
| # Scaler | |
| scaler, scaler_path, scaler_err = load_joblib_if_exists(SCALER_CANDIDATES) | |
| if scaler_path and scaler_err: | |
| print(f"⚠️ Failed to load scaler from {scaler_path}: {scaler_err}") | |
| elif scaler: | |
| print(f"Loaded scaler from {scaler_path}") | |
| else: | |
| print("⚠️ No scaler found — using manual z-scoring if stats are available.") | |
| # Stats (means/std) for fallback manual z-score | |
| stats: Dict[str, Dict[str, float]] = {} | |
| if os.path.isfile(STATS_PATH): | |
| stats = load_json(STATS_PATH) | |
| print(f"Loaded means/std from {STATS_PATH}") | |
| else: | |
| print("⚠️ No means_std.json found — manual z-scoring will be unavailable if scaler missing.") | |
| # --------- decoding for CORAL vs softmax --------- | |
| def coral_probs_from_logits(logits_np: np.ndarray) -> np.ndarray: | |
| """ | |
| (N, K-1) logits -> (N, K) probabilities for CORAL ordinal output. | |
| """ | |
| logits = tf.convert_to_tensor(logits_np, dtype=tf.float32) | |
| sig = tf.math.sigmoid(logits) # (N, K-1) | |
| left = tf.concat([tf.ones_like(sig[:, :1]), sig], axis=1) | |
| right = tf.concat([sig, tf.zeros_like(sig[:, :1])], axis=1) | |
| probs = tf.clip_by_value(left - right, 1e-12, 1.0) | |
| # normalize row-wise just in case | |
| probs = probs / tf.reduce_sum(probs, axis=1, keepdims=True) | |
| return probs.numpy() | |
| def decode_logits(raw: np.ndarray) -> (np.ndarray, str): | |
| """ | |
| raw: (1, M) array | |
| Returns (probs (K,), mode_str). | |
| Detects CORAL (M=K-1) vs Softmax (M=K). | |
| """ | |
| if raw.ndim != 2: | |
| raise ValueError(f"Unexpected raw shape {raw.shape}") | |
| M = raw.shape[1] | |
| K = len(CLASSES) | |
| if M == K - 1: | |
| # CORAL logits | |
| probs = coral_probs_from_logits(raw)[0] | |
| return probs, "auto_coral" | |
| elif M == K: | |
| # Softmax or unnormalized scores | |
| row = raw[0] | |
| exps = np.exp(row - np.max(row)) | |
| probs = exps / np.sum(exps) | |
| return probs, "auto_softmax" | |
| else: | |
| # Fallback: normalize across whatever is there | |
| row = raw[0] | |
| s = float(np.sum(np.abs(row))) | |
| probs = (row / s) if s > 0 else np.ones_like(row) / len(row) | |
| return probs, f"fallback_M{M}_K{K}" | |
| # --------- preprocessing pipeline --------- | |
| def build_raw_vector(payload: Dict[str, Any]) -> np.ndarray: | |
| """ | |
| Build raw feature vector in exact training order. | |
| Missing -> np.nan (imputer will handle if available). | |
| Values coerced to float robustly. | |
| """ | |
| vals = [] | |
| for f in FEATURES: | |
| if f in payload: | |
| try: | |
| vals.append(coerce_float(payload[f])) | |
| except Exception: | |
| vals.append(np.nan) | |
| else: | |
| vals.append(np.nan) | |
| return np.array(vals, dtype=np.float32) | |
| def apply_imputer_if_any(x: np.ndarray) -> np.ndarray: | |
| if imputer is not None: | |
| # imputer expects 2D | |
| return imputer.transform(x.reshape(1, -1)).astype(np.float32)[0] | |
| # fallback: replace NaNs with feature means from stats if available, else 0 | |
| out = x.copy() | |
| for i, f in enumerate(FEATURES): | |
| if np.isnan(out[i]): | |
| if f in stats and "mean" in stats[f]: | |
| out[i] = float(stats[f]["mean"]) | |
| else: | |
| out[i] = 0.0 | |
| return out | |
| def apply_scaling_or_stats(raw_vec: np.ndarray) -> (np.ndarray, Dict[str, float], str): | |
| """ | |
| Returns (z_vec, z_detail_dict, mode_str) | |
| - If scaler present: scaler.transform | |
| - Else: manual (x-mean)/std using stats | |
| """ | |
| if scaler is not None: | |
| z = scaler.transform(raw_vec.reshape(1, -1)).astype(np.float32)[0] | |
| z_detail = {f: float(z[i]) for i, f in enumerate(FEATURES)} | |
| return z, z_detail, "sklearn_scaler" | |
| else: | |
| z = np.zeros_like(raw_vec, dtype=np.float32) | |
| z_detail: Dict[str, float] = {} | |
| for i, f in enumerate(FEATURES): | |
| mean = stats.get(f, {}).get("mean", 0.0) | |
| sd = stats.get(f, {}).get("std", 1.0) | |
| if not sd: | |
| sd = 1.0 | |
| z[i] = (raw_vec[i] - mean) / sd | |
| z_detail[f] = float(z[i]) | |
| return z, z_detail, "manual_stats" | |
| # --------- SHAP model wrapper & explainer --------- | |
| def model_proba_from_z(z_batch_np: np.ndarray) -> np.ndarray: | |
| """ | |
| Wrapper for SHAP: takes (N, n_features) in z-space and returns (N, K) probabilities. | |
| """ | |
| raw = model.predict(z_batch_np, verbose=0) | |
| if raw.ndim != 2: | |
| raise ValueError(f"Unexpected raw shape from model: {raw.shape}") | |
| N, M = raw.shape | |
| K = len(CLASSES) | |
| if M == K - 1: | |
| # CORAL | |
| probs = coral_probs_from_logits(raw) # (N, K) | |
| elif M == K: | |
| # Softmax or scores | |
| exps = np.exp(raw - np.max(raw, axis=1, keepdims=True)) | |
| probs = exps / np.sum(exps, axis=1, keepdims=True) | |
| else: | |
| # Fallback normalize | |
| s = np.sum(np.abs(raw), axis=1, keepdims=True) | |
| probs = np.divide(raw, s, out=np.ones_like(raw) / max(M, 1), where=(s > 0)) | |
| return probs | |
| EXPLAINER = None | |
| if SHAP_AVAILABLE: | |
| try: | |
| # Background: 50 "average" institutions at z=0 | |
| BACKGROUND_Z = np.zeros((50, len(FEATURES)), dtype=np.float32) | |
| EXPLAINER = shap.KernelExplainer(model_proba_from_z, BACKGROUND_Z) | |
| print("SHAP KernelExplainer initialized.") | |
| except Exception as e: | |
| EXPLAINER = None | |
| print("⚠️ Failed to initialize SHAP explainer:", repr(e)) | |
| else: | |
| print("SHAP not installed; explanations disabled.") | |
| # ----------------- FastAPI ----------------- | |
| app = FastAPI(title="Static Fingerprint API", version="1.2.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def root(): | |
| return { | |
| "message": "Static Fingerprint API is running.", | |
| "try": ["GET /health", "POST /predict", "POST /debug/z"], | |
| } | |
| def health(): | |
| stats_keys = [] | |
| try: | |
| if os.path.isfile(STATS_PATH): | |
| stats_keys = list(load_json(STATS_PATH).keys()) | |
| except Exception: | |
| pass | |
| return { | |
| "status": "ok", | |
| "classes": CLASSES, | |
| "features_training_order": FEATURES, | |
| "features_in_means_std": stats_keys, | |
| "model_file": MODEL_PATH, | |
| "imputer": bool(imputer), | |
| "scaler": bool(scaler), | |
| "stats_available": bool(stats), | |
| "shap_available": bool(EXPLAINER is not None), | |
| } | |
| async def debug_z(req: Request): | |
| try: | |
| payload = await req.json() | |
| if not isinstance(payload, dict): | |
| return JSONResponse(status_code=400, content={"error": "Expected JSON object"}) | |
| raw = build_raw_vector(payload) | |
| raw_imp = apply_imputer_if_any(raw) | |
| z, z_detail, mode = apply_scaling_or_stats(raw_imp) | |
| rows = [] | |
| for i, f in enumerate(FEATURES): | |
| rows.append({ | |
| "feature": f, | |
| "input_value": None if np.isnan(raw[i]) else float(raw[i]), | |
| "imputed_value": float(raw_imp[i]), | |
| "z": float(z[i]), | |
| "mean": stats.get(f, {}).get("mean", None), | |
| "std": stats.get(f, {}).get("std", None), | |
| }) | |
| return {"preprocess_mode": mode, "rows": rows} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e), "trace": traceback.format_exc()}) | |
| async def predict(req: Request): | |
| """ | |
| Body: JSON object mapping feature -> numeric value (strings with commas/points ok). | |
| Missing features are imputed if imputer present; else filled with means (if stats) or 0. | |
| Now also returns SHAP values for the predicted_state (if SHAP is available). | |
| """ | |
| try: | |
| payload = await req.json() | |
| if not isinstance(payload, dict): | |
| return JSONResponse(status_code=400, content={"error": "Expected JSON object"}) | |
| # Build in EXACT training order | |
| raw = build_raw_vector(payload) # may contain NaNs | |
| raw_imp = apply_imputer_if_any(raw) # impute | |
| z_vec, z_detail, z_mode = apply_scaling_or_stats(raw_imp) # scale / z-score | |
| # Predict | |
| X = z_vec.reshape(1, -1).astype(np.float32) | |
| raw_logits = model.predict(X, verbose=0) | |
| probs, mode = decode_logits(raw_logits) | |
| pred_idx = int(np.argmax(probs)) | |
| probs_dict = {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))} | |
| missing = [f for i, f in enumerate(FEATURES) if np.isnan(raw[i])] | |
| return { | |
| "input_ok": (len(missing) == 0), | |
| "missing": missing, | |
| "preprocess": { | |
| "imputer": bool(imputer), | |
| "scaler": bool(scaler), | |
| "z_mode": z_mode, | |
| }, | |
| "z_scores": z_detail, | |
| "probabilities": probs_dict, | |
| "predicted_state": CLASSES[pred_idx], | |
| "shap": shap_out, | |
| "debug": { | |
| "raw_shape": list(raw_logits.shape), | |
| "decode_mode": mode, | |
| "raw_first_row": [float(v) for v in raw_logits[0]], | |
| }, | |
| } | |
| pred_idx = int(np.argmax(probs)) | |
| probs_dict = {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))} | |
| missing = [f for i, f in enumerate(FEATURES) if np.isnan(raw[i])] | |
| # ---- SHAP explanation for predicted class ---- | |
| # -------- SHAP EXPLANATION (predicted class only) -------- | |
| shap_out = None | |
| if EXPLAINER is not None: | |
| try: | |
| # X is already z-space: shape (1, n_features) | |
| shap_vals = EXPLAINER.shap_values(X, nsamples=100) | |
| # Case 1: multi-output -> list of length K, each (1, n_features) | |
| if isinstance(shap_vals, list): | |
| shap_vec = np.array(shap_vals[pred_idx][0], dtype=float) | |
| # expected_value may also be a list per class | |
| exp_val_raw = EXPLAINER.expected_value | |
| if isinstance(exp_val_raw, (list, np.ndarray)): | |
| exp_val = float(exp_val_raw[pred_idx]) | |
| else: | |
| exp_val = float(exp_val_raw) | |
| # Case 2: single-output -> ndarray (1, n_features) | |
| elif isinstance(shap_vals, np.ndarray): | |
| shap_vec = np.array(shap_vals[0], dtype=float) | |
| exp_val_raw = EXPLAINER.expected_value | |
| if isinstance(exp_val_raw, (list, np.ndarray)): | |
| exp_val = float(exp_val_raw[0]) | |
| else: | |
| exp_val = float(exp_val_raw) | |
| else: | |
| raise TypeError(f"Unsupported SHAP return type: {type(shap_vals)}") | |
| # Map feature -> SHAP contribution (for the predicted class) | |
| shap_feature_contribs = { | |
| FEATURES[i]: float(shap_vec[i]) | |
| for i in range(len(FEATURES)) | |
| } | |
| shap_out = { | |
| "explained_class": CLASSES[pred_idx], | |
| "expected_value": exp_val, | |
| "shap_values": shap_feature_contribs, | |
| } | |
| except Exception as e: | |
| shap_out = { | |
| "error": str(e), | |
| "trace": traceback.format_exc() | |
| } | |
| else: | |
| shap_out = {"error": "SHAP not available on server"} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e), "trace": traceback.format_exc()}) |