fraud-detector-api / app /ml /classifier.py
chotam's picture
Deploy fraud detector API
a783939
"""
Fraud classifier inference wrapper with LRU cache.
The classifier sees the same text many times in normal operation:
identical SMS-spam blasts, repeated retries from the mobile client, etc.
Caching the (normalized text β†’ probability) mapping cheaply absorbs
those duplicates without re-running the TF-IDF + ensemble vote.
Supports two pickle layouts:
* **v1 legacy** β€” ``{"word", "char", "clf"}`` from the original single
LogisticRegression training (kept for backward compatibility).
* **v2 ensemble** β€” ``{"word", "char", "members", "version": 2}`` where
``members`` is a list of ``(name, calibrated_classifier)`` tuples whose
positive-class probabilities are soft-averaged for the final score.
"""
from __future__ import annotations
import logging
from functools import lru_cache
from pathlib import Path
from typing import Any
import joblib
import numpy as np
from scipy.sparse import hstack
from ..services.preprocess import normalize_for_classifier
log = logging.getLogger("fraud.classifier")
class FraudClassifier:
def __init__(self, bundle: dict[str, Any]):
self.word_vec = bundle["word"]
self.char_vec = bundle["char"]
self.version = int(bundle.get("version", 1))
if self.version >= 2 and "members" in bundle:
self._members = [clf for _, clf in bundle["members"]]
else:
self._members = [bundle["clf"]]
# Predictions are deterministic given the model β€” cache them.
# 4096 slots = ~1 MB max RAM with our typical string sizes; plenty
# of headroom for hot duplicate inputs.
self._predict_proba_cached = lru_cache(maxsize=4096)(
self._predict_proba_uncached
)
def _vectorise(self, normalised: str):
word = self.word_vec.transform([normalised])
char = self.char_vec.transform([normalised])
return hstack([word, char]).tocsr()
def _predict_proba_uncached(self, normalised: str) -> float:
if not normalised:
return 0.0
features = self._vectorise(normalised)
probas = np.zeros(1, dtype=float)
for member in self._members:
classes = list(member.classes_)
pos = classes.index(1) if 1 in classes else -1
probas += member.predict_proba(features)[0][pos]
probas /= len(self._members)
return float(probas[0])
def predict_proba(self, text: str) -> float:
if not text:
return 0.0
normalised = normalize_for_classifier(text)
return self._predict_proba_cached(normalised)
def load_classifier(path: str | Path) -> FraudClassifier | None:
p = Path(path)
if not p.exists():
log.warning("Classifier model not found at %s β€” running without ML signal", p)
return None
bundle = joblib.load(p)
layout = "ensemble v2" if int(bundle.get("version", 1)) >= 2 else "legacy v1"
log.info("Loaded classifier from %s (%s, %d members)",
p, layout, len(bundle.get("members", [None])))
return FraudClassifier(bundle)