Spaces:
Sleeping
Sleeping
File size: 5,064 Bytes
4937cba | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """Model loading and prediction service helpers."""
from __future__ import annotations
import json
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any
import joblib
import pandas as pd
import yaml
from src.data_ingestion import EXPECTED_COLUMNS
DEFAULT_MODEL_PATH = Path("models/model.pkl")
DEFAULT_PREPROCESSOR_PATH = Path("models/preprocessor.pkl")
DEFAULT_TRAINING_REPORT_PATH = Path("artifacts/model_training_report.json")
DEFAULT_MODEL_REPORT_PATH = Path("artifacts/model_report.json")
DEFAULT_CONFIG_PATH = Path("configs/train.yaml")
FEATURE_COLUMNS = [column for column in EXPECTED_COLUMNS if column != "Class"]
@dataclass
class InferenceService:
"""Encapsulate model/preprocessor runtime and prediction logic."""
model: Any
preprocessor: Any
threshold: float
model_path: Path
preprocessor_path: Path
feature_columns: list[str]
def predict_records(self, records: list[dict[str, float]]) -> list[dict[str, Any]]:
"""Predict fraud labels/probabilities for input transaction records."""
frame = pd.DataFrame(records)
frame = frame[self.feature_columns]
transformed = self.preprocessor.transform(frame)
probabilities = self.model.predict_proba(transformed)[:, 1]
outputs: list[dict[str, Any]] = []
for prob in probabilities:
probability = float(prob)
outputs.append(
{
"is_fraud": bool(probability >= self.threshold),
"fraud_probability": probability,
"risk_level": _risk_level(probability),
"threshold": float(self.threshold),
}
)
return outputs
def _risk_level(probability: float) -> str:
if probability >= 0.7:
return "high"
if probability >= 0.3:
return "medium"
return "low"
def _threshold_from_training_report(training_report_path: Path) -> float | None:
if not training_report_path.exists():
return None
payload = json.loads(training_report_path.read_text(encoding="utf-8"))
best = payload.get("best_model", {})
threshold = best.get("selected_threshold")
return float(threshold) if threshold is not None else None
def _threshold_from_model_report(model_report_path: Path) -> float | None:
if not model_report_path.exists():
return None
payload = json.loads(model_report_path.read_text(encoding="utf-8"))
selection = payload.get("threshold_selection", {})
threshold = selection.get("selected_threshold")
return float(threshold) if threshold is not None else None
def _threshold_from_config(config_path: Path) -> float | None:
if not config_path.exists():
return None
config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
threshold_cfg = config.get("threshold", {})
threshold = threshold_cfg.get("decision_threshold")
return float(threshold) if threshold is not None else None
def resolve_threshold(
*,
training_report_path: Path = DEFAULT_TRAINING_REPORT_PATH,
model_report_path: Path = DEFAULT_MODEL_REPORT_PATH,
config_path: Path = DEFAULT_CONFIG_PATH,
) -> float:
"""Resolve runtime threshold from artifacts, then fallback config/default."""
value = _threshold_from_training_report(training_report_path)
if value is not None:
return value
value = _threshold_from_model_report(model_report_path)
if value is not None:
return value
value = _threshold_from_config(config_path)
if value is not None:
return value
return 0.5
@lru_cache(maxsize=1)
def load_inference_service(
*,
model_path: str = str(DEFAULT_MODEL_PATH),
preprocessor_path: str = str(DEFAULT_PREPROCESSOR_PATH),
training_report_path: str = str(DEFAULT_TRAINING_REPORT_PATH),
model_report_path: str = str(DEFAULT_MODEL_REPORT_PATH),
config_path: str = str(DEFAULT_CONFIG_PATH),
) -> InferenceService:
"""Load model + preprocessor + threshold and cache service singleton."""
model_file = Path(model_path)
preprocessor_file = Path(preprocessor_path)
if not model_file.exists():
raise FileNotFoundError(f"Model artifact not found: {model_file}")
if not preprocessor_file.exists():
raise FileNotFoundError(f"Preprocessor artifact not found: {preprocessor_file}")
model = joblib.load(model_file)
preprocessor = joblib.load(preprocessor_file)
threshold = resolve_threshold(
training_report_path=Path(training_report_path),
model_report_path=Path(model_report_path),
config_path=Path(config_path),
)
feature_names_in = getattr(preprocessor, "feature_names_in_", FEATURE_COLUMNS)
feature_columns = list(feature_names_in)
return InferenceService(
model=model,
preprocessor=preprocessor,
threshold=threshold,
model_path=model_file,
preprocessor_path=preprocessor_file,
feature_columns=feature_columns,
)
|