rb1337's picture
Upload 50 files
2cc7f91 verified
"""
Optimized HTML Feature Extractor for Phishing Detection
Extracts 67 features from HTML content with single-parse efficiency.
Uses cached tag lookups to avoid redundant find_all() calls.
"""
import re
from urllib.parse import urlparse
from bs4 import BeautifulSoup
import logging
logger = logging.getLogger(__name__)
# Suspicious TLDs commonly used in phishing
SUSPICIOUS_TLDS = {
'.tk', '.ml', '.ga', '.cf', '.gq', '.top', '.xyz', '.buzz',
'.club', '.online', '.site', '.icu', '.work', '.click', '.link',
'.info', '.pw', '.cc', '.ws', '.bid', '.stream', '.racing',
}
# Brand keywords phishers commonly impersonate
BRAND_KEYWORDS = [
'paypal', 'amazon', 'google', 'microsoft', 'apple', 'facebook',
'netflix', 'ebay', 'instagram', 'twitter', 'linkedin', 'yahoo',
'bank', 'visa', 'mastercard', 'americanexpress', 'chase', 'wells',
'citibank', 'dhl', 'fedex', 'ups', 'usps', 'dropbox', 'adobe',
'spotify', 'whatsapp', 'telegram', 'steam', 'coinbase', 'binance',
]
# Urgency / social engineering keywords
URGENCY_KEYWORDS = [
'urgent', 'verify', 'suspended', 'locked', 'confirm',
'security', 'alert', 'warning', 'expire', 'limited',
'immediately', 'click here', 'act now', 'unusual activity',
'unauthorized', 'restricted', 'risk', 'compromised',
'your account', 'update your', 'verify your', 'confirm your',
'within 24', 'within 48', 'action required',
]
class HTMLFeatureExtractor:
"""
High-performance HTML feature extractor.
Parses HTML once and caches all tag lookups for efficiency.
Designed for batch processing of 40k+ files.
"""
def extract_features(self, html_content: str, url: str | None = None) -> dict:
"""
Extract all features from HTML content in a single pass.
Args:
html_content: Raw HTML string
url: Optional source URL for context
Returns:
Dictionary with 67 numeric features
"""
try:
# --- Single parse with fast parser ---
try:
soup = BeautifulSoup(html_content, 'lxml')
except Exception:
soup = BeautifulSoup(html_content, 'html.parser')
# --- Cache tag lookups (done ONCE) ---
cache = self._build_cache(soup)
features = {}
features.update(self._structure_features(soup, cache, html_content))
features.update(self._form_features(cache))
features.update(self._link_features(cache))
features.update(self._script_features(cache))
features.update(self._text_features(soup, cache))
features.update(self._meta_features(soup, cache))
features.update(self._resource_features(cache))
features.update(self._advanced_features(soup, cache))
return features
except Exception as e:
logger.debug(f"Feature extraction error: {e}")
return self._default_features()
# ------------------------------------------------------------------
# Cache builder – avoids redundant find_all() across feature groups
# ------------------------------------------------------------------
@staticmethod
def _build_cache(soup) -> dict:
"""Build a lookup cache of all tags we need. Called once per document."""
all_tags = soup.find_all()
# Classify tags by name in a single pass
by_name: dict[str, list] = {}
for tag in all_tags:
by_name.setdefault(tag.name, []).append(tag)
# Convenience lists used by multiple feature groups
links_a = by_name.get('a', [])
forms = by_name.get('form', [])
inputs = by_name.get('input', [])
scripts = by_name.get('script', [])
images = by_name.get('img', [])
iframes = by_name.get('iframe', [])
meta_tags = by_name.get('meta', [])
style_tags = by_name.get('style', [])
css_links = [t for t in by_name.get('link', [])
if t.get('rel') and 'stylesheet' in t.get('rel', [])]
all_link_tags = by_name.get('link', [])
# Pre-extract hrefs and input types (used in several groups)
hrefs = [a.get('href', '') or '' for a in links_a]
input_types = [(inp, (inp.get('type', '') or '').lower()) for inp in inputs]
return {
'all_tags': all_tags,
'by_name': by_name,
'links_a': links_a,
'hrefs': hrefs,
'forms': forms,
'inputs': inputs,
'input_types': input_types,
'scripts': scripts,
'images': images,
'iframes': iframes,
'meta_tags': meta_tags,
'style_tags': style_tags,
'css_links': css_links,
'all_link_tags': all_link_tags,
}
# ------------------------------------------------------------------
# 1. Structure features (12)
# ------------------------------------------------------------------
@staticmethod
def _structure_features(soup, c: dict, raw_html: str) -> dict:
bn = c['by_name']
# DOM depth – walk just the <body>
body = soup.find('body')
max_depth = 0
if body:
stack = [(body, 0)]
while stack:
node, depth = stack.pop()
if depth > max_depth:
max_depth = depth
for child in getattr(node, 'children', []):
if hasattr(child, 'name') and child.name:
stack.append((child, depth + 1))
return {
'html_length': len(raw_html),
'num_tags': len(c['all_tags']),
'num_divs': len(bn.get('div', [])),
'num_spans': len(bn.get('span', [])),
'num_paragraphs': len(bn.get('p', [])),
'num_headings': sum(len(bn.get(h, []))
for h in ('h1', 'h2', 'h3', 'h4', 'h5', 'h6')),
'num_lists': len(bn.get('ul', [])) + len(bn.get('ol', [])),
'num_images': len(c['images']),
'num_iframes': len(c['iframes']),
'num_tables': len(bn.get('table', [])),
'has_title': 1 if soup.find('title') else 0,
'dom_depth': max_depth,
}
# ------------------------------------------------------------------
# 2. Form features (11)
# ------------------------------------------------------------------
@staticmethod
def _form_features(c: dict) -> dict:
forms = c['forms']
input_types = c['input_types']
n_password = sum(1 for _, t in input_types if t == 'password')
n_email = sum(1 for _, t in input_types if t == 'email')
n_text = sum(1 for _, t in input_types if t == 'text')
n_hidden = sum(1 for _, t in input_types if t == 'hidden')
n_submit = sum(1 for _, t in input_types if t == 'submit')
# Also count <button type="submit">
n_submit += sum(1 for btn in c['by_name'].get('button', [])
if (btn.get('type', '') or '').lower() == 'submit')
form_actions = [f.get('action', '') or '' for f in forms]
n_ext_action = sum(1 for a in form_actions if a.startswith('http'))
n_empty_action = sum(1 for a in form_actions if not a or a == '#')
return {
'num_forms': len(forms),
'num_input_fields': len(c['inputs']),
'num_password_fields': n_password,
'num_email_fields': n_email,
'num_text_fields': n_text,
'num_submit_buttons': n_submit,
'num_hidden_fields': n_hidden,
'has_login_form': 1 if (n_password > 0 and (n_email > 0 or n_text > 0)) else 0,
'has_form': 1 if forms else 0,
'num_external_form_actions': n_ext_action,
'num_empty_form_actions': n_empty_action,
}
# ------------------------------------------------------------------
# 3. Link features (10)
# ------------------------------------------------------------------
@staticmethod
def _link_features(c: dict) -> dict:
hrefs = c['hrefs']
links_a = c['links_a']
n_links = len(links_a)
n_external = sum(1 for h in hrefs if h.startswith('http'))
n_internal = sum(1 for h in hrefs if h.startswith('/') or h.startswith('#'))
n_empty = sum(1 for h in hrefs if not h or h == '#')
n_mailto = sum(1 for h in hrefs if h.startswith('mailto:'))
n_js = sum(1 for h in hrefs if 'javascript:' in h.lower())
n_ip = sum(1 for h in hrefs
if re.search(r'https?://\d+\.\d+\.\d+\.\d+', h))
# Count links pointing to suspicious TLDs
n_suspicious_tld = 0
for h in hrefs:
if h.startswith('http'):
try:
netloc = urlparse(h).netloc.lower()
for tld in SUSPICIOUS_TLDS:
if netloc.endswith(tld):
n_suspicious_tld += 1
break
except Exception:
pass
ratio_ext = n_external / n_links if n_links > 0 else 0.0
return {
'num_links': n_links,
'num_external_links': n_external,
'num_internal_links': n_internal,
'num_empty_links': n_empty,
'num_mailto_links': n_mailto,
'num_javascript_links': n_js,
'ratio_external_links': ratio_ext,
'num_ip_based_links': n_ip,
'num_suspicious_tld_links': n_suspicious_tld,
'num_anchor_text_mismatch': HTMLFeatureExtractor._anchor_mismatch(links_a),
}
@staticmethod
def _anchor_mismatch(links_a: list) -> int:
"""Count links where visible text shows a domain different from href."""
count = 0
url_pattern = re.compile(r'https?://[^\s<>"\']+')
for a in links_a:
href = a.get('href', '') or ''
text = a.get_text(strip=True)
if not href.startswith('http') or not text:
continue
text_urls = url_pattern.findall(text)
if text_urls:
try:
href_domain = urlparse(href).netloc.lower()
text_domain = urlparse(text_urls[0]).netloc.lower()
if href_domain and text_domain and href_domain != text_domain:
count += 1
except Exception:
pass
return count
# ------------------------------------------------------------------
# 4. Script features (7)
# ------------------------------------------------------------------
@staticmethod
def _script_features(c: dict) -> dict:
scripts = c['scripts']
n_inline = 0
n_external = 0
script_text_parts = []
for s in scripts:
if s.get('src'):
n_external += 1
if s.string:
n_inline += 1
script_text_parts.append(s.string)
script_content = ' '.join(script_text_parts)
return {
'num_scripts': len(scripts),
'num_inline_scripts': n_inline,
'num_external_scripts': n_external,
'has_eval': 1 if 'eval(' in script_content else 0,
'has_unescape': 1 if 'unescape(' in script_content else 0,
'has_escape': 1 if 'escape(' in script_content else 0,
'has_document_write': 1 if 'document.write' in script_content else 0,
}
# ------------------------------------------------------------------
# 5. Text content features (8)
# ------------------------------------------------------------------
@staticmethod
def _text_features(soup, c: dict) -> dict:
text = soup.get_text(separator=' ', strip=True).lower()
words = text.split()
n_words = len(words)
html_len = len(str(soup))
return {
'text_length': len(text),
'num_words': n_words,
'text_to_html_ratio': len(text) / html_len if html_len > 0 else 0.0,
'num_brand_mentions': sum(1 for kw in BRAND_KEYWORDS if kw in text),
'num_urgency_keywords': sum(1 for kw in URGENCY_KEYWORDS if kw in text),
'has_copyright': 1 if ('©' in text or 'copyright' in text) else 0,
'has_phone_number': 1 if re.search(
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text) else 0,
'has_email_address': 1 if re.search(
r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}', text) else 0,
}
# ------------------------------------------------------------------
# 6. Meta tag features (6)
# ------------------------------------------------------------------
@staticmethod
def _meta_features(soup, c: dict) -> dict:
meta_tags = c['meta_tags']
has_refresh = 0
has_desc = 0
has_keywords = 0
has_author = 0
has_viewport = 0
for m in meta_tags:
name_attr = (m.get('name') or '').lower()
http_equiv = (m.get('http-equiv') or '').lower()
if name_attr == 'description':
has_desc = 1
elif name_attr == 'keywords':
has_keywords = 1
elif name_attr == 'author':
has_author = 1
elif name_attr == 'viewport':
has_viewport = 1
if http_equiv == 'refresh':
has_refresh = 1
return {
'num_meta_tags': len(meta_tags),
'has_description': has_desc,
'has_keywords': has_keywords,
'has_author': has_author,
'has_viewport': has_viewport,
'has_meta_refresh': has_refresh,
}
# ------------------------------------------------------------------
# 7. Resource features (7)
# ------------------------------------------------------------------
@staticmethod
def _resource_features(c: dict) -> dict:
css_links = c['css_links']
images = c['images']
style_tags = c['style_tags']
img_srcs = [img.get('src', '') or '' for img in images]
css_content = ''.join(tag.string or '' for tag in style_tags)
has_favicon = 0
for lt in c['all_link_tags']:
rel = lt.get('rel', [])
if 'icon' in rel or 'shortcut' in rel:
has_favicon = 1
break
return {
'num_css_files': len(css_links),
'num_external_css': sum(1 for lk in css_links
if (lk.get('href', '') or '').startswith('http')),
'num_external_images': sum(1 for s in img_srcs if s.startswith('http')),
'num_data_uri_images': sum(1 for s in img_srcs if s.startswith('data:')),
'num_inline_styles': len(style_tags),
'inline_css_length': len(css_content),
'has_favicon': has_favicon,
}
# ------------------------------------------------------------------
# 8. Advanced phishing indicators (16)
# ------------------------------------------------------------------
@staticmethod
def _advanced_features(soup, c: dict) -> dict:
forms = c['forms']
input_types = c['input_types']
hrefs = c['hrefs']
all_text_lower = str(soup).lower()
# Password + external action combo
has_password = any(t == 'password' for _, t in input_types)
has_ext_action = any(
(f.get('action', '') or '').startswith('http') for f in forms)
# Count unique external domains from links
ext_domains = set()
for h in hrefs:
if h.startswith('http'):
try:
d = urlparse(h).netloc
if d:
ext_domains.add(d.lower())
except Exception:
pass
# Forms without labels
n_forms_no_label = sum(
1 for f in forms
if not f.find_all('label') and f.find_all('input')
)
# Event handlers – single pass over all tags
n_onload = 0
n_onerror = 0
n_onclick = 0
for tag in c['all_tags']:
attrs = tag.attrs
if 'onload' in attrs:
n_onload += 1
if 'onerror' in attrs:
n_onerror += 1
if 'onclick' in attrs:
n_onclick += 1
# Iframe with small/zero dimensions (common cloaking)
n_hidden_iframes = 0
for iframe in c['iframes']:
w = iframe.get('width', '')
h = iframe.get('height', '')
style = (iframe.get('style', '') or '').lower()
if w in ('0', '1') or h in ('0', '1') or 'display:none' in style or 'visibility:hidden' in style:
n_hidden_iframes += 1
return {
'password_with_external_action': 1 if (has_password and has_ext_action) else 0,
'has_base64': 1 if 'base64' in all_text_lower else 0,
'has_atob': 1 if 'atob(' in all_text_lower else 0,
'has_fromcharcode': 1 if 'fromcharcode' in all_text_lower else 0,
'num_onload_events': n_onload,
'num_onerror_events': n_onerror,
'num_onclick_events': n_onclick,
'num_unique_external_domains': len(ext_domains),
'num_forms_without_labels': n_forms_no_label,
'has_display_none': 1 if ('display:none' in all_text_lower or
'display: none' in all_text_lower) else 0,
'has_visibility_hidden': 1 if ('visibility:hidden' in all_text_lower or
'visibility: hidden' in all_text_lower) else 0,
'has_window_open': 1 if 'window.open' in all_text_lower else 0,
'has_location_replace': 1 if ('location.replace' in all_text_lower or
'location.href' in all_text_lower) else 0,
'num_hidden_iframes': n_hidden_iframes,
'has_right_click_disabled': 1 if ('oncontextmenu' in all_text_lower and
'return false' in all_text_lower) else 0,
'has_status_bar_customization': 1 if ('window.status' in all_text_lower or
'onmouseover' in all_text_lower) else 0,
}
# ------------------------------------------------------------------
# Default features (all zeros) – used on parse failure
# ------------------------------------------------------------------
def _default_features(self) -> dict:
return {k: 0 for k in self.get_feature_names()}
@staticmethod
def get_feature_names() -> list[str]:
"""Return ordered list of all 67 feature names."""
return [
# Structure (12)
'html_length', 'num_tags', 'num_divs', 'num_spans',
'num_paragraphs', 'num_headings', 'num_lists', 'num_images',
'num_iframes', 'num_tables', 'has_title', 'dom_depth',
# Form (11)
'num_forms', 'num_input_fields', 'num_password_fields',
'num_email_fields', 'num_text_fields', 'num_submit_buttons',
'num_hidden_fields', 'has_login_form', 'has_form',
'num_external_form_actions', 'num_empty_form_actions',
# Link (10)
'num_links', 'num_external_links', 'num_internal_links',
'num_empty_links', 'num_mailto_links', 'num_javascript_links',
'ratio_external_links', 'num_ip_based_links',
'num_suspicious_tld_links', 'num_anchor_text_mismatch',
# Script (7)
'num_scripts', 'num_inline_scripts', 'num_external_scripts',
'has_eval', 'has_unescape', 'has_escape', 'has_document_write',
# Text (8)
'text_length', 'num_words', 'text_to_html_ratio',
'num_brand_mentions', 'num_urgency_keywords',
'has_copyright', 'has_phone_number', 'has_email_address',
# Meta (6)
'num_meta_tags', 'has_description', 'has_keywords',
'has_author', 'has_viewport', 'has_meta_refresh',
# Resource (7)
'num_css_files', 'num_external_css', 'num_external_images',
'num_data_uri_images', 'num_inline_styles',
'inline_css_length', 'has_favicon',
# Advanced (16)
'password_with_external_action', 'has_base64', 'has_atob',
'has_fromcharcode', 'num_onload_events', 'num_onerror_events',
'num_onclick_events', 'num_unique_external_domains',
'num_forms_without_labels', 'has_display_none',
'has_visibility_hidden', 'has_window_open',
'has_location_replace', 'num_hidden_iframes',
'has_right_click_disabled', 'has_status_bar_customization',
]