File size: 4,380 Bytes
a783939
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
Keyword-based fraud feature extraction.

The previous version did N substring checks per feature type per message
(Python's ``substring in text`` looped for each lexicon entry, ~150
checks per request). For longer transcripts that's O(L*N) characters
scanned — fine for short SMS but wasteful at scale.

This version pre-compiles each lexicon into one alternation regex per
feature type at import time. A single ``regex.findall(lowered_text)``
sweep then locates every matching phrase in one linear scan over the
text. ~10x faster on long transcripts, and the API stays identical.
"""
from __future__ import annotations

import re
from functools import lru_cache

from ..ml.lexicons import kk, ru
from ..schemas import DetectedFeature, FeatureType, Lang

_WEIGHTS: dict[FeatureType, float] = {
    FeatureType.URGENCY: 0.22,
    FeatureType.DATA_REQUEST: 0.30,
    FeatureType.THREAT: 0.25,
    FeatureType.IMPERSONATION: 0.14,
    FeatureType.BENEFIT_PROMISE: 0.10,
    FeatureType.SECRECY_REQUEST: 0.12,
    FeatureType.SUSPICIOUS_LINK: 0.20,
}

_LEX_RU: dict[FeatureType, list[str]] = {
    FeatureType.URGENCY: ru.URGENCY_RU,
    FeatureType.DATA_REQUEST: ru.DATA_REQUEST_RU,
    FeatureType.THREAT: ru.THREAT_RU,
    FeatureType.IMPERSONATION: ru.IMPERSONATION_RU,
    FeatureType.BENEFIT_PROMISE: ru.BENEFIT_PROMISE_RU,
    FeatureType.SECRECY_REQUEST: ru.SECRECY_REQUEST_RU,
}

_LEX_KK: dict[FeatureType, list[str]] = {
    FeatureType.URGENCY: kk.URGENCY_KK,
    FeatureType.DATA_REQUEST: kk.DATA_REQUEST_KK,
    FeatureType.THREAT: kk.THREAT_KK,
    FeatureType.IMPERSONATION: kk.IMPERSONATION_KK,
    FeatureType.BENEFIT_PROMISE: kk.BENEFIT_PROMISE_KK,
    FeatureType.SECRECY_REQUEST: kk.SECRECY_REQUEST_KK,
}


def _compile_alternation(phrases: list[str]) -> re.Pattern[str]:
    # Sort longer first so the regex engine prefers longer matches when
    # short phrases are prefixes of longer ones.
    ordered = sorted(set(phrases), key=len, reverse=True)
    return re.compile("|".join(re.escape(p) for p in ordered))


_LEX_RU_RE: dict[FeatureType, re.Pattern[str]] = {
    ftype: _compile_alternation(phrases) for ftype, phrases in _LEX_RU.items()
}
_LEX_KK_RE: dict[FeatureType, re.Pattern[str]] = {
    ftype: _compile_alternation(phrases) for ftype, phrases in _LEX_KK.items()
}


def _dedup_preserve_order(items):
    seen = set()
    out = []
    for item in items:
        if item not in seen:
            seen.add(item)
            out.append(item)
    return out


@lru_cache(maxsize=2048)
def _extract_cached(text: str, lang: Lang) -> tuple[tuple[FeatureType, float, tuple[str, ...]], ...]:
    """Inner cached implementation. Returns immutable tuples so lru_cache works."""
    if not text:
        return ()
    lowered = text.lower()
    catalog = _LEX_KK_RE if lang == "kk" else _LEX_RU_RE

    detected: list[tuple[FeatureType, float, tuple[str, ...]]] = []
    for ftype, pattern in catalog.items():
        hits = pattern.findall(lowered)
        if hits:
            unique_hits = tuple(_dedup_preserve_order(hits)[:3])
            detected.append((ftype, _WEIGHTS[ftype], unique_hits))

    link_match = ru.SUSPICIOUS_LINK.search(text)
    if link_match:
        detected.append((
            FeatureType.SUSPICIOUS_LINK,
            _WEIGHTS[FeatureType.SUSPICIOUS_LINK],
            (link_match.group(0),),
        ))

    # Conditional brand-mention impersonation: only fire if other fraud
    # signals are already present, otherwise legitimate bank notifications
    # that mention "Kaspi" would falsely trip.
    has_impersonation = any(f[0] == FeatureType.IMPERSONATION for f in detected)
    fraud_signal_count = sum(
        1 for f in detected
        if f[0] in (FeatureType.URGENCY, FeatureType.DATA_REQUEST, FeatureType.THREAT)
    )
    if not has_impersonation and fraud_signal_count >= 1:
        org_hits = tuple(org for org in ru.ORG_MENTIONS_RU if org in lowered)
        if org_hits:
            detected.append((
                FeatureType.IMPERSONATION,
                _WEIGHTS[FeatureType.IMPERSONATION],
                org_hits[:3],
            ))

    return tuple(detected)


def extract(text: str, lang: Lang) -> list[DetectedFeature]:
    raw = _extract_cached(text, lang)
    return [
        DetectedFeature(type=ftype, weight=weight, evidence=list(evidence))
        for ftype, weight, evidence in raw
    ]