""" PhishLens Header Forensics Feature Module. Extracts 12 security-critical features from email headers. All feature extraction is wrapped in try/except blocks to guarantee no single malformed header crashes the pipeline. Security rationale: Email headers contain the digital fingerprints of message routing. Phishing campaigns consistently show specific header anomalies: spoofed sender domains, reply-to hijacking, failed SPF/DKIM/DMARC, and unusual relay chains. These 12 features capture the most reliable signals. """ from __future__ import annotations import re from typing import Dict, List, Optional, Tuple import dns.resolver import dns.exception import tldextract from src.utils.config import DEFAULT_CONFIG, FREEMAIL_DOMAINS, SUSPICIOUS_XMAILER_PATTERNS from src.utils.logger import get_logger log = get_logger(__name__) # Precompile header injection detection pattern # CRLF injection in headers is an email security attack vector _HEADER_INJECTION_RE = re.compile(r"[\r\n\x00]") # IP in Received header _IP_RE = re.compile(r"\b(\d{1,3}\.){3}\d{1,3}\b") # Country/timezone extraction from Received header _TZ_RE = re.compile(r"([+-]\d{4})\s*\(?([A-Z]{2,5})?\)?") # Known bulk-sender X-Mailer fingerprints (only flag confirmed bulk-mailers) _BULK_MAILER_RE = re.compile( "|".join(re.escape(p) for p in SUSPICIOUS_XMAILER_PATTERNS if p), re.IGNORECASE, ) # Inline Authentication-Results result-to-score mapping _AUTH_RESULT_SCORES: Dict[str, int] = { "pass": 1, "softfail": 0, "neutral": 0, "none": -1, "fail": -1, "temperror": -1, "permerror": -1, } def extract_header_features(parsed_email: Dict, use_network: bool = True) -> Dict: """Extract header forensics features from a parsed email dict. Args: parsed_email: Dict returned by eml_parser.parse_eml_bytes(). use_network: When False, DNS lookups (SPF/DKIM/DMARC) are skipped and those features default to -1. Set False during offline training to avoid blocking on DNS timeouts (~300-400 ms/email). Returns: Dict with header features, all with numeric or boolean values. Falls back to safe defaults on any extraction failure. """ features: Dict = _default_header_features() try: features["from_reply_to_mismatch"] = _check_from_reply_mismatch( parsed_email.get("from_address", ""), parsed_email.get("reply_to", ""), ) except Exception as exc: log.debug(f"from_reply_to_mismatch error: {exc}") try: features["from_return_path_mismatch"] = _check_from_return_path_mismatch( parsed_email.get("from_address", ""), parsed_email.get("return_path", ""), ) except Exception as exc: log.debug(f"from_return_path_mismatch error: {exc}") try: # reply_to freemail = phisher redirecting replies to attacker mailbox reply_to = parsed_email.get("reply_to", "") features["reply_to_freemail"] = int( bool(re.search(r"@(" + "|".join(re.escape(d.split("@")[-1]) for d in FREEMAIL_DOMAINS) + r")", reply_to, re.IGNORECASE)) ) except Exception as exc: log.debug(f"reply_to_freemail error: {exc}") try: received = parsed_email.get("received_headers", []) features["received_hop_count"] = len(received) features["received_geo_anomaly"] = int(_check_geo_anomaly(received)) except Exception as exc: log.debug(f"received_headers error: {exc}") # Priority 1: parse inline Authentication-Results / Received-SPF headers # (available on almost all emails without any network call) try: _ar = parsed_email.get("auth_results", "") _rspf = parsed_email.get("received_spf_header", "") if _ar or _rspf: _spf, _dkim, _dmarc = _parse_auth_results_header(_ar, _rspf) if _spf is not None: features["spf_result"] = _spf if _dkim is not None: features["dkim_result"] = _dkim if _dmarc is not None: features["dmarc_result"] = _dmarc # If DKIM-Signature header present, override with its presence if parsed_email.get("dkim_signed", 0): features["dkim_result"] = max(features["dkim_result"], 0) # at least neutral # Priority 2: live DNS lookup when no inline results and network enabled elif use_network: from_domain = _extract_domain(parsed_email.get("from_address", "")) if from_domain: spf, dkim, dmarc = _check_email_authentication(from_domain) features["spf_result"] = spf features["dkim_result"] = dkim features["dmarc_result"] = dmarc except Exception as exc: log.debug(f"SPF/DKIM/DMARC check error: {exc}") try: features["message_id_suspicious"] = int( _check_message_id_suspicious(parsed_email.get("message_id", "")) ) except Exception as exc: log.debug(f"message_id_suspicious error: {exc}") try: features["timezone_mismatch"] = int( _check_timezone_mismatch( parsed_email.get("date", ""), parsed_email.get("received_headers", []), ) ) except Exception as exc: log.debug(f"timezone_mismatch error: {exc}") try: # Only flag confirmed bulk-mailer fingerprints; missing X-Mailer is # normal for modern webmail (Gmail, OWA, Apple Mail omit it). features["x_mailer_suspicious"] = int( _check_xmailer_suspicious(parsed_email.get("x_mailer", "")) ) except Exception as exc: log.debug(f"x_mailer_suspicious error: {exc}") try: # CRLF injection in any header value = potential header injection attack header_raw = parsed_email.get("header_raw", "") features["header_injection_attempt"] = int( bool(_HEADER_INJECTION_RE.search(header_raw[:2048])) ) except Exception as exc: log.debug(f"header_injection_attempt error: {exc}") return features # --------------------------------------------------------------------------- # Feature implementation functions # --------------------------------------------------------------------------- def _check_from_reply_mismatch(from_addr: str, reply_to: str) -> int: """Detect mismatch between From and Reply-To registered domains. Compares registered domains (e.g. paypal.com) not full hostnames so that newsletters.paypal.com → paypal.com does NOT trigger as a mismatch. Returns 1 if mismatch detected, 0 if same registered domain or Reply-To absent. """ if not reply_to or not from_addr: return 0 from_domain = _extract_domain(from_addr) reply_domain = _extract_domain(reply_to) if not from_domain or not reply_domain: return 0 try: from_reg = tldextract.extract(from_domain).registered_domain or from_domain reply_reg = tldextract.extract(reply_domain).registered_domain or reply_domain return int(from_reg.lower() != reply_reg.lower()) except Exception: return int(from_domain.lower() != reply_domain.lower()) def _check_from_return_path_mismatch(from_addr: str, return_path: str) -> int: """Detect mismatch between From and Return-Path registered domains. Compares registered domains so bounce-handling subdomains (e.g. bounce.paypal.com) do not falsely flag as a mismatch against paypal.com. """ if not return_path or not from_addr: return 0 from_domain = _extract_domain(from_addr) rp_domain = _extract_domain(return_path) if not from_domain or not rp_domain: return 0 try: from_reg = tldextract.extract(from_domain).registered_domain or from_domain rp_reg = tldextract.extract(rp_domain).registered_domain or rp_domain return int(from_reg.lower() != rp_reg.lower()) except Exception: return int(from_domain.lower() != rp_domain.lower()) def _check_geo_anomaly(received_headers: List[str]) -> bool: """Detect geographic anomalies in the Received header relay chain. Security rationale: A legitimate corporate email typically routes through a predictable set of servers. A relay chain jumping between 3+ distinct continents or containing Tor exit nodes signals relay abuse. """ if len(received_headers) < 2: return False # Extract IPs from Received headers and check for >2 distinct /8 subnets # as a proxy for geographic diversity (simplified heuristic) ips = [] for h in received_headers: matches = _IP_RE.findall(h) ips.extend(matches) if len(ips) < 2: return False # Check if IPs span more than 2 distinct /16 subnets (class B) subnets = {".".join(ip.split(".")[:2]) for ip in ips if not ip.startswith("10.") and not ip.startswith("192.168.") and not ip.startswith("127.")} return len(subnets) > 2 def _check_email_authentication(domain: str) -> tuple[int, int, int]: """Check SPF, DKIM, and DMARC DNS records for the sender domain. Security rationale: SPF/DKIM/DMARC are the three email authentication standards. All three failing simultaneously is the strongest possible phishing signal — it means the sending server is not authorised, the message is not cryptographically signed, and the domain owner has not published anti-phishing policy. Returns: Tuple of (spf_result, dkim_result, dmarc_result). -1 = fail/not found, 0 = neutral/softfail, 1 = pass """ spf_result = _check_spf(domain) dkim_result = _check_dkim(domain) dmarc_result = _check_dmarc(domain) return spf_result, dkim_result, dmarc_result def _check_spf(domain: str) -> int: """Query SPF TXT record. Returns -1/0/1.""" try: answers = dns.resolver.resolve(domain, "TXT", lifetime=3.0) for rdata in answers: txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings) if "v=spf1" in txt.lower(): if "~all" in txt or "?all" in txt: return 0 # Softfail / neutral if "-all" in txt: return 1 # Strict pass (record exists with hard reject) return 1 # Record exists return -1 # No SPF record except (dns.exception.DNSException, Exception): return -1 def _check_dkim(domain: str) -> int: """Query common DKIM selector TXT records. Returns -1/0/1.""" common_selectors = ["default", "google", "mail", "k1", "s1", "s2"] for selector in common_selectors: try: dkim_domain = f"{selector}._domainkey.{domain}" dns.resolver.resolve(dkim_domain, "TXT", lifetime=2.0) return 1 # DKIM record found for at least one common selector except Exception: continue return -1 # No DKIM record found def _check_dmarc(domain: str) -> int: """Query DMARC TXT record. Returns -1/0/1.""" try: answers = dns.resolver.resolve(f"_dmarc.{domain}", "TXT", lifetime=3.0) for rdata in answers: txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings) if "v=dmarc1" in txt.lower(): if "p=reject" in txt.lower(): return 1 # Strong policy if "p=quarantine" in txt.lower(): return 0 # Moderate policy return -1 # p=none = no enforcement return -1 except Exception: return -1 def _check_message_id_suspicious(message_id: str) -> bool: """Detect suspicious or missing Message-ID headers. Security rationale: Legitimate mail servers always generate a Message-ID in the format . A missing, malformed, or freemail-domain Message-ID indicates relay abuse or spoofing. """ if not message_id: return True # Missing = suspicious # Check format: should be if not re.match(r"<[^@]+@[^>]+>", message_id.strip()): return True # Malformed # Freemail domain in Message-ID mid_domain = message_id.split("@")[-1].rstrip(">").lower() for fm in FREEMAIL_DOMAINS: if mid_domain.startswith(fm.split("@")[-1]): return True return False def _check_timezone_mismatch(date_header: str, received_headers: List[str]) -> bool: """Detect timezone mismatch between Date header and relay chain. Security rationale: Automated phishing tools often use incorrect timezones or copy timestamps, producing obvious mismatches. """ if not date_header or not received_headers: return False try: date_tz_match = _TZ_RE.search(date_header) if not date_tz_match: return False date_tz = date_tz_match.group(1) # Check last received header (closest to origin) last_received = received_headers[-1] if received_headers else "" recv_tz_match = _TZ_RE.search(last_received) if not recv_tz_match: return False recv_tz = recv_tz_match.group(1) return date_tz != recv_tz except Exception: return False def _check_xmailer_suspicious(x_mailer: str) -> bool: """Detect confirmed bulk-sender X-Mailer fingerprints. A MISSING X-Mailer is NOT suspicious — modern webmail clients (Gmail, Outlook Web Access, Apple Mail, Yahoo Mail) do not send X-Mailer at all. Only flag strings that match known mass-mailing software. """ if not x_mailer: return False # Missing = normal for modern webmail return bool(_BULK_MAILER_RE.search(x_mailer)) def _extract_domain(address: str) -> Optional[str]: """Extract the domain from an email address or bare domain.""" if not address: return None # Handle formats: "Name ", "email@domain.com", "" match = re.search(r"[\w.+-]+@([\w.-]+\.[a-zA-Z]{2,})", address) if match: return match.group(1) # Bare domain if re.match(r"^[\w.-]+\.[a-zA-Z]{2,}$", address.strip()): return address.strip() return None def _default_header_features() -> Dict: """Return zero-value defaults for all header features.""" return { "from_reply_to_mismatch": 0, "from_return_path_mismatch": 0, "reply_to_freemail": 0, "received_hop_count": 0, "received_geo_anomaly": 0, "spf_result": 0, "dkim_result": 0, "dmarc_result": 0, "message_id_suspicious": 0, "timezone_mismatch": 0, "x_mailer_suspicious": 0, "header_injection_attempt": 0, } def _parse_auth_results_header( auth_results: str, received_spf: str = "" ) -> Tuple[Optional[int], Optional[int], Optional[int]]: """Parse SPF/DKIM/DMARC results from inline Authentication-Results / Received-SPF. Returns (spf, dkim, dmarc) each as 1/0/-1, or None if not found in headers. This lets us populate auth features without any DNS network call. """ spf: Optional[int] = None dkim: Optional[int] = None dmarc: Optional[int] = None text = (auth_results + " " + received_spf).lower() m = re.search(r"\bspf\s*=\s*(\w+)", text) if m: spf = _AUTH_RESULT_SCORES.get(m.group(1)) m = re.search(r"\bdkim\s*=\s*(\w+)", text) if m: dkim = _AUTH_RESULT_SCORES.get(m.group(1)) m = re.search(r"\bdmarc\s*=\s*(\w+)", text) if m: dmarc = _AUTH_RESULT_SCORES.get(m.group(1)) return spf, dkim, dmarc