Spaces:
Runtime error
Runtime error
| """ | |
| Optimized HTML Feature Extractor for Phishing Detection | |
| Extracts 67 features from HTML content with single-parse efficiency. | |
| Uses cached tag lookups to avoid redundant find_all() calls. | |
| """ | |
| import re | |
| from urllib.parse import urlparse | |
| from bs4 import BeautifulSoup | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Suspicious TLDs commonly used in phishing | |
| SUSPICIOUS_TLDS = { | |
| '.tk', '.ml', '.ga', '.cf', '.gq', '.top', '.xyz', '.buzz', | |
| '.club', '.online', '.site', '.icu', '.work', '.click', '.link', | |
| '.info', '.pw', '.cc', '.ws', '.bid', '.stream', '.racing', | |
| } | |
| # Brand keywords phishers commonly impersonate | |
| BRAND_KEYWORDS = [ | |
| 'paypal', 'amazon', 'google', 'microsoft', 'apple', 'facebook', | |
| 'netflix', 'ebay', 'instagram', 'twitter', 'linkedin', 'yahoo', | |
| 'bank', 'visa', 'mastercard', 'americanexpress', 'chase', 'wells', | |
| 'citibank', 'dhl', 'fedex', 'ups', 'usps', 'dropbox', 'adobe', | |
| 'spotify', 'whatsapp', 'telegram', 'steam', 'coinbase', 'binance', | |
| ] | |
| # Urgency / social engineering keywords | |
| URGENCY_KEYWORDS = [ | |
| 'urgent', 'verify', 'suspended', 'locked', 'confirm', | |
| 'security', 'alert', 'warning', 'expire', 'limited', | |
| 'immediately', 'click here', 'act now', 'unusual activity', | |
| 'unauthorized', 'restricted', 'risk', 'compromised', | |
| 'your account', 'update your', 'verify your', 'confirm your', | |
| 'within 24', 'within 48', 'action required', | |
| ] | |
| class HTMLFeatureExtractor: | |
| """ | |
| High-performance HTML feature extractor. | |
| Parses HTML once and caches all tag lookups for efficiency. | |
| Designed for batch processing of 40k+ files. | |
| """ | |
| def extract_features(self, html_content: str, url: str | None = None) -> dict: | |
| """ | |
| Extract all features from HTML content in a single pass. | |
| Args: | |
| html_content: Raw HTML string | |
| url: Optional source URL for context | |
| Returns: | |
| Dictionary with 67 numeric features | |
| """ | |
| try: | |
| # --- Single parse with fast parser --- | |
| try: | |
| soup = BeautifulSoup(html_content, 'lxml') | |
| except Exception: | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # --- Cache tag lookups (done ONCE) --- | |
| cache = self._build_cache(soup) | |
| features = {} | |
| features.update(self._structure_features(soup, cache, html_content)) | |
| features.update(self._form_features(cache)) | |
| features.update(self._link_features(cache)) | |
| features.update(self._script_features(cache)) | |
| features.update(self._text_features(soup, cache)) | |
| features.update(self._meta_features(soup, cache)) | |
| features.update(self._resource_features(cache)) | |
| features.update(self._advanced_features(soup, cache)) | |
| return features | |
| except Exception as e: | |
| logger.debug(f"Feature extraction error: {e}") | |
| return self._default_features() | |
| # ------------------------------------------------------------------ | |
| # Cache builder – avoids redundant find_all() across feature groups | |
| # ------------------------------------------------------------------ | |
| def _build_cache(soup) -> dict: | |
| """Build a lookup cache of all tags we need. Called once per document.""" | |
| all_tags = soup.find_all() | |
| # Classify tags by name in a single pass | |
| by_name: dict[str, list] = {} | |
| for tag in all_tags: | |
| by_name.setdefault(tag.name, []).append(tag) | |
| # Convenience lists used by multiple feature groups | |
| links_a = by_name.get('a', []) | |
| forms = by_name.get('form', []) | |
| inputs = by_name.get('input', []) | |
| scripts = by_name.get('script', []) | |
| images = by_name.get('img', []) | |
| iframes = by_name.get('iframe', []) | |
| meta_tags = by_name.get('meta', []) | |
| style_tags = by_name.get('style', []) | |
| css_links = [t for t in by_name.get('link', []) | |
| if t.get('rel') and 'stylesheet' in t.get('rel', [])] | |
| all_link_tags = by_name.get('link', []) | |
| # Pre-extract hrefs and input types (used in several groups) | |
| hrefs = [a.get('href', '') or '' for a in links_a] | |
| input_types = [(inp, (inp.get('type', '') or '').lower()) for inp in inputs] | |
| return { | |
| 'all_tags': all_tags, | |
| 'by_name': by_name, | |
| 'links_a': links_a, | |
| 'hrefs': hrefs, | |
| 'forms': forms, | |
| 'inputs': inputs, | |
| 'input_types': input_types, | |
| 'scripts': scripts, | |
| 'images': images, | |
| 'iframes': iframes, | |
| 'meta_tags': meta_tags, | |
| 'style_tags': style_tags, | |
| 'css_links': css_links, | |
| 'all_link_tags': all_link_tags, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 1. Structure features (12) | |
| # ------------------------------------------------------------------ | |
| def _structure_features(soup, c: dict, raw_html: str) -> dict: | |
| bn = c['by_name'] | |
| # DOM depth – walk just the <body> | |
| body = soup.find('body') | |
| max_depth = 0 | |
| if body: | |
| stack = [(body, 0)] | |
| while stack: | |
| node, depth = stack.pop() | |
| if depth > max_depth: | |
| max_depth = depth | |
| for child in getattr(node, 'children', []): | |
| if hasattr(child, 'name') and child.name: | |
| stack.append((child, depth + 1)) | |
| return { | |
| 'html_length': len(raw_html), | |
| 'num_tags': len(c['all_tags']), | |
| 'num_divs': len(bn.get('div', [])), | |
| 'num_spans': len(bn.get('span', [])), | |
| 'num_paragraphs': len(bn.get('p', [])), | |
| 'num_headings': sum(len(bn.get(h, [])) | |
| for h in ('h1', 'h2', 'h3', 'h4', 'h5', 'h6')), | |
| 'num_lists': len(bn.get('ul', [])) + len(bn.get('ol', [])), | |
| 'num_images': len(c['images']), | |
| 'num_iframes': len(c['iframes']), | |
| 'num_tables': len(bn.get('table', [])), | |
| 'has_title': 1 if soup.find('title') else 0, | |
| 'dom_depth': max_depth, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 2. Form features (11) | |
| # ------------------------------------------------------------------ | |
| def _form_features(c: dict) -> dict: | |
| forms = c['forms'] | |
| input_types = c['input_types'] | |
| n_password = sum(1 for _, t in input_types if t == 'password') | |
| n_email = sum(1 for _, t in input_types if t == 'email') | |
| n_text = sum(1 for _, t in input_types if t == 'text') | |
| n_hidden = sum(1 for _, t in input_types if t == 'hidden') | |
| n_submit = sum(1 for _, t in input_types if t == 'submit') | |
| # Also count <button type="submit"> | |
| n_submit += sum(1 for btn in c['by_name'].get('button', []) | |
| if (btn.get('type', '') or '').lower() == 'submit') | |
| form_actions = [f.get('action', '') or '' for f in forms] | |
| n_ext_action = sum(1 for a in form_actions if a.startswith('http')) | |
| n_empty_action = sum(1 for a in form_actions if not a or a == '#') | |
| return { | |
| 'num_forms': len(forms), | |
| 'num_input_fields': len(c['inputs']), | |
| 'num_password_fields': n_password, | |
| 'num_email_fields': n_email, | |
| 'num_text_fields': n_text, | |
| 'num_submit_buttons': n_submit, | |
| 'num_hidden_fields': n_hidden, | |
| 'has_login_form': 1 if (n_password > 0 and (n_email > 0 or n_text > 0)) else 0, | |
| 'has_form': 1 if forms else 0, | |
| 'num_external_form_actions': n_ext_action, | |
| 'num_empty_form_actions': n_empty_action, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 3. Link features (10) | |
| # ------------------------------------------------------------------ | |
| def _link_features(c: dict) -> dict: | |
| hrefs = c['hrefs'] | |
| links_a = c['links_a'] | |
| n_links = len(links_a) | |
| n_external = sum(1 for h in hrefs if h.startswith('http')) | |
| n_internal = sum(1 for h in hrefs if h.startswith('/') or h.startswith('#')) | |
| n_empty = sum(1 for h in hrefs if not h or h == '#') | |
| n_mailto = sum(1 for h in hrefs if h.startswith('mailto:')) | |
| n_js = sum(1 for h in hrefs if 'javascript:' in h.lower()) | |
| n_ip = sum(1 for h in hrefs | |
| if re.search(r'https?://\d+\.\d+\.\d+\.\d+', h)) | |
| # Count links pointing to suspicious TLDs | |
| n_suspicious_tld = 0 | |
| for h in hrefs: | |
| if h.startswith('http'): | |
| try: | |
| netloc = urlparse(h).netloc.lower() | |
| for tld in SUSPICIOUS_TLDS: | |
| if netloc.endswith(tld): | |
| n_suspicious_tld += 1 | |
| break | |
| except Exception: | |
| pass | |
| ratio_ext = n_external / n_links if n_links > 0 else 0.0 | |
| return { | |
| 'num_links': n_links, | |
| 'num_external_links': n_external, | |
| 'num_internal_links': n_internal, | |
| 'num_empty_links': n_empty, | |
| 'num_mailto_links': n_mailto, | |
| 'num_javascript_links': n_js, | |
| 'ratio_external_links': ratio_ext, | |
| 'num_ip_based_links': n_ip, | |
| 'num_suspicious_tld_links': n_suspicious_tld, | |
| 'num_anchor_text_mismatch': HTMLFeatureExtractor._anchor_mismatch(links_a), | |
| } | |
| def _anchor_mismatch(links_a: list) -> int: | |
| """Count links where visible text shows a domain different from href.""" | |
| count = 0 | |
| url_pattern = re.compile(r'https?://[^\s<>"\']+') | |
| for a in links_a: | |
| href = a.get('href', '') or '' | |
| text = a.get_text(strip=True) | |
| if not href.startswith('http') or not text: | |
| continue | |
| text_urls = url_pattern.findall(text) | |
| if text_urls: | |
| try: | |
| href_domain = urlparse(href).netloc.lower() | |
| text_domain = urlparse(text_urls[0]).netloc.lower() | |
| if href_domain and text_domain and href_domain != text_domain: | |
| count += 1 | |
| except Exception: | |
| pass | |
| return count | |
| # ------------------------------------------------------------------ | |
| # 4. Script features (7) | |
| # ------------------------------------------------------------------ | |
| def _script_features(c: dict) -> dict: | |
| scripts = c['scripts'] | |
| n_inline = 0 | |
| n_external = 0 | |
| script_text_parts = [] | |
| for s in scripts: | |
| if s.get('src'): | |
| n_external += 1 | |
| if s.string: | |
| n_inline += 1 | |
| script_text_parts.append(s.string) | |
| script_content = ' '.join(script_text_parts) | |
| return { | |
| 'num_scripts': len(scripts), | |
| 'num_inline_scripts': n_inline, | |
| 'num_external_scripts': n_external, | |
| 'has_eval': 1 if 'eval(' in script_content else 0, | |
| 'has_unescape': 1 if 'unescape(' in script_content else 0, | |
| 'has_escape': 1 if 'escape(' in script_content else 0, | |
| 'has_document_write': 1 if 'document.write' in script_content else 0, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 5. Text content features (8) | |
| # ------------------------------------------------------------------ | |
| def _text_features(soup, c: dict) -> dict: | |
| text = soup.get_text(separator=' ', strip=True).lower() | |
| words = text.split() | |
| n_words = len(words) | |
| html_len = len(str(soup)) | |
| return { | |
| 'text_length': len(text), | |
| 'num_words': n_words, | |
| 'text_to_html_ratio': len(text) / html_len if html_len > 0 else 0.0, | |
| 'num_brand_mentions': sum(1 for kw in BRAND_KEYWORDS if kw in text), | |
| 'num_urgency_keywords': sum(1 for kw in URGENCY_KEYWORDS if kw in text), | |
| 'has_copyright': 1 if ('©' in text or 'copyright' in text) else 0, | |
| 'has_phone_number': 1 if re.search( | |
| r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text) else 0, | |
| 'has_email_address': 1 if re.search( | |
| r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}', text) else 0, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 6. Meta tag features (6) | |
| # ------------------------------------------------------------------ | |
| def _meta_features(soup, c: dict) -> dict: | |
| meta_tags = c['meta_tags'] | |
| has_refresh = 0 | |
| has_desc = 0 | |
| has_keywords = 0 | |
| has_author = 0 | |
| has_viewport = 0 | |
| for m in meta_tags: | |
| name_attr = (m.get('name') or '').lower() | |
| http_equiv = (m.get('http-equiv') or '').lower() | |
| if name_attr == 'description': | |
| has_desc = 1 | |
| elif name_attr == 'keywords': | |
| has_keywords = 1 | |
| elif name_attr == 'author': | |
| has_author = 1 | |
| elif name_attr == 'viewport': | |
| has_viewport = 1 | |
| if http_equiv == 'refresh': | |
| has_refresh = 1 | |
| return { | |
| 'num_meta_tags': len(meta_tags), | |
| 'has_description': has_desc, | |
| 'has_keywords': has_keywords, | |
| 'has_author': has_author, | |
| 'has_viewport': has_viewport, | |
| 'has_meta_refresh': has_refresh, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 7. Resource features (7) | |
| # ------------------------------------------------------------------ | |
| def _resource_features(c: dict) -> dict: | |
| css_links = c['css_links'] | |
| images = c['images'] | |
| style_tags = c['style_tags'] | |
| img_srcs = [img.get('src', '') or '' for img in images] | |
| css_content = ''.join(tag.string or '' for tag in style_tags) | |
| has_favicon = 0 | |
| for lt in c['all_link_tags']: | |
| rel = lt.get('rel', []) | |
| if 'icon' in rel or 'shortcut' in rel: | |
| has_favicon = 1 | |
| break | |
| return { | |
| 'num_css_files': len(css_links), | |
| 'num_external_css': sum(1 for lk in css_links | |
| if (lk.get('href', '') or '').startswith('http')), | |
| 'num_external_images': sum(1 for s in img_srcs if s.startswith('http')), | |
| 'num_data_uri_images': sum(1 for s in img_srcs if s.startswith('data:')), | |
| 'num_inline_styles': len(style_tags), | |
| 'inline_css_length': len(css_content), | |
| 'has_favicon': has_favicon, | |
| } | |
| # ------------------------------------------------------------------ | |
| # 8. Advanced phishing indicators (16) | |
| # ------------------------------------------------------------------ | |
| def _advanced_features(soup, c: dict) -> dict: | |
| forms = c['forms'] | |
| input_types = c['input_types'] | |
| hrefs = c['hrefs'] | |
| all_text_lower = str(soup).lower() | |
| # Password + external action combo | |
| has_password = any(t == 'password' for _, t in input_types) | |
| has_ext_action = any( | |
| (f.get('action', '') or '').startswith('http') for f in forms) | |
| # Count unique external domains from links | |
| ext_domains = set() | |
| for h in hrefs: | |
| if h.startswith('http'): | |
| try: | |
| d = urlparse(h).netloc | |
| if d: | |
| ext_domains.add(d.lower()) | |
| except Exception: | |
| pass | |
| # Forms without labels | |
| n_forms_no_label = sum( | |
| 1 for f in forms | |
| if not f.find_all('label') and f.find_all('input') | |
| ) | |
| # Event handlers – single pass over all tags | |
| n_onload = 0 | |
| n_onerror = 0 | |
| n_onclick = 0 | |
| for tag in c['all_tags']: | |
| attrs = tag.attrs | |
| if 'onload' in attrs: | |
| n_onload += 1 | |
| if 'onerror' in attrs: | |
| n_onerror += 1 | |
| if 'onclick' in attrs: | |
| n_onclick += 1 | |
| # Iframe with small/zero dimensions (common cloaking) | |
| n_hidden_iframes = 0 | |
| for iframe in c['iframes']: | |
| w = iframe.get('width', '') | |
| h = iframe.get('height', '') | |
| style = (iframe.get('style', '') or '').lower() | |
| if w in ('0', '1') or h in ('0', '1') or 'display:none' in style or 'visibility:hidden' in style: | |
| n_hidden_iframes += 1 | |
| return { | |
| 'password_with_external_action': 1 if (has_password and has_ext_action) else 0, | |
| 'has_base64': 1 if 'base64' in all_text_lower else 0, | |
| 'has_atob': 1 if 'atob(' in all_text_lower else 0, | |
| 'has_fromcharcode': 1 if 'fromcharcode' in all_text_lower else 0, | |
| 'num_onload_events': n_onload, | |
| 'num_onerror_events': n_onerror, | |
| 'num_onclick_events': n_onclick, | |
| 'num_unique_external_domains': len(ext_domains), | |
| 'num_forms_without_labels': n_forms_no_label, | |
| 'has_display_none': 1 if ('display:none' in all_text_lower or | |
| 'display: none' in all_text_lower) else 0, | |
| 'has_visibility_hidden': 1 if ('visibility:hidden' in all_text_lower or | |
| 'visibility: hidden' in all_text_lower) else 0, | |
| 'has_window_open': 1 if 'window.open' in all_text_lower else 0, | |
| 'has_location_replace': 1 if ('location.replace' in all_text_lower or | |
| 'location.href' in all_text_lower) else 0, | |
| 'num_hidden_iframes': n_hidden_iframes, | |
| 'has_right_click_disabled': 1 if ('oncontextmenu' in all_text_lower and | |
| 'return false' in all_text_lower) else 0, | |
| 'has_status_bar_customization': 1 if ('window.status' in all_text_lower or | |
| 'onmouseover' in all_text_lower) else 0, | |
| } | |
| # ------------------------------------------------------------------ | |
| # Default features (all zeros) – used on parse failure | |
| # ------------------------------------------------------------------ | |
| def _default_features(self) -> dict: | |
| return {k: 0 for k in self.get_feature_names()} | |
| def get_feature_names() -> list[str]: | |
| """Return ordered list of all 67 feature names.""" | |
| return [ | |
| # Structure (12) | |
| 'html_length', 'num_tags', 'num_divs', 'num_spans', | |
| 'num_paragraphs', 'num_headings', 'num_lists', 'num_images', | |
| 'num_iframes', 'num_tables', 'has_title', 'dom_depth', | |
| # Form (11) | |
| 'num_forms', 'num_input_fields', 'num_password_fields', | |
| 'num_email_fields', 'num_text_fields', 'num_submit_buttons', | |
| 'num_hidden_fields', 'has_login_form', 'has_form', | |
| 'num_external_form_actions', 'num_empty_form_actions', | |
| # Link (10) | |
| 'num_links', 'num_external_links', 'num_internal_links', | |
| 'num_empty_links', 'num_mailto_links', 'num_javascript_links', | |
| 'ratio_external_links', 'num_ip_based_links', | |
| 'num_suspicious_tld_links', 'num_anchor_text_mismatch', | |
| # Script (7) | |
| 'num_scripts', 'num_inline_scripts', 'num_external_scripts', | |
| 'has_eval', 'has_unescape', 'has_escape', 'has_document_write', | |
| # Text (8) | |
| 'text_length', 'num_words', 'text_to_html_ratio', | |
| 'num_brand_mentions', 'num_urgency_keywords', | |
| 'has_copyright', 'has_phone_number', 'has_email_address', | |
| # Meta (6) | |
| 'num_meta_tags', 'has_description', 'has_keywords', | |
| 'has_author', 'has_viewport', 'has_meta_refresh', | |
| # Resource (7) | |
| 'num_css_files', 'num_external_css', 'num_external_images', | |
| 'num_data_uri_images', 'num_inline_styles', | |
| 'inline_css_length', 'has_favicon', | |
| # Advanced (16) | |
| 'password_with_external_action', 'has_base64', 'has_atob', | |
| 'has_fromcharcode', 'num_onload_events', 'num_onerror_events', | |
| 'num_onclick_events', 'num_unique_external_domains', | |
| 'num_forms_without_labels', 'has_display_none', | |
| 'has_visibility_hidden', 'has_window_open', | |
| 'has_location_replace', 'num_hidden_iframes', | |
| 'has_right_click_disabled', 'has_status_bar_customization', | |
| ] | |