Spaces:
Sleeping
Sleeping
| """ | |
| Fraud classifier inference wrapper with LRU cache. | |
| The classifier sees the same text many times in normal operation: | |
| identical SMS-spam blasts, repeated retries from the mobile client, etc. | |
| Caching the (normalized text β probability) mapping cheaply absorbs | |
| those duplicates without re-running the TF-IDF + ensemble vote. | |
| Supports two pickle layouts: | |
| * **v1 legacy** β ``{"word", "char", "clf"}`` from the original single | |
| LogisticRegression training (kept for backward compatibility). | |
| * **v2 ensemble** β ``{"word", "char", "members", "version": 2}`` where | |
| ``members`` is a list of ``(name, calibrated_classifier)`` tuples whose | |
| positive-class probabilities are soft-averaged for the final score. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import numpy as np | |
| from scipy.sparse import hstack | |
| from ..services.preprocess import normalize_for_classifier | |
| log = logging.getLogger("fraud.classifier") | |
| class FraudClassifier: | |
| def __init__(self, bundle: dict[str, Any]): | |
| self.word_vec = bundle["word"] | |
| self.char_vec = bundle["char"] | |
| self.version = int(bundle.get("version", 1)) | |
| if self.version >= 2 and "members" in bundle: | |
| self._members = [clf for _, clf in bundle["members"]] | |
| else: | |
| self._members = [bundle["clf"]] | |
| # Predictions are deterministic given the model β cache them. | |
| # 4096 slots = ~1 MB max RAM with our typical string sizes; plenty | |
| # of headroom for hot duplicate inputs. | |
| self._predict_proba_cached = lru_cache(maxsize=4096)( | |
| self._predict_proba_uncached | |
| ) | |
| def _vectorise(self, normalised: str): | |
| word = self.word_vec.transform([normalised]) | |
| char = self.char_vec.transform([normalised]) | |
| return hstack([word, char]).tocsr() | |
| def _predict_proba_uncached(self, normalised: str) -> float: | |
| if not normalised: | |
| return 0.0 | |
| features = self._vectorise(normalised) | |
| probas = np.zeros(1, dtype=float) | |
| for member in self._members: | |
| classes = list(member.classes_) | |
| pos = classes.index(1) if 1 in classes else -1 | |
| probas += member.predict_proba(features)[0][pos] | |
| probas /= len(self._members) | |
| return float(probas[0]) | |
| def predict_proba(self, text: str) -> float: | |
| if not text: | |
| return 0.0 | |
| normalised = normalize_for_classifier(text) | |
| return self._predict_proba_cached(normalised) | |
| def load_classifier(path: str | Path) -> FraudClassifier | None: | |
| p = Path(path) | |
| if not p.exists(): | |
| log.warning("Classifier model not found at %s β running without ML signal", p) | |
| return None | |
| bundle = joblib.load(p) | |
| layout = "ensemble v2" if int(bundle.get("version", 1)) >= 2 else "legacy v1" | |
| log.info("Loaded classifier from %s (%s, %d members)", | |
| p, layout, len(bundle.get("members", [None]))) | |
| return FraudClassifier(bundle) | |