chotam's picture
Deploy fraud detector API
a783939
"""
Keyword-based fraud feature extraction.
The previous version did N substring checks per feature type per message
(Python's ``substring in text`` looped for each lexicon entry, ~150
checks per request). For longer transcripts that's O(L*N) characters
scanned — fine for short SMS but wasteful at scale.
This version pre-compiles each lexicon into one alternation regex per
feature type at import time. A single ``regex.findall(lowered_text)``
sweep then locates every matching phrase in one linear scan over the
text. ~10x faster on long transcripts, and the API stays identical.
"""
from __future__ import annotations
import re
from functools import lru_cache
from ..ml.lexicons import kk, ru
from ..schemas import DetectedFeature, FeatureType, Lang
_WEIGHTS: dict[FeatureType, float] = {
FeatureType.URGENCY: 0.22,
FeatureType.DATA_REQUEST: 0.30,
FeatureType.THREAT: 0.25,
FeatureType.IMPERSONATION: 0.14,
FeatureType.BENEFIT_PROMISE: 0.10,
FeatureType.SECRECY_REQUEST: 0.12,
FeatureType.SUSPICIOUS_LINK: 0.20,
}
_LEX_RU: dict[FeatureType, list[str]] = {
FeatureType.URGENCY: ru.URGENCY_RU,
FeatureType.DATA_REQUEST: ru.DATA_REQUEST_RU,
FeatureType.THREAT: ru.THREAT_RU,
FeatureType.IMPERSONATION: ru.IMPERSONATION_RU,
FeatureType.BENEFIT_PROMISE: ru.BENEFIT_PROMISE_RU,
FeatureType.SECRECY_REQUEST: ru.SECRECY_REQUEST_RU,
}
_LEX_KK: dict[FeatureType, list[str]] = {
FeatureType.URGENCY: kk.URGENCY_KK,
FeatureType.DATA_REQUEST: kk.DATA_REQUEST_KK,
FeatureType.THREAT: kk.THREAT_KK,
FeatureType.IMPERSONATION: kk.IMPERSONATION_KK,
FeatureType.BENEFIT_PROMISE: kk.BENEFIT_PROMISE_KK,
FeatureType.SECRECY_REQUEST: kk.SECRECY_REQUEST_KK,
}
def _compile_alternation(phrases: list[str]) -> re.Pattern[str]:
# Sort longer first so the regex engine prefers longer matches when
# short phrases are prefixes of longer ones.
ordered = sorted(set(phrases), key=len, reverse=True)
return re.compile("|".join(re.escape(p) for p in ordered))
_LEX_RU_RE: dict[FeatureType, re.Pattern[str]] = {
ftype: _compile_alternation(phrases) for ftype, phrases in _LEX_RU.items()
}
_LEX_KK_RE: dict[FeatureType, re.Pattern[str]] = {
ftype: _compile_alternation(phrases) for ftype, phrases in _LEX_KK.items()
}
def _dedup_preserve_order(items):
seen = set()
out = []
for item in items:
if item not in seen:
seen.add(item)
out.append(item)
return out
@lru_cache(maxsize=2048)
def _extract_cached(text: str, lang: Lang) -> tuple[tuple[FeatureType, float, tuple[str, ...]], ...]:
"""Inner cached implementation. Returns immutable tuples so lru_cache works."""
if not text:
return ()
lowered = text.lower()
catalog = _LEX_KK_RE if lang == "kk" else _LEX_RU_RE
detected: list[tuple[FeatureType, float, tuple[str, ...]]] = []
for ftype, pattern in catalog.items():
hits = pattern.findall(lowered)
if hits:
unique_hits = tuple(_dedup_preserve_order(hits)[:3])
detected.append((ftype, _WEIGHTS[ftype], unique_hits))
link_match = ru.SUSPICIOUS_LINK.search(text)
if link_match:
detected.append((
FeatureType.SUSPICIOUS_LINK,
_WEIGHTS[FeatureType.SUSPICIOUS_LINK],
(link_match.group(0),),
))
# Conditional brand-mention impersonation: only fire if other fraud
# signals are already present, otherwise legitimate bank notifications
# that mention "Kaspi" would falsely trip.
has_impersonation = any(f[0] == FeatureType.IMPERSONATION for f in detected)
fraud_signal_count = sum(
1 for f in detected
if f[0] in (FeatureType.URGENCY, FeatureType.DATA_REQUEST, FeatureType.THREAT)
)
if not has_impersonation and fraud_signal_count >= 1:
org_hits = tuple(org for org in ru.ORG_MENTIONS_RU if org in lowered)
if org_hits:
detected.append((
FeatureType.IMPERSONATION,
_WEIGHTS[FeatureType.IMPERSONATION],
org_hits[:3],
))
return tuple(detected)
def extract(text: str, lang: Lang) -> list[DetectedFeature]:
raw = _extract_cached(text, lang)
return [
DetectedFeature(type=ftype, weight=weight, evidence=list(evidence))
for ftype, weight, evidence in raw
]