Spaces:
Running
Running
| """ | |
| PhishLens URL Feature Engineering Module. | |
| Extracts lexical, WHOIS, and certificate transparency features from URLs | |
| found in email bodies. All network calls use strict timeouts and fallbacks. | |
| Per-URL features are aggregated (max/mean/count) across all URLs in an email. | |
| Security rationale: URL analysis is the single most reliable phishing signal | |
| category. Phishers cannot easily avoid: newly registered domains, high-entropy | |
| URLs, brand keywords in subdomains, punycode homoglyphs, and Let's Encrypt | |
| certs on <30-day-old domains. Lexical features require zero network calls, | |
| making them zero-day safe — they work even on unknown phishing infrastructure. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import hashlib | |
| import math | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import Dict, List, Optional, Tuple | |
| from urllib.parse import urlparse | |
| import requests | |
| import tldextract | |
| import whois | |
| from src.utils.config import ( | |
| DEFAULT_CONFIG, | |
| BRAND_LIST, | |
| RISK_TLD_LIST, | |
| SAFE_TLD_LIST, | |
| URL_SHORTENER_DOMAINS, | |
| SUSPICIOUS_URL_KEYWORDS, | |
| ABUSE_REGISTRARS, | |
| API_ENDPOINTS, | |
| NETWORK_TIMEOUT, | |
| WHOIS_TIMEOUT, | |
| ) | |
| from src.utils.logger import get_logger | |
| log = get_logger(__name__) | |
| # Confusable homoglyph library (Unicode spoofing detection) | |
| try: | |
| from confusable_homoglyphs import confusables | |
| _CONFUSABLES_AVAILABLE = True | |
| except ImportError: | |
| _CONFUSABLES_AVAILABLE = False | |
| log.warning("confusable_homoglyphs not available — homoglyph detection disabled.") | |
| _IP_URL_RE = re.compile(r"https?://(\d{1,3}\.){3}\d{1,3}") | |
| # --------------------------------------------------------------------------- | |
| # Public interface | |
| # --------------------------------------------------------------------------- | |
| def extract_url_features(urls: List[str], config=DEFAULT_CONFIG) -> Dict: | |
| """Extract and aggregate URL features from a list of email URLs. | |
| Args: | |
| urls: List of URL strings extracted from an email. | |
| config: PhishLensConfig instance. | |
| Returns: | |
| Dict of aggregated features (max/mean/count across all URLs). | |
| Returns zero-filled defaults if urls is empty. | |
| """ | |
| if not urls: | |
| return _default_url_features() | |
| per_url_results: List[Dict] = [] | |
| for url in urls[:20]: # Cap at 20 URLs to prevent DoS via URL flooding | |
| try: | |
| features = _extract_single_url_features(url, config) | |
| per_url_results.append(features) | |
| except Exception as exc: | |
| log.debug(f"URL feature extraction error for '{url[:80]}': {exc}") | |
| per_url_results.append(_default_single_url_features()) | |
| # Keep schema stable: always return the full URL feature key set, | |
| # even when network lookups are disabled. | |
| aggregated = _aggregate_url_features(per_url_results, total_url_count=len(urls)) | |
| stable = _default_url_features() | |
| stable.update(aggregated) | |
| return stable | |
| def extract_url_features_with_network( | |
| urls: List[str], | |
| config=DEFAULT_CONFIG, | |
| ) -> Dict: | |
| """Extract URL features including async WHOIS + certificate transparency. | |
| This function adds network-dependent features on top of the lexical features. | |
| Uses ThreadPoolExecutor for parallel WHOIS / crt.sh queries. | |
| Args: | |
| urls: List of URL strings. | |
| config: PhishLensConfig instance. | |
| Returns: | |
| Dict with both lexical and network-based features. | |
| """ | |
| base_features = extract_url_features(urls, config) | |
| if not urls: | |
| return base_features | |
| # Sample first 5 unique domains for network lookups (rate limit protection) | |
| domains = list({_get_registered_domain(u) for u in urls[:10] if _get_registered_domain(u)}) | |
| domains = [d for d in domains if d][:5] | |
| whois_features = _aggregate_whois_features(domains, config) | |
| cert_features = _aggregate_cert_features(domains, config) | |
| base_features.update(whois_features) | |
| base_features.update(cert_features) | |
| return base_features | |
| # --------------------------------------------------------------------------- | |
| # Single URL feature extraction | |
| # --------------------------------------------------------------------------- | |
| def _extract_single_url_features(url: str, config) -> Dict: | |
| """Extract all lexical features for a single URL.""" | |
| features = _default_single_url_features() | |
| try: | |
| parsed = urlparse(url) | |
| ext = tldextract.extract(url) | |
| domain = ext.domain or "" | |
| suffix = ext.suffix or "" | |
| subdomain = ext.subdomain or "" | |
| registered_domain = ext.top_domain_under_public_suffix or "" | |
| full_domain = parsed.netloc or "" | |
| path = parsed.path or "" | |
| # domain_length: longer domains = higher phishing probability | |
| features["domain_length"] = len(registered_domain) | |
| # subdomain_depth: deep subdomain nesting = obfuscation | |
| features["subdomain_depth"] = len(subdomain.split(".")) if subdomain else 0 | |
| # hyphen_count: hyphens in domain often mimic legitimate brand names | |
| features["hyphen_count"] = registered_domain.count("-") | |
| # digit_ratio: high digit proportion across full hostname = random domain generation | |
| if full_domain: | |
| features["digit_ratio"] = sum(c.isdigit() for c in full_domain) / len(full_domain) | |
| # url_entropy: Shannon entropy of the full URL string | |
| features["url_entropy"] = _shannon_entropy(url) | |
| # brand_in_subdomain: e.g., paypal.secure-login.xyz | |
| features["brand_in_subdomain"] = int( | |
| _has_brand_in_subdomain(subdomain, registered_domain, config.brand_list) | |
| ) | |
| # tld_risk_score: .xyz/.tk etc. = 1.0, .com/.ie = 0.0, unknown = 0.5 | |
| tld_with_dot = f".{suffix.lower()}" if suffix else "" | |
| if tld_with_dot in config.risk_tld_list: | |
| features["tld_risk_score"] = 1.0 | |
| elif tld_with_dot in config.safe_tld_list: | |
| features["tld_risk_score"] = 0.0 | |
| else: | |
| features["tld_risk_score"] = 0.5 | |
| # is_ip_address: raw IP in URL = strong phishing signal | |
| features["is_ip_address"] = int(bool(_IP_URL_RE.match(url))) | |
| # punycode_detected: xn-- = internationalised domain (homoglyph risk) | |
| features["punycode_detected"] = int( | |
| "xn--" in url.lower() or _has_confusable_homoglyph(full_domain) | |
| ) | |
| # url_shortener: bit.ly, tinyurl, etc. | |
| features["url_shortener"] = int( | |
| any(shortener in full_domain.lower() for shortener in config.url_shortener_domains) | |
| ) | |
| # path_depth: /verify/account/reset = 3 levels = suspicious | |
| features["path_depth"] = len([p for p in path.split("/") if p]) | |
| # suspicious_keywords_in_url | |
| url_lower = url.lower() | |
| kw_count = sum( | |
| 1 for kw in config.suspicious_url_keywords if kw in url_lower | |
| ) | |
| # Credential spoofing trick: http://paypal.com@attacker.com/login | |
| # urlparse treats everything before @ as credentials; the host is attacker.com | |
| if "@" in (parsed.netloc or ""): | |
| kw_count += 3 # Heavy penalty — this is almost always malicious | |
| features["suspicious_keywords_in_url"] = kw_count | |
| except Exception as exc: | |
| log.debug(f"_extract_single_url_features error: {exc}") | |
| return features | |
| def _aggregate_url_features(per_url: List[Dict], total_url_count: int) -> Dict: | |
| """Aggregate per-URL features into email-level max/mean/count statistics.""" | |
| if not per_url: | |
| return _default_url_features() | |
| numeric_keys = [ | |
| "domain_length", "subdomain_depth", "hyphen_count", | |
| "digit_ratio", "url_entropy", "brand_in_subdomain", | |
| "tld_risk_score", "is_ip_address", "punycode_detected", | |
| "url_shortener", "path_depth", "suspicious_keywords_in_url", | |
| ] | |
| aggregated: Dict = {"url_count": total_url_count} | |
| for key in numeric_keys: | |
| vals = [r.get(key, 0) for r in per_url] | |
| aggregated[f"{key}_max"] = max(vals) | |
| aggregated[f"{key}_mean"] = sum(vals) / len(vals) | |
| return aggregated | |
| # --------------------------------------------------------------------------- | |
| # WHOIS features | |
| # --------------------------------------------------------------------------- | |
| def _get_whois_features(domain: str, config) -> Dict: | |
| """Query WHOIS for domain age and registrar risk. | |
| Returns: | |
| Dict with domain_age_days, domain_age_risk, registrar_risk. | |
| Falls back to -1 values on timeout or WHOIS failure (~30% miss rate). | |
| """ | |
| features = { | |
| "domain_age_days": -1, | |
| "domain_age_risk": 0.5, | |
| "registrar_risk": 0.0, | |
| } | |
| try: | |
| w = whois.whois(domain) | |
| creation_date = w.creation_date | |
| if isinstance(creation_date, list): | |
| creation_date = creation_date[0] | |
| if creation_date: | |
| import datetime | |
| age_days = (datetime.datetime.now() - creation_date).days | |
| features["domain_age_days"] = age_days | |
| if age_days < config.domain_age_risk_days: | |
| features["domain_age_risk"] = 1.0 # Brand new = high risk | |
| elif age_days < config.domain_age_warn_days: | |
| features["domain_age_risk"] = 0.5 | |
| else: | |
| features["domain_age_risk"] = 0.0 | |
| registrar = str(w.registrar or "").lower() | |
| features["registrar_risk"] = float( | |
| any(abuse_reg in registrar for abuse_reg in config.abuse_registrars) | |
| ) | |
| except Exception as exc: | |
| log.debug(f"WHOIS lookup failed for '{domain}': {exc}") | |
| return features | |
| def _aggregate_whois_features(domains: List[str], config) -> Dict: | |
| """Run WHOIS lookups in parallel and aggregate results.""" | |
| all_results = [] | |
| with ThreadPoolExecutor(max_workers=3) as executor: | |
| futures = {executor.submit(_get_whois_features, d, config): d for d in domains} | |
| for future in as_completed(futures, timeout=config.whois_timeout + 2): | |
| try: | |
| all_results.append(future.result(timeout=config.whois_timeout)) | |
| except Exception: | |
| all_results.append({"domain_age_days": -1, "domain_age_risk": 0.5, "registrar_risk": 0.0}) | |
| if not all_results: | |
| return {"domain_age_days": -1, "domain_age_risk": 0.5, "registrar_risk": 0.0} | |
| # Use worst-case (highest risk) values across all domains | |
| return { | |
| "domain_age_days": min(r["domain_age_days"] for r in all_results), | |
| "domain_age_risk": max(r["domain_age_risk"] for r in all_results), | |
| "registrar_risk": max(r["registrar_risk"] for r in all_results), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Certificate transparency features (crt.sh) | |
| # --------------------------------------------------------------------------- | |
| def _get_cert_features(domain: str, config) -> Dict: | |
| """Query crt.sh for certificate transparency data. | |
| Security rationale: Let's Encrypt certs on domains < 30 days old with | |
| brand keywords in their SAN is one of the strongest phishing signals | |
| in modern attack infrastructure. | |
| """ | |
| features = { | |
| "cert_age_days": -1, | |
| "cert_lets_encrypt": 0, | |
| "cert_brand_mismatch": 0, | |
| } | |
| try: | |
| url = API_ENDPOINTS["crtsh"].format(domain=domain) | |
| resp = requests.get(url, timeout=config.network_timeout) | |
| if resp.status_code != 200: | |
| return features | |
| certs = resp.json() | |
| if not certs: | |
| return features | |
| import datetime | |
| # Find oldest cert entry | |
| min_age = None | |
| le_found = False | |
| brand_mismatch = False | |
| for cert in certs[:50]: # Cap at 50 entries | |
| try: | |
| not_before = cert.get("not_before", "") | |
| if not_before: | |
| issued_dt = datetime.datetime.fromisoformat(not_before.replace("T", " ").split(".")[0]) | |
| age = (datetime.datetime.utcnow() - issued_dt).days | |
| if min_age is None or age < min_age: | |
| min_age = age | |
| issuer = cert.get("issuer_name", "").lower() | |
| if "let's encrypt" in issuer or "lets encrypt" in issuer: | |
| le_found = True | |
| # Brand in SAN but not registered domain = impersonation | |
| san = cert.get("name_value", "").lower() | |
| for brand in BRAND_LIST: | |
| if brand in san and brand not in domain.lower(): | |
| brand_mismatch = True | |
| break | |
| except Exception: | |
| continue | |
| features["cert_age_days"] = min_age if min_age is not None else -1 | |
| features["cert_lets_encrypt"] = int(le_found) | |
| features["cert_brand_mismatch"] = int(brand_mismatch) | |
| except Exception as exc: | |
| log.debug(f"crt.sh lookup failed for '{domain}': {exc}") | |
| return features | |
| def _aggregate_cert_features(domains: List[str], config) -> Dict: | |
| """Run crt.sh lookups asynchronously and aggregate results. | |
| Security rationale: Using asyncio + aiohttp for crt.sh HTTP calls reduces | |
| wall-clock time from O(n × timeout) to O(timeout) for n domains by | |
| dispatching all HTTP requests concurrently. WHOIS stays in ThreadPoolExecutor | |
| because the whois library uses blocking socket calls that cannot be adapted | |
| to asyncio without monkey-patching. | |
| """ | |
| if not domains: | |
| return {"cert_age_days": -1, "cert_lets_encrypt": 0, "cert_brand_mismatch": 0} | |
| try: | |
| # Try to get the running loop (works in Jupyter / async contexts) | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | |
| all_results = pool.submit( | |
| asyncio.run, _dispatch_certs_async(domains, config) | |
| ).result() | |
| else: | |
| all_results = asyncio.run(_dispatch_certs_async(domains, config)) | |
| except Exception as exc: | |
| log.debug(f"Async cert dispatch failed, falling back to sync: {exc}") | |
| all_results = [_get_cert_features(d, config) for d in domains] | |
| if not all_results: | |
| return {"cert_age_days": -1, "cert_lets_encrypt": 0, "cert_brand_mismatch": 0} | |
| return { | |
| "cert_age_days": min((r["cert_age_days"] for r in all_results if r["cert_age_days"] >= 0), default=-1), | |
| "cert_lets_encrypt": max(r["cert_lets_encrypt"] for r in all_results), | |
| "cert_brand_mismatch": max(r["cert_brand_mismatch"] for r in all_results), | |
| } | |
| async def _dispatch_certs_async(domains: List[str], config) -> List[Dict]: | |
| """Dispatch all crt.sh HTTP lookups concurrently using aiohttp. | |
| Security rationale: aiohttp with a 3-second per-request timeout prevents | |
| a slow/malicious crt.sh response from blocking the main pipeline for the | |
| duration of all domain lookups combined. Each coroutine silently returns | |
| -1 defaults on any failure — the pipeline never crashes on network issues. | |
| Args: | |
| domains: List of registered domain strings. | |
| config: PhishLensConfig with network_timeout setting. | |
| Returns: | |
| List of cert feature dicts, one per domain. | |
| """ | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| log.warning("aiohttp not installed — falling back to sync crt.sh lookups") | |
| return [_get_cert_features(d, config) for d in domains] | |
| timeout = aiohttp.ClientTimeout(total=3) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| tasks = [_fetch_cert_async(d, session, config) for d in domains] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| processed: List[Dict] = [] | |
| for r in results: | |
| if isinstance(r, Exception) or not isinstance(r, dict): | |
| processed.append({"cert_age_days": -1, "cert_lets_encrypt": 0, "cert_brand_mismatch": 0}) | |
| else: | |
| processed.append(r) | |
| return processed | |
| async def _fetch_cert_async(domain: str, session, config) -> Dict: | |
| """Fetch and parse crt.sh data for a single domain asynchronously. | |
| Args: | |
| domain: Registered domain to query. | |
| session: Shared aiohttp.ClientSession (connection-pooled). | |
| config: PhishLensConfig instance. | |
| Returns: | |
| Dict with cert_age_days, cert_lets_encrypt, cert_brand_mismatch. | |
| Returns -1/-0/0 defaults on any network or parse error. | |
| """ | |
| features = {"cert_age_days": -1, "cert_lets_encrypt": 0, "cert_brand_mismatch": 0} | |
| try: | |
| url = API_ENDPOINTS["crtsh"].format(domain=domain) | |
| async with session.get(url) as resp: | |
| if resp.status != 200: | |
| return features | |
| certs = await resp.json(content_type=None) | |
| if not certs: | |
| return features | |
| import datetime | |
| min_age = None | |
| le_found = False | |
| brand_mismatch = False | |
| for cert in certs[:50]: | |
| try: | |
| not_before = cert.get("not_before", "") | |
| if not_before: | |
| issued_dt = datetime.datetime.fromisoformat( | |
| not_before.replace("T", " ").split(".")[0] | |
| ) | |
| age = (datetime.datetime.utcnow() - issued_dt).days | |
| if min_age is None or age < min_age: | |
| min_age = age | |
| issuer = cert.get("issuer_name", "").lower() | |
| if "let's encrypt" in issuer or "lets encrypt" in issuer: | |
| le_found = True | |
| san = cert.get("name_value", "").lower() | |
| for brand in BRAND_LIST: | |
| if brand in san and brand not in domain.lower(): | |
| brand_mismatch = True | |
| break | |
| except Exception: | |
| continue | |
| features["cert_age_days"] = min_age if min_age is not None else -1 | |
| features["cert_lets_encrypt"] = int(le_found) | |
| features["cert_brand_mismatch"] = int(brand_mismatch) | |
| except Exception as exc: | |
| log.debug(f"crt.sh async fetch failed for '{domain}': {exc}") | |
| return features | |
| # --------------------------------------------------------------------------- | |
| # Helper functions | |
| # --------------------------------------------------------------------------- | |
| def _shannon_entropy(text: str) -> float: | |
| """Compute Shannon entropy of a string. | |
| Security rationale: High-entropy URLs (random character sequences) indicate | |
| algorithmically-generated domains (DGA) or obfuscated phishing infrastructure. | |
| """ | |
| if not text: | |
| return 0.0 | |
| freq = {} | |
| for c in text: | |
| freq[c] = freq.get(c, 0) + 1 | |
| n = len(text) | |
| return -sum((count / n) * math.log2(count / n) for count in freq.values()) | |
| def _has_brand_in_subdomain(subdomain: str, registered_domain: str, brand_list: List[str]) -> bool: | |
| """Detect brand keyword in subdomain but not in registered domain. | |
| Security rationale: paypal.secure-login.xyz — 'paypal' in subdomain but | |
| registered domain is 'secure-login.xyz'. This is the canonical brand | |
| impersonation pattern in phishing URLs. | |
| """ | |
| if not subdomain: | |
| return False | |
| subdomain_lower = subdomain.lower() | |
| registered_lower = registered_domain.lower() | |
| for brand in brand_list: | |
| if brand in subdomain_lower and brand not in registered_lower: | |
| return True | |
| return False | |
| def _has_confusable_homoglyph(domain: str) -> bool: | |
| """Detect Unicode confusable homoglyphs in the domain. | |
| Security rationale: Cyrillic 'а' vs Latin 'a', Greek 'ο' vs Latin 'o' etc. | |
| are used to create visually identical but different domain names. | |
| """ | |
| if not _CONFUSABLES_AVAILABLE: | |
| return False | |
| try: | |
| for char in domain: | |
| if confusables.is_dangerous(char): | |
| return True | |
| return False | |
| except Exception: | |
| return False | |
| def _get_registered_domain(url: str) -> Optional[str]: | |
| """Extract just the registered domain (e.g., google.com) from a URL.""" | |
| try: | |
| ext = tldextract.extract(url) | |
| return ext.registered_domain or None | |
| except Exception: | |
| return None | |
| def _default_single_url_features() -> Dict: | |
| """Zero-value defaults for single URL features.""" | |
| return { | |
| "domain_length": 0, | |
| "subdomain_depth": 0, | |
| "hyphen_count": 0, | |
| "digit_ratio": 0.0, | |
| "url_entropy": 0.0, | |
| "brand_in_subdomain": 0, | |
| "tld_risk_score": 0.0, | |
| "is_ip_address": 0, | |
| "punycode_detected": 0, | |
| "url_shortener": 0, | |
| "path_depth": 0, | |
| "suspicious_keywords_in_url": 0, | |
| } | |
| def _default_url_features() -> Dict: | |
| """Zero-value defaults for aggregated email-level URL features.""" | |
| base = {"url_count": 0} | |
| for key in _default_single_url_features(): | |
| base[f"{key}_max"] = 0 | |
| base[f"{key}_mean"] = 0.0 | |
| base.update({ | |
| "domain_age_days": -1, | |
| "domain_age_risk": 0.0, | |
| "registrar_risk": 0.0, | |
| "cert_age_days": -1, | |
| "cert_lets_encrypt": 0, | |
| "cert_brand_mismatch": 0, | |
| }) | |
| return base | |