Spaces:
Sleeping
Sleeping
| import os, json, io, traceback | |
| from typing import Any, Dict, List, Optional | |
| import pandas as pd | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.preprocessing import StandardScaler | |
| import numpy as np | |
| import tensorflow as tf | |
| from fastapi import FastAPI, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| # ---------- SHAP optional import ---------- | |
| try: | |
| import shap | |
| SHAP_AVAILABLE = True | |
| except ImportError: | |
| SHAP_AVAILABLE = False | |
| # ----------------- CONFIG ----------------- | |
| MODEL_PATH = os.getenv("MODEL_PATH", "best_model.h5") | |
| STATS_PATH = os.getenv("STATS_PATH", "means_std.json") | |
| IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"] | |
| SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"] | |
| CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"] | |
| # ⛔ DO NOT CHANGE: exact order used in training | |
| FEATURES: List[str] = [ | |
| "autosuf_oper", | |
| "improductiva", | |
| "gastos_fin_over_avg_cart", | |
| "_equity", | |
| "grado_absorcion", | |
| "_cartera_bruta", | |
| "gastos_oper_over_ing_oper", | |
| "cartera_vencida_ratio", | |
| "roe_pre_tax", | |
| "_assets", | |
| "_liab", | |
| "equity_over_assets", | |
| "_margen_bruto", | |
| "prov_over_cartera", | |
| "gastos_oper_over_cart", | |
| "ing_cartera_over_ing_total", | |
| "debt_to_equity", | |
| "prov_gasto_over_cart", | |
| "cov_improductiva", | |
| "rend_cart_over_avg_cart", | |
| "roa_pre_tax", | |
| ] | |
| # ------------------------------------------ | |
| # --------- helpers: I/O + numeric coercion --------- | |
| def coerce_float(val: Any) -> float: | |
| """ | |
| Accepts numeric, or strings like: | |
| "49.709,14" -> 49709.14 | |
| "49,709.14" -> 49709.14 | |
| "0,005" -> 0.005 | |
| """ | |
| if isinstance(val, (int, float, np.number)): | |
| return float(val) | |
| s = str(val).strip() | |
| if s == "": | |
| raise ValueError("empty") | |
| s = s.replace(" ", "") | |
| has_dot, has_comma = "." in s, "," in s | |
| if has_dot and has_comma: | |
| # Decide decimal by last occurrence | |
| if s.rfind(",") > s.rfind("."): | |
| s = s.replace(".", "") | |
| s = s.replace(",", ".") | |
| else: | |
| s = s.replace(",", "") | |
| elif has_comma and not has_dot: | |
| s = s.replace(",", ".") | |
| # else leave as-is | |
| return float(s) | |
| def load_json(path: str) -> dict: | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| def load_joblib_if_exists(candidates: List[str]): | |
| """ | |
| Try loading a joblib/pickle artifact (imputer/scaler). | |
| Returns (obj, path_str or None, error_str or None). | |
| """ | |
| for name in candidates: | |
| p = os.path.join(os.getcwd(), name) | |
| if os.path.isfile(p): | |
| try: | |
| # Import inside to avoid hard dependency if not used | |
| import joblib # type: ignore | |
| with open(p, "rb") as fh: | |
| obj = joblib.load(fh) | |
| return obj, p, None | |
| except Exception as e: | |
| return None, p, f"{type(e).__name__}({e})" | |
| return None, None, None | |
| # --------- model / artifacts load --------- | |
| print("Loading model / imputer / scaler...") | |
| # Model | |
| model = tf.keras.models.load_model(MODEL_PATH, compile=False) | |
| # Imputer | |
| imputer, imputer_path, imputer_err = load_joblib_if_exists(IMPUTER_CANDIDATES) | |
| if imputer_path and imputer_err: | |
| print(f"⚠️ Failed to load imputer from {imputer_path}: {imputer_err}") | |
| elif imputer: | |
| print(f"Loaded imputer from {imputer_path}") | |
| else: | |
| print("⚠️ No imputer found — skipping median imputation.") | |
| # Scaler | |
| scaler, scaler_path, scaler_err = load_joblib_if_exists(SCALER_CANDIDATES) | |
| if scaler_path and scaler_err: | |
| print(f"⚠️ Failed to load scaler from {scaler_path}: {scaler_err}") | |
| elif scaler: | |
| print(f"Loaded scaler from {scaler_path}") | |
| else: | |
| print("⚠️ No scaler found — using manual z-scoring if stats are available.") | |
| # Stats (means/std) for fallback manual z-score | |
| stats: Dict[str, Dict[str, float]] = {} | |
| if os.path.isfile(STATS_PATH): | |
| stats = load_json(STATS_PATH) | |
| print(f"Loaded means/std from {STATS_PATH}") | |
| else: | |
| print("⚠️ No means_std.json found — manual z-scoring will be unavailable if scaler missing.") | |
| # --------- decoding for CORAL vs softmax --------- | |
| def coral_probs_from_logits(logits_np: np.ndarray) -> np.ndarray: | |
| """ | |
| (N, K-1) logits -> (N, K) probabilities for CORAL ordinal output. | |
| """ | |
| logits = tf.convert_to_tensor(logits_np, dtype=tf.float32) | |
| sig = tf.math.sigmoid(logits) # (N, K-1) | |
| left = tf.concat([tf.ones_like(sig[:, :1]), sig], axis=1) | |
| right = tf.concat([sig, tf.zeros_like(sig[:, :1])], axis=1) | |
| probs = tf.clip_by_value(left - right, 1e-12, 1.0) | |
| # normalize row-wise just in case | |
| probs = probs / tf.reduce_sum(probs, axis=1, keepdims=True) | |
| return probs.numpy() | |
| def decode_logits(raw: np.ndarray) -> (np.ndarray, str): | |
| """ | |
| raw: (1, M) array | |
| Returns (probs (K,), mode_str). | |
| Detects CORAL (M=K-1) vs Softmax (M=K). | |
| """ | |
| if raw.ndim != 2: | |
| raise ValueError(f"Unexpected raw shape {raw.shape}") | |
| M = raw.shape[1] | |
| K = len(CLASSES) | |
| if M == K - 1: | |
| # CORAL logits | |
| probs = coral_probs_from_logits(raw)[0] | |
| return probs, "auto_coral" | |
| elif M == K: | |
| # Softmax or unnormalized scores | |
| row = raw[0] | |
| exps = np.exp(row - np.max(row)) | |
| probs = exps / np.sum(exps) | |
| return probs, "auto_softmax" | |
| else: | |
| # Fallback: normalize across whatever is there | |
| row = raw[0] | |
| s = float(np.sum(np.abs(row))) | |
| probs = (row / s) if s > 0 else np.ones_like(row) / len(row) | |
| return probs, f"fallback_M{M}_K{K}" | |
| # --------- preprocessing pipeline --------- | |
| def build_raw_vector(payload: Dict[str, Any]) -> np.ndarray: | |
| """ | |
| Build raw feature vector in exact training order. | |
| Missing -> np.nan (imputer will handle if available). | |
| Values coerced to float robustly. | |
| """ | |
| vals = [] | |
| for f in FEATURES: | |
| if f in payload: | |
| try: | |
| vals.append(coerce_float(payload[f])) | |
| except Exception: | |
| vals.append(np.nan) | |
| else: | |
| vals.append(np.nan) | |
| return np.array(vals, dtype=np.float32) | |
| def apply_imputer_if_any(x: np.ndarray) -> np.ndarray: | |
| if imputer is not None: | |
| # imputer expects 2D | |
| return imputer.transform(x.reshape(1, -1)).astype(np.float32)[0] | |
| # fallback: replace NaNs with feature means from stats if available, else 0 | |
| out = x.copy() | |
| for i, f in enumerate(FEATURES): | |
| if np.isnan(out[i]): | |
| if f in stats and "mean" in stats[f]: | |
| out[i] = float(stats[f]["mean"]) | |
| else: | |
| out[i] = 0.0 | |
| return out | |
| def apply_scaling_or_stats(raw_vec: np.ndarray) -> (np.ndarray, Dict[str, float], str): | |
| """ | |
| Returns (z_vec, z_detail_dict, mode_str) | |
| - If scaler present: scaler.transform | |
| - Else: manual (x-mean)/std using stats | |
| """ | |
| if scaler is not None: | |
| z = scaler.transform(raw_vec.reshape(1, -1)).astype(np.float32)[0] | |
| z_detail = {f: float(z[i]) for i, f in enumerate(FEATURES)} | |
| return z, z_detail, "sklearn_scaler" | |
| else: | |
| z = np.zeros_like(raw_vec, dtype=np.float32) | |
| z_detail: Dict[str, float] = {} | |
| for i, f in enumerate(FEATURES): | |
| mean = stats.get(f, {}).get("mean", 0.0) | |
| sd = stats.get(f, {}).get("std", 1.0) | |
| if not sd: | |
| sd = 1.0 | |
| z[i] = (raw_vec[i] - mean) / sd | |
| z_detail[f] = float(z[i]) | |
| return z, z_detail, "manual_stats" | |
| # --------- SHAP model wrapper & explainer --------- | |
| def model_proba_from_z(z_batch_np: np.ndarray) -> np.ndarray: | |
| """ | |
| Wrapper for SHAP: takes (N, n_features) in z-space and returns (N, K) probabilities. | |
| """ | |
| raw = model.predict(z_batch_np, verbose=0) | |
| if raw.ndim != 2: | |
| raise ValueError(f"Unexpected raw shape from model: {raw.shape}") | |
| N, M = raw.shape | |
| K = len(CLASSES) | |
| if M == K - 1: | |
| # CORAL | |
| probs = coral_probs_from_logits(raw) # (N, K) | |
| elif M == K: | |
| # Softmax or scores | |
| exps = np.exp(raw - np.max(raw, axis=1, keepdims=True)) | |
| probs = exps / np.sum(exps, axis=1, keepdims=True) | |
| else: | |
| # Fallback normalize | |
| s = np.sum(np.abs(raw), axis=1, keepdims=True) | |
| probs = np.divide(raw, s, out=np.ones_like(raw) / max(M, 1), where=(s > 0)) | |
| return probs | |
| EXPLAINER = None | |
| if SHAP_AVAILABLE: | |
| try: | |
| # Background: 50 "average" institutions at z=0 | |
| BACKGROUND_Z = np.zeros((50, len(FEATURES)), dtype=np.float32) | |
| EXPLAINER = shap.KernelExplainer(model_proba_from_z, BACKGROUND_Z) | |
| print("SHAP KernelExplainer initialized.") | |
| except Exception as e: | |
| EXPLAINER = None | |
| print("⚠️ Failed to initialize SHAP explainer:", repr(e)) | |
| else: | |
| print("SHAP not installed; explanations disabled.") | |
| # ----------------- FastAPI ----------------- | |
| app = FastAPI(title="Static Fingerprint API", version="1.2.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def root(): | |
| return { | |
| "message": "Static Fingerprint API is running.", | |
| "try": ["GET /health", "POST /predict", "POST /debug/z"], | |
| } | |
| def health(): | |
| stats_keys = [] | |
| try: | |
| if os.path.isfile(STATS_PATH): | |
| stats_keys = list(load_json(STATS_PATH).keys()) | |
| except Exception: | |
| pass | |
| return { | |
| "status": "ok", | |
| "classes": CLASSES, | |
| "features_training_order": FEATURES, | |
| "features_in_means_std": stats_keys, | |
| "model_file": MODEL_PATH, | |
| "imputer": bool(imputer), | |
| "scaler": bool(scaler), | |
| "stats_available": bool(stats), | |
| "shap_available": bool(EXPLAINER is not None), | |
| } | |
| async def debug_z(req: Request): | |
| try: | |
| payload = await req.json() | |
| if not isinstance(payload, dict): | |
| return JSONResponse(status_code=400, content={"error": "Expected JSON object"}) | |
| raw = build_raw_vector(payload) | |
| raw_imp = apply_imputer_if_any(raw) | |
| z, z_detail, mode = apply_scaling_or_stats(raw_imp) | |
| rows = [] | |
| for i, f in enumerate(FEATURES): | |
| rows.append({ | |
| "feature": f, | |
| "input_value": None if np.isnan(raw[i]) else float(raw[i]), | |
| "imputed_value": float(raw_imp[i]), | |
| "z": float(z[i]), | |
| "mean": stats.get(f, {}).get("mean", None), | |
| "std": stats.get(f, {}).get("std", None), | |
| }) | |
| return {"preprocess_mode": mode, "rows": rows} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e), "trace": traceback.format_exc()}) | |
| async def predict(req: Request): | |
| """ | |
| Body: JSON object mapping feature -> numeric value (strings with commas/points ok). | |
| Missing features are imputed if imputer present; else filled with means (if stats) or 0. | |
| Returns: | |
| - probabilities per state | |
| - predicted_state | |
| - z_scores (per feature, after imputation & scaling pipeline) | |
| - shap: per-class explanations if available | |
| """ | |
| try: | |
| payload = await req.json() | |
| if not isinstance(payload, dict): | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Expected JSON object"}, | |
| ) | |
| # ---------- 1) Preprocess: raw -> imputed -> z ---------- | |
| raw_vec = build_raw_vector(payload) # (21,) may contain NaNs | |
| raw_imp = apply_imputer_if_any(raw_vec) # impute missing | |
| z_vec, z_detail, z_mode = apply_scaling_or_stats(raw_imp) | |
| # ---------- 2) Model prediction ---------- | |
| X = z_vec.reshape(1, -1).astype(np.float32) | |
| raw_logits = model.predict(X, verbose=0) | |
| probs, decode_mode = decode_logits(raw_logits) | |
| pred_idx = int(np.argmax(probs)) | |
| probs_dict = {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))} | |
| missing = [f for i, f in enumerate(FEATURES) if np.isnan(raw_vec[i])] | |
| # ---------- 3) SHAP explanations (all classes) ---------- | |
| shap_block: Dict[str, Any] = {"available": False} | |
| if EXPLAINER is not None and SHAP_AVAILABLE: | |
| try: | |
| X_z = z_vec.reshape(1, -1).astype(np.float32) | |
| shap_vals = EXPLAINER.shap_values(X_z, nsamples=50) | |
| all_classes: Dict[str, Dict[str, float]] = {} | |
| # ---------- CASE 1: SHAP returns list (usual multi-class) ---------- | |
| if isinstance(shap_vals, list): | |
| for k, class_name in enumerate(CLASSES): | |
| if k >= len(shap_vals): | |
| continue | |
| arr = np.array(shap_vals[k], dtype=float) # shape (N, D) or (D,) | |
| # reduce to a 1D (D,) vector for the first sample | |
| if arr.ndim == 2 and arr.shape[0] >= 1 and arr.shape[1] == len(FEATURES): | |
| vec = arr[0, :] | |
| elif arr.ndim == 1 and arr.shape[0] == len(FEATURES): | |
| vec = arr | |
| else: | |
| # shape we don't know how to handle for this class | |
| continue | |
| all_classes[class_name] = { | |
| FEATURES[i]: float(vec[i]) for i in range(len(FEATURES)) | |
| } | |
| if all_classes: | |
| shap_block = { | |
| "available": True, | |
| "mode": "per_class", | |
| "explained_classes": list(all_classes.keys()), | |
| "all_classes": all_classes, | |
| } | |
| else: | |
| shap_block = { | |
| "available": False, | |
| "error": "No per-class SHAP vectors matched expected shape.", | |
| } | |
| # ---------- CASE 2: SHAP returns a numpy array ---------- | |
| else: | |
| arr = np.array(shap_vals, dtype=float) | |
| # (1, D, K) <-- THIS IS YOUR (1, 21, 5) CASE | |
| if ( | |
| arr.ndim == 3 | |
| and arr.shape[0] == 1 | |
| and arr.shape[1] == len(FEATURES) | |
| and arr.shape[2] == len(CLASSES) | |
| ): | |
| # first sample, loop over classes on last axis | |
| for k, class_name in enumerate(CLASSES): | |
| vec = arr[0, :, k] # (D,) | |
| all_classes[class_name] = { | |
| FEATURES[i]: float(vec[i]) for i in range(len(FEATURES)) | |
| } | |
| shap_block = { | |
| "available": True, | |
| "mode": "per_class", | |
| "explained_classes": list(all_classes.keys()), | |
| "all_classes": all_classes, | |
| } | |
| # (1, K, D) | |
| elif ( | |
| arr.ndim == 3 | |
| and arr.shape[0] == 1 | |
| and arr.shape[1] == len(CLASSES) | |
| and arr.shape[2] == len(FEATURES) | |
| ): | |
| for k, class_name in enumerate(CLASSES): | |
| vec = arr[0, k, :] # (D,) | |
| all_classes[class_name] = { | |
| FEATURES[i]: float(vec[i]) for i in range(len(FEATURES)) | |
| } | |
| shap_block = { | |
| "available": True, | |
| "mode": "per_class", | |
| "explained_classes": list(all_classes.keys()), | |
| "all_classes": all_classes, | |
| } | |
| # (K, D) | |
| elif ( | |
| arr.ndim == 2 | |
| and arr.shape[0] == len(CLASSES) | |
| and arr.shape[1] == len(FEATURES) | |
| ): | |
| for k, class_name in enumerate(CLASSES): | |
| vec = arr[k, :] # (D,) | |
| all_classes[class_name] = { | |
| FEATURES[i]: float(vec[i]) for i in range(len(FEATURES)) | |
| } | |
| shap_block = { | |
| "available": True, | |
| "mode": "per_class", | |
| "explained_classes": list(all_classes.keys()), | |
| "all_classes": all_classes, | |
| } | |
| # Single-vector fallback: (1, D) or (D,) | |
| elif arr.ndim == 2 and arr.shape[0] == 1 and arr.shape[1] == len(FEATURES): | |
| vec = arr[0, :] # (D,) | |
| shap_block = { | |
| "available": True, | |
| "mode": "single_class", | |
| "explained_class": CLASSES[pred_idx], | |
| "values": { | |
| FEATURES[i]: float(vec[i]) for i in range(len(FEATURES)) | |
| }, | |
| } | |
| elif arr.ndim == 1 and arr.shape[0] == len(FEATURES): | |
| vec = arr # (D,) | |
| shap_block = { | |
| "available": True, | |
| "mode": "single_class", | |
| "explained_class": CLASSES[pred_idx], | |
| "values": { | |
| FEATURES[i]: float(vec[i]) for i in range(len(FEATURES)) | |
| }, | |
| } | |
| else: | |
| shap_block = { | |
| "available": False, | |
| "error": f"Unexpected SHAP array shape {arr.shape}", | |
| } | |
| except Exception as e: | |
| shap_block = { | |
| "available": False, | |
| "error": str(e), | |
| "trace": traceback.format_exc(), | |
| } | |
| # ---------- 4) Build response ---------- | |
| return { | |
| "input_ok": (len(missing) == 0), | |
| "missing": missing, | |
| "preprocess": { | |
| "imputer": bool(imputer), | |
| "scaler": bool(scaler), | |
| "z_mode": z_mode, | |
| }, | |
| "z_scores": z_detail, # per feature | |
| "probabilities": probs_dict, # per state | |
| "predicted_state": CLASSES[pred_idx], | |
| "shap": shap_block, | |
| "debug": { | |
| "raw_shape": list(raw_logits.shape), | |
| "decode_mode": decode_mode, | |
| "raw_first_row": [float(v) for v in raw_logits[0]], | |
| }, | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": str(e), "trace": traceback.format_exc()}, | |
| ) | |
| # ============================================================ | |
| # CORAL ORDINAL HELPERS (from training script) | |
| # (we do NOT redefine coral_probs_from_logits here to avoid | |
| # clashing with the one already used by decode_logits) | |
| # ============================================================ | |
| def to_cumulative_targets_tf(y_true_int, K_): | |
| """ | |
| y_true_int: (N,) integer targets 0..K-1 | |
| returns (N, K_-1) with t_k = 1[y >= k], k = 1..K-1 | |
| """ | |
| y = tf.reshape(y_true_int, [-1]) | |
| y = tf.cast(y, tf.int32) | |
| thresholds = tf.range(1, K_, dtype=tf.int32) | |
| T = tf.cast(tf.greater_equal(y[:, None], thresholds[None, :]), tf.float32) | |
| return T | |
| def coral_loss_tf(y_true, logits): | |
| """ | |
| CORAL ordinal loss implemented in TF: | |
| y_true: (N,) or (N,1) with integer labels 0..K-1 | |
| logits: (N, K-1) | |
| """ | |
| y_true = tf.reshape(y_true, [-1]) | |
| y_true = tf.cast(y_true, tf.int32) | |
| T = to_cumulative_targets_tf(y_true, len(CLASSES)) # (N, K-1) | |
| bce = tf.nn.sigmoid_cross_entropy_with_logits(labels=T, logits=logits) | |
| return tf.reduce_mean(tf.reduce_sum(bce, axis=1)) | |
| # ---------- TF helper (pure TF CORAL probs) ---------- | |
| def _coral_probs_from_logits_tf(logits_tf: tf.Tensor) -> tf.Tensor: | |
| """ | |
| Pure-TF version of CORAL probability transform, used in metric. | |
| logits_tf: (N, K-1) | |
| returns (N, K) probabilities | |
| """ | |
| sig = tf.math.sigmoid(logits_tf) | |
| left = tf.concat([tf.ones_like(sig[:, :1]), sig], axis=1) | |
| right = tf.concat([sig, tf.zeros_like(sig[:, :1])], axis=1) | |
| probs = tf.clip_by_value(left - right, 1e-12, 1.0) | |
| return probs | |
| def ordinal_accuracy_metric(y_true, y_pred_logits): | |
| """ | |
| Exact class accuracy for CORAL outputs (same idea as training script). | |
| """ | |
| y_true = tf.reshape(y_true, [-1]) | |
| y_true = tf.cast(y_true, tf.int32) | |
| probs = _coral_probs_from_logits_tf(y_pred_logits) | |
| y_pred = tf.argmax(probs, axis=1, output_type=tf.int32) | |
| return tf.reduce_mean(tf.cast(tf.equal(y_true, y_pred), tf.float32)) | |
| # ============================================================ | |
| # IMPORTS FOR RETRAINING / DATA MGMT | |
| # (Ok to import here; Python allows imports anywhere in file) | |
| # ============================================================ | |
| import pandas as pd | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.preprocessing import StandardScaler | |
| # ============================================================ | |
| # LETTER → 5-CLASS GROUP MAPPING (same logic as training code) | |
| # ============================================================ | |
| def letter_to_group(letter: str): | |
| """ | |
| Converts raw rating letters (AAA, A-, BBB+, BB-, etc.) | |
| into the 5 ordinal groups used by the model: | |
| Top, Mid-Top, Mid, Mid-Low, Low | |
| """ | |
| if letter is None: | |
| return None | |
| s = str(letter).strip().upper() | |
| if s == "": | |
| return None | |
| # Normalise duals like "AA / AA+" by taking the stronger one | |
| s_clean = s.replace(" ", "") | |
| if "/" in s_clean: | |
| order = [ | |
| "E","D","C-","C","C+", | |
| "B-","B","B+","BB-","BB","BB+", | |
| "BBB-","BBB","BBB+", | |
| "A-","A","A+", | |
| "AA-","AA","AA+", | |
| "AAA-","AAA" | |
| ] | |
| parts = [p for p in s_clean.split("/") if p] | |
| idxs = [order.index(p) for p in parts if p in order] | |
| if idxs: | |
| s = order[max(idxs)] # stronger (higher index) | |
| else: | |
| s = parts[0] | |
| # Group boundaries (as in your training script) | |
| g1 = {"AAA","AAA-","AA+","AA"} # Top | |
| g2 = {"AA-","A+","A","A-"} # Mid-Top | |
| g3 = {"BBB+","BBB","BBB-","BB+"} # Mid | |
| g4 = {"BB","BB-","B+","B","B-"} # Mid-Low | |
| g5 = {"C+","C","C-","D","E"} # Low | |
| if s in g1: return "Top" | |
| if s in g2: return "Mid-Top" | |
| if s in g3: return "Mid" | |
| if s in g4: return "Mid-Low" | |
| if s in g5: return "Low" | |
| return None | |
| # ============================================================ | |
| # RECREATE MODEL FROM BEST HYPERPARAMETERS | |
| # ============================================================ | |
| def build_model_from_hparams(hp: dict): | |
| """ | |
| Rebuilds the CORAL DNN with the same structure & hyperparameters | |
| as in your training script. | |
| """ | |
| inputs = tf.keras.Input(shape=(len(FEATURES),)) | |
| x = inputs | |
| n_hidden = hp["n_hidden"] | |
| use_bn = hp["batchnorm"] | |
| act = hp["activation"] | |
| l2_reg = hp["l2"] | |
| for i in range(1, n_hidden + 1): | |
| units = hp[f"units_{i}"] | |
| drop = hp[f"dropout_{i}"] | |
| x = tf.keras.layers.Dense( | |
| units, | |
| activation=act, | |
| kernel_regularizer=tf.keras.regularizers.l2(l2_reg) | |
| )(x) | |
| if use_bn: | |
| x = tf.keras.layers.BatchNormalization()(x) | |
| if drop > 0: | |
| x = tf.keras.layers.Dropout(drop)(x) | |
| # CORAL output: K-1 logits (K = len(CLASSES)) | |
| outputs = tf.keras.layers.Dense(len(CLASSES) - 1, activation=None)(x) | |
| model = tf.keras.Model(inputs, outputs) | |
| model.compile( | |
| optimizer=tf.keras.optimizers.Adam(learning_rate=hp["lr"]), | |
| loss=coral_loss_tf, | |
| metrics=[ordinal_accuracy_metric], | |
| ) | |
| return model | |
| # ============================================================ | |
| # RETRAINING LOGIC + DATASET MGMT | |
| # ============================================================ | |
| FINGERPRINT_CSV = "fingerprints_db.csv" # master DB file | |
| BEST_HP_JSON = "best_params_and_metrics.json" # hyperparams JSON | |
| def load_best_hparams(): | |
| """ | |
| Loads best hyperparameters from your tuning JSON. | |
| Expects JSON to contain key "best_hyperparams". | |
| """ | |
| with open(BEST_HP_JSON, "r") as f: | |
| js = json.load(f) | |
| return js["best_hyperparams"] | |
| def load_fingerprint_dataset(): | |
| """ | |
| Loads the full fingerprint DB from FINGERPRINT_CSV. | |
| Expected columns (at minimum): | |
| - QTR | |
| - COMPANY | |
| - Supervisor | |
| - RATING_RAW | |
| - 21 ratio features named exactly as in FEATURES | |
| - rating_score (can be ignored for training) | |
| We: | |
| - derive RATING_GROUP (Top/Mid-Top/...) from RATING_RAW if missing | |
| - drop rows with RATING_GROUP = NaN | |
| - impute missing feature values with median | |
| - scale with StandardScaler | |
| """ | |
| df = pd.read_csv(FINGERPRINT_CSV) | |
| # Derive 5-class group if not already present | |
| if "RATING_GROUP" not in df.columns: | |
| df["RATING_GROUP"] = df["RATING_RAW"].apply(letter_to_group) | |
| df = df[df["RATING_GROUP"].notna()].copy() | |
| # y labels 0..4 | |
| class_to_id = {c: i for i, c in enumerate(CLASSES)} | |
| y = df["RATING_GROUP"].map(class_to_id).astype("int32").to_numpy() | |
| # X features | |
| X_raw = df[FEATURES].to_numpy().astype("float32") | |
| # Fit fresh imputer + scaler on full dataset | |
| imp = SimpleImputer(strategy="median") | |
| sc = StandardScaler() | |
| X_imp = imp.fit_transform(X_raw) | |
| X_sc = sc.fit_transform(X_imp).astype("float32") | |
| return X_sc, y, imp, sc | |
| def retrain_model(): | |
| """ | |
| Retrains the model on the current fingerprints_db.csv | |
| using the fixed best hyperparameters. | |
| - Rebuilds the model | |
| - Fits on full (X_sc, y) | |
| - Updates global model/imputer/scaler | |
| - Rebuilds SHAP explainer to stay in sync | |
| """ | |
| print(">>> RETRAIN: loading dataset") | |
| hp = load_best_hparams() | |
| X, y, imp, sc = load_fingerprint_dataset() | |
| print(">>> RETRAIN: building model from best hparams") | |
| model_new = build_model_from_hparams(hp) | |
| print(">>> RETRAIN: fitting on fingerprint DB") | |
| es = tf.keras.callbacks.EarlyStopping( | |
| monitor="loss", | |
| patience=15, | |
| restore_best_weights=True, | |
| verbose=1 | |
| ) | |
| model_new.fit( | |
| X, y, | |
| epochs=150, | |
| batch_size=128, | |
| callbacks=[es], | |
| verbose=1, | |
| ) | |
| # Update global model + preprocessors used by /predict | |
| global model, imputer, scaler | |
| model = model_new | |
| imputer = imp | |
| scaler = sc | |
| # Rebuild SHAP explainer so explanations match new model | |
| global EXPLAINER | |
| if SHAP_AVAILABLE: | |
| try: | |
| BACKGROUND_Z = np.zeros((50, len(FEATURES)), dtype=np.float32) | |
| EXPLAINER = shap.KernelExplainer(model_proba_from_z, BACKGROUND_Z) | |
| print("SHAP explainer rebuilt after retrain.") | |
| except Exception as e: | |
| EXPLAINER = None | |
| print("⚠️ Failed to rebuild SHAP explainer:", repr(e)) | |
| print(">>> RETRAIN COMPLETE") | |
| return True | |
| # ============================================================ | |
| # API ENDPOINT: APPEND + RETRAIN | |
| # ============================================================ | |
| def append_and_retrain(payload: dict): | |
| """ | |
| Appends a new fingerprint row to fingerprints_db.csv | |
| and retrains the model. | |
| Expected payload: | |
| { | |
| "qtr": "2014Q4", | |
| "company": "COAC Ambato Ltda", | |
| "supervisor": "SEPS", | |
| "rating_raw": "B", | |
| "features": { | |
| "autosuf_oper": 0.536154555, | |
| "improductiva": null, | |
| "gastos_fin_over_avg_cart": 1.200803646, | |
| "_equity": ..., | |
| ... | |
| "roa_pre_tax": 1.580296249 | |
| } | |
| } | |
| - rating_raw is the letter rating (AAA, A-, BBB+, BB-, ...) | |
| - we derive RATING_GROUP (Top / Mid-Top / Mid / Mid-Low / Low) | |
| using the same logic as in the training script. | |
| """ | |
| qtr = payload.get("qtr") | |
| company = payload.get("company") | |
| supervisor = payload.get("supervisor") | |
| rating_raw = payload.get("rating_raw") | |
| feats = payload.get("features", {}) | |
| if not qtr or not company or not rating_raw: | |
| return {"ok": False, "error": "Missing qtr/company/rating_raw"} | |
| if set(feats.keys()) != set(FEATURES): | |
| return {"ok": False, "error": "features must contain all 21 ratio names"} | |
| rating_group = letter_to_group(rating_raw) | |
| if rating_group is None: | |
| return {"ok": False, "error": f"Cannot map rating_raw '{rating_raw}' to 5-class group"} | |
| # Build new row matching your CSV schema | |
| row = { | |
| "QTR": qtr, | |
| "COMPANY": company, | |
| "Supervisor": supervisor, | |
| "RATING_RAW": rating_raw, | |
| "RATING_GROUP": rating_group, | |
| **feats, | |
| "rating_score": None # optional, can be filled later | |
| } | |
| # Append row to CSV | |
| if os.path.exists(FINGERPRINT_CSV): | |
| df = pd.read_csv(FINGERPRINT_CSV) | |
| df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) | |
| else: | |
| df = pd.DataFrame([row]) | |
| df.to_csv(FINGERPRINT_CSV, index=False) | |
| # Retrain model on full updated DB | |
| retrain_model() | |
| return {"ok": True, "message": "Fingerprint appended and model retrained"} | |
| def debug_db_head(n: int = 5): | |
| import os | |
| import pandas as pd | |
| if not os.path.exists(FINGERPRINT_CSV): | |
| return { | |
| "exists": False, | |
| "message": f"{FINGERPRINT_CSV} not found in current working dir." | |
| } | |
| df = pd.read_csv(FINGERPRINT_CSV) | |
| return { | |
| "exists": True, | |
| "file": FINGERPRINT_CSV, | |
| "rows": int(len(df)), | |
| "head": df.head(n).to_dict(orient="records"), | |
| "columns": list(df.columns), | |
| } | |
| import pandas as pd # make sure this is at the top of the file if not already | |
| # from here: after append_and_retrain | |
| def debug_db_tail(n: int = 10): | |
| """ | |
| Returns the last n rows of fingerprints_db.csv so you can verify | |
| that new points are really being appended inside the container. | |
| """ | |
| if not os.path.exists(FINGERPRINT_CSV): | |
| return {"ok": False, "error": f"{FINGERPRINT_CSV} not found"} | |
| try: | |
| df = pd.read_csv(FINGERPRINT_CSV) | |
| except Exception as e: | |
| return {"ok": False, "error": f"Failed to read CSV: {e}"} | |
| tail = df.tail(n) | |
| return { | |
| "ok": True, | |
| "rows": tail.to_dict(orient="records"), | |
| "n_rows_total": int(df.shape[0]), | |
| "n_returned": int(tail.shape[0]), | |
| } |