File size: 15,713 Bytes
0fd143d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
"""
PhishLens Header Forensics Feature Module.

Extracts 12 security-critical features from email headers.
All feature extraction is wrapped in try/except blocks to guarantee
no single malformed header crashes the pipeline.

Security rationale: Email headers contain the digital fingerprints of
message routing. Phishing campaigns consistently show specific header
anomalies: spoofed sender domains, reply-to hijacking, failed SPF/DKIM/DMARC,
and unusual relay chains. These 12 features capture the most reliable signals.
"""

from __future__ import annotations

import re
from typing import Dict, List, Optional, Tuple

import dns.resolver
import dns.exception
import tldextract

from src.utils.config import DEFAULT_CONFIG, FREEMAIL_DOMAINS, SUSPICIOUS_XMAILER_PATTERNS
from src.utils.logger import get_logger

log = get_logger(__name__)

# Precompile header injection detection pattern
# CRLF injection in headers is an email security attack vector
_HEADER_INJECTION_RE = re.compile(r"[\r\n\x00]")

# IP in Received header
_IP_RE = re.compile(r"\b(\d{1,3}\.){3}\d{1,3}\b")

# Country/timezone extraction from Received header
_TZ_RE = re.compile(r"([+-]\d{4})\s*\(?([A-Z]{2,5})?\)?")

# Known bulk-sender X-Mailer fingerprints (only flag confirmed bulk-mailers)
_BULK_MAILER_RE = re.compile(
    "|".join(re.escape(p) for p in SUSPICIOUS_XMAILER_PATTERNS if p),
    re.IGNORECASE,
)

# Inline Authentication-Results result-to-score mapping
_AUTH_RESULT_SCORES: Dict[str, int] = {
    "pass": 1,
    "softfail": 0,
    "neutral": 0,
    "none": -1,
    "fail": -1,
    "temperror": -1,
    "permerror": -1,
}


def extract_header_features(parsed_email: Dict, use_network: bool = True) -> Dict:
    """Extract header forensics features from a parsed email dict.

    Args:
        parsed_email: Dict returned by eml_parser.parse_eml_bytes().
        use_network: When False, DNS lookups (SPF/DKIM/DMARC) are skipped
            and those features default to -1.  Set False during offline
            training to avoid blocking on DNS timeouts (~300-400 ms/email).

    Returns:
        Dict with header features, all with numeric or boolean values.
        Falls back to safe defaults on any extraction failure.
    """
    features: Dict = _default_header_features()

    try:
        features["from_reply_to_mismatch"] = _check_from_reply_mismatch(
            parsed_email.get("from_address", ""),
            parsed_email.get("reply_to", ""),
        )
    except Exception as exc:
        log.debug(f"from_reply_to_mismatch error: {exc}")

    try:
        features["from_return_path_mismatch"] = _check_from_return_path_mismatch(
            parsed_email.get("from_address", ""),
            parsed_email.get("return_path", ""),
        )
    except Exception as exc:
        log.debug(f"from_return_path_mismatch error: {exc}")

    try:
        # reply_to freemail = phisher redirecting replies to attacker mailbox
        reply_to = parsed_email.get("reply_to", "")
        features["reply_to_freemail"] = int(
            bool(re.search(r"@(" + "|".join(re.escape(d.split("@")[-1]) for d in FREEMAIL_DOMAINS) + r")", reply_to, re.IGNORECASE))
        )
    except Exception as exc:
        log.debug(f"reply_to_freemail error: {exc}")

    try:
        received = parsed_email.get("received_headers", [])
        features["received_hop_count"] = len(received)
        features["received_geo_anomaly"] = int(_check_geo_anomaly(received))
    except Exception as exc:
        log.debug(f"received_headers error: {exc}")

    # Priority 1: parse inline Authentication-Results / Received-SPF headers
    # (available on almost all emails without any network call)
    try:
        _ar = parsed_email.get("auth_results", "")
        _rspf = parsed_email.get("received_spf_header", "")
        if _ar or _rspf:
            _spf, _dkim, _dmarc = _parse_auth_results_header(_ar, _rspf)
            if _spf is not None:
                features["spf_result"] = _spf
            if _dkim is not None:
                features["dkim_result"] = _dkim
            if _dmarc is not None:
                features["dmarc_result"] = _dmarc
            # If DKIM-Signature header present, override with its presence
            if parsed_email.get("dkim_signed", 0):
                features["dkim_result"] = max(features["dkim_result"], 0)  # at least neutral
        # Priority 2: live DNS lookup when no inline results and network enabled
        elif use_network:
            from_domain = _extract_domain(parsed_email.get("from_address", ""))
            if from_domain:
                spf, dkim, dmarc = _check_email_authentication(from_domain)
                features["spf_result"] = spf
                features["dkim_result"] = dkim
                features["dmarc_result"] = dmarc
    except Exception as exc:
        log.debug(f"SPF/DKIM/DMARC check error: {exc}")

    try:
        features["message_id_suspicious"] = int(
            _check_message_id_suspicious(parsed_email.get("message_id", ""))
        )
    except Exception as exc:
        log.debug(f"message_id_suspicious error: {exc}")

    try:
        features["timezone_mismatch"] = int(
            _check_timezone_mismatch(
                parsed_email.get("date", ""),
                parsed_email.get("received_headers", []),
            )
        )
    except Exception as exc:
        log.debug(f"timezone_mismatch error: {exc}")

    try:
        # Only flag confirmed bulk-mailer fingerprints; missing X-Mailer is
        # normal for modern webmail (Gmail, OWA, Apple Mail omit it).
        features["x_mailer_suspicious"] = int(
            _check_xmailer_suspicious(parsed_email.get("x_mailer", ""))
        )
    except Exception as exc:
        log.debug(f"x_mailer_suspicious error: {exc}")

    try:
        # CRLF injection in any header value = potential header injection attack
        header_raw = parsed_email.get("header_raw", "")
        features["header_injection_attempt"] = int(
            bool(_HEADER_INJECTION_RE.search(header_raw[:2048]))
        )
    except Exception as exc:
        log.debug(f"header_injection_attempt error: {exc}")

    return features


