File size: 3,428 Bytes
2767c41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from __future__ import annotations

import json
import warnings
from functools import lru_cache
from pathlib import Path
from typing import Any

import joblib
import numpy as np


APP_DIR = Path(__file__).resolve().parent
MODEL_PATH = APP_DIR / "models" / "ensemble_models.joblib"
COST_MATRIX_PATH = APP_DIR / "cost_matrix.npy"
METADATA_PATH = APP_DIR / "model_metadata.json"


with METADATA_PATH.open() as f:
    _METADATA = json.load(f)

N_FEATURES = int(_METADATA["n_features"])
N_CLASSES = int(_METADATA["n_classes"])
PARAMETER_SETS = _METADATA["parameter_sets"]


@lru_cache(maxsize=1)
def _load_models() -> list[Any]:
    return joblib.load(MODEL_PATH)


@lru_cache(maxsize=1)
def _load_cost_matrix() -> np.ndarray:
    costs = np.load(COST_MATRIX_PATH)
    if costs.shape != (N_CLASSES, N_CLASSES):
        raise ValueError(f"Expected cost matrix {(N_CLASSES, N_CLASSES)}, got {costs.shape}")
    return costs.astype(np.float64, copy=False)


def get_metadata() -> dict[str, Any]:
    return {
        "model_type": _METADATA["model_type"],
        "model_source": _METADATA["model_source"],
        "n_models": _METADATA["n_models"],
        "n_features": N_FEATURES,
        "n_classes": N_CLASSES,
        "feature_columns": _METADATA["feature_columns"],
        "parameter_sets": PARAMETER_SETS,
        "cost_matrix_shape": _METADATA["cost_matrix_shape"],
    }


def _predict_probabilities(features: list[float]) -> np.ndarray:
    x = np.asarray(features, dtype=np.float64).reshape(1, -1)
    probs = np.zeros(N_CLASSES, dtype=np.float64)

    for model in _load_models():
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", message="X does not have valid feature names")
            model_probs = np.asarray(model.predict_proba(x)[0], dtype=np.float64)
        if model_probs.shape[0] == N_CLASSES:
            probs += model_probs
            continue

        full_probs = np.zeros(N_CLASSES, dtype=np.float64)
        classes = getattr(model, "classes_", [])
        for src_idx, class_id in enumerate(classes):
            full_probs[int(class_id)] = model_probs[src_idx]
        probs += full_probs

    probs /= len(_load_models())
    total = probs.sum()
    if total > 0:
        probs /= total
    return probs


def _ranked_entries(indices: np.ndarray, probs: np.ndarray, risks: np.ndarray) -> list[dict[str, Any]]:
    return [
        {
            "class": int(i),
            "probability": float(probs[i]),
            "expected_risk": float(risks[i]),
            "params": PARAMETER_SETS[int(i)],
        }
        for i in indices
    ]


def predict_from_features(features: list[float], top_k: int = 3) -> dict[str, Any]:
    probs = _predict_probabilities([float(v) for v in features])
    risks = _load_cost_matrix() @ probs
    selected_idx = int(np.argmin(risks))
    probability_idx = np.argsort(probs)[::-1][:top_k]
    risk_idx = np.argsort(risks)[:top_k]
    probability_argmax = int(np.argmax(probs))

    return {
        "selected_class": selected_idx,
        "selected_params": PARAMETER_SETS[selected_idx],
        "selection_method": "minimum_expected_risk",
        "probability_argmax_class": probability_argmax,
        "probability_argmax_params": PARAMETER_SETS[probability_argmax],
        "top_by_probability": _ranked_entries(probability_idx, probs, risks),
        "top_by_expected_risk": _ranked_entries(risk_idx, probs, risks),
    }