PhishSentinel / src /features /header_features.py
github-actions[bot]
Deploy to HF Spaces (ci)
0fd143d
"""
PhishLens Header Forensics Feature Module.
Extracts 12 security-critical features from email headers.
All feature extraction is wrapped in try/except blocks to guarantee
no single malformed header crashes the pipeline.
Security rationale: Email headers contain the digital fingerprints of
message routing. Phishing campaigns consistently show specific header
anomalies: spoofed sender domains, reply-to hijacking, failed SPF/DKIM/DMARC,
and unusual relay chains. These 12 features capture the most reliable signals.
"""
from __future__ import annotations
import re
from typing import Dict, List, Optional, Tuple
import dns.resolver
import dns.exception
import tldextract
from src.utils.config import DEFAULT_CONFIG, FREEMAIL_DOMAINS, SUSPICIOUS_XMAILER_PATTERNS
from src.utils.logger import get_logger
log = get_logger(__name__)
# Precompile header injection detection pattern
# CRLF injection in headers is an email security attack vector
_HEADER_INJECTION_RE = re.compile(r"[\r\n\x00]")
# IP in Received header
_IP_RE = re.compile(r"\b(\d{1,3}\.){3}\d{1,3}\b")
# Country/timezone extraction from Received header
_TZ_RE = re.compile(r"([+-]\d{4})\s*\(?([A-Z]{2,5})?\)?")
# Known bulk-sender X-Mailer fingerprints (only flag confirmed bulk-mailers)
_BULK_MAILER_RE = re.compile(
"|".join(re.escape(p) for p in SUSPICIOUS_XMAILER_PATTERNS if p),
re.IGNORECASE,
)
# Inline Authentication-Results result-to-score mapping
_AUTH_RESULT_SCORES: Dict[str, int] = {
"pass": 1,
"softfail": 0,
"neutral": 0,
"none": -1,
"fail": -1,
"temperror": -1,
"permerror": -1,
}
def extract_header_features(parsed_email: Dict, use_network: bool = True) -> Dict:
"""Extract header forensics features from a parsed email dict.
Args:
parsed_email: Dict returned by eml_parser.parse_eml_bytes().
use_network: When False, DNS lookups (SPF/DKIM/DMARC) are skipped
and those features default to -1. Set False during offline
training to avoid blocking on DNS timeouts (~300-400 ms/email).
Returns:
Dict with header features, all with numeric or boolean values.
Falls back to safe defaults on any extraction failure.
"""
features: Dict = _default_header_features()
try:
features["from_reply_to_mismatch"] = _check_from_reply_mismatch(
parsed_email.get("from_address", ""),
parsed_email.get("reply_to", ""),
)
except Exception as exc:
log.debug(f"from_reply_to_mismatch error: {exc}")
try:
features["from_return_path_mismatch"] = _check_from_return_path_mismatch(
parsed_email.get("from_address", ""),
parsed_email.get("return_path", ""),
)
except Exception as exc:
log.debug(f"from_return_path_mismatch error: {exc}")
try:
# reply_to freemail = phisher redirecting replies to attacker mailbox
reply_to = parsed_email.get("reply_to", "")
features["reply_to_freemail"] = int(
bool(re.search(r"@(" + "|".join(re.escape(d.split("@")[-1]) for d in FREEMAIL_DOMAINS) + r")", reply_to, re.IGNORECASE))
)
except Exception as exc:
log.debug(f"reply_to_freemail error: {exc}")
try:
received = parsed_email.get("received_headers", [])
features["received_hop_count"] = len(received)
features["received_geo_anomaly"] = int(_check_geo_anomaly(received))
except Exception as exc:
log.debug(f"received_headers error: {exc}")
# Priority 1: parse inline Authentication-Results / Received-SPF headers
# (available on almost all emails without any network call)
try:
_ar = parsed_email.get("auth_results", "")
_rspf = parsed_email.get("received_spf_header", "")
if _ar or _rspf:
_spf, _dkim, _dmarc = _parse_auth_results_header(_ar, _rspf)
if _spf is not None:
features["spf_result"] = _spf
if _dkim is not None:
features["dkim_result"] = _dkim
if _dmarc is not None:
features["dmarc_result"] = _dmarc
# If DKIM-Signature header present, override with its presence
if parsed_email.get("dkim_signed", 0):
features["dkim_result"] = max(features["dkim_result"], 0) # at least neutral
# Priority 2: live DNS lookup when no inline results and network enabled
elif use_network:
from_domain = _extract_domain(parsed_email.get("from_address", ""))
if from_domain:
spf, dkim, dmarc = _check_email_authentication(from_domain)
features["spf_result"] = spf
features["dkim_result"] = dkim
features["dmarc_result"] = dmarc
except Exception as exc:
log.debug(f"SPF/DKIM/DMARC check error: {exc}")
try:
features["message_id_suspicious"] = int(
_check_message_id_suspicious(parsed_email.get("message_id", ""))
)
except Exception as exc:
log.debug(f"message_id_suspicious error: {exc}")
try:
features["timezone_mismatch"] = int(
_check_timezone_mismatch(
parsed_email.get("date", ""),
parsed_email.get("received_headers", []),
)
)
except Exception as exc:
log.debug(f"timezone_mismatch error: {exc}")
try:
# Only flag confirmed bulk-mailer fingerprints; missing X-Mailer is
# normal for modern webmail (Gmail, OWA, Apple Mail omit it).
features["x_mailer_suspicious"] = int(
_check_xmailer_suspicious(parsed_email.get("x_mailer", ""))
)
except Exception as exc:
log.debug(f"x_mailer_suspicious error: {exc}")
try:
# CRLF injection in any header value = potential header injection attack
header_raw = parsed_email.get("header_raw", "")
features["header_injection_attempt"] = int(
bool(_HEADER_INJECTION_RE.search(header_raw[:2048]))
)
except Exception as exc:
log.debug(f"header_injection_attempt error: {exc}")
return features
# ---------------------------------------------------------------------------
# Feature implementation functions
# ---------------------------------------------------------------------------
def _check_from_reply_mismatch(from_addr: str, reply_to: str) -> int:
"""Detect mismatch between From and Reply-To registered domains.
Compares registered domains (e.g. paypal.com) not full hostnames so that
newsletters.paypal.com → paypal.com does NOT trigger as a mismatch.
Returns 1 if mismatch detected, 0 if same registered domain or Reply-To absent.
"""
if not reply_to or not from_addr:
return 0
from_domain = _extract_domain(from_addr)
reply_domain = _extract_domain(reply_to)
if not from_domain or not reply_domain:
return 0
try:
from_reg = tldextract.extract(from_domain).registered_domain or from_domain
reply_reg = tldextract.extract(reply_domain).registered_domain or reply_domain
return int(from_reg.lower() != reply_reg.lower())
except Exception:
return int(from_domain.lower() != reply_domain.lower())
def _check_from_return_path_mismatch(from_addr: str, return_path: str) -> int:
"""Detect mismatch between From and Return-Path registered domains.
Compares registered domains so bounce-handling subdomains (e.g.
bounce.paypal.com) do not falsely flag as a mismatch against paypal.com.
"""
if not return_path or not from_addr:
return 0
from_domain = _extract_domain(from_addr)
rp_domain = _extract_domain(return_path)
if not from_domain or not rp_domain:
return 0
try:
from_reg = tldextract.extract(from_domain).registered_domain or from_domain
rp_reg = tldextract.extract(rp_domain).registered_domain or rp_domain
return int(from_reg.lower() != rp_reg.lower())
except Exception:
return int(from_domain.lower() != rp_domain.lower())
def _check_geo_anomaly(received_headers: List[str]) -> bool:
"""Detect geographic anomalies in the Received header relay chain.
Security rationale: A legitimate corporate email typically routes through
a predictable set of servers. A relay chain jumping between 3+ distinct
continents or containing Tor exit nodes signals relay abuse.
"""
if len(received_headers) < 2:
return False
# Extract IPs from Received headers and check for >2 distinct /8 subnets
# as a proxy for geographic diversity (simplified heuristic)
ips = []
for h in received_headers:
matches = _IP_RE.findall(h)
ips.extend(matches)
if len(ips) < 2:
return False
# Check if IPs span more than 2 distinct /16 subnets (class B)
subnets = {".".join(ip.split(".")[:2]) for ip in ips if not ip.startswith("10.")
and not ip.startswith("192.168.") and not ip.startswith("127.")}
return len(subnets) > 2
def _check_email_authentication(domain: str) -> tuple[int, int, int]:
"""Check SPF, DKIM, and DMARC DNS records for the sender domain.
Security rationale: SPF/DKIM/DMARC are the three email authentication
standards. All three failing simultaneously is the strongest possible
phishing signal — it means the sending server is not authorised,
the message is not cryptographically signed, and the domain owner has
not published anti-phishing policy.
Returns:
Tuple of (spf_result, dkim_result, dmarc_result).
-1 = fail/not found, 0 = neutral/softfail, 1 = pass
"""
spf_result = _check_spf(domain)
dkim_result = _check_dkim(domain)
dmarc_result = _check_dmarc(domain)
return spf_result, dkim_result, dmarc_result
def _check_spf(domain: str) -> int:
"""Query SPF TXT record. Returns -1/0/1."""
try:
answers = dns.resolver.resolve(domain, "TXT", lifetime=3.0)
for rdata in answers:
txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings)
if "v=spf1" in txt.lower():
if "~all" in txt or "?all" in txt:
return 0 # Softfail / neutral
if "-all" in txt:
return 1 # Strict pass (record exists with hard reject)
return 1 # Record exists
return -1 # No SPF record
except (dns.exception.DNSException, Exception):
return -1
def _check_dkim(domain: str) -> int:
"""Query common DKIM selector TXT records. Returns -1/0/1."""
common_selectors = ["default", "google", "mail", "k1", "s1", "s2"]
for selector in common_selectors:
try:
dkim_domain = f"{selector}._domainkey.{domain}"
dns.resolver.resolve(dkim_domain, "TXT", lifetime=2.0)
return 1 # DKIM record found for at least one common selector
except Exception:
continue
return -1 # No DKIM record found
def _check_dmarc(domain: str) -> int:
"""Query DMARC TXT record. Returns -1/0/1."""
try:
answers = dns.resolver.resolve(f"_dmarc.{domain}", "TXT", lifetime=3.0)
for rdata in answers:
txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings)
if "v=dmarc1" in txt.lower():
if "p=reject" in txt.lower():
return 1 # Strong policy
if "p=quarantine" in txt.lower():
return 0 # Moderate policy
return -1 # p=none = no enforcement
return -1
except Exception:
return -1
def _check_message_id_suspicious(message_id: str) -> bool:
"""Detect suspicious or missing Message-ID headers.
Security rationale: Legitimate mail servers always generate a
Message-ID in the format <random@domain>. A missing, malformed,
or freemail-domain Message-ID indicates relay abuse or spoofing.
"""
if not message_id:
return True # Missing = suspicious
# Check format: should be <something@domain>
if not re.match(r"<[^@]+@[^>]+>", message_id.strip()):
return True # Malformed
# Freemail domain in Message-ID
mid_domain = message_id.split("@")[-1].rstrip(">").lower()
for fm in FREEMAIL_DOMAINS:
if mid_domain.startswith(fm.split("@")[-1]):
return True
return False
def _check_timezone_mismatch(date_header: str, received_headers: List[str]) -> bool:
"""Detect timezone mismatch between Date header and relay chain.
Security rationale: Automated phishing tools often use incorrect
timezones or copy timestamps, producing obvious mismatches.
"""
if not date_header or not received_headers:
return False
try:
date_tz_match = _TZ_RE.search(date_header)
if not date_tz_match:
return False
date_tz = date_tz_match.group(1)
# Check last received header (closest to origin)
last_received = received_headers[-1] if received_headers else ""
recv_tz_match = _TZ_RE.search(last_received)
if not recv_tz_match:
return False
recv_tz = recv_tz_match.group(1)
return date_tz != recv_tz
except Exception:
return False
def _check_xmailer_suspicious(x_mailer: str) -> bool:
"""Detect confirmed bulk-sender X-Mailer fingerprints.
A MISSING X-Mailer is NOT suspicious — modern webmail clients (Gmail,
Outlook Web Access, Apple Mail, Yahoo Mail) do not send X-Mailer at all.
Only flag strings that match known mass-mailing software.
"""
if not x_mailer:
return False # Missing = normal for modern webmail
return bool(_BULK_MAILER_RE.search(x_mailer))
def _extract_domain(address: str) -> Optional[str]:
"""Extract the domain from an email address or bare domain."""
if not address:
return None
# Handle formats: "Name <email@domain.com>", "email@domain.com", "<email@domain>"
match = re.search(r"[\w.+-]+@([\w.-]+\.[a-zA-Z]{2,})", address)
if match:
return match.group(1)
# Bare domain
if re.match(r"^[\w.-]+\.[a-zA-Z]{2,}$", address.strip()):
return address.strip()
return None
def _default_header_features() -> Dict:
"""Return zero-value defaults for all header features."""
return {
"from_reply_to_mismatch": 0,
"from_return_path_mismatch": 0,
"reply_to_freemail": 0,
"received_hop_count": 0,
"received_geo_anomaly": 0,
"spf_result": 0,
"dkim_result": 0,
"dmarc_result": 0,
"message_id_suspicious": 0,
"timezone_mismatch": 0,
"x_mailer_suspicious": 0,
"header_injection_attempt": 0,
}
def _parse_auth_results_header(
auth_results: str, received_spf: str = ""
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
"""Parse SPF/DKIM/DMARC results from inline Authentication-Results / Received-SPF.
Returns (spf, dkim, dmarc) each as 1/0/-1, or None if not found in headers.
This lets us populate auth features without any DNS network call.
"""
spf: Optional[int] = None
dkim: Optional[int] = None
dmarc: Optional[int] = None
text = (auth_results + " " + received_spf).lower()
m = re.search(r"\bspf\s*=\s*(\w+)", text)
if m:
spf = _AUTH_RESULT_SCORES.get(m.group(1))
m = re.search(r"\bdkim\s*=\s*(\w+)", text)
if m:
dkim = _AUTH_RESULT_SCORES.get(m.group(1))
m = re.search(r"\bdmarc\s*=\s*(\w+)", text)
if m:
dmarc = _AUTH_RESULT_SCORES.get(m.group(1))
return spf, dkim, dmarc