File size: 3,065 Bytes
a783939
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
Fraud classifier inference wrapper with LRU cache.

The classifier sees the same text many times in normal operation:
identical SMS-spam blasts, repeated retries from the mobile client, etc.
Caching the (normalized text → probability) mapping cheaply absorbs
those duplicates without re-running the TF-IDF + ensemble vote.

Supports two pickle layouts:

* **v1 legacy** — ``{"word", "char", "clf"}`` from the original single
  LogisticRegression training (kept for backward compatibility).
* **v2 ensemble** — ``{"word", "char", "members", "version": 2}`` where
  ``members`` is a list of ``(name, calibrated_classifier)`` tuples whose
  positive-class probabilities are soft-averaged for the final score.
"""
from __future__ import annotations

import logging
from functools import lru_cache
from pathlib import Path
from typing import Any

import joblib
import numpy as np
from scipy.sparse import hstack

from ..services.preprocess import normalize_for_classifier

log = logging.getLogger("fraud.classifier")


class FraudClassifier:
    def __init__(self, bundle: dict[str, Any]):
        self.word_vec = bundle["word"]
        self.char_vec = bundle["char"]
        self.version = int(bundle.get("version", 1))

        if self.version >= 2 and "members" in bundle:
            self._members = [clf for _, clf in bundle["members"]]
        else:
            self._members = [bundle["clf"]]

        # Predictions are deterministic given the model — cache them.
        # 4096 slots = ~1 MB max RAM with our typical string sizes; plenty
        # of headroom for hot duplicate inputs.
        self._predict_proba_cached = lru_cache(maxsize=4096)(
            self._predict_proba_uncached
        )

    def _vectorise(self, normalised: str):
        word = self.word_vec.transform([normalised])
        char = self.char_vec.transform([normalised])
        return hstack([word, char]).tocsr()

    def _predict_proba_uncached(self, normalised: str) -> float:
        if not normalised:
            return 0.0
        features = self._vectorise(normalised)
        probas = np.zeros(1, dtype=float)
        for member in self._members:
            classes = list(member.classes_)
            pos = classes.index(1) if 1 in classes else -1
            probas += member.predict_proba(features)[0][pos]
        probas /= len(self._members)
        return float(probas[0])

    def predict_proba(self, text: str) -> float:
        if not text:
            return 0.0
        normalised = normalize_for_classifier(text)
        return self._predict_proba_cached(normalised)


def load_classifier(path: str | Path) -> FraudClassifier | None:
    p = Path(path)
    if not p.exists():
        log.warning("Classifier model not found at %s — running without ML signal", p)
        return None
    bundle = joblib.load(p)
    layout = "ensemble v2" if int(bundle.get("version", 1)) >= 2 else "legacy v1"
    log.info("Loaded classifier from %s (%s, %d members)",
             p, layout, len(bundle.get("members", [None])))
    return FraudClassifier(bundle)