# ---------------------------------------------------------------------------
# Feature implementation functions
# ---------------------------------------------------------------------------


def _check_from_reply_mismatch(from_addr: str, reply_to: str) -> int:
    """Detect mismatch between From and Reply-To registered domains.

    Compares registered domains (e.g. paypal.com) not full hostnames so that
    newsletters.paypal.com → paypal.com does NOT trigger as a mismatch.
    Returns 1 if mismatch detected, 0 if same registered domain or Reply-To absent.
    """
    if not reply_to or not from_addr:
        return 0
    from_domain = _extract_domain(from_addr)
    reply_domain = _extract_domain(reply_to)
    if not from_domain or not reply_domain:
        return 0
    try:
        from_reg = tldextract.extract(from_domain).registered_domain or from_domain
        reply_reg = tldextract.extract(reply_domain).registered_domain or reply_domain
        return int(from_reg.lower() != reply_reg.lower())
    except Exception:
        return int(from_domain.lower() != reply_domain.lower())


def _check_from_return_path_mismatch(from_addr: str, return_path: str) -> int:
    """Detect mismatch between From and Return-Path registered domains.

    Compares registered domains so bounce-handling subdomains (e.g.
    bounce.paypal.com) do not falsely flag as a mismatch against paypal.com.
    """
    if not return_path or not from_addr:
        return 0
    from_domain = _extract_domain(from_addr)
    rp_domain = _extract_domain(return_path)
    if not from_domain or not rp_domain:
        return 0
    try:
        from_reg = tldextract.extract(from_domain).registered_domain or from_domain
        rp_reg = tldextract.extract(rp_domain).registered_domain or rp_domain
        return int(from_reg.lower() != rp_reg.lower())
    except Exception:
        return int(from_domain.lower() != rp_domain.lower())


def _check_geo_anomaly(received_headers: List[str]) -> bool:
    """Detect geographic anomalies in the Received header relay chain.

    Security rationale: A legitimate corporate email typically routes through
    a predictable set of servers. A relay chain jumping between 3+ distinct
    continents or containing Tor exit nodes signals relay abuse.
    """
    if len(received_headers) < 2:
        return False
    # Extract IPs from Received headers and check for >2 distinct /8 subnets
    # as a proxy for geographic diversity (simplified heuristic)
    ips = []
    for h in received_headers:
        matches = _IP_RE.findall(h)
        ips.extend(matches)
    if len(ips) < 2:
        return False
    # Check if IPs span more than 2 distinct /16 subnets (class B)
    subnets = {".".join(ip.split(".")[:2]) for ip in ips if not ip.startswith("10.")
               and not ip.startswith("192.168.") and not ip.startswith("127.")}
    return len(subnets) > 2


def _check_email_authentication(domain: str) -> tuple[int, int, int]:
    """Check SPF, DKIM, and DMARC DNS records for the sender domain.

    Security rationale: SPF/DKIM/DMARC are the three email authentication
    standards. All three failing simultaneously is the strongest possible
    phishing signal — it means the sending server is not authorised,
    the message is not cryptographically signed, and the domain owner has
    not published anti-phishing policy.

    Returns:
        Tuple of (spf_result, dkim_result, dmarc_result).
        -1 = fail/not found, 0 = neutral/softfail, 1 = pass
    """
    spf_result = _check_spf(domain)
    dkim_result = _check_dkim(domain)
    dmarc_result = _check_dmarc(domain)
    return spf_result, dkim_result, dmarc_result


def _check_spf(domain: str) -> int:
    """Query SPF TXT record. Returns -1/0/1."""
    try:
        answers = dns.resolver.resolve(domain, "TXT", lifetime=3.0)
        for rdata in answers:
            txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings)
            if "v=spf1" in txt.lower():
                if "~all" in txt or "?all" in txt:
                    return 0    # Softfail / neutral
                if "-all" in txt:
                    return 1    # Strict pass (record exists with hard reject)
                return 1        # Record exists
        return -1               # No SPF record
    except (dns.exception.DNSException, Exception):
        return -1


