""" Fraud classifier inference wrapper with LRU cache. The classifier sees the same text many times in normal operation: identical SMS-spam blasts, repeated retries from the mobile client, etc. Caching the (normalized text → probability) mapping cheaply absorbs those duplicates without re-running the TF-IDF + ensemble vote. Supports two pickle layouts: * **v1 legacy** — ``{"word", "char", "clf"}`` from the original single LogisticRegression training (kept for backward compatibility). * **v2 ensemble** — ``{"word", "char", "members", "version": 2}`` where ``members`` is a list of ``(name, calibrated_classifier)`` tuples whose positive-class probabilities are soft-averaged for the final score. """ from __future__ import annotations import logging from functools import lru_cache from pathlib import Path from typing import Any import joblib import numpy as np from scipy.sparse import hstack from ..services.preprocess import normalize_for_classifier log = logging.getLogger("fraud.classifier") class FraudClassifier: def __init__(self, bundle: dict[str, Any]): self.word_vec = bundle["word"] self.char_vec = bundle["char"] self.version = int(bundle.get("version", 1)) if self.version >= 2 and "members" in bundle: self._members = [clf for _, clf in bundle["members"]] else: self._members = [bundle["clf"]] # Predictions are deterministic given the model — cache them. # 4096 slots = ~1 MB max RAM with our typical string sizes; plenty # of headroom for hot duplicate inputs. self._predict_proba_cached = lru_cache(maxsize=4096)( self._predict_proba_uncached ) def _vectorise(self, normalised: str): word = self.word_vec.transform([normalised]) char = self.char_vec.transform([normalised]) return hstack([word, char]).tocsr() def _predict_proba_uncached(self, normalised: str) -> float: if not normalised: return 0.0 features = self._vectorise(normalised) probas = np.zeros(1, dtype=float) for member in self._members: classes = list(member.classes_) pos = classes.index(1) if 1 in classes else -1 probas += member.predict_proba(features)[0][pos] probas /= len(self._members) return float(probas[0]) def predict_proba(self, text: str) -> float: if not text: return 0.0 normalised = normalize_for_classifier(text) return self._predict_proba_cached(normalised) def load_classifier(path: str | Path) -> FraudClassifier | None: p = Path(path) if not p.exists(): log.warning("Classifier model not found at %s — running without ML signal", p) return None bundle = joblib.load(p) layout = "ensemble v2" if int(bundle.get("version", 1)) >= 2 else "legacy v1" log.info("Loaded classifier from %s (%s, %d members)", p, layout, len(bundle.get("members", [None]))) return FraudClassifier(bundle)