Spaces:
Sleeping
Sleeping
File size: 3,428 Bytes
2767c41 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | from __future__ import annotations
import json
import warnings
from functools import lru_cache
from pathlib import Path
from typing import Any
import joblib
import numpy as np
APP_DIR = Path(__file__).resolve().parent
MODEL_PATH = APP_DIR / "models" / "ensemble_models.joblib"
COST_MATRIX_PATH = APP_DIR / "cost_matrix.npy"
METADATA_PATH = APP_DIR / "model_metadata.json"
with METADATA_PATH.open() as f:
_METADATA = json.load(f)
N_FEATURES = int(_METADATA["n_features"])
N_CLASSES = int(_METADATA["n_classes"])
PARAMETER_SETS = _METADATA["parameter_sets"]
@lru_cache(maxsize=1)
def _load_models() -> list[Any]:
return joblib.load(MODEL_PATH)
@lru_cache(maxsize=1)
def _load_cost_matrix() -> np.ndarray:
costs = np.load(COST_MATRIX_PATH)
if costs.shape != (N_CLASSES, N_CLASSES):
raise ValueError(f"Expected cost matrix {(N_CLASSES, N_CLASSES)}, got {costs.shape}")
return costs.astype(np.float64, copy=False)
def get_metadata() -> dict[str, Any]:
return {
"model_type": _METADATA["model_type"],
"model_source": _METADATA["model_source"],
"n_models": _METADATA["n_models"],
"n_features": N_FEATURES,
"n_classes": N_CLASSES,
"feature_columns": _METADATA["feature_columns"],
"parameter_sets": PARAMETER_SETS,
"cost_matrix_shape": _METADATA["cost_matrix_shape"],
}
def _predict_probabilities(features: list[float]) -> np.ndarray:
x = np.asarray(features, dtype=np.float64).reshape(1, -1)
probs = np.zeros(N_CLASSES, dtype=np.float64)
for model in _load_models():
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="X does not have valid feature names")
model_probs = np.asarray(model.predict_proba(x)[0], dtype=np.float64)
if model_probs.shape[0] == N_CLASSES:
probs += model_probs
continue
full_probs = np.zeros(N_CLASSES, dtype=np.float64)
classes = getattr(model, "classes_", [])
for src_idx, class_id in enumerate(classes):
full_probs[int(class_id)] = model_probs[src_idx]
probs += full_probs
probs /= len(_load_models())
total = probs.sum()
if total > 0:
probs /= total
return probs
def _ranked_entries(indices: np.ndarray, probs: np.ndarray, risks: np.ndarray) -> list[dict[str, Any]]:
return [
{
"class": int(i),
"probability": float(probs[i]),
"expected_risk": float(risks[i]),
"params": PARAMETER_SETS[int(i)],
}
for i in indices
]
def predict_from_features(features: list[float], top_k: int = 3) -> dict[str, Any]:
probs = _predict_probabilities([float(v) for v in features])
risks = _load_cost_matrix() @ probs
selected_idx = int(np.argmin(risks))
probability_idx = np.argsort(probs)[::-1][:top_k]
risk_idx = np.argsort(risks)[:top_k]
probability_argmax = int(np.argmax(probs))
return {
"selected_class": selected_idx,
"selected_params": PARAMETER_SETS[selected_idx],
"selection_method": "minimum_expected_risk",
"probability_argmax_class": probability_argmax,
"probability_argmax_params": PARAMETER_SETS[probability_argmax],
"top_by_probability": _ranked_entries(probability_idx, probs, risks),
"top_by_expected_risk": _ranked_entries(risk_idx, probs, risks),
}
|