SF_FastAPI / app.py
COCODEDE04's picture
Update app.py
c71e704 verified
raw
history blame
15.7 kB
import os, json, traceback
from typing import Any, Dict, List
import numpy as np
import tensorflow as tf
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
# Try SHAP
try:
import shap
SHAP_AVAILABLE = True
except ImportError:
SHAP_AVAILABLE = False
# ----------------- CONFIG -----------------
MODEL_PATH = os.getenv("MODEL_PATH", "best_model.h5")
STATS_PATH = os.getenv("STATS_PATH", "means_std.json")
IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"]
SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"]
CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"]
# ⛔ DO NOT CHANGE: exact order used in training
FEATURES: List[str] = [
"autosuf_oper",
"improductiva",
"gastos_fin_over_avg_cart",
"_equity",
"grado_absorcion",
"_cartera_bruta",
"gastos_oper_over_ing_oper",
"cartera_vencida_ratio",
"roe_pre_tax",
"_assets",
"_liab",
"equity_over_assets",
"_margen_bruto",
"prov_over_cartera",
"gastos_oper_over_cart",
"ing_cartera_over_ing_total",
"debt_to_equity",
"prov_gasto_over_cart",
"cov_improductiva",
"rend_cart_over_avg_cart",
"roa_pre_tax",
]
# ------------------------------------------
# --------- helpers: I/O + numeric coercion ---------
def coerce_float(val: Any) -> float:
"""
Accepts numeric, or strings like:
"49.709,14" -> 49709.14
"49,709.14" -> 49709.14
"0,005" -> 0.005
"""
if isinstance(val, (int, float, np.number)):
return float(val)
s = str(val).strip()
if s == "":
raise ValueError("empty")
s = s.replace(" ", "")
has_dot, has_comma = "." in s, "," in s
if has_dot and has_comma:
# Decide decimal by last occurrence
if s.rfind(",") > s.rfind("."):
s = s.replace(".", "")
s = s.replace(",", ".")
else:
s = s.replace(",", "")
elif has_comma and not has_dot:
s = s.replace(",", ".")
# else leave as-is
return float(s)
def load_json(path: str) -> dict:
with open(path, "r") as f:
return json.load(f)
def load_joblib_if_exists(candidates: List[str]):
"""
Try loading a joblib/pickle artifact (imputer/scaler).
Returns (obj, path_str or None, error_str or None).
"""
for name in candidates:
p = os.path.join(os.getcwd(), name)
if os.path.isfile(p):
try:
import joblib # lazy import
with open(p, "rb") as fh:
obj = joblib.load(fh)
return obj, p, None
except Exception as e:
return None, p, f"{type(e).__name__}({e})"
return None, None, None
# --------- model / artifacts load ---------
print("Loading model / imputer / scaler...")
# Model
model = tf.keras.models.load_model(MODEL_PATH, compile=False)
# Imputer
imputer, imputer_path, imputer_err = load_joblib_if_exists(IMPUTER_CANDIDATES)
if imputer_path and imputer_err:
print(f"⚠️ Failed to load imputer from {imputer_path}: {imputer_err}")
elif imputer:
print(f"Loaded imputer from {imputer_path}")
else:
print("⚠️ No imputer found — skipping median imputation.")
# Scaler
scaler, scaler_path, scaler_err = load_joblib_if_exists(SCALER_CANDIDATES)
if scaler_path and scaler_err:
print(f"⚠️ Failed to load scaler from {scaler_path}: {scaler_err}")
elif scaler:
print(f"Loaded scaler from {scaler_path}")
else:
print("⚠️ No scaler found — using manual z-scoring if stats are available.")
# Stats (means/std) for fallback manual z-score
stats: Dict[str, Dict[str, float]] = {}
if os.path.isfile(STATS_PATH):
stats = load_json(STATS_PATH)
print(f"Loaded means/std from {STATS_PATH}")
else:
print("⚠️ No means_std.json found — manual z-scoring will be unavailable if scaler missing.")
# --------- decoding for CORAL vs softmax ---------
def coral_probs_from_logits(logits_np: np.ndarray) -> np.ndarray:
"""
(N, K-1) logits -> (N, K) probabilities for CORAL ordinal output.
"""
logits = tf.convert_to_tensor(logits_np, dtype=tf.float32)
sig = tf.math.sigmoid(logits) # (N, K-1)
left = tf.concat([tf.ones_like(sig[:, :1]), sig], axis=1)
right = tf.concat([sig, tf.zeros_like(sig[:, :1])], axis=1)
probs = tf.clip_by_value(left - right, 1e-12, 1.0)
return probs.numpy()
def decode_logits(raw: np.ndarray) -> (np.ndarray, str):
"""
raw: (1, M) array
Returns (probs (K,), mode_str).
Detects CORAL (M=K-1) vs Softmax (M=K).
"""
if raw.ndim != 2:
raise ValueError(f"Unexpected raw shape {raw.shape}")
M = raw.shape[1]
K = len(CLASSES)
if M == K - 1:
probs = coral_probs_from_logits(raw)[0]
return probs, "auto_coral"
elif M == K:
row = raw[0]
exps = np.exp(row - np.max(row))
probs = exps / np.sum(exps)
return probs, "auto_softmax"
else:
row = raw[0]
s = float(np.sum(np.abs(row)))
probs = (row / s) if s > 0 else np.ones_like(row) / len(row)
return probs, f"fallback_M{M}_K{K}"
# --------- preprocessing pipeline ---------
def build_raw_vector(payload: Dict[str, Any]) -> np.ndarray:
"""
Build raw feature vector in exact training order.
Missing -> np.nan (imputer will handle if available).
Values coerced to float robustly.
"""
vals = []
for f in FEATURES:
if f in payload:
try:
vals.append(coerce_float(payload[f]))
except Exception:
vals.append(np.nan)
else:
vals.append(np.nan)
return np.array(vals, dtype=np.float32)
def apply_imputer_if_any(x: np.ndarray) -> np.ndarray:
if imputer is not None:
return imputer.transform(x.reshape(1, -1)).astype(np.float32)[0]
# fallback: replace NaNs with feature means from stats if available, else 0
out = x.copy()
for i, f in enumerate(FEATURES):
if np.isnan(out[i]):
if f in stats and "mean" in stats[f]:
out[i] = float(stats[f]["mean"])
else:
out[i] = 0.0
return out
def apply_scaling_or_stats(raw_vec: np.ndarray) -> (np.ndarray, Dict[str, float], str):
"""
Returns (z_vec, z_detail_dict, mode_str)
- If scaler present: scaler.transform
- Else: manual (x-mean)/std using stats
"""
if scaler is not None:
z = scaler.transform(raw_vec.reshape(1, -1)).astype(np.float32)[0]
z_detail = {f: float(z[i]) for i, f in enumerate(FEATURES)}
return z, z_detail, "sklearn_scaler"
else:
z = np.zeros_like(raw_vec, dtype=np.float32)
z_detail: Dict[str, float] = {}
for i, f in enumerate(FEATURES):
mean = stats.get(f, {}).get("mean", 0.0)
sd = stats.get(f, {}).get("std", 1.0)
if not sd:
sd = 1.0
z[i] = (raw_vec[i] - mean) / sd
z_detail[f] = float(z[i])
return z, z_detail, "manual_stats"
# --------- SHAP: model wrapper & explainer ---------
def model_proba_from_z(z_batch_np: np.ndarray) -> np.ndarray:
"""
Batch-safe wrapper for SHAP and other callers.
Input:
z_batch_np: (N, n_features) or (n_features,) in z-space
Output:
probs: (N, K) matrix of class probabilities
"""
z = np.array(z_batch_np, dtype=np.float32)
# Ensure 2D: (N, D)
if z.ndim == 1:
z = z.reshape(1, -1)
raw = model.predict(z, verbose=0) # shape: (N, M)
if raw.ndim != 2:
raise ValueError(f"Unexpected raw shape from model: {raw.shape}")
N, M = raw.shape
K = len(CLASSES)
if M == K - 1:
# CORAL: logits for K-1 thresholds → K probabilities
probs = coral_probs_from_logits(raw) # (N, K)
elif M == K:
# Softmax or unnormalized scores, per row
exps = np.exp(raw - np.max(raw, axis=1, keepdims=True))
probs = exps / np.sum(exps, axis=1, keepdims=True) # (N, K)
else:
# Fallback: row-wise normalization
s = np.sum(np.abs(raw), axis=1, keepdims=True) # (N, 1)
probs = np.divide(
raw,
s,
out=np.ones_like(raw) / max(M, 1),
where=(s > 0),
) # (N, M)
return probs
EXPLAINER = None
if SHAP_AVAILABLE:
try:
BACKGROUND_Z = np.zeros((50, len(FEATURES)), dtype=np.float32)
EXPLAINER = shap.KernelExplainer(model_proba_from_z, BACKGROUND_Z)
print("SHAP KernelExplainer initialized.")
except Exception as e:
EXPLAINER = None
print("⚠️ Failed to initialize SHAP explainer:", repr(e))
else:
print("SHAP not installed; explanations disabled.")
# ----------------- FastAPI -----------------
app = FastAPI(title="Static Fingerprint API", version="1.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root():
return {
"message": "Static Fingerprint API is running.",
"try": ["GET /health", "POST /predict", "POST /debug/z"],
}
@app.get("/health")
def health():
stats_keys = []
try:
if os.path.isfile(STATS_PATH):
stats_keys = list(load_json(STATS_PATH).keys())
except Exception:
pass
return {
"status": "ok",
"classes": CLASSES,
"features_training_order": FEATURES,
"features_in_means_std": stats_keys,
"model_file": MODEL_PATH,
"imputer": bool(imputer),
"scaler": bool(scaler),
"stats_available": bool(stats),
"shap_available": bool(EXPLAINER),
}
@app.post("/debug/z")
async def debug_z(req: Request):
try:
payload = await req.json()
if not isinstance(payload, dict):
return JSONResponse(status_code=400, content={"error": "Expected JSON object"})
raw = build_raw_vector(payload)
raw_imp = apply_imputer_if_any(raw)
z, z_detail, mode = apply_scaling_or_stats(raw_imp)
rows = []
for i, f in enumerate(FEATURES):
rows.append({
"feature": f,
"input_value": None if np.isnan(raw[i]) else float(raw[i]),
"imputed_value": float(raw_imp[i]),
"z": float(z[i]),
"mean": stats.get(f, {}).get("mean", None),
"std": stats.get(f, {}).get("std", None),
})
return {"preprocess_mode": mode, "rows": rows}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e), "trace": traceback.format_exc()})
@app.post("/predict")
async def predict(req: Request):
"""
Body: JSON object mapping feature -> numeric value (strings with commas/points ok).
Missing features are imputed if imputer present; else filled with means (if stats) or 0.
This endpoint ALSO computes SHAP values for the *predicted class only*,
returning one SHAP value per feature (21 in total) when SHAP is available.
"""
try:
payload = await req.json()
if not isinstance(payload, dict):
return JSONResponse(status_code=400, content={"error": "Expected JSON object"})
# ---------- 1) Build features in EXACT training order ----------
raw = build_raw_vector(payload) # may contain NaNs
raw_imp = apply_imputer_if_any(raw) # median / training imputer
z_vec, z_detail, z_mode = apply_scaling_or_stats(raw_imp) # scaler or manual z-score
# ---------- 2) Model prediction ----------
X_z = z_vec.reshape(1, -1).astype(np.float32)
raw_logits = model.predict(X_z, verbose=0)
probs, decode_mode = decode_logits(raw_logits)
pred_idx = int(np.argmax(probs))
pred_class = CLASSES[pred_idx]
probs_dict = {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))}
missing = [f for i, f in enumerate(FEATURES) if np.isnan(raw[i])]
# ---------- 3) SHAP explanation for the predicted class ----------
shap_payload: Dict[str, Any]
if not SHAP_AVAILABLE:
shap_payload = {
"available": False,
"reason": "SHAP library not installed in this environment.",
}
else:
try:
# Scalar function: probability of the *predicted* class only
def f_scalar(z_batch):
"""
z_batch: (N, D) or (D,)
returns: (N,) probability of the predicted class
"""
probs_batch = model_proba_from_z(z_batch) # (N, K)
return probs_batch[:, pred_idx] # (N,)
# Background: 50 "average" institutions at z=0
background_z = np.zeros((50, len(FEATURES)), dtype=np.float32)
# KernelExplainer for a scalar-output model
explainer = shap.KernelExplainer(f_scalar, background_z)
# SHAP for this one observation (in z-space)
shap_vals = explainer.shap_values(X_z, nsamples=50)
# For scalar output, shap_vals is usually a 2D array (N, D),
# but some versions wrap it in a list. Handle both:
if isinstance(shap_vals, list):
shap_mat = np.array(shap_vals[0])
else:
shap_mat = np.array(shap_vals)
# Expect (1, n_features)
if shap_mat.ndim == 1:
shap_mat = shap_mat.reshape(1, -1)
if shap_mat.shape[0] != 1:
raise ValueError(f"Unexpected SHAP batch size {shap_mat.shape[0]} (expected 1)")
if shap_mat.shape[1] != len(FEATURES):
raise ValueError(
f"Unexpected SHAP vector length {shap_mat.shape[1]} "
f"(expected {len(FEATURES)})"
)
shap_vec = shap_mat[0] # (n_features,)
shap_feature_contribs = {
FEATURES[i]: float(shap_vec[i]) for i in range(len(FEATURES))
}
shap_payload = {
"available": True,
"class": pred_class,
"values": shap_feature_contribs,
}
except Exception as e:
shap_payload = {
"available": False,
"error": str(e),
"trace": traceback.format_exc(),
}
# ---------- 4) Final JSON response ----------
return {
"input_ok": (len(missing) == 0),
"missing": missing,
"preprocess": {
"imputer": bool(imputer),
"scaler": bool(scaler),
"z_mode": z_mode,
},
"z_scores": z_detail, # per feature (model input)
"probabilities": probs_dict, # state → probability
"predicted_state": pred_class,
"shap": shap_payload, # explanation for predicted class only
"debug": {
"raw_shape": list(raw_logits.shape),
"decode_mode": decode_mode,
"raw_first_row": [float(v) for v in raw_logits[0]],
},
}
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e), "trace": traceback.format_exc()},
)