File size: 11,836 Bytes
0fd143d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
"""
PhishLens Text & NLP Feature Module.

Extracts TF-IDF sparse features, urgency/social-engineering scores,
semantic embeddings (sentence-transformers), and subject-line features.

Security rationale: Phishing emails are engineered to create fear and urgency.
NLP signals β€” particularly semantic embeddings from pre-trained transformers β€”
capture the latent 'threat context' of an email that bag-of-words methods miss.
The 384-dimensional all-MiniLM-L6-v2 embedding is the single highest-impact
feature group, representing deep semantic meaning that cannot be easily evaded
by paraphrasing or synonym substitution.
"""

from __future__ import annotations

import re
from typing import Dict, List, Optional, Tuple

import numpy as np

from src.utils.config import DEFAULT_CONFIG, URGENCY_PHRASES
from src.utils.logger import get_logger

log = get_logger(__name__)

# Sentence-transformers lazy loading (80MB model β€” load once)
_EMBEDDING_MODEL = None
_DEVICE = "cpu"  # Set to 'cuda' at load time if GPU is available


def get_embedding_model(model_name: str = "all-MiniLM-L6-v2"):
    """Load and cache the sentence-transformers embedding model.

    Security rationale: The model is loaded once at module level and reused
    across all emails. This prevents the 80MB model from being loaded per email,
    which would make batch processing impractical.

    Device selection: Checks torch.cuda.is_available() at load time. Falls back
    to CPU gracefully on machines without a GPU β€” no code changes needed.

    Args:
        model_name: Hugging Face model identifier.

    Returns:
        SentenceTransformer model instance.
    """
    global _EMBEDDING_MODEL, _DEVICE
    if _EMBEDDING_MODEL is None:
        try:
            import torch
            _DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
            log.info(f"Embedding device: {_DEVICE.upper()} "
                     f"({'GPU: ' + torch.cuda.get_device_name(0) if 'cuda' in _DEVICE else 'CPU-only build'})")
            from sentence_transformers import SentenceTransformer
            log.info(f"Loading sentence-transformer model: {model_name}")
            _EMBEDDING_MODEL = SentenceTransformer(model_name, device=_DEVICE)
            # Convert to fp16 so CUDA Tensor Cores are engaged on every
            # matrix-multiply β€” RTX Ada has dedicated fp16 hardware giving
            # ~2x throughput vs fp32 with negligible quality loss at 384-dim.
            if "cuda" in _DEVICE:
                import torch as _t
                _EMBEDDING_MODEL = _EMBEDDING_MODEL.half()
                log.info("Embedding model converted to fp16 (Tensor Core acceleration).")
            log.info("Embedding model loaded successfully.")
        except Exception as exc:
            log.error(f"Failed to load embedding model: {exc}")
            _EMBEDDING_MODEL = None
    return _EMBEDDING_MODEL


def extract_text_features(
    body_text: str,
    subject: str,
    config=DEFAULT_CONFIG,
    tfidf_vectorizer=None,
    fit_tfidf: bool = False,
    precomputed_embedding: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, List[str]]:
    """Extract all text-based features from email body and subject.

    Args:
        body_text: Plain text body of the email.
        subject: Email subject line.
        config: PhishLensConfig instance.
        tfidf_vectorizer: Fitted TfidfVectorizer (None during fit phase).
        fit_tfidf: If True, returns raw text for TF-IDF fitting externally.
        precomputed_embedding: Optional pre-computed 384-dim embedding array
            from the batch cache. When provided, model.encode() is skipped,
            saving ~200ms per email in batch mode.

    Returns:
        Tuple of (feature_vector: np.ndarray, feature_names: List[str]).
        feature_vector contains: urgency score, subject features,
        and semantic embedding (384 dims).
    """
    features: List[float] = []
    feature_names: List[str] = []

    # ---- Urgency / Social Engineering Score --------------------------------
    urgency_score, urgency_count = _compute_urgency_score(body_text, config.urgency_phrases)
    features.append(urgency_score)
    features.append(float(urgency_count))
    feature_names.extend(["urgency_score_normalised", "urgency_phrase_count"])

    # ---- Subject line features ---------------------------------------------
    subject_feats, subject_names = _extract_subject_features(subject, config.brand_list)
    features.extend(subject_feats)
    feature_names.extend(subject_names)

    # ---- Semantic Embedding (384 dims) ------------------------------------
    # Security rationale: If a pre-computed batch embedding is supplied (from
    # the pipeline's embedding cache), we use it directly β€” this skips the
    # 80MB model call and makes batch transforms ~100Γ— faster on CPU.
    if precomputed_embedding is not None and len(precomputed_embedding) == 384:
        embedding = precomputed_embedding.astype(np.float32)
    else:
        model = get_embedding_model(config.embedding_model)
        if model is not None:
            embedding = _compute_embedding(body_text, model, config.embedding_max_tokens)
        else:
            log.warning("Embedding model unavailable β€” using zeros for embedding features.")
            embedding = np.zeros(384, dtype=np.float32)

    features.extend(embedding.tolist())
    feature_names.extend([f"embed_{i}" for i in range(len(embedding))])

    return np.array(features, dtype=np.float32), feature_names


