Spaces:
Sleeping
Sleeping
| # app.py | |
| import os, json, glob | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| import tensorflow as tf | |
| from fastapi import FastAPI, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| # ----------------- CONFIG ----------------- | |
| DEFAULT_MODEL_CANDIDATES = ["best_model.h5", "best_model.keras"] | |
| DEFAULT_IMPUTER_CANDIDATES = ["imputer.joblib", "imputer.pkl", "imputer.sav"] | |
| DEFAULT_SCALER_CANDIDATES = ["scaler.joblib", "scaler.pkl", "scaler.sav"] | |
| DEFAULT_STATS_PATH = "means_std.json" | |
| CLASSES = ["Top", "Mid-Top", "Mid", "Mid-Low", "Low"] # index 0=Top ... 4=Low | |
| APPLY_CORAL_MONOTONE = True # nudge thresholds to be non-increasing before decode | |
| # ------------------------------------------ | |
| HERE = os.path.dirname(os.path.abspath(__file__)) | |
| # ---------- utilities: robust file resolving & logging ---------- | |
| def resolve_first(*names: str) -> Optional[str]: | |
| """Return absolute path to the first existing file among provided names | |
| by checking HERE, CWD, then recursive matches.""" | |
| for base in (HERE, os.getcwd()): | |
| for n in names: | |
| p = os.path.join(base, n) | |
| if os.path.isfile(p): | |
| return p | |
| # recursive fallback (handles subfolders) | |
| patterns: List[str] = [] | |
| for n in names: | |
| patterns += [os.path.join(HERE, "**", n), | |
| os.path.join(os.getcwd(), "**", n)] | |
| for pat in patterns: | |
| for p in glob.glob(pat, recursive=True): | |
| if os.path.isfile(p): | |
| return p | |
| return None | |
| def describe_dir(): | |
| try: | |
| print("CWD:", os.getcwd()) | |
| print("Repo dir (HERE):", HERE) | |
| print("Repo listing:", os.listdir(HERE)) | |
| except Exception as e: | |
| print("listdir error:", e) | |
| def load_joblib(label: str, candidates: List[str]): | |
| import joblib | |
| print(f"Looking for {label} among: {candidates}") | |
| describe_dir() | |
| path = resolve_first(*candidates) | |
| if not path: | |
| print(f"⚠️ {label} not found.") | |
| return None | |
| try: | |
| print(f"Loading {label} from {path} ({os.path.getsize(path)} bytes)") | |
| except Exception: | |
| print(f"Loading {label} from {path}") | |
| try: | |
| return joblib.load(path) | |
| except Exception as e: | |
| print(f"⚠️ Failed to load {label}: {repr(e)}") | |
| return None | |
| def load_model_robust() -> tf.keras.Model: | |
| print("Resolving model...") | |
| # env override supported | |
| env_model = os.getenv("MODEL_PATH") | |
| if env_model: | |
| candidates = [env_model] | |
| else: | |
| candidates = DEFAULT_MODEL_CANDIDATES | |
| path = resolve_first(*candidates) | |
| if not path: | |
| raise FileNotFoundError(f"Model file not found. Tried: {candidates}") | |
| print(f"Loading model from {path} ({os.path.getsize(path)} bytes)") | |
| # We don't need custom objects for inference; compile=False is safer | |
| return tf.keras.models.load_model(path, compile=False) | |
| def load_means_std(stats_path: Optional[str]) -> Optional[Dict[str, Dict[str, float]]]: | |
| path = stats_path or os.getenv("STATS_PATH") or DEFAULT_STATS_PATH | |
| path = resolve_first(path) if path else None | |
| if not path: | |
| print("⚠️ means_std.json not found.") | |
| return None | |
| print(f"Loading means/std from {path} ({os.path.getsize(path)} bytes)") | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| # ---------- numeric coercion ---------- | |
| def coerce_float(val: Any) -> float: | |
| """Accepts numeric, or locale strings like '49.709,14' -> 49709.14""" | |
| if isinstance(val, (int, float)): | |
| return float(val) | |
| s = str(val).strip() | |
| if s == "": | |
| raise ValueError("empty") | |
| s = s.replace(" ", "") | |
| has_dot = "." in s | |
| has_comma = "," in s | |
| if has_dot and has_comma: | |
| last_dot = s.rfind(".") | |
| last_comma = s.rfind(",") | |
| if last_comma > last_dot: | |
| s = s.replace(".", "") | |
| s = s.replace(",", ".") | |
| else: | |
| s = s.replace(",", "") | |
| elif has_comma and not has_dot: | |
| s = s.replace(",", ".") | |
| return float(s) | |
| def z_manual(val: Any, mean: float, sd: float) -> float: | |
| try: | |
| v = coerce_float(val) | |
| except Exception: | |
| return 0.0 | |
| if not sd: | |
| return 0.0 | |
| return (v - mean) / sd | |
| # ---------- CORAL decoding ---------- | |
| def coral_probs_from_logits(logits_np: np.ndarray, monotone: bool = False) -> np.ndarray: | |
| """ | |
| logits: (N, K-1) cumulative logits. | |
| If monotone=True, enforce non-increasing thresholds per sample before decode. | |
| """ | |
| logits = np.asarray(logits_np, dtype=np.float32) | |
| if monotone: | |
| # clamp each row to be non-increasing: t1 >= t2 >= t3 >= ... | |
| # for Top=0 best to Low=4 worst, cumulative boundary logits | |
| for i in range(logits.shape[0]): | |
| row = logits[i] | |
| # make it non-increasing by cumulative minimum from left to right | |
| for j in range(1, row.shape[0]): | |
| if row[j] > row[j - 1]: | |
| row[j] = row[j - 1] | |
| logits[i] = row | |
| sig = 1.0 / (1.0 + np.exp(-logits)) # sigmoid | |
| left = np.concatenate([np.ones((sig.shape[0], 1), dtype=np.float32), sig], axis=1) | |
| right = np.concatenate([sig, np.zeros((sig.shape[0], 1), dtype=np.float32)], axis=1) | |
| probs = np.clip(left - right, 1e-12, 1.0) | |
| return probs | |
| # ---------- FastAPI app ---------- | |
| app = FastAPI(title="Static Fingerprint API", version="1.1.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| print("Loading model / imputer / scaler...") | |
| model = load_model_robust() | |
| imputer = load_joblib("imputer", DEFAULT_IMPUTER_CANDIDATES) | |
| scaler = load_joblib("scaler", DEFAULT_SCALER_CANDIDATES) | |
| stats = load_means_std(os.getenv("STATS_PATH")) | |
| # Feature order: | |
| # Prefer scaler.feature_names_in_ if present (sklearn >=1.0), | |
| # else imputer.feature_names_in_, | |
| # else the order in means_std.json, | |
| # else fail loudly. | |
| if hasattr(scaler, "feature_names_in_"): | |
| FEATURES: List[str] = list(scaler.feature_names_in_) | |
| print("FEATURES from scaler.feature_names_in_") | |
| elif hasattr(imputer, "feature_names_in_"): | |
| FEATURES = list(imputer.feature_names_in_) | |
| print("FEATURES from imputer.feature_names_in_") | |
| elif isinstance(stats, dict): | |
| FEATURES = list(stats.keys()) | |
| print("FEATURES from means_std.json order") | |
| else: | |
| raise RuntimeError("Cannot determine feature order. Provide scaler/imputer with feature_names_in_ or a means_std.json.") | |
| print("Feature order:", FEATURES) | |
| print("Artifacts present:", | |
| {"imputer": imputer is not None, "scaler": scaler is not None, "stats": stats is not None}) | |
| def root(): | |
| return { | |
| "message": "Static Fingerprint API is running.", | |
| "try": ["GET /health", "POST /predict", "POST /echo"], | |
| } | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "features": FEATURES, | |
| "classes": CLASSES, | |
| "artifacts": { | |
| "imputer": bool(imputer is not None), | |
| "scaler": bool(scaler is not None), | |
| "means_std": bool(stats is not None), | |
| }, | |
| } | |
| async def echo(req: Request): | |
| payload = await req.json() | |
| return {"received": payload} | |
| def preprocess_payload_to_X(payload: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Returns dict with: | |
| - X: np.ndarray shape (1, n_features) ready for model | |
| - z_scores: dict feature -> z value (if available) | |
| - missing: list of features not provided | |
| - used: dict feature -> raw value used (after imputation) | |
| """ | |
| missing: List[str] = [] | |
| used_vals: List[float] = [] | |
| z_scores: Dict[str, float] = {} | |
| used_raw: Dict[str, float] = {} | |
| # Build raw feature vector in correct order | |
| raw_vec: List[float] = [] | |
| for f in FEATURES: | |
| if f in payload: | |
| v = coerce_float(payload[f]) | |
| else: | |
| missing.append(f) | |
| v = np.nan # let imputer handle it (median), or we'll fill below | |
| raw_vec.append(v) | |
| raw = np.array([raw_vec], dtype=np.float32) | |
| # Impute if available | |
| if imputer is not None: | |
| raw_imp = imputer.transform(raw) | |
| else: | |
| # If no imputer, simple median fill using means_std or zero | |
| raw_imp = raw.copy() | |
| for j, f in enumerate(FEATURES): | |
| if np.isnan(raw_imp[0, j]): | |
| if stats and f in stats: | |
| raw_imp[0, j] = stats[f].get("mean", 0.0) | |
| else: | |
| raw_imp[0, j] = 0.0 | |
| # Scale if available | |
| if scaler is not None: | |
| X = scaler.transform(raw_imp).astype(np.float32) | |
| # we can still compute z-scores from scaler if it exposes scale_ and mean_ | |
| if hasattr(scaler, "mean_") and hasattr(scaler, "scale_"): | |
| for j, f in enumerate(FEATURES): | |
| mu = float(scaler.mean_[j]) | |
| sd = float(scaler.scale_[j]) | |
| z = 0.0 if sd == 0 else (float(raw_imp[0, j]) - mu) / sd | |
| z_scores[f] = float(z) | |
| else: | |
| # manual z-score using means_std.json | |
| if not stats: | |
| raise RuntimeError("No scaler and no means_std.json — cannot standardize.") | |
| z_list: List[float] = [] | |
| for j, f in enumerate(FEATURES): | |
| mu = float(stats[f]["mean"]) | |
| sd = float(stats[f]["std"]) | |
| z = z_manual(raw_imp[0, j], mu, sd) | |
| z_list.append(z) | |
| z_scores[f] = float(z) | |
| X = np.array([z_list], dtype=np.float32) | |
| # capture used raw values (after imputation) | |
| for j, f in enumerate(FEATURES): | |
| used_val = float(raw_imp[0, j]) | |
| used_raw[f] = used_val | |
| used_vals.append(used_val) | |
| return { | |
| "X": X, | |
| "z_scores": z_scores, | |
| "missing": missing, | |
| "used": used_raw, | |
| } | |
| async def predict(req: Request): | |
| payload = await req.json() | |
| if not isinstance(payload, dict): | |
| return {"error": "Expected a JSON object mapping feature -> value."} | |
| prep = preprocess_payload_to_X(payload) | |
| X: np.ndarray = prep["X"] | |
| raw = model.predict(X, verbose=0) | |
| # CORAL (K-1) vs softmax (K) | |
| debug: Dict[str, Any] = {"raw_shape": list(raw.shape)} | |
| if raw.ndim == 2 and raw.shape[1] == (len(CLASSES) - 1): | |
| decode_mode = "auto_coral_monotone" if APPLY_CORAL_MONOTONE else "auto_coral" | |
| probs = coral_probs_from_logits(raw, monotone=APPLY_CORAL_MONOTONE)[0] | |
| else: | |
| decode_mode = "auto_softmax" | |
| probs = raw[0] | |
| s = float(np.sum(probs)) | |
| if s > 0: | |
| probs = probs / s | |
| debug["decode_mode"] = decode_mode | |
| debug["raw_first_row"] = [float(x) for x in np.array(raw[0]).ravel().tolist()] | |
| pred_idx = int(np.argmax(probs)) | |
| return { | |
| "input_ok": (len(prep["missing"]) == 0), | |
| "missing": prep["missing"], | |
| "used_raw": prep["used"], # values after imputation | |
| "z_scores": prep["z_scores"], # standardized (from scaler or stats) | |
| "probabilities": {CLASSES[i]: float(probs[i]) for i in range(len(CLASSES))}, | |
| "predicted_state": CLASSES[pred_idx], | |
| "debug": debug, | |
| } |