Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import warnings | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import numpy as np | |
| APP_DIR = Path(__file__).resolve().parent | |
| MODEL_PATH = APP_DIR / "models" / "ensemble_models.joblib" | |
| COST_MATRIX_PATH = APP_DIR / "cost_matrix.npy" | |
| METADATA_PATH = APP_DIR / "model_metadata.json" | |
| with METADATA_PATH.open() as f: | |
| _METADATA = json.load(f) | |
| N_FEATURES = int(_METADATA["n_features"]) | |
| N_CLASSES = int(_METADATA["n_classes"]) | |
| PARAMETER_SETS = _METADATA["parameter_sets"] | |
| def _load_models() -> list[Any]: | |
| return joblib.load(MODEL_PATH) | |
| def _load_cost_matrix() -> np.ndarray: | |
| costs = np.load(COST_MATRIX_PATH) | |
| if costs.shape != (N_CLASSES, N_CLASSES): | |
| raise ValueError(f"Expected cost matrix {(N_CLASSES, N_CLASSES)}, got {costs.shape}") | |
| return costs.astype(np.float64, copy=False) | |
| def get_metadata() -> dict[str, Any]: | |
| return { | |
| "model_type": _METADATA["model_type"], | |
| "model_source": _METADATA["model_source"], | |
| "n_models": _METADATA["n_models"], | |
| "n_features": N_FEATURES, | |
| "n_classes": N_CLASSES, | |
| "feature_columns": _METADATA["feature_columns"], | |
| "parameter_sets": PARAMETER_SETS, | |
| "cost_matrix_shape": _METADATA["cost_matrix_shape"], | |
| } | |
| def _predict_probabilities(features: list[float]) -> np.ndarray: | |
| x = np.asarray(features, dtype=np.float64).reshape(1, -1) | |
| probs = np.zeros(N_CLASSES, dtype=np.float64) | |
| for model in _load_models(): | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings("ignore", message="X does not have valid feature names") | |
| model_probs = np.asarray(model.predict_proba(x)[0], dtype=np.float64) | |
| if model_probs.shape[0] == N_CLASSES: | |
| probs += model_probs | |
| continue | |
| full_probs = np.zeros(N_CLASSES, dtype=np.float64) | |
| classes = getattr(model, "classes_", []) | |
| for src_idx, class_id in enumerate(classes): | |
| full_probs[int(class_id)] = model_probs[src_idx] | |
| probs += full_probs | |
| probs /= len(_load_models()) | |
| total = probs.sum() | |
| if total > 0: | |
| probs /= total | |
| return probs | |
| def _ranked_entries(indices: np.ndarray, probs: np.ndarray, risks: np.ndarray) -> list[dict[str, Any]]: | |
| return [ | |
| { | |
| "class": int(i), | |
| "probability": float(probs[i]), | |
| "expected_risk": float(risks[i]), | |
| "params": PARAMETER_SETS[int(i)], | |
| } | |
| for i in indices | |
| ] | |
| def predict_from_features(features: list[float], top_k: int = 3) -> dict[str, Any]: | |
| probs = _predict_probabilities([float(v) for v in features]) | |
| risks = _load_cost_matrix() @ probs | |
| selected_idx = int(np.argmin(risks)) | |
| probability_idx = np.argsort(probs)[::-1][:top_k] | |
| risk_idx = np.argsort(risks)[:top_k] | |
| probability_argmax = int(np.argmax(probs)) | |
| return { | |
| "selected_class": selected_idx, | |
| "selected_params": PARAMETER_SETS[selected_idx], | |
| "selection_method": "minimum_expected_risk", | |
| "probability_argmax_class": probability_argmax, | |
| "probability_argmax_params": PARAMETER_SETS[probability_argmax], | |
| "top_by_probability": _ranked_entries(probability_idx, probs, risks), | |
| "top_by_expected_risk": _ranked_entries(risk_idx, probs, risks), | |
| } | |