SF_FastAPI / app.py
COCODEDE04's picture
Update app.py
5f4bae5 verified
raw
history blame
11.2 kB
# app.py
import os, json, glob
from typing import Any, Dict, List, Optional
import numpy as np
import tensorflow as tf
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
# ----------------- CONFIG -----------------
DEFAULT_MODEL_CANDIDATES = ["best_model.h5", "best_model.keras"]
DEFAULT_IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"]
DEFAULT_SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"]
DEFAULT_STATS_PATH = "means_std.json"
CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"] # index 0=Top ... 4=Low
APPLY_CORAL_MONOTONE = True # nudge thresholds to be non-increasing before decode
# ------------------------------------------
HERE = os.path.dirname(os.path.abspath(__file__))
# ---------- utilities: robust file resolving & logging ----------
def resolve_first(*names: str) -> Optional[str]:
"""Return absolute path to the first existing file among provided names
by checking HERE, CWD, then recursive matches."""
for base in (HERE, os.getcwd()):
for n in names:
p = os.path.join(base, n)
if os.path.isfile(p):
return p
# recursive fallback (handles subfolders)
patterns: List[str] = []
for n in names:
patterns += [os.path.join(HERE, "**", n),
os.path.join(os.getcwd(), "**", n)]
for pat in patterns:
for p in glob.glob(pat, recursive=True):
if os.path.isfile(p):
return p
return None
def describe_dir():
try:
print("CWD:", os.getcwd())
print("Repo dir (HERE):", HERE)
print("Repo listing:", os.listdir(HERE))
except Exception as e:
print("listdir error:", e)
def load_joblib(label: str, candidates: List[str]):
import joblib
print(f"Looking for {label} among: {candidates}")
describe_dir()
path = resolve_first(*candidates)
if not path:
print(f"⚠️ {label} not found.")
return None
try:
print(f"Loading {label} from {path} ({os.path.getsize(path)} bytes)")
except Exception:
print(f"Loading {label} from {path}")
try:
return joblib.load(path)
except Exception as e:
print(f"⚠️ Failed to load {label}: {repr(e)}")
return None
def load_model_robust() -> tf.keras.Model:
print("Resolving model...")
# env override supported
env_model = os.getenv("MODEL_PATH")
if env_model:
candidates = [env_model]
else:
candidates = DEFAULT_MODEL_CANDIDATES
path = resolve_first(*candidates)
if not path:
raise FileNotFoundError(f"Model file not found. Tried: {candidates}")
print(f"Loading model from {path} ({os.path.getsize(path)} bytes)")
# We don't need custom objects for inference; compile=False is safer
return tf.keras.models.load_model(path, compile=False)
def load_means_std(stats_path: Optional[str]) -> Optional[Dict[str, Dict[str, float]]]:
path = stats_path or os.getenv("STATS_PATH") or DEFAULT_STATS_PATH
path = resolve_first(path) if path else None
if not path:
print("⚠️ means_std.json not found.")
return None
print(f"Loading means/std from {path} ({os.path.getsize(path)} bytes)")
with open(path, "r") as f:
return json.load(f)
# ---------- numeric coercion ----------
def coerce_float(val: Any) -> float:
"""Accepts numeric, or locale strings like '49.709,14' -> 49709.14"""
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip()
if s == "":
raise ValueError("empty")
s = s.replace(" ", "")
has_dot = "." in s
has_comma = "," in s
if has_dot and has_comma:
last_dot = s.rfind(".")
last_comma = s.rfind(",")
if last_comma > last_dot:
s = s.replace(".", "")
s = s.replace(",", ".")
else:
s = s.replace(",", "")
elif has_comma and not has_dot:
s = s.replace(",", ".")
return float(s)
def z_manual(val: Any, mean: float, sd: float) -> float:
try:
v = coerce_float(val)
except Exception:
return 0.0
if not sd:
return 0.0
return (v - mean) / sd
# ---------- CORAL decoding ----------
def coral_probs_from_logits(logits_np: np.ndarray, monotone: bool = False) -> np.ndarray:
"""
logits: (N, K-1) cumulative logits.
If monotone=True, enforce non-increasing thresholds per sample before decode.
"""
logits = np.asarray(logits_np, dtype=np.float32)
if monotone:
# clamp each row to be non-increasing: t1 >= t2 >= t3 >= ...
# for Top=0 best to Low=4 worst, cumulative boundary logits
for i in range(logits.shape[0]):
row = logits[i]
# make it non-increasing by cumulative minimum from left to right
for j in range(1, row.shape[0]):
if row[j] > row[j - 1]:
row[j] = row[j - 1]
logits[i] = row
sig = 1.0 / (1.0 + np.exp(-logits)) # sigmoid
left = np.concatenate([np.ones((sig.shape[0], 1), dtype=np.float32), sig], axis=1)
right = np.concatenate([sig, np.zeros((sig.shape[0], 1), dtype=np.float32)], axis=1)
probs = np.clip(left - right, 1e-12, 1.0)
return probs
# ---------- FastAPI app ----------
app = FastAPI(title="Static Fingerprint API", version="1.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
print("Loading model / imputer / scaler...")
model = load_model_robust()
imputer = load_joblib("imputer", DEFAULT_IMPUTER_CANDIDATES)
scaler = load_joblib("scaler", DEFAULT_SCALER_CANDIDATES)
stats = load_means_std(os.getenv("STATS_PATH"))
# Feature order:
# Prefer scaler.feature_names_in_ if present (sklearn >=1.0),
# else imputer.feature_names_in_,
# else the order in means_std.json,
# else fail loudly.
if hasattr(scaler, "feature_names_in_"):
FEATURES: List[str] = list(scaler.feature_names_in_)
print("FEATURES from scaler.feature_names_in_")
elif hasattr(imputer, "feature_names_in_"):
FEATURES = list(imputer.feature_names_in_)
print("FEATURES from imputer.feature_names_in_")
elif isinstance(stats, dict):
FEATURES = list(stats.keys())
print("FEATURES from means_std.json order")
else:
raise RuntimeError("Cannot determine feature order. Provide scaler/imputer with feature_names_in_ or a means_std.json.")
print("Feature order:", FEATURES)
print("Artifacts present:",
{"imputer": imputer is not None, "scaler": scaler is not None, "stats": stats is not None})
@app.get("/")
def root():
return {
"message": "Static Fingerprint API is running.",
"try": ["GET /health", "POST /predict", "POST /echo"],
}
@app.get("/health")
def health():
return {
"status": "ok",
"features": FEATURES,
"classes": CLASSES,
"artifacts": {
"imputer": bool(imputer is not None),
"scaler": bool(scaler is not None),
"means_std": bool(stats is not None),
},
}
@app.post("/echo")
async def echo(req: Request):
payload = await req.json()
return {"received": payload}
def preprocess_payload_to_X(payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Returns dict with:
- X: np.ndarray shape (1, n_features) ready for model
- z_scores: dict feature -> z value (if available)
- missing: list of features not provided
- used: dict feature -> raw value used (after imputation)
"""
missing: List[str] = []
used_vals: List[float] = []
z_scores: Dict[str, float] = {}
used_raw: Dict[str, float] = {}
# Build raw feature vector in correct order
raw_vec: List[float] = []
for f in FEATURES:
if f in payload:
v = coerce_float(payload[f])
else:
missing.append(f)
v = np.nan # let imputer handle it (median), or we'll fill below
raw_vec.append(v)
raw = np.array([raw_vec], dtype=np.float32)
# Impute if available
if imputer is not None:
raw_imp = imputer.transform(raw)
else:
# If no imputer, simple median fill using means_std or zero
raw_imp = raw.copy()
for j, f in enumerate(FEATURES):
if np.isnan(raw_imp[0, j]):
if stats and f in stats:
raw_imp[0, j] = stats[f].get("mean", 0.0)
else:
raw_imp[0, j] = 0.0
# Scale if available
if scaler is not None:
X = scaler.transform(raw_imp).astype(np.float32)
# we can still compute z-scores from scaler if it exposes scale_ and mean_
if hasattr(scaler, "mean_") and hasattr(scaler, "scale_"):
for j, f in enumerate(FEATURES):
mu = float(scaler.mean_[j])
sd = float(scaler.scale_[j])
z = 0.0 if sd == 0 else (float(raw_imp[0, j]) - mu) / sd
z_scores[f] = float(z)
else:
# manual z-score using means_std.json
if not stats:
raise RuntimeError("No scaler and no means_std.json — cannot standardize.")
z_list: List[float] = []
for j, f in enumerate(FEATURES):
mu = float(stats[f]["mean"])
sd = float(stats[f]["std"])
z = z_manual(raw_imp[0, j], mu, sd)
z_list.append(z)
z_scores[f] = float(z)
X = np.array([z_list], dtype=np.float32)
# capture used raw values (after imputation)
for j, f in enumerate(FEATURES):
used_val = float(raw_imp[0, j])
used_raw[f] = used_val
used_vals.append(used_val)
return {
"X": X,
"z_scores": z_scores,
"missing": missing,
"used": used_raw,
}
@app.post("/predict")
async def predict(req: Request):
payload = await req.json()
if not isinstance(payload, dict):
return {"error": "Expected a JSON object mapping feature -> value."}
prep = preprocess_payload_to_X(payload)
X: np.ndarray = prep["X"]
raw = model.predict(X, verbose=0)
# CORAL (K-1) vs softmax (K)
debug: Dict[str, Any] = {"raw_shape": list(raw.shape)}
if raw.ndim == 2 and raw.shape[1] == (len(CLASSES) - 1):
decode_mode = "auto_coral_monotone" if APPLY_CORAL_MONOTONE else "auto_coral"
probs = coral_probs_from_logits(raw, monotone=APPLY_CORAL_MONOTONE)[0]
else:
decode_mode = "auto_softmax"
probs = raw[0]
s = float(np.sum(probs))
if s > 0:
probs = probs / s
debug["decode_mode"] = decode_mode
debug["raw_first_row"] = [float(x) for x in np.array(raw[0]).ravel().tolist()]
pred_idx = int(np.argmax(probs))
return {
"input_ok": (len(prep["missing"]) == 0),
"missing": prep["missing"],
"used_raw": prep["used"], # values after imputation
"z_scores": prep["z_scores"], # standardized (from scaler or stats)
"probabilities": {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))},
"predicted_state": CLASSES[pred_idx],
"debug": debug,
}