def extract_tfidf_features(
    texts: List[str],
    vectorizer=None,
    config=DEFAULT_CONFIG,
    fit: bool = False,
):
    """Fit or transform texts using TF-IDF vectorizer.

    Args:
        texts: List of email body texts.
        vectorizer: Fitted TfidfVectorizer or None if fitting from scratch.
        config: PhishLensConfig instance.
        fit: If True, fits the vectorizer on provided texts.

    Returns:
        Tuple of (sparse_matrix, fitted_vectorizer, feature_names).
    """
    from sklearn.feature_extraction.text import TfidfVectorizer

    if fit or vectorizer is None:
        vectorizer = TfidfVectorizer(
            max_features=config.tfidf_max_features,
            ngram_range=config.tfidf_ngram_range,
            sublinear_tf=True,          # Log-scaled TF reduces impact of very frequent terms
            strip_accents="unicode",
            decode_error="replace",
            analyzer="word",
            min_df=2,                   # Ignore terms appearing in < 2 docs (noise reduction)
        )
        X = vectorizer.fit_transform(texts)
        log.info(
            f"TF-IDF fitted: {config.tfidf_max_features} features, "
            f"ngram_range={config.tfidf_ngram_range}"
        )
    else:
        X = vectorizer.transform(texts)

    feature_names = [f"tfidf_{name}" for name in vectorizer.get_feature_names_out()]
    return X, vectorizer, feature_names


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------


def _compute_urgency_score(text: str, urgency_phrases: List[str]) -> Tuple[float, int]:
    """Compute normalised urgency/social-engineering score.

    Security rationale: Urgency creation is the primary psychological
    manipulation technique in phishing. 'Verify now or your account will be
    closed within 24 hours' β€” these phrases are statistically concentrated
    in phishing and rare in legitimate email. Normalising by word count
    prevents long legitimate emails from triggering false positives.

    Args:
        text: Email body text.
        urgency_phrases: List of phishing urgency phrases from config.

    Returns:
        Tuple of (normalised_score 0.0–1.0, raw_count).
    """
    if not text:
        return 0.0, 0
    text_lower = text.lower()
    count = sum(1 for phrase in urgency_phrases if phrase.lower() in text_lower)
    word_count = max(len(text.split()), 1)
    normalised = min(count / (word_count / 100), 1.0)   # Phrases per 100 words, capped at 1
    return normalised, count


def _extract_subject_features(subject: str, brand_list: List[str]) -> Tuple[List[float], List[str]]:
    """Extract features from the email subject line.

    Security rationale: Subject lines are crafted to provoke urgency and
    impersonate brands. All-caps words, excessive punctuation, spoofed
    RE:/FW: prefixes, and brand keywords are reliable phishing signals.

    Args:
        subject: Email subject string.
        brand_list: List of brand keywords to check.

    Returns:
        Tuple of (feature_values, feature_names).
    """
    features = []
    names = []

    subject = subject or ""

    # subject_length
    features.append(float(len(subject)))
    names.append("subject_length")

    # exclamation_count
    features.append(float(subject.count("!")))
    names.append("subject_exclamation_count")

    # question_mark_count (rarely legitimate in corporate subject lines)
    features.append(float(subject.count("?")))
    names.append("subject_question_count")

    # all_caps_word_ratio β€” "URGENT ACTION REQUIRED" pattern
    words = subject.split()
    caps_ratio = sum(1 for w in words if w.isupper() and len(w) > 1) / max(len(words), 1)
    features.append(caps_ratio)
    names.append("subject_caps_ratio")

    # spoofed_re_fw: RE: FW: prefix but it is actually a first-contact phish
    spoofed = int(
        bool(re.match(r"^(re:|fw:|fwd:)\s*(re:|fw:|fwd:)?\s*(re:|fw:|fwd:)?", subject, re.IGNORECASE))
    )
    features.append(float(spoofed))
    names.append("subject_spoofed_re_fw")

    # brand_in_subject: brand keyword found in subject line
    subj_lower = subject.lower()
    brand_in_subj = int(any(brand in subj_lower for brand in brand_list))
    features.append(float(brand_in_subj))
    names.append("subject_brand_keyword")

    # urgency_in_subject: urgency phrase in subject
    urgency_in_subj = int(
        any(phrase in subj_lower for phrase in ["urgent", "action required", "verify", "suspended", "alert"])
    )
    features.append(float(urgency_in_subj))
    names.append("subject_urgency_keyword")

    # subject_has_dollar_signs (prize/lottery phishing pattern)
    features.append(float(subject.count("$")))
    names.append("subject_dollar_count")

    return features, names


def _compute_embedding(
    text: str,
    model,
    max_tokens: int = 512,
) -> np.ndarray:
    """Encode email body text into a 384-dimensional semantic embedding.

    Security rationale: Semantic embeddings capture meaning beyond surface
    vocabulary. A phishing email that replaces all risk keywords with synonyms
    still has a recognisable semantic fingerprint: credential requests, urgency,
    impersonation of authority, financial threat. These patterns are encoded in
    the transformer's latent space and cannot be evaded by simple word substitution.

    Args:
        text: Email body text (first max_tokens words used).
        model: Loaded SentenceTransformer instance.
        max_tokens: Maximum token count before truncation.

    Returns:
        384-dimensional float32 numpy array.
    """
    if not text or not text.strip():
        return np.zeros(384, dtype=np.float32)

    # Truncate to max_tokens words (approximate β€” transformer handles exact token count)
    words = text.split()
    if len(words) > max_tokens:
        text = " ".join(words[:max_tokens])

    try:
        embedding = model.encode(
            text,
            convert_to_numpy=True,
            show_progress_bar=False,
            batch_size=256,
            device=_DEVICE,
        )
        return embedding.astype(np.float32)
    except Exception as exc:
        log.warning(f"Embedding encode error: {exc}")
        return np.zeros(384, dtype=np.float32)