| """ |
| Scam Detection Module using IndicBERT. |
| |
| Provides hybrid scam detection combining: |
| - IndicBERT transformer model for semantic classification |
| - Keyword matching for known scam patterns |
| - Multi-language support (English, Hindi, Hinglish) |
| |
| Acceptance Criteria: |
| - AC-1.2.1: Achieves >90% accuracy on test dataset |
| - AC-1.2.2: False positive rate <5% |
| - AC-1.2.3: Inference time <500ms per message |
| - AC-1.2.4: Handles messages up to 5000 characters |
| - AC-1.2.5: Returns calibrated confidence scores (not just 0/1) |
| """ |
|
|
| import os |
| import re |
| import time |
| from typing import Dict, List, Optional, Tuple |
|
|
| import torch |
|
|
| from app.config import settings |
| from app.utils.logger import get_logger |
| from app.utils.preprocessing import clean_text, convert_devanagari_digits |
|
|
| logger = get_logger(__name__) |
|
|
| |
| |
| |
| BERT_WEIGHT_FINETUNED = 0.6 |
| BERT_WEIGHT_BASE = 0.2 |
| KEYWORD_WEIGHT_FINETUNED = 0.4 |
| KEYWORD_WEIGHT_BASE = 0.8 |
|
|
| |
| SCAM_THRESHOLD = 0.6 |
|
|
| |
| MAX_MESSAGE_LENGTH = 5000 |
|
|
|
|
| class ScamDetector: |
| """ |
| Hybrid scam detection using IndicBERT and keyword matching. |
| |
| Combines transformer-based semantic analysis with rule-based |
| keyword matching for robust scam detection across English, |
| Hindi, and Hinglish messages. |
| |
| Attributes: |
| model: IndicBERT model for sequence classification |
| tokenizer: Tokenizer for IndicBERT |
| en_keywords: English scam keyword list |
| hi_keywords: Hindi scam keyword list |
| _model_loaded: Flag indicating if BERT model is available |
| """ |
| |
| |
| _cached_model = None |
| _cached_tokenizer = None |
| _model_load_attempted = False |
| |
| def __init__(self, load_model: bool = True) -> None: |
| """ |
| Initialize the ScamDetector with IndicBERT model and keywords. |
| |
| Args: |
| load_model: Whether to load the BERT model (can be False for testing) |
| """ |
| self._model_loaded = False |
| self._model_finetuned = False |
| self.model = None |
| self.tokenizer = None |
| |
| |
| self.en_keywords: List[str] = [ |
| |
| "won", "winner", "prize", "lottery", "congratulations", "claim", |
| "selected", "lucky", "reward", "jackpot", "lakh", "crore", |
| |
| "otp", "bank", "account", "transfer", "payment", "upi", |
| "verify", "blocked", "suspended", "deactivated", "kyc", |
| "credit card", "debit card", "cvv", "pin", |
| |
| "police", "arrest", "court", "legal", "investigation", |
| "warrant", "fine", "penalty", "department", |
| |
| "electricity", "electric bill", "power bill", "power cut", |
| "disconnection", "utility", "gas bill", "water bill", |
| "pending bill", "overdue", "outstanding dues", |
| |
| "job offer", "work from home", "earn from home", "hiring", |
| "salary", "employment opportunity", |
| |
| "income tax", "tax notice", "tax department", "it department", |
| |
| "tech support", "computer virus", "microsoft support", |
| |
| "government scheme", "subsidy", "pm scheme", "govt scheme", |
| |
| "urgent", "immediately", "now", "today", "expire", "last chance", |
| "limited time", "hurry", "before", "deadline", |
| |
| "click", "call", "send", "share", "confirm", "update", |
| "reactivate", "unblock", "incomplete", |
| |
| "iphone", "samsung", "free", "gift", |
| ] |
| |
| |
| self.hi_keywords: List[str] = [ |
| |
| "जीत", "जीता", "जीते", "विजेता", "इनाम", "लॉटरी", "बधाई", "पुरस्कार", |
| |
| "ओटीपी", "बैंक", "खाता", "ट्रांसफर", "भुगतान", "यूपीआई", |
| "वेरिफाई", "ब्लॉक", "सस्पेंड", "बंद", |
| |
| "पुलिस", "गिरफ्तार", "गिरफ्तारी", "कोर्ट", "कानूनी", "जांच", |
| "वारंट", "जुर्माना", |
| |
| "तुरंत", "अभी", "आज", "जल्दी", "फौरन", |
| |
| "भेजें", "शेयर", "कॉल", "क्लिक", |
| ] |
| |
| |
| self.hinglish_keywords: List[str] = [ |
| "jeeta", "jeete", "jeet", "inaam", "lottery", |
| "otp", "bank", "account", "paisa", "paise", "rupees", "rupaye", |
| "police", "giraftar", "arrest", "court", |
| "turant", "abhi", "jaldi", "foran", |
| "bhejo", "share", "call", "click", |
| ] |
| |
| |
| self.scam_patterns = [ |
| r"₹\s*\d+\s*(lakh|crore|lac|cr)", |
| r"\d+\s*(lakh|crore|lac|cr)\s*(rupees?)?", |
| r"won\s+.*?(prize|lottery|reward)", |
| r"(send|share)\s+.*?otp", |
| r"account\s+.*?(block|suspend|deactivat)", |
| r"(arrest|गिरफ्तार)", |
| r"call\s+.*?\+?91[\s-]?\d{10}", |
| ] |
| |
| |
| if load_model: |
| self._load_model() |
| |
| |
| _cached_model_finetuned = False |
| |
| def _load_model(self) -> None: |
| """ |
| Load IndicBERT model and tokenizer. |
| |
| Prioritizes loading fine-tuned model from local directory. |
| Falls back to base IndicBERT model from HuggingFace. |
| Falls back to keyword-only detection if model unavailable. |
| """ |
| |
| if ScamDetector._cached_model is not None: |
| self.model = ScamDetector._cached_model |
| self.tokenizer = ScamDetector._cached_tokenizer |
| self._model_loaded = True |
| self._model_finetuned = ScamDetector._cached_model_finetuned |
| logger.debug(f"Using cached model (fine-tuned: {self._model_finetuned})") |
| return |
| |
| |
| if ScamDetector._model_load_attempted: |
| logger.debug("Skipping model load (previous attempt failed)") |
| return |
| |
| ScamDetector._model_load_attempted = True |
| |
| try: |
| from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer |
| |
| |
| finetuned_path = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), |
| "models", |
| "scam_detector", |
| "latest" |
| ) |
| |
| if os.path.exists(finetuned_path): |
| logger.info(f"Loading fine-tuned model from: {finetuned_path}") |
| start_time = time.time() |
| |
| self.tokenizer = AutoTokenizer.from_pretrained(finetuned_path) |
| self.model = AutoModelForSequenceClassification.from_pretrained(finetuned_path) |
| self.model.eval() |
| self._model_finetuned = True |
| |
| |
| ScamDetector._cached_model = self.model |
| ScamDetector._cached_tokenizer = self.tokenizer |
| ScamDetector._cached_model_finetuned = True |
| |
| load_time = time.time() - start_time |
| logger.info(f"Fine-tuned model loaded in {load_time:.2f}s") |
| self._model_loaded = True |
| return |
| |
| |
| model_name = settings.INDICBERT_MODEL |
| token = settings.HUGGINGFACE_TOKEN |
| token_kwargs = {"token": token} if token else {} |
| |
| logger.info(f"Loading base IndicBERT model: {model_name}") |
| start_time = time.time() |
| |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name, **token_kwargs) |
| self.model = AutoModel.from_pretrained(model_name, **token_kwargs) |
| self.model.eval() |
| self._model_finetuned = False |
| |
| |
| ScamDetector._cached_model = self.model |
| ScamDetector._cached_tokenizer = self.tokenizer |
| ScamDetector._cached_model_finetuned = False |
| |
| load_time = time.time() - start_time |
| logger.info(f"Base IndicBERT loaded in {load_time:.2f}s") |
| self._model_loaded = True |
| |
| except ImportError as e: |
| logger.warning(f"transformers not installed: {e}") |
| logger.warning("Falling back to keyword-only detection") |
| except Exception as e: |
| error_msg = str(e).lower() |
| if "gated" in error_msg or "access" in error_msg: |
| logger.warning("IndicBERT requires HuggingFace authentication") |
| logger.warning("Set HUGGINGFACE_TOKEN environment variable") |
| else: |
| logger.warning(f"Failed to load IndicBERT: {e}") |
| logger.warning("Falling back to keyword-only detection") |
| |
| def detect(self, message: str, language: str = "auto") -> Dict: |
| """ |
| Detect if a message is a scam. |
| |
| Uses hybrid approach combining: |
| 1. IndicBERT semantic classification (60% weight) |
| 2. Keyword matching (40% weight) |
| |
| Args: |
| message: Input text to analyze (max 5000 chars) |
| language: Language code ('auto', 'en', 'hi', 'hinglish') |
| |
| Returns: |
| Dict containing: |
| - scam_detected: bool (True if confidence > 0.7) |
| - confidence: float (0.0-1.0) |
| - language: str (detected or provided language) |
| - indicators: List[str] (matched keywords/patterns) |
| """ |
| start_time = time.time() |
| |
| |
| if not message or not message.strip(): |
| logger.debug("Empty message, returning not scam") |
| return { |
| "scam_detected": False, |
| "confidence": 0.0, |
| "language": language if language != "auto" else "en", |
| "indicators": [], |
| } |
| |
| |
| message = clean_text(message) |
| if len(message) > MAX_MESSAGE_LENGTH: |
| message = message[:MAX_MESSAGE_LENGTH] |
| logger.debug(f"Message truncated to {MAX_MESSAGE_LENGTH} chars") |
| |
| |
| detected_language = language |
| if language == "auto": |
| from app.models.language import detect_language |
| detected_language, _ = detect_language(message) |
| |
| |
| keyword_score, indicators = self._keyword_match(message, detected_language) |
| |
| |
| if self._model_loaded: |
| bert_score = self._bert_classify(message) |
| |
| if self._model_finetuned: |
| final_confidence = BERT_WEIGHT_FINETUNED * bert_score + KEYWORD_WEIGHT_FINETUNED * keyword_score |
| else: |
| |
| final_confidence = BERT_WEIGHT_BASE * bert_score + KEYWORD_WEIGHT_BASE * keyword_score |
| else: |
| |
| final_confidence = keyword_score |
| |
| |
| pattern_indicators = self._pattern_match(message) |
| indicators.extend(pattern_indicators) |
| |
| |
| if pattern_indicators: |
| pattern_boost = min(len(pattern_indicators) * 0.1, 0.2) |
| final_confidence = min(1.0, final_confidence + pattern_boost) |
| |
| |
| scam_detected = final_confidence >= SCAM_THRESHOLD |
| |
| |
| elapsed_ms = (time.time() - start_time) * 1000 |
| logger.debug( |
| f"Detection: scam={scam_detected}, conf={final_confidence:.2f}, " |
| f"lang={detected_language}, time={elapsed_ms:.0f}ms" |
| ) |
| |
| return { |
| "scam_detected": scam_detected, |
| "confidence": float(round(final_confidence, 4)), |
| "language": detected_language, |
| "indicators": list(set(indicators)), |
| } |
| |
| def _keyword_match(self, message: str, language: str) -> Tuple[float, List[str]]: |
| """ |
| Calculate keyword-based scam score. |
| |
| Args: |
| message: Input text |
| language: Language code ('en', 'hi', 'hinglish') |
| |
| Returns: |
| Tuple of (score, matched_keywords) |
| Score is normalized to 0.0-1.0 |
| """ |
| |
| message_lower = message.lower() |
| message_normalized = convert_devanagari_digits(message_lower) |
| |
| matched_keywords = [] |
| |
| |
| for kw in self.en_keywords: |
| if kw.lower() in message_lower: |
| matched_keywords.append(kw) |
| |
| |
| if language in ["hi", "hinglish"] or self._has_devanagari(message): |
| for kw in self.hi_keywords: |
| if kw in message: |
| matched_keywords.append(kw) |
| |
| |
| for kw in self.hinglish_keywords: |
| if kw in message_lower: |
| matched_keywords.append(kw) |
| |
| |
| |
| match_count = len(set(matched_keywords)) |
| |
| if match_count == 0: |
| score = 0.0 |
| elif match_count == 1: |
| score = 0.3 |
| elif match_count == 2: |
| score = 0.5 |
| elif match_count == 3: |
| score = 0.7 |
| elif match_count == 4: |
| score = 0.85 |
| else: |
| score = min(0.95, 0.85 + (match_count - 4) * 0.02) |
| |
| return score, matched_keywords |
| |
| def _bert_classify(self, message: str) -> float: |
| """ |
| Classify message using BERT model. |
| |
| If model is fine-tuned for sequence classification, uses direct prediction. |
| Otherwise, uses embedding-based heuristic approach. |
| |
| Args: |
| message: Input text |
| |
| Returns: |
| Scam probability between 0.0 and 1.0 |
| """ |
| if not self._model_loaded: |
| return 0.0 |
| |
| try: |
| |
| inputs = self.tokenizer( |
| message, |
| return_tensors="pt", |
| truncation=True, |
| max_length=512, |
| padding=True, |
| ) |
| |
| with torch.no_grad(): |
| outputs = self.model(**inputs) |
| |
| |
| if self._model_finetuned and hasattr(outputs, 'logits'): |
| logits = outputs.logits |
| probs = torch.softmax(logits, dim=-1) |
| |
| scam_prob = probs[0, 1].item() |
| return scam_prob |
| |
| |
| |
| |
| last_hidden = outputs.last_hidden_state |
| |
| |
| attention_mask = inputs["attention_mask"] |
| mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden.size()).float() |
| sum_embeddings = torch.sum(last_hidden * mask_expanded, dim=1) |
| sum_mask = torch.clamp(mask_expanded.sum(dim=1), min=1e-9) |
| embeddings = sum_embeddings / sum_mask |
| |
| |
| |
| embedding_norm = torch.norm(embeddings, dim=-1).item() |
| |
| |
| |
| normalized_score = min(1.0, max(0.0, (embedding_norm - 5.0) / 15.0)) |
| |
| return normalized_score |
| |
| except Exception as e: |
| logger.warning(f"BERT classification error: {e}") |
| return 0.0 |
| |
| def _pattern_match(self, message: str) -> List[str]: |
| """ |
| Match scam patterns using regex. |
| |
| Args: |
| message: Input text |
| |
| Returns: |
| List of matched pattern descriptions |
| """ |
| matched_patterns = [] |
| message_lower = message.lower() |
| |
| for pattern in self.scam_patterns: |
| try: |
| if re.search(pattern, message_lower, re.IGNORECASE): |
| |
| if "lakh" in pattern or "crore" in pattern: |
| matched_patterns.append("money_amount") |
| elif "prize" in pattern or "lottery" in pattern: |
| matched_patterns.append("prize_winning") |
| elif "otp" in pattern: |
| matched_patterns.append("otp_request") |
| elif "block" in pattern or "suspend" in pattern: |
| matched_patterns.append("account_threat") |
| elif "arrest" in pattern or "गिरफ्तार" in pattern: |
| matched_patterns.append("arrest_threat") |
| elif "call" in pattern: |
| matched_patterns.append("phone_number") |
| except re.error as e: |
| logger.warning(f"Regex error for pattern {pattern}: {e}") |
| |
| return matched_patterns |
| |
| def _extract_indicators(self, message: str, language: str) -> List[str]: |
| """ |
| Extract scam indicators found in message. |
| |
| Args: |
| message: Input text |
| language: Language code |
| |
| Returns: |
| List of matched keywords/indicators |
| """ |
| _, indicators = self._keyword_match(message, language) |
| pattern_indicators = self._pattern_match(message) |
| indicators.extend(pattern_indicators) |
| return list(set(indicators)) |
| |
| def _has_devanagari(self, text: str) -> bool: |
| """Check if text contains Devanagari characters.""" |
| return any("\u0900" <= char <= "\u097F" for char in text) |
|
|
|
|
| def detect_scam(message: str, language: str = "auto") -> Tuple[bool, float, List[str]]: |
| """ |
| Convenience function for scam detection. |
| |
| Args: |
| message: Input text |
| language: Language code ('auto', 'en', 'hi', 'hinglish') |
| |
| Returns: |
| Tuple of (scam_detected, confidence, indicators) |
| """ |
| |
| if not hasattr(detect_scam, "_detector"): |
| detect_scam._detector = ScamDetector() |
| |
| result = detect_scam._detector.detect(message, language) |
| return result["scam_detected"], result["confidence"], result["indicators"] |
|
|
|
|
| def reset_detector_cache() -> None: |
| """ |
| Reset the detector model cache. |
| |
| Useful for testing or when model needs to be reloaded. |
| """ |
| global _singleton_detector |
| ScamDetector._cached_model = None |
| ScamDetector._cached_tokenizer = None |
| ScamDetector._model_load_attempted = False |
| if hasattr(detect_scam, "_detector"): |
| delattr(detect_scam, "_detector") |
| _singleton_detector = None |
| logger.info("Detector cache reset") |
|
|
|
|
| |
| _singleton_detector: Optional[ScamDetector] = None |
|
|
|
|
| def get_detector() -> ScamDetector: |
| """ |
| Get singleton ScamDetector instance. |
| |
| Returns: |
| ScamDetector instance |
| """ |
| global _singleton_detector |
| if _singleton_detector is None: |
| _singleton_detector = ScamDetector() |
| return _singleton_detector |
|
|