Spaces:
Sleeping
Sleeping
| """ | |
| PhishLens Text & NLP Feature Module. | |
| Extracts TF-IDF sparse features, urgency/social-engineering scores, | |
| semantic embeddings (sentence-transformers), and subject-line features. | |
| Security rationale: Phishing emails are engineered to create fear and urgency. | |
| NLP signals — particularly semantic embeddings from pre-trained transformers — | |
| capture the latent 'threat context' of an email that bag-of-words methods miss. | |
| The 384-dimensional all-MiniLM-L6-v2 embedding is the single highest-impact | |
| feature group, representing deep semantic meaning that cannot be easily evaded | |
| by paraphrasing or synonym substitution. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| from src.utils.config import DEFAULT_CONFIG, URGENCY_PHRASES | |
| from src.utils.logger import get_logger | |
| log = get_logger(__name__) | |
| # Sentence-transformers lazy loading (80MB model — load once) | |
| _EMBEDDING_MODEL = None | |
| _DEVICE = "cpu" # Set to 'cuda' at load time if GPU is available | |
| def get_embedding_model(model_name: str = "all-MiniLM-L6-v2"): | |
| """Load and cache the sentence-transformers embedding model. | |
| Security rationale: The model is loaded once at module level and reused | |
| across all emails. This prevents the 80MB model from being loaded per email, | |
| which would make batch processing impractical. | |
| Device selection: Checks torch.cuda.is_available() at load time. Falls back | |
| to CPU gracefully on machines without a GPU — no code changes needed. | |
| Args: | |
| model_name: Hugging Face model identifier. | |
| Returns: | |
| SentenceTransformer model instance. | |
| """ | |
| global _EMBEDDING_MODEL, _DEVICE | |
| if _EMBEDDING_MODEL is None: | |
| try: | |
| import torch | |
| _DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| log.info(f"Embedding device: {_DEVICE.upper()} " | |
| f"({'GPU: ' + torch.cuda.get_device_name(0) if 'cuda' in _DEVICE else 'CPU-only build'})") | |
| from sentence_transformers import SentenceTransformer | |
| log.info(f"Loading sentence-transformer model: {model_name}") | |
| _EMBEDDING_MODEL = SentenceTransformer(model_name, device=_DEVICE) | |
| # Convert to fp16 so CUDA Tensor Cores are engaged on every | |
| # matrix-multiply — RTX Ada has dedicated fp16 hardware giving | |
| # ~2x throughput vs fp32 with negligible quality loss at 384-dim. | |
| if "cuda" in _DEVICE: | |
| import torch as _t | |
| _EMBEDDING_MODEL = _EMBEDDING_MODEL.half() | |
| log.info("Embedding model converted to fp16 (Tensor Core acceleration).") | |
| log.info("Embedding model loaded successfully.") | |
| except Exception as exc: | |
| log.error(f"Failed to load embedding model: {exc}") | |
| _EMBEDDING_MODEL = None | |
| return _EMBEDDING_MODEL | |
| def extract_text_features( | |
| body_text: str, | |
| subject: str, | |
| config=DEFAULT_CONFIG, | |
| tfidf_vectorizer=None, | |
| fit_tfidf: bool = False, | |
| precomputed_embedding: Optional[np.ndarray] = None, | |
| ) -> Tuple[np.ndarray, List[str]]: | |
| """Extract all text-based features from email body and subject. | |
| Args: | |
| body_text: Plain text body of the email. | |
| subject: Email subject line. | |
| config: PhishLensConfig instance. | |
| tfidf_vectorizer: Fitted TfidfVectorizer (None during fit phase). | |
| fit_tfidf: If True, returns raw text for TF-IDF fitting externally. | |
| precomputed_embedding: Optional pre-computed 384-dim embedding array | |
| from the batch cache. When provided, model.encode() is skipped, | |
| saving ~200ms per email in batch mode. | |
| Returns: | |
| Tuple of (feature_vector: np.ndarray, feature_names: List[str]). | |
| feature_vector contains: urgency score, subject features, | |
| and semantic embedding (384 dims). | |
| """ | |
| features: List[float] = [] | |
| feature_names: List[str] = [] | |
| # ---- Urgency / Social Engineering Score -------------------------------- | |
| urgency_score, urgency_count = _compute_urgency_score(body_text, config.urgency_phrases) | |
| features.append(urgency_score) | |
| features.append(float(urgency_count)) | |
| feature_names.extend(["urgency_score_normalised", "urgency_phrase_count"]) | |
| # ---- Subject line features --------------------------------------------- | |
| subject_feats, subject_names = _extract_subject_features(subject, config.brand_list) | |
| features.extend(subject_feats) | |
| feature_names.extend(subject_names) | |
| # ---- Semantic Embedding (384 dims) ------------------------------------ | |
| # Security rationale: If a pre-computed batch embedding is supplied (from | |
| # the pipeline's embedding cache), we use it directly — this skips the | |
| # 80MB model call and makes batch transforms ~100× faster on CPU. | |
| if precomputed_embedding is not None and len(precomputed_embedding) == 384: | |
| embedding = precomputed_embedding.astype(np.float32) | |
| else: | |
| model = get_embedding_model(config.embedding_model) | |
| if model is not None: | |
| embedding = _compute_embedding(body_text, model, config.embedding_max_tokens) | |
| else: | |
| log.warning("Embedding model unavailable — using zeros for embedding features.") | |
| embedding = np.zeros(384, dtype=np.float32) | |
| features.extend(embedding.tolist()) | |
| feature_names.extend([f"embed_{i}" for i in range(len(embedding))]) | |
| return np.array(features, dtype=np.float32), feature_names | |
| def extract_tfidf_features( | |
| texts: List[str], | |
| vectorizer=None, | |
| config=DEFAULT_CONFIG, | |
| fit: bool = False, | |
| ): | |
| """Fit or transform texts using TF-IDF vectorizer. | |
| Args: | |
| texts: List of email body texts. | |
| vectorizer: Fitted TfidfVectorizer or None if fitting from scratch. | |
| config: PhishLensConfig instance. | |
| fit: If True, fits the vectorizer on provided texts. | |
| Returns: | |
| Tuple of (sparse_matrix, fitted_vectorizer, feature_names). | |
| """ | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| if fit or vectorizer is None: | |
| vectorizer = TfidfVectorizer( | |
| max_features=config.tfidf_max_features, | |
| ngram_range=config.tfidf_ngram_range, | |
| sublinear_tf=True, # Log-scaled TF reduces impact of very frequent terms | |
| strip_accents="unicode", | |
| decode_error="replace", | |
| analyzer="word", | |
| min_df=2, # Ignore terms appearing in < 2 docs (noise reduction) | |
| ) | |
| X = vectorizer.fit_transform(texts) | |
| log.info( | |
| f"TF-IDF fitted: {config.tfidf_max_features} features, " | |
| f"ngram_range={config.tfidf_ngram_range}" | |
| ) | |
| else: | |
| X = vectorizer.transform(texts) | |
| feature_names = [f"tfidf_{name}" for name in vectorizer.get_feature_names_out()] | |
| return X, vectorizer, feature_names | |
| # --------------------------------------------------------------------------- | |
| # Internal helpers | |
| # --------------------------------------------------------------------------- | |
| def _compute_urgency_score(text: str, urgency_phrases: List[str]) -> Tuple[float, int]: | |
| """Compute normalised urgency/social-engineering score. | |
| Security rationale: Urgency creation is the primary psychological | |
| manipulation technique in phishing. 'Verify now or your account will be | |
| closed within 24 hours' — these phrases are statistically concentrated | |
| in phishing and rare in legitimate email. Normalising by word count | |
| prevents long legitimate emails from triggering false positives. | |
| Args: | |
| text: Email body text. | |
| urgency_phrases: List of phishing urgency phrases from config. | |
| Returns: | |
| Tuple of (normalised_score 0.0–1.0, raw_count). | |
| """ | |
| if not text: | |
| return 0.0, 0 | |
| text_lower = text.lower() | |
| count = sum(1 for phrase in urgency_phrases if phrase.lower() in text_lower) | |
| word_count = max(len(text.split()), 1) | |
| normalised = min(count / (word_count / 100), 1.0) # Phrases per 100 words, capped at 1 | |
| return normalised, count | |
| def _extract_subject_features(subject: str, brand_list: List[str]) -> Tuple[List[float], List[str]]: | |
| """Extract features from the email subject line. | |
| Security rationale: Subject lines are crafted to provoke urgency and | |
| impersonate brands. All-caps words, excessive punctuation, spoofed | |
| RE:/FW: prefixes, and brand keywords are reliable phishing signals. | |
| Args: | |
| subject: Email subject string. | |
| brand_list: List of brand keywords to check. | |
| Returns: | |
| Tuple of (feature_values, feature_names). | |
| """ | |
| features = [] | |
| names = [] | |
| subject = subject or "" | |
| # subject_length | |
| features.append(float(len(subject))) | |
| names.append("subject_length") | |
| # exclamation_count | |
| features.append(float(subject.count("!"))) | |
| names.append("subject_exclamation_count") | |
| # question_mark_count (rarely legitimate in corporate subject lines) | |
| features.append(float(subject.count("?"))) | |
| names.append("subject_question_count") | |
| # all_caps_word_ratio — "URGENT ACTION REQUIRED" pattern | |
| words = subject.split() | |
| caps_ratio = sum(1 for w in words if w.isupper() and len(w) > 1) / max(len(words), 1) | |
| features.append(caps_ratio) | |
| names.append("subject_caps_ratio") | |
| # spoofed_re_fw: RE: FW: prefix but it is actually a first-contact phish | |
| spoofed = int( | |
| bool(re.match(r"^(re:|fw:|fwd:)\s*(re:|fw:|fwd:)?\s*(re:|fw:|fwd:)?", subject, re.IGNORECASE)) | |
| ) | |
| features.append(float(spoofed)) | |
| names.append("subject_spoofed_re_fw") | |
| # brand_in_subject: brand keyword found in subject line | |
| subj_lower = subject.lower() | |
| brand_in_subj = int(any(brand in subj_lower for brand in brand_list)) | |
| features.append(float(brand_in_subj)) | |
| names.append("subject_brand_keyword") | |
| # urgency_in_subject: urgency phrase in subject | |
| urgency_in_subj = int( | |
| any(phrase in subj_lower for phrase in ["urgent", "action required", "verify", "suspended", "alert"]) | |
| ) | |
| features.append(float(urgency_in_subj)) | |
| names.append("subject_urgency_keyword") | |
| # subject_has_dollar_signs (prize/lottery phishing pattern) | |
| features.append(float(subject.count("$"))) | |
| names.append("subject_dollar_count") | |
| return features, names | |
| def _compute_embedding( | |
| text: str, | |
| model, | |
| max_tokens: int = 512, | |
| ) -> np.ndarray: | |
| """Encode email body text into a 384-dimensional semantic embedding. | |
| Security rationale: Semantic embeddings capture meaning beyond surface | |
| vocabulary. A phishing email that replaces all risk keywords with synonyms | |
| still has a recognisable semantic fingerprint: credential requests, urgency, | |
| impersonation of authority, financial threat. These patterns are encoded in | |
| the transformer's latent space and cannot be evaded by simple word substitution. | |
| Args: | |
| text: Email body text (first max_tokens words used). | |
| model: Loaded SentenceTransformer instance. | |
| max_tokens: Maximum token count before truncation. | |
| Returns: | |
| 384-dimensional float32 numpy array. | |
| """ | |
| if not text or not text.strip(): | |
| return np.zeros(384, dtype=np.float32) | |
| # Truncate to max_tokens words (approximate — transformer handles exact token count) | |
| words = text.split() | |
| if len(words) > max_tokens: | |
| text = " ".join(words[:max_tokens]) | |
| try: | |
| embedding = model.encode( | |
| text, | |
| convert_to_numpy=True, | |
| show_progress_bar=False, | |
| batch_size=256, | |
| device=_DEVICE, | |
| ) | |
| return embedding.astype(np.float32) | |
| except Exception as exc: | |
| log.warning(f"Embedding encode error: {exc}") | |
| return np.zeros(384, dtype=np.float32) | |