Spaces:
Sleeping
Sleeping
File size: 15,713 Bytes
0fd143d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 | """
PhishLens Header Forensics Feature Module.
Extracts 12 security-critical features from email headers.
All feature extraction is wrapped in try/except blocks to guarantee
no single malformed header crashes the pipeline.
Security rationale: Email headers contain the digital fingerprints of
message routing. Phishing campaigns consistently show specific header
anomalies: spoofed sender domains, reply-to hijacking, failed SPF/DKIM/DMARC,
and unusual relay chains. These 12 features capture the most reliable signals.
"""
from __future__ import annotations
import re
from typing import Dict, List, Optional, Tuple
import dns.resolver
import dns.exception
import tldextract
from src.utils.config import DEFAULT_CONFIG, FREEMAIL_DOMAINS, SUSPICIOUS_XMAILER_PATTERNS
from src.utils.logger import get_logger
log = get_logger(__name__)
# Precompile header injection detection pattern
# CRLF injection in headers is an email security attack vector
_HEADER_INJECTION_RE = re.compile(r"[\r\n\x00]")
# IP in Received header
_IP_RE = re.compile(r"\b(\d{1,3}\.){3}\d{1,3}\b")
# Country/timezone extraction from Received header
_TZ_RE = re.compile(r"([+-]\d{4})\s*\(?([A-Z]{2,5})?\)?")
# Known bulk-sender X-Mailer fingerprints (only flag confirmed bulk-mailers)
_BULK_MAILER_RE = re.compile(
"|".join(re.escape(p) for p in SUSPICIOUS_XMAILER_PATTERNS if p),
re.IGNORECASE,
)
# Inline Authentication-Results result-to-score mapping
_AUTH_RESULT_SCORES: Dict[str, int] = {
"pass": 1,
"softfail": 0,
"neutral": 0,
"none": -1,
"fail": -1,
"temperror": -1,
"permerror": -1,
}
def extract_header_features(parsed_email: Dict, use_network: bool = True) -> Dict:
"""Extract header forensics features from a parsed email dict.
Args:
parsed_email: Dict returned by eml_parser.parse_eml_bytes().
use_network: When False, DNS lookups (SPF/DKIM/DMARC) are skipped
and those features default to -1. Set False during offline
training to avoid blocking on DNS timeouts (~300-400 ms/email).
Returns:
Dict with header features, all with numeric or boolean values.
Falls back to safe defaults on any extraction failure.
"""
features: Dict = _default_header_features()
try:
features["from_reply_to_mismatch"] = _check_from_reply_mismatch(
parsed_email.get("from_address", ""),
parsed_email.get("reply_to", ""),
)
except Exception as exc:
log.debug(f"from_reply_to_mismatch error: {exc}")
try:
features["from_return_path_mismatch"] = _check_from_return_path_mismatch(
parsed_email.get("from_address", ""),
parsed_email.get("return_path", ""),
)
except Exception as exc:
log.debug(f"from_return_path_mismatch error: {exc}")
try:
# reply_to freemail = phisher redirecting replies to attacker mailbox
reply_to = parsed_email.get("reply_to", "")
features["reply_to_freemail"] = int(
bool(re.search(r"@(" + "|".join(re.escape(d.split("@")[-1]) for d in FREEMAIL_DOMAINS) + r")", reply_to, re.IGNORECASE))
)
except Exception as exc:
log.debug(f"reply_to_freemail error: {exc}")
try:
received = parsed_email.get("received_headers", [])
features["received_hop_count"] = len(received)
features["received_geo_anomaly"] = int(_check_geo_anomaly(received))
except Exception as exc:
log.debug(f"received_headers error: {exc}")
# Priority 1: parse inline Authentication-Results / Received-SPF headers
# (available on almost all emails without any network call)
try:
_ar = parsed_email.get("auth_results", "")
_rspf = parsed_email.get("received_spf_header", "")
if _ar or _rspf:
_spf, _dkim, _dmarc = _parse_auth_results_header(_ar, _rspf)
if _spf is not None:
features["spf_result"] = _spf
if _dkim is not None:
features["dkim_result"] = _dkim
if _dmarc is not None:
features["dmarc_result"] = _dmarc
# If DKIM-Signature header present, override with its presence
if parsed_email.get("dkim_signed", 0):
features["dkim_result"] = max(features["dkim_result"], 0) # at least neutral
# Priority 2: live DNS lookup when no inline results and network enabled
elif use_network:
from_domain = _extract_domain(parsed_email.get("from_address", ""))
if from_domain:
spf, dkim, dmarc = _check_email_authentication(from_domain)
features["spf_result"] = spf
features["dkim_result"] = dkim
features["dmarc_result"] = dmarc
except Exception as exc:
log.debug(f"SPF/DKIM/DMARC check error: {exc}")
try:
features["message_id_suspicious"] = int(
_check_message_id_suspicious(parsed_email.get("message_id", ""))
)
except Exception as exc:
log.debug(f"message_id_suspicious error: {exc}")
try:
features["timezone_mismatch"] = int(
_check_timezone_mismatch(
parsed_email.get("date", ""),
parsed_email.get("received_headers", []),
)
)
except Exception as exc:
log.debug(f"timezone_mismatch error: {exc}")
try:
# Only flag confirmed bulk-mailer fingerprints; missing X-Mailer is
# normal for modern webmail (Gmail, OWA, Apple Mail omit it).
features["x_mailer_suspicious"] = int(
_check_xmailer_suspicious(parsed_email.get("x_mailer", ""))
)
except Exception as exc:
log.debug(f"x_mailer_suspicious error: {exc}")
try:
# CRLF injection in any header value = potential header injection attack
header_raw = parsed_email.get("header_raw", "")
features["header_injection_attempt"] = int(
bool(_HEADER_INJECTION_RE.search(header_raw[:2048]))
)
except Exception as exc:
log.debug(f"header_injection_attempt error: {exc}")
return features
# ---------------------------------------------------------------------------
# Feature implementation functions
# ---------------------------------------------------------------------------
def _check_from_reply_mismatch(from_addr: str, reply_to: str) -> int:
"""Detect mismatch between From and Reply-To registered domains.
Compares registered domains (e.g. paypal.com) not full hostnames so that
newsletters.paypal.com → paypal.com does NOT trigger as a mismatch.
Returns 1 if mismatch detected, 0 if same registered domain or Reply-To absent.
"""
if not reply_to or not from_addr:
return 0
from_domain = _extract_domain(from_addr)
reply_domain = _extract_domain(reply_to)
if not from_domain or not reply_domain:
return 0
try:
from_reg = tldextract.extract(from_domain).registered_domain or from_domain
reply_reg = tldextract.extract(reply_domain).registered_domain or reply_domain
return int(from_reg.lower() != reply_reg.lower())
except Exception:
return int(from_domain.lower() != reply_domain.lower())
def _check_from_return_path_mismatch(from_addr: str, return_path: str) -> int:
"""Detect mismatch between From and Return-Path registered domains.
Compares registered domains so bounce-handling subdomains (e.g.
bounce.paypal.com) do not falsely flag as a mismatch against paypal.com.
"""
if not return_path or not from_addr:
return 0
from_domain = _extract_domain(from_addr)
rp_domain = _extract_domain(return_path)
if not from_domain or not rp_domain:
return 0
try:
from_reg = tldextract.extract(from_domain).registered_domain or from_domain
rp_reg = tldextract.extract(rp_domain).registered_domain or rp_domain
return int(from_reg.lower() != rp_reg.lower())
except Exception:
return int(from_domain.lower() != rp_domain.lower())
def _check_geo_anomaly(received_headers: List[str]) -> bool:
"""Detect geographic anomalies in the Received header relay chain.
Security rationale: A legitimate corporate email typically routes through
a predictable set of servers. A relay chain jumping between 3+ distinct
continents or containing Tor exit nodes signals relay abuse.
"""
if len(received_headers) < 2:
return False
# Extract IPs from Received headers and check for >2 distinct /8 subnets
# as a proxy for geographic diversity (simplified heuristic)
ips = []
for h in received_headers:
matches = _IP_RE.findall(h)
ips.extend(matches)
if len(ips) < 2:
return False
# Check if IPs span more than 2 distinct /16 subnets (class B)
subnets = {".".join(ip.split(".")[:2]) for ip in ips if not ip.startswith("10.")
and not ip.startswith("192.168.") and not ip.startswith("127.")}
return len(subnets) > 2
def _check_email_authentication(domain: str) -> tuple[int, int, int]:
"""Check SPF, DKIM, and DMARC DNS records for the sender domain.
Security rationale: SPF/DKIM/DMARC are the three email authentication
standards. All three failing simultaneously is the strongest possible
phishing signal — it means the sending server is not authorised,
the message is not cryptographically signed, and the domain owner has
not published anti-phishing policy.
Returns:
Tuple of (spf_result, dkim_result, dmarc_result).
-1 = fail/not found, 0 = neutral/softfail, 1 = pass
"""
spf_result = _check_spf(domain)
dkim_result = _check_dkim(domain)
dmarc_result = _check_dmarc(domain)
return spf_result, dkim_result, dmarc_result
def _check_spf(domain: str) -> int:
"""Query SPF TXT record. Returns -1/0/1."""
try:
answers = dns.resolver.resolve(domain, "TXT", lifetime=3.0)
for rdata in answers:
txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings)
if "v=spf1" in txt.lower():
if "~all" in txt or "?all" in txt:
return 0 # Softfail / neutral
if "-all" in txt:
return 1 # Strict pass (record exists with hard reject)
return 1 # Record exists
return -1 # No SPF record
except (dns.exception.DNSException, Exception):
return -1
def _check_dkim(domain: str) -> int:
"""Query common DKIM selector TXT records. Returns -1/0/1."""
common_selectors = ["default", "google", "mail", "k1", "s1", "s2"]
for selector in common_selectors:
try:
dkim_domain = f"{selector}._domainkey.{domain}"
dns.resolver.resolve(dkim_domain, "TXT", lifetime=2.0)
return 1 # DKIM record found for at least one common selector
except Exception:
continue
return -1 # No DKIM record found
def _check_dmarc(domain: str) -> int:
"""Query DMARC TXT record. Returns -1/0/1."""
try:
answers = dns.resolver.resolve(f"_dmarc.{domain}", "TXT", lifetime=3.0)
for rdata in answers:
txt = "".join(s.decode("utf-8", errors="ignore") for s in rdata.strings)
if "v=dmarc1" in txt.lower():
if "p=reject" in txt.lower():
return 1 # Strong policy
if "p=quarantine" in txt.lower():
return 0 # Moderate policy
return -1 # p=none = no enforcement
return -1
except Exception:
return -1
def _check_message_id_suspicious(message_id: str) -> bool:
"""Detect suspicious or missing Message-ID headers.
Security rationale: Legitimate mail servers always generate a
Message-ID in the format <random@domain>. A missing, malformed,
or freemail-domain Message-ID indicates relay abuse or spoofing.
"""
if not message_id:
return True # Missing = suspicious
# Check format: should be <something@domain>
if not re.match(r"<[^@]+@[^>]+>", message_id.strip()):
return True # Malformed
# Freemail domain in Message-ID
mid_domain = message_id.split("@")[-1].rstrip(">").lower()
for fm in FREEMAIL_DOMAINS:
if mid_domain.startswith(fm.split("@")[-1]):
return True
return False
def _check_timezone_mismatch(date_header: str, received_headers: List[str]) -> bool:
"""Detect timezone mismatch between Date header and relay chain.
Security rationale: Automated phishing tools often use incorrect
timezones or copy timestamps, producing obvious mismatches.
"""
if not date_header or not received_headers:
return False
try:
date_tz_match = _TZ_RE.search(date_header)
if not date_tz_match:
return False
date_tz = date_tz_match.group(1)
# Check last received header (closest to origin)
last_received = received_headers[-1] if received_headers else ""
recv_tz_match = _TZ_RE.search(last_received)
if not recv_tz_match:
return False
recv_tz = recv_tz_match.group(1)
return date_tz != recv_tz
except Exception:
return False
def _check_xmailer_suspicious(x_mailer: str) -> bool:
"""Detect confirmed bulk-sender X-Mailer fingerprints.
A MISSING X-Mailer is NOT suspicious — modern webmail clients (Gmail,
Outlook Web Access, Apple Mail, Yahoo Mail) do not send X-Mailer at all.
Only flag strings that match known mass-mailing software.
"""
if not x_mailer:
return False # Missing = normal for modern webmail
return bool(_BULK_MAILER_RE.search(x_mailer))
def _extract_domain(address: str) -> Optional[str]:
"""Extract the domain from an email address or bare domain."""
if not address:
return None
# Handle formats: "Name <email@domain.com>", "email@domain.com", "<email@domain>"
match = re.search(r"[\w.+-]+@([\w.-]+\.[a-zA-Z]{2,})", address)
if match:
return match.group(1)
# Bare domain
if re.match(r"^[\w.-]+\.[a-zA-Z]{2,}$", address.strip()):
return address.strip()
return None
def _default_header_features() -> Dict:
"""Return zero-value defaults for all header features."""
return {
"from_reply_to_mismatch": 0,
"from_return_path_mismatch": 0,
"reply_to_freemail": 0,
"received_hop_count": 0,
"received_geo_anomaly": 0,
"spf_result": 0,
"dkim_result": 0,
"dmarc_result": 0,
"message_id_suspicious": 0,
"timezone_mismatch": 0,
"x_mailer_suspicious": 0,
"header_injection_attempt": 0,
}
def _parse_auth_results_header(
auth_results: str, received_spf: str = ""
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
"""Parse SPF/DKIM/DMARC results from inline Authentication-Results / Received-SPF.
Returns (spf, dkim, dmarc) each as 1/0/-1, or None if not found in headers.
This lets us populate auth features without any DNS network call.
"""
spf: Optional[int] = None
dkim: Optional[int] = None
dmarc: Optional[int] = None
text = (auth_results + " " + received_spf).lower()
m = re.search(r"\bspf\s*=\s*(\w+)", text)
if m:
spf = _AUTH_RESULT_SCORES.get(m.group(1))
m = re.search(r"\bdkim\s*=\s*(\w+)", text)
if m:
dkim = _AUTH_RESULT_SCORES.get(m.group(1))
m = re.search(r"\bdmarc\s*=\s*(\w+)", text)
if m:
dmarc = _AUTH_RESULT_SCORES.get(m.group(1))
return spf, dkim, dmarc
|