def _check_dkim(domain: str) -> int:
    """Query common DKIM selector TXT records. Returns -1/0/1."""
    common_selectors = ["default", "google", "mail", "k1", "s1", "s2"]
    for selector in common_selectors:
        try:
            dkim_domain = f"{selector}._domainkey.{domain}"
            dns.resolver.resolve(dkim_domain, "TXT", lifetime=2.0)
            return 1    # DKIM record found for at least one common selector
        except Exception:
            continue
    return -1           # No DKIM record found


def _check_dmarc(domain: str) -> int:
    """Query DMARC TXT record. Returns -1/0/1."""
    try:
        answers = dns.resolver.resolve(f"_dmarc.{domain}", "TXT", lifetime=3.0)
        for rdata in answers:
            txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings)
            if "v=dmarc1" in txt.lower():
                if "p=reject" in txt.lower():
                    return 1    # Strong policy
                if "p=quarantine" in txt.lower():
                    return 0    # Moderate policy
                return -1       # p=none = no enforcement
        return -1
    except Exception:
        return -1


def _check_message_id_suspicious(message_id: str) -> bool:
    """Detect suspicious or missing Message-ID headers.

    Security rationale: Legitimate mail servers always generate a
    Message-ID in the format <random@domain>. A missing, malformed,
    or freemail-domain Message-ID indicates relay abuse or spoofing.
    """
    if not message_id:
        return True     # Missing = suspicious
    # Check format: should be <something@domain>
    if not re.match(r"<[^@]+@[^>]+>", message_id.strip()):
        return True     # Malformed
    # Freemail domain in Message-ID
    mid_domain = message_id.split("@")[-1].rstrip(">").lower()
    for fm in FREEMAIL_DOMAINS:
        if mid_domain.startswith(fm.split("@")[-1]):
            return True
    return False


def _check_timezone_mismatch(date_header: str, received_headers: List[str]) -> bool:
    """Detect timezone mismatch between Date header and relay chain.

    Security rationale: Automated phishing tools often use incorrect
    timezones or copy timestamps, producing obvious mismatches.
    """
    if not date_header or not received_headers:
        return False
    try:
        date_tz_match = _TZ_RE.search(date_header)
        if not date_tz_match:
            return False
        date_tz = date_tz_match.group(1)
        # Check last received header (closest to origin)
        last_received = received_headers[-1] if received_headers else ""
        recv_tz_match = _TZ_RE.search(last_received)
        if not recv_tz_match:
            return False
        recv_tz = recv_tz_match.group(1)
        return date_tz != recv_tz
    except Exception:
        return False


def _check_xmailer_suspicious(x_mailer: str) -> bool:
    """Detect confirmed bulk-sender X-Mailer fingerprints.

    A MISSING X-Mailer is NOT suspicious — modern webmail clients (Gmail,
    Outlook Web Access, Apple Mail, Yahoo Mail) do not send X-Mailer at all.
    Only flag strings that match known mass-mailing software.
    """
    if not x_mailer:
        return False  # Missing = normal for modern webmail
    return bool(_BULK_MAILER_RE.search(x_mailer))


def _extract_domain(address: str) -> Optional[str]:
    """Extract the domain from an email address or bare domain."""
    if not address:
        return None
    # Handle formats: "Name <email@domain.com>", "email@domain.com", "<email@domain>"
    match = re.search(r"[\w.+-]+@([\w.-]+\.[a-zA-Z]{2,})", address)
    if match:
        return match.group(1)
    # Bare domain
    if re.match(r"^[\w.-]+\.[a-zA-Z]{2,}$", address.strip()):
        return address.strip()
    return None


def _default_header_features() -> Dict:
    """Return zero-value defaults for all header features."""
    return {
        "from_reply_to_mismatch": 0,
        "from_return_path_mismatch": 0,
        "reply_to_freemail": 0,
        "received_hop_count": 0,
        "received_geo_anomaly": 0,
        "spf_result": 0,
        "dkim_result": 0,
        "dmarc_result": 0,
        "message_id_suspicious": 0,
        "timezone_mismatch": 0,
        "x_mailer_suspicious": 0,
        "header_injection_attempt": 0,
    }


def _parse_auth_results_header(
    auth_results: str, received_spf: str = ""
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
    """Parse SPF/DKIM/DMARC results from inline Authentication-Results / Received-SPF.

    Returns (spf, dkim, dmarc) each as 1/0/-1, or None if not found in headers.
    This lets us populate auth features without any DNS network call.
    """
    spf: Optional[int] = None
    dkim: Optional[int] = None
    dmarc: Optional[int] = None
    text = (auth_results + " " + received_spf).lower()
    m = re.search(r"\bspf\s*=\s*(\w+)", text)
    if m:
        spf = _AUTH_RESULT_SCORES.get(m.group(1))
    m = re.search(r"\bdkim\s*=\s*(\w+)", text)
    if m:
        dkim = _AUTH_RESULT_SCORES.get(m.group(1))
    m = re.search(r"\bdmarc\s*=\s*(\w+)", text)
    if m:
        dmarc = _AUTH_RESULT_SCORES.get(m.group(1))
    return spf, dkim, dmarc