# app.py import os, json, glob from typing import Any, Dict, List, Optional import numpy as np import tensorflow as tf from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware # ----------------- CONFIG ----------------- DEFAULT_MODEL_CANDIDATES = ["best_model.h5", "best_model.keras"] DEFAULT_IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"] DEFAULT_SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"] DEFAULT_STATS_PATH = "means_std.json" CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"] # index 0=Top ... 4=Low APPLY_CORAL_MONOTONE = True # nudge thresholds to be non-increasing before decode # ------------------------------------------ HERE = os.path.dirname(os.path.abspath(__file__)) # ---------- utilities: robust file resolving & logging ---------- def resolve_first(*names: str) -> Optional[str]: """Return absolute path to the first existing file among provided names by checking HERE, CWD, then recursive matches.""" for base in (HERE, os.getcwd()): for n in names: p = os.path.join(base, n) if os.path.isfile(p): return p # recursive fallback (handles subfolders) patterns: List[str] = [] for n in names: patterns += [os.path.join(HERE, "**", n), os.path.join(os.getcwd(), "**", n)] for pat in patterns: for p in glob.glob(pat, recursive=True): if os.path.isfile(p): return p return None def describe_dir(): try: print("CWD:", os.getcwd()) print("Repo dir (HERE):", HERE) print("Repo listing:", os.listdir(HERE)) except Exception as e: print("listdir error:", e) def load_joblib(label: str, candidates: List[str]): import joblib print(f"Looking for {label} among: {candidates}") describe_dir() path = resolve_first(*candidates) if not path: print(f"⚠️ {label} not found.") return None try: print(f"Loading {label} from {path} ({os.path.getsize(path)} bytes)") except Exception: print(f"Loading {label} from {path}") try: return joblib.load(path) except Exception as e: print(f"⚠️ Failed to load {label}: {repr(e)}") return None def load_model_robust() -> tf.keras.Model: print("Resolving model...") # env override supported env_model = os.getenv("MODEL_PATH") if env_model: candidates = [env_model] else: candidates = DEFAULT_MODEL_CANDIDATES path = resolve_first(*candidates) if not path: raise FileNotFoundError(f"Model file not found. Tried: {candidates}") print(f"Loading model from {path} ({os.path.getsize(path)} bytes)") # We don't need custom objects for inference; compile=False is safer return tf.keras.models.load_model(path, compile=False) def load_means_std(stats_path: Optional[str]) -> Optional[Dict[str, Dict[str, float]]]: path = stats_path or os.getenv("STATS_PATH") or DEFAULT_STATS_PATH path = resolve_first(path) if path else None if not path: print("⚠️ means_std.json not found.") return None print(f"Loading means/std from {path} ({os.path.getsize(path)} bytes)") with open(path, "r") as f: return json.load(f) # ---------- numeric coercion ---------- def coerce_float(val: Any) -> float: """Accepts numeric, or locale strings like '49.709,14' -> 49709.14""" if isinstance(val, (int, float)): return float(val) s = str(val).strip() if s == "": raise ValueError("empty") s = s.replace(" ", "") has_dot = "." in s has_comma = "," in s if has_dot and has_comma: last_dot = s.rfind(".") last_comma = s.rfind(",") if last_comma > last_dot: s = s.replace(".", "") s = s.replace(",", ".") else: s = s.replace(",", "") elif has_comma and not has_dot: s = s.replace(",", ".") return float(s) def z_manual(val: Any, mean: float, sd: float) -> float: try: v = coerce_float(val) except Exception: return 0.0 if not sd: return 0.0 return (v - mean) / sd # ---------- CORAL decoding ---------- def coral_probs_from_logits(logits_np: np.ndarray, monotone: bool = False) -> np.ndarray: """ logits: (N, K-1) cumulative logits. If monotone=True, enforce non-increasing thresholds per sample before decode. """ logits = np.asarray(logits_np, dtype=np.float32) if monotone: # clamp each row to be non-increasing: t1 >= t2 >= t3 >= ... # for Top=0 best to Low=4 worst, cumulative boundary logits for i in range(logits.shape[0]): row = logits[i] # make it non-increasing by cumulative minimum from left to right for j in range(1, row.shape[0]): if row[j] > row[j - 1]: row[j] = row[j - 1] logits[i] = row sig = 1.0 / (1.0 + np.exp(-logits)) # sigmoid left = np.concatenate([np.ones((sig.shape[0], 1), dtype=np.float32), sig], axis=1) right = np.concatenate([sig, np.zeros((sig.shape[0], 1), dtype=np.float32)], axis=1) probs = np.clip(left - right, 1e-12, 1.0) return probs # ---------- FastAPI app ---------- app = FastAPI(title="Static Fingerprint API", version="1.1.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) print("Loading model / imputer / scaler...") model = load_model_robust() imputer = load_joblib("imputer", DEFAULT_IMPUTER_CANDIDATES) scaler = load_joblib("scaler", DEFAULT_SCALER_CANDIDATES) stats = load_means_std(os.getenv("STATS_PATH")) # Feature order: # Prefer scaler.feature_names_in_ if present (sklearn >=1.0), # else imputer.feature_names_in_, # else the order in means_std.json, # else fail loudly. if hasattr(scaler, "feature_names_in_"): FEATURES: List[str] = list(scaler.feature_names_in_) print("FEATURES from scaler.feature_names_in_") elif hasattr(imputer, "feature_names_in_"): FEATURES = list(imputer.feature_names_in_) print("FEATURES from imputer.feature_names_in_") elif isinstance(stats, dict): FEATURES = list(stats.keys()) print("FEATURES from means_std.json order") else: raise RuntimeError("Cannot determine feature order. Provide scaler/imputer with feature_names_in_ or a means_std.json.") print("Feature order:", FEATURES) print("Artifacts present:", {"imputer": imputer is not None, "scaler": scaler is not None, "stats": stats is not None}) @app.get("/") def root(): return { "message": "Static Fingerprint API is running.", "try": ["GET /health", "POST /predict", "POST /echo"], } @app.get("/health") def health(): return { "status": "ok", "features": FEATURES, "classes": CLASSES, "artifacts": { "imputer": bool(imputer is not None), "scaler": bool(scaler is not None), "means_std": bool(stats is not None), }, } @app.post("/echo") async def echo(req: Request): payload = await req.json() return {"received": payload} def preprocess_payload_to_X(payload: Dict[str, Any]) -> Dict[str, Any]: """ Returns dict with: - X: np.ndarray shape (1, n_features) ready for model - z_scores: dict feature -> z value (if available) - missing: list of features not provided - used: dict feature -> raw value used (after imputation) """ missing: List[str] = [] used_vals: List[float] = [] z_scores: Dict[str, float] = {} used_raw: Dict[str, float] = {} # Build raw feature vector in correct order raw_vec: List[float] = [] for f in FEATURES: if f in payload: v = coerce_float(payload[f]) else: missing.append(f) v = np.nan # let imputer handle it (median), or we'll fill below raw_vec.append(v) raw = np.array([raw_vec], dtype=np.float32) # Impute if available if imputer is not None: raw_imp = imputer.transform(raw) else: # If no imputer, simple median fill using means_std or zero raw_imp = raw.copy() for j, f in enumerate(FEATURES): if np.isnan(raw_imp[0, j]): if stats and f in stats: raw_imp[0, j] = stats[f].get("mean", 0.0) else: raw_imp[0, j] = 0.0 # Scale if available if scaler is not None: X = scaler.transform(raw_imp).astype(np.float32) # we can still compute z-scores from scaler if it exposes scale_ and mean_ if hasattr(scaler, "mean_") and hasattr(scaler, "scale_"): for j, f in enumerate(FEATURES): mu = float(scaler.mean_[j]) sd = float(scaler.scale_[j]) z = 0.0 if sd == 0 else (float(raw_imp[0, j]) - mu) / sd z_scores[f] = float(z) else: # manual z-score using means_std.json if not stats: raise RuntimeError("No scaler and no means_std.json — cannot standardize.") z_list: List[float] = [] for j, f in enumerate(FEATURES): mu = float(stats[f]["mean"]) sd = float(stats[f]["std"]) z = z_manual(raw_imp[0, j], mu, sd) z_list.append(z) z_scores[f] = float(z) X = np.array([z_list], dtype=np.float32) # capture used raw values (after imputation) for j, f in enumerate(FEATURES): used_val = float(raw_imp[0, j]) used_raw[f] = used_val used_vals.append(used_val) return { "X": X, "z_scores": z_scores, "missing": missing, "used": used_raw, } @app.post("/predict") async def predict(req: Request): payload = await req.json() if not isinstance(payload, dict): return {"error": "Expected a JSON object mapping feature -> value."} prep = preprocess_payload_to_X(payload) X: np.ndarray = prep["X"] raw = model.predict(X, verbose=0) # CORAL (K-1) vs softmax (K) debug: Dict[str, Any] = {"raw_shape": list(raw.shape)} if raw.ndim == 2 and raw.shape[1] == (len(CLASSES) - 1): decode_mode = "auto_coral_monotone" if APPLY_CORAL_MONOTONE else "auto_coral" probs = coral_probs_from_logits(raw, monotone=APPLY_CORAL_MONOTONE)[0] else: decode_mode = "auto_softmax" probs = raw[0] s = float(np.sum(probs)) if s > 0: probs = probs / s debug["decode_mode"] = decode_mode debug["raw_first_row"] = [float(x) for x in np.array(raw[0]).ravel().tolist()] pred_idx = int(np.argmax(probs)) return { "input_ok": (len(prep["missing"]) == 0), "missing": prep["missing"], "used_raw": prep["used"], # values after imputation "z_scores": prep["z_scores"], # standardized (from scaler or stats) "probabilities": {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))}, "predicted_state": CLASSES[pred_idx], "debug": debug, }