SF_FastAPI / app.py
COCODEDE04's picture
Update app.py
b8d0ae8 verified
raw
history blame
15.1 kB
import os, json, io, traceback
from typing import Any, Dict, List, Optional
import numpy as np
import tensorflow as tf
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
# ---------- SHAP optional import ----------
try:
import shap
SHAP_AVAILABLE = True
except ImportError:
SHAP_AVAILABLE = False
# ----------------- CONFIG -----------------
MODEL_PATH = os.getenv("MODEL_PATH", "best_model.h5")
STATS_PATH = os.getenv("STATS_PATH", "means_std.json")
IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"]
SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"]
CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"]
# ⛔ DO NOT CHANGE: exact order used in training
FEATURES: List[str] = [
"autosuf_oper",
"improductiva",
"gastos_fin_over_avg_cart",
"_equity",
"grado_absorcion",
"_cartera_bruta",
"gastos_oper_over_ing_oper",
"cartera_vencida_ratio",
"roe_pre_tax",
"_assets",
"_liab",
"equity_over_assets",
"_margen_bruto",
"prov_over_cartera",
"gastos_oper_over_cart",
"ing_cartera_over_ing_total",
"debt_to_equity",
"prov_gasto_over_cart",
"cov_improductiva",
"rend_cart_over_avg_cart",
"roa_pre_tax",
]
# ------------------------------------------
# --------- helpers: I/O + numeric coercion ---------
def coerce_float(val: Any) -> float:
"""
Accepts numeric, or strings like:
"49.709,14" -> 49709.14
"49,709.14" -> 49709.14
"0,005" -> 0.005
"""
if isinstance(val, (int, float, np.number)):
return float(val)
s = str(val).strip()
if s == "":
raise ValueError("empty")
s = s.replace(" ", "")
has_dot, has_comma = "." in s, "," in s
if has_dot and has_comma:
# Decide decimal by last occurrence
if s.rfind(",") > s.rfind("."):
s = s.replace(".", "")
s = s.replace(",", ".")
else:
s = s.replace(",", "")
elif has_comma and not has_dot:
s = s.replace(",", ".")
# else leave as-is
return float(s)
def load_json(path: str) -> dict:
with open(path, "r") as f:
return json.load(f)
def load_joblib_if_exists(candidates: List[str]):
"""
Try loading a joblib/pickle artifact (imputer/scaler).
Returns (obj, path_str or None, error_str or None).
"""
for name in candidates:
p = os.path.join(os.getcwd(), name)
if os.path.isfile(p):
try:
# Import inside to avoid hard dependency if not used
import joblib # type: ignore
with open(p, "rb") as fh:
obj = joblib.load(fh)
return obj, p, None
except Exception as e:
return None, p, f"{type(e).__name__}({e})"
return None, None, None
# --------- model / artifacts load ---------
print("Loading model / imputer / scaler...")
# Model
model = tf.keras.models.load_model(MODEL_PATH, compile=False)
# Imputer
imputer, imputer_path, imputer_err = load_joblib_if_exists(IMPUTER_CANDIDATES)
if imputer_path and imputer_err:
print(f"⚠️ Failed to load imputer from {imputer_path}: {imputer_err}")
elif imputer:
print(f"Loaded imputer from {imputer_path}")
else:
print("⚠️ No imputer found — skipping median imputation.")
# Scaler
scaler, scaler_path, scaler_err = load_joblib_if_exists(SCALER_CANDIDATES)
if scaler_path and scaler_err:
print(f"⚠️ Failed to load scaler from {scaler_path}: {scaler_err}")
elif scaler:
print(f"Loaded scaler from {scaler_path}")
else:
print("⚠️ No scaler found — using manual z-scoring if stats are available.")
# Stats (means/std) for fallback manual z-score
stats: Dict[str, Dict[str, float]] = {}
if os.path.isfile(STATS_PATH):
stats = load_json(STATS_PATH)
print(f"Loaded means/std from {STATS_PATH}")
else:
print("⚠️ No means_std.json found — manual z-scoring will be unavailable if scaler missing.")
# --------- decoding for CORAL vs softmax ---------
def coral_probs_from_logits(logits_np: np.ndarray) -> np.ndarray:
"""
(N, K-1) logits -> (N, K) probabilities for CORAL ordinal output.
"""
logits = tf.convert_to_tensor(logits_np, dtype=tf.float32)
sig = tf.math.sigmoid(logits) # (N, K-1)
left = tf.concat([tf.ones_like(sig[:, :1]), sig], axis=1)
right = tf.concat([sig, tf.zeros_like(sig[:, :1])], axis=1)
probs = tf.clip_by_value(left - right, 1e-12, 1.0)
# normalize row-wise just in case
probs = probs / tf.reduce_sum(probs, axis=1, keepdims=True)
return probs.numpy()
def decode_logits(raw: np.ndarray) -> (np.ndarray, str):
"""
raw: (1, M) array
Returns (probs (K,), mode_str).
Detects CORAL (M=K-1) vs Softmax (M=K).
"""
if raw.ndim != 2:
raise ValueError(f"Unexpected raw shape {raw.shape}")
M = raw.shape[1]
K = len(CLASSES)
if M == K - 1:
# CORAL logits
probs = coral_probs_from_logits(raw)[0]
return probs, "auto_coral"
elif M == K:
# Softmax or unnormalized scores
row = raw[0]
exps = np.exp(row - np.max(row))
probs = exps / np.sum(exps)
return probs, "auto_softmax"
else:
# Fallback: normalize across whatever is there
row = raw[0]
s = float(np.sum(np.abs(row)))
probs = (row / s) if s > 0 else np.ones_like(row) / len(row)
return probs, f"fallback_M{M}_K{K}"
# --------- preprocessing pipeline ---------
def build_raw_vector(payload: Dict[str, Any]) -> np.ndarray:
"""
Build raw feature vector in exact training order.
Missing -> np.nan (imputer will handle if available).
Values coerced to float robustly.
"""
vals = []
for f in FEATURES:
if f in payload:
try:
vals.append(coerce_float(payload[f]))
except Exception:
vals.append(np.nan)
else:
vals.append(np.nan)
return np.array(vals, dtype=np.float32)
def apply_imputer_if_any(x: np.ndarray) -> np.ndarray:
if imputer is not None:
# imputer expects 2D
return imputer.transform(x.reshape(1, -1)).astype(np.float32)[0]
# fallback: replace NaNs with feature means from stats if available, else 0
out = x.copy()
for i, f in enumerate(FEATURES):
if np.isnan(out[i]):
if f in stats and "mean" in stats[f]:
out[i] = float(stats[f]["mean"])
else:
out[i] = 0.0
return out
def apply_scaling_or_stats(raw_vec: np.ndarray) -> (np.ndarray, Dict[str, float], str):
"""
Returns (z_vec, z_detail_dict, mode_str)
- If scaler present: scaler.transform
- Else: manual (x-mean)/std using stats
"""
if scaler is not None:
z = scaler.transform(raw_vec.reshape(1, -1)).astype(np.float32)[0]
z_detail = {f: float(z[i]) for i, f in enumerate(FEATURES)}
return z, z_detail, "sklearn_scaler"
else:
z = np.zeros_like(raw_vec, dtype=np.float32)
z_detail: Dict[str, float] = {}
for i, f in enumerate(FEATURES):
mean = stats.get(f, {}).get("mean", 0.0)
sd = stats.get(f, {}).get("std", 1.0)
if not sd:
sd = 1.0
z[i] = (raw_vec[i] - mean) / sd
z_detail[f] = float(z[i])
return z, z_detail, "manual_stats"
# --------- SHAP model wrapper & explainer ---------
def model_proba_from_z(z_batch_np: np.ndarray) -> np.ndarray:
"""
Wrapper for SHAP: takes (N, n_features) in z-space and returns (N, K) probabilities.
"""
raw = model.predict(z_batch_np, verbose=0)
if raw.ndim != 2:
raise ValueError(f"Unexpected raw shape from model: {raw.shape}")
N, M = raw.shape
K = len(CLASSES)
if M == K - 1:
# CORAL
probs = coral_probs_from_logits(raw) # (N, K)
elif M == K:
# Softmax or scores
exps = np.exp(raw - np.max(raw, axis=1, keepdims=True))
probs = exps / np.sum(exps, axis=1, keepdims=True)
else:
# Fallback normalize
s = np.sum(np.abs(raw), axis=1, keepdims=True)
probs = np.divide(raw, s, out=np.ones_like(raw) / max(M, 1), where=(s > 0))
return probs
EXPLAINER = None
if SHAP_AVAILABLE:
try:
# Background: 50 "average" institutions at z=0
BACKGROUND_Z = np.zeros((50, len(FEATURES)), dtype=np.float32)
EXPLAINER = shap.KernelExplainer(model_proba_from_z, BACKGROUND_Z)
print("SHAP KernelExplainer initialized.")
except Exception as e:
EXPLAINER = None
print("⚠️ Failed to initialize SHAP explainer:", repr(e))
else:
print("SHAP not installed; explanations disabled.")
# ----------------- FastAPI -----------------
app = FastAPI(title="Static Fingerprint API", version="1.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root():
return {
"message": "Static Fingerprint API is running.",
"try": ["GET /health", "POST /predict", "POST /debug/z"],
}
@app.get("/health")
def health():
stats_keys = []
try:
if os.path.isfile(STATS_PATH):
stats_keys = list(load_json(STATS_PATH).keys())
except Exception:
pass
return {
"status": "ok",
"classes": CLASSES,
"features_training_order": FEATURES,
"features_in_means_std": stats_keys,
"model_file": MODEL_PATH,
"imputer": bool(imputer),
"scaler": bool(scaler),
"stats_available": bool(stats),
"shap_available": bool(EXPLAINER is not None),
}
@app.post("/debug/z")
async def debug_z(req: Request):
try:
payload = await req.json()
if not isinstance(payload, dict):
return JSONResponse(status_code=400, content={"error": "Expected JSON object"})
raw = build_raw_vector(payload)
raw_imp = apply_imputer_if_any(raw)
z, z_detail, mode = apply_scaling_or_stats(raw_imp)
rows = []
for i, f in enumerate(FEATURES):
rows.append({
"feature": f,
"input_value": None if np.isnan(raw[i]) else float(raw[i]),
"imputed_value": float(raw_imp[i]),
"z": float(z[i]),
"mean": stats.get(f, {}).get("mean", None),
"std": stats.get(f, {}).get("std", None),
})
return {"preprocess_mode": mode, "rows": rows}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e), "trace": traceback.format_exc()})
@app.post("/predict")
async def predict(req: Request):
"""
Body: JSON object mapping feature -> numeric value (strings with commas/points ok).
Missing features are imputed if imputer present; else filled with means (if stats) or 0.
Now also returns SHAP values for the predicted_state (if SHAP is available).
"""
try:
payload = await req.json()
if not isinstance(payload, dict):
return JSONResponse(status_code=400, content={"error": "Expected JSON object"})
# Build in EXACT training order
raw = build_raw_vector(payload) # may contain NaNs
raw_imp = apply_imputer_if_any(raw) # impute
z_vec, z_detail, z_mode = apply_scaling_or_stats(raw_imp) # scale / z-score
# Predict
X = z_vec.reshape(1, -1).astype(np.float32)
raw_logits = model.predict(X, verbose=0)
probs, mode = decode_logits(raw_logits)
pred_idx = int(np.argmax(probs))
probs_dict = {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))}
missing = [f for i, f in enumerate(FEATURES) if np.isnan(raw[i])]
return {
"input_ok": (len(missing) == 0),
"missing": missing,
"preprocess": {
"imputer": bool(imputer),
"scaler": bool(scaler),
"z_mode": z_mode,
},
"z_scores": z_detail,
"probabilities": probs_dict,
"predicted_state": CLASSES[pred_idx],
"shap": shap_out,
"debug": {
"raw_shape": list(raw_logits.shape),
"decode_mode": mode,
"raw_first_row": [float(v) for v in raw_logits[0]],
},
}
pred_idx = int(np.argmax(probs))
probs_dict = {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))}
missing = [f for i, f in enumerate(FEATURES) if np.isnan(raw[i])]
# ---- SHAP explanation for predicted class ----
# -------- SHAP EXPLANATION (predicted class only) --------
shap_out = None
if EXPLAINER is not None:
try:
# X is already z-space: shape (1, n_features)
shap_vals = EXPLAINER.shap_values(X, nsamples=100)
# Case 1: multi-output -> list of length K, each (1, n_features)
if isinstance(shap_vals, list):
shap_vec = np.array(shap_vals[pred_idx][0], dtype=float)
# expected_value may also be a list per class
exp_val_raw = EXPLAINER.expected_value
if isinstance(exp_val_raw, (list, np.ndarray)):
exp_val = float(exp_val_raw[pred_idx])
else:
exp_val = float(exp_val_raw)
# Case 2: single-output -> ndarray (1, n_features)
elif isinstance(shap_vals, np.ndarray):
shap_vec = np.array(shap_vals[0], dtype=float)
exp_val_raw = EXPLAINER.expected_value
if isinstance(exp_val_raw, (list, np.ndarray)):
exp_val = float(exp_val_raw[0])
else:
exp_val = float(exp_val_raw)
else:
raise TypeError(f"Unsupported SHAP return type: {type(shap_vals)}")
# Map feature -> SHAP contribution (for the predicted class)
shap_feature_contribs = {
FEATURES[i]: float(shap_vec[i])
for i in range(len(FEATURES))
}
shap_out = {
"explained_class": CLASSES[pred_idx],
"expected_value": exp_val,
"shap_values": shap_feature_contribs,
}
except Exception as e:
shap_out = {
"error": str(e),
"trace": traceback.format_exc()
}
else:
shap_out = {"error": "SHAP not available on server"}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e), "trace": traceback.format_exc()})