""" URL Feature Extraction v2 - IMPROVED VERSION Improvements: - Fixed free hosting detection (exact/suffix match instead of substring) - Added free platform detection (Google Sites, Weebly, Firebase, etc.) - Added UUID subdomain detection (Replit, Firebase patterns) - Added platform subdomain length feature - Added longest_part thresholds (gt_20, gt_30, gt_40) - Expanded brand list with regional brands - Improved extension categorization (added archive, image categories) - Fixed get_feature_names() to be dynamic - Better URL shortener detection Key Features: - Lexical (length, characters, entropy) - Structural (domain parts, path segments, TLD) - Statistical (entropy, n-grams, patterns) - Security indicators (from URL only) - Brand/phishing patterns - FREE PLATFORM ABUSE DETECTION (NEW!) Designed for: - Fast inference (< 1ms per URL) - No network dependencies - Production deployment """ import pandas as pd import numpy as np from urllib.parse import urlparse, parse_qs, unquote import re import math import socket import unicodedata from pathlib import Path from collections import Counter import sys import logging # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger("url_features_v2") class URLFeatureExtractorV2: """ Fast URL-only feature extractor for Stage 1 phishing detection. IMPROVED VERSION with better free platform detection. """ def __init__(self): """Initialize feature extractor with keyword lists.""" # Phishing-related keywords self.phishing_keywords = [ 'login', 'signin', 'sign-in', 'log-in', 'logon', 'signon', 'account', 'accounts', 'update', 'verify', 'verification', 'secure', 'security', 'banking', 'bank', 'confirm', 'password', 'passwd', 'credential', 'suspended', 'locked', 'unusual', 'authenticate', 'auth', 'wallet', 'invoice', 'payment', 'billing', 'expire', 'expired', 'limited', 'restrict', 'urgent', 'immediately', 'alert', 'warning', 'resolve', 'recover', 'restore', 'reactivate', 'unlock', 'validate' ] # Brand names - EXPANDED with regional brands self.brand_names = [ # US Tech Giants 'paypal', 'ebay', 'amazon', 'apple', 'microsoft', 'google', 'facebook', 'instagram', 'twitter', 'netflix', 'linkedin', 'dropbox', 'adobe', 'spotify', 'steam', 'zoom', 'docusign', 'salesforce', 'shopify', 'square', 'venmo', 'cashapp', 'zelle', # US Banks 'chase', 'wellsfargo', 'bankofamerica', 'citibank', 'citi', 'americanexpress', 'amex', 'visa', 'mastercard', 'capitalone', 'usbank', 'pnc', 'truist', # Email/Communication 'outlook', 'office365', 'office', 'yahoo', 'aol', 'icloud', 'gmail', 'protonmail', 'whatsapp', 'telegram', 'discord', 'signal', 'skype', 'teams', # Shipping/Logistics 'dhl', 'fedex', 'ups', 'usps', 'amazon', 'alibaba', # Crypto/Finance 'coinbase', 'binance', 'blockchain', 'metamask', 'kraken', 'gemini', 'robinhood', 'etrade', 'fidelity', 'schwab', 'payoneer', 'stripe', 'wise', 'revolut', # Social/Entertainment 'tiktok', 'snapchat', 'twitch', 'roblox', 'epic', 'epicgames', 'playstation', 'xbox', 'nintendo', 'blizzard', 'riot', # REGIONAL BRANDS (from analysis) # Europe 'allegro', 'allegrolokalnie', # Poland 'olx', # Europe/LatAm 'bol', 'marktplaats', # Netherlands 'leboncoin', # France 'idealo', 'otto', # Germany 'hsbc', 'barclays', 'santander', 'ing', 'revolut', # European banks # Asia 'rakuten', # Japan 'lazada', 'shopee', # Southeast Asia 'baidu', 'taobao', 'alipay', 'wechat', 'weibo', # China 'paytm', 'phonepe', # India # Latin America 'mercadolibre', 'mercadopago', # LatAm # Russia 'yandex', 'vk', 'mailru', # Other 'uber', 'lyft', 'airbnb', 'booking', 'expedia', 'wetransfer', 'mediafire', 'mega', ] # URL shorteners - EXACT MATCH ONLY self.shorteners = { # Original 'bit.ly', 'bitly.com', 'goo.gl', 'tinyurl.com', 't.co', 'ow.ly', 'is.gd', 'buff.ly', 'adf.ly', 'bit.do', 'short.to', 'tiny.cc', 'j.mp', 'surl.li', 'rb.gy', 'cutt.ly', 'qrco.de', 'v.gd', 'shorturl.at', 'rebrand.ly', 'clck.ru', 's.id', 'shrtco.de', # NEW from analysis (CRITICAL!) 'qrco.de', # 3,824 occurrences! 'q-r.to', # 2,974 'l.ead.me', # 2,907 'ead.me', # Base domain 'urlz.fr', 'hotm.art', 'reurl.cc', 'did.li', 'zpr.io', 'linkin.bio', 'linqapp.com', 'linktr.ee', 'flow.page', 'campsite.bio', 'qr-codes.io', 'scanned.page', 'l.wl.co', 'wl.co', 'hm.ru', 'flowcode.com', } # Suspicious TLDs self.suspicious_tlds = { 'tk', 'ml', 'ga', 'cf', 'gq', # Free domains 'xyz', 'top', 'club', 'work', 'date', 'racing', 'win', 'loan', 'download', 'stream', 'click', 'link', 'bid', 'review', 'party', 'trade', 'webcam', 'science', 'accountant', 'faith', 'cricket', 'zip', 'mov', 'icu', 'buzz', 'space', 'online', 'site', 'website', 'tech', 'store', 'rest', 'cfd', 'monster', 'sbs' } # Trusted TLDs self.trusted_tlds = { 'com', 'org', 'net', 'edu', 'gov', 'mil', 'uk', 'us', 'ca', 'de', 'fr', 'jp', 'au', 'nl', 'be', 'ch', 'it', 'es', 'se', 'no', 'pl', 'br', 'in', 'mx', 'kr', 'ru', 'cn' } # FREE PLATFORMS - EXACT/SUFFIX MATCH (from your PhishTank analysis!) self.free_platforms = { # Website Builders 'weebly.com', 'wixsite.com', 'wix.com', 'webflow.io', 'framer.website', 'carrd.co', 'notion.site', 'webwave.me', 'godaddysites.com', 'square.site', 'sites.google.com', # Google Platforms (HIGH PHISHING RATE from analysis) 'firebaseapp.com', 'web.app', 'appspot.com', 'firebase.app', 'page.link', # Developer Platforms (from analysis: Replit, Vercel, etc.) 'github.io', 'gitlab.io', 'pages.github.com', 'vercel.app', 'netlify.app', 'netlify.com', 'replit.dev', 'repl.co', 'replit.co', 'glitch.me', 'glitch.com', 'pages.dev', 'workers.dev', # Cloudflare 'herokuapp.com', 'heroku.com', 'onrender.com', 'railway.app', 'fly.dev', 'amplifyapp.com', # AWS Amplify 'surge.sh', 'now.sh', # Blogging/CMS 'wordpress.com', 'blogspot.com', 'blogger.com', 'tumblr.com', 'medium.com', 'ghost.io', 'substack.com', 'beehiiv.com', # Adobe/Creative 'adobesites.com', 'myportfolio.com', 'behance.net', 'adobe.com', 'framer.app', # Forms/Surveys (from analysis: jotform, hsforms) 'jotform.com', 'typeform.com', 'forms.gle', 'hsforms.com', 'hubspot.com', 'surveymonkey.com', 'formstack.com', 'cognito.com', # File Sharing 'dropboxusercontent.com', 'dl.dropboxusercontent.com', 'sharepoint.com', '1drv.ms', 'onedrive.live.com', 'box.com', 'wetransfer.com', 'we.tl', # Free Hosting '000webhostapp.com', 'freehosting.com', 'freehostia.com', '5gbfree.com', 'x10hosting.com', 'awardspace.com', 'byet.host', 'infinityfree.com', # Education/Sandbox 'repl.it', 'codepen.io', 'jsfiddle.net', 'codesandbox.io', 'stackblitz.com', 'observablehq.com', # Other (from analysis) 'webcindario.com', 'gitbook.io', 'tinyurl.com', 'start.page', 'my.site', 'site123.com' } # Common English words for dictionary check self.common_words = { 'about', 'account', 'after', 'again', 'all', 'also', 'america', 'american', 'another', 'answer', 'any', 'app', 'apple', 'area', 'back', 'bank', 'best', 'between', 'book', 'business', 'call', 'can', 'card', 'care', 'case', 'center', 'central', 'change', 'check', 'city', 'class', 'cloud', 'come', 'company', 'contact', 'control', 'country', 'course', 'credit', 'data', 'day', 'dept', 'department', 'different', 'digital', 'doctor', 'down', 'east', 'easy', 'end', 'energy', 'even', 'event', 'every', 'express', 'fact', 'family', 'feel', 'field', 'file', 'find', 'first', 'food', 'form', 'free', 'friend', 'from', 'game', 'general', 'get', 'give', 'global', 'good', 'government', 'great', 'group', 'hand', 'have', 'head', 'health', 'help', 'here', 'high', 'home', 'house', 'how', 'image', 'info', 'information', 'insurance', 'international', 'into', 'just', 'keep', 'kind', 'know', 'large', 'last', 'late', 'leave', 'left', 'legal', 'life', 'like', 'line', 'little', 'local', 'long', 'look', 'love', 'mail', 'main', 'make', 'management', 'manager', 'many', 'map', 'market', 'marketing', 'media', 'medical', 'member', 'message', 'money', 'month', 'more', 'most', 'move', 'music', 'name', 'national', 'need', 'network', 'never', 'new', 'news', 'next', 'north', 'not', 'note', 'number', 'office', 'official', 'old', 'online', 'only', 'open', 'order', 'other', 'over', 'page', 'part', 'party', 'people', 'person', 'personal', 'photo', 'place', 'plan', 'play', 'plus', 'point', 'policy', 'portal', 'post', 'power', 'press', 'price', 'private', 'product', 'program', 'project', 'property', 'public', 'quality', 'question', 'quick', 'rate', 'read', 'real', 'record', 'report', 'research', 'resource', 'result', 'right', 'room', 'sale', 'sales', 'save', 'school', 'search', 'second', 'section', 'security', 'see', 'senior', 'service', 'services', 'set', 'shop', 'show', 'side', 'sign', 'site', 'small', 'social', 'software', 'solution', 'solutions', 'some', 'south', 'space', 'special', 'staff', 'start', 'state', 'store', 'story', 'student', 'study', 'support', 'sure', 'system', 'systems', 'take', 'team', 'tech', 'technology', 'test', 'text', 'than', 'that', 'their', 'them', 'then', 'there', 'these', 'they', 'thing', 'think', 'this', 'those', 'through', 'time', 'today', 'together', 'total', 'trade', 'training', 'travel', 'trust', 'type', 'under', 'university', 'until', 'update', 'upon', 'user', 'value', 'very', 'video', 'view', 'want', 'water', 'website', 'week', 'well', 'west', 'what', 'when', 'where', 'which', 'while', 'white', 'will', 'with', 'within', 'without', 'woman', 'women', 'word', 'work', 'world', 'would', 'write', 'year', 'york', 'young', 'your' } # Keyboard patterns self.keyboard_patterns = [ 'qwerty', 'asdfgh', 'zxcvbn', '12345', '123456', '1234567', '12345678', 'qwertyuiop', 'asdfghjkl', 'zxcvbnm' ] # Lookalike character mappings self.lookalike_chars = { '0': 'o', 'o': '0', '1': 'l', 'l': '1', 'i': '1', 'rn': 'm', 'vv': 'w', 'cl': 'd' } self.microsoft_services = { 'forms.office.com', 'sharepoint.com', 'onedrive.live.com', '1drv.ms', } self.zoom_services = { 'docs.zoom.us', 'zoom.us', } self.adobe_services = { 'express.adobe.com', 'new.express.adobe.com', # Multi-level! 'spark.adobe.com', 'portfolio.adobe.com', } self.google_services = { 'docs.google.com', 'sites.google.com', 'drive.google.com', 'script.google.com', 'storage.googleapis.com', 'storage.cloud.google.com', 'forms.google.com', 'calendar.google.com', 'meet.google.com', } def extract_features(self, url: str) -> dict: """ Extract all URL-only features from a single URL. Args: url: URL string Returns: Dictionary of features """ try: # Ensure URL has scheme if not url.startswith(('http://', 'https://')): url = 'http://' + url parsed = urlparse(url) domain = parsed.netloc.lower() domain_no_port = domain.split(':')[0] path = parsed.path query = parsed.query features = {} # 1. Length features features.update(self._length_features(url, domain_no_port, path, query)) # 2. Character count features features.update(self._char_count_features(url, domain_no_port, path)) # 3. Ratio features features.update(self._ratio_features(url, domain_no_port)) # 4. Domain structure features features.update(self._domain_features(domain_no_port, parsed)) # 5. Path features features.update(self._path_features(path, domain_no_port)) # 6. Query features features.update(self._query_features(query)) # 7. Statistical features (entropy, patterns) features.update(self._statistical_features(url, domain_no_port, path)) # 8. Security indicator features features.update(self._security_features(url, parsed, domain_no_port)) # 9. Keyword/brand features features.update(self._keyword_features(url, domain_no_port, path, parsed)) # 10. Encoding features features.update(self._encoding_features(url, domain_no_port)) return features except Exception as e: logger.error(f"Error extracting features from URL: {url[:50]}... Error: {e}") return self._get_default_features() def _length_features(self, url: str, domain: str, path: str, query: str) -> dict: """Length-based features.""" return { 'url_length': len(url), 'domain_length': len(domain), 'path_length': len(path), 'query_length': len(query), # Categorical length encoding 'url_length_category': self._categorize_length(len(url), [30, 75, 150]), 'domain_length_category': self._categorize_length(len(domain), [10, 20, 30]), } def _char_count_features(self, url: str, domain: str, path: str) -> dict: """Character count features.""" return { # URL character counts 'num_dots': url.count('.'), 'num_hyphens': url.count('-'), 'num_underscores': url.count('_'), 'num_slashes': url.count('/'), 'num_question_marks': url.count('?'), 'num_ampersands': url.count('&'), 'num_equals': url.count('='), 'num_at': url.count('@'), 'num_percent': url.count('%'), 'num_digits_url': sum(c.isdigit() for c in url), 'num_letters_url': sum(c.isalpha() for c in url), # Domain character counts 'domain_dots': domain.count('.'), 'domain_hyphens': domain.count('-'), 'domain_digits': sum(c.isdigit() for c in domain), # Path character counts 'path_slashes': path.count('/'), 'path_dots': path.count('.'), 'path_digits': sum(c.isdigit() for c in path), } def _ratio_features(self, url: str, domain: str) -> dict: """Ratio-based features.""" url_len = max(len(url), 1) domain_len = max(len(domain), 1) return { 'digit_ratio_url': sum(c.isdigit() for c in url) / url_len, 'letter_ratio_url': sum(c.isalpha() for c in url) / url_len, 'special_char_ratio': sum(not c.isalnum() for c in url) / url_len, 'digit_ratio_domain': sum(c.isdigit() for c in domain) / domain_len, 'symbol_ratio_domain': sum(c in '-_.' for c in domain) / domain_len, } def _domain_features(self, domain: str, parsed) -> dict: """Domain structure features.""" parts = domain.split('.') tld = parts[-1] if parts else '' sld = parts[-2] if len(parts) > 1 else '' num_subdomains = max(0, len(parts) - 2) longest_part = max((len(p) for p in parts), default=0) return { 'num_subdomains': num_subdomains, 'num_domain_parts': len(parts), 'tld_length': len(tld), 'sld_length': len(sld), 'longest_domain_part': longest_part, 'avg_domain_part_len': sum(len(p) for p in parts) / max(len(parts), 1), # NEW: Longest part thresholds (from analysis!) 'longest_part_gt_20': 1 if longest_part > 20 else 0, 'longest_part_gt_30': 1 if longest_part > 30 else 0, 'longest_part_gt_40': 1 if longest_part > 40 else 0, # TLD indicators 'has_suspicious_tld': 1 if tld in self.suspicious_tlds else 0, 'has_trusted_tld': 1 if tld in self.trusted_tlds else 0, # Port 'has_port': 1 if parsed.port else 0, 'has_non_std_port': 1 if parsed.port and parsed.port not in [80, 443] else 0, # Domain randomness features 'domain_randomness_score': self._calculate_domain_randomness(sld), 'sld_consonant_cluster_score': self._consonant_clustering_score(sld), 'sld_keyboard_pattern': self._keyboard_pattern_score(sld), 'sld_has_dictionary_word': self._contains_dictionary_word(sld), 'sld_pronounceability_score': self._pronounceability_score(sld), 'domain_digit_position_suspicious': self._suspicious_digit_position(sld), } def _path_features(self, path: str, domain: str) -> dict: """Path structure features.""" segments = [s for s in path.split('/') if s] # Get file extension if present extension = '' if '.' in path: potential_ext = path.rsplit('.', 1)[-1].split('?')[0].lower() if len(potential_ext) <= 10: extension = potential_ext return { 'path_depth': len(segments), 'max_path_segment_len': max((len(s) for s in segments), default=0), 'avg_path_segment_len': sum(len(s) for s in segments) / max(len(segments), 1), # Extension features 'has_extension': 1 if extension else 0, 'extension_category': self._categorize_extension(extension), 'has_suspicious_extension': 1 if extension in ['zip', 'exe', 'apk', 'scr', 'bat', 'cmd'] else 0, 'has_exe': 1 if extension in ['exe', 'bat', 'cmd', 'msi'] else 0, # Suspicious path patterns 'has_double_slash': 1 if '//' in path else 0, 'path_has_brand_not_domain': self._brand_in_path_only(path, domain), 'path_has_ip_pattern': 1 if re.search(r'\d{1,3}[._-]\d{1,3}[._-]\d{1,3}', path) else 0, 'suspicious_path_extension_combo': self._suspicious_extension_pattern(path), } def _query_features(self, query: str) -> dict: """Query string features.""" params = parse_qs(query) return { 'num_params': len(params), 'has_query': 1 if query else 0, 'query_value_length': sum(len(''.join(v)) for v in params.values()), 'max_param_len': max((len(k) + len(''.join(v)) for k, v in params.items()), default=0), 'query_has_url': 1 if re.search(r'https?%3A%2F%2F|http%3A//', query.lower()) else 0, } def _statistical_features(self, url: str, domain: str, path: str) -> dict: """Statistical and entropy features.""" parts = domain.split('.') sld = parts[-2] if len(parts) > 1 else domain return { # Entropy 'url_entropy': self._entropy(url), 'domain_entropy': self._entropy(domain), 'path_entropy': self._entropy(path) if path else 0, # Consecutive character patterns 'max_consecutive_digits': self._max_consecutive(url, str.isdigit), 'max_consecutive_chars': self._max_consecutive(url, str.isalpha), 'max_consecutive_consonants': self._max_consecutive_consonants(domain), # Character variance 'char_repeat_rate': self._repeat_rate(url), # N-gram uniqueness 'unique_bigram_ratio': self._unique_ngram_ratio(url, 2), 'unique_trigram_ratio': self._unique_ngram_ratio(url, 3), # Improved statistical features 'sld_letter_diversity': self._character_diversity(sld), 'domain_has_numbers_letters': 1 if any(c.isdigit() for c in domain) and any(c.isalpha() for c in domain) else 0, 'url_complexity_score': self._calculate_url_complexity(url), } def _security_features(self, url: str, parsed, domain: str) -> dict: """Security indicator features (URL-based only).""" parts = domain.split('.') return { # IP address 'has_ip_address': 1 if self._is_ip(domain) else 0, # Suspicious patterns 'has_at_symbol': 1 if '@' in url else 0, 'has_redirect': 1 if 'redirect' in url.lower() or 'url=' in url.lower() else 0, # URL shortener - FIXED: exact match only 'is_shortened': self._is_url_shortener(domain), # Free hosting - DEPRECATED (use is_free_platform instead) 'is_free_hosting': self._is_free_platform(domain), # NEW: Free platform detection (CRITICAL for your dataset!) 'is_free_platform': self._is_free_platform(domain), 'platform_subdomain_length': self._get_platform_subdomain_length(domain), 'has_uuid_subdomain': self._detect_uuid_pattern(domain), } def _keyword_features(self, url: str, domain: str, path: str, parsed) -> dict: """Keyword and brand detection features.""" url_lower = url.lower() domain_lower = domain.lower() path_lower = path.lower() # Count phishing keywords phishing_in_url = sum(1 for k in self.phishing_keywords if k in url_lower) phishing_in_domain = sum(1 for k in self.phishing_keywords if k in domain_lower) phishing_in_path = sum(1 for k in self.phishing_keywords if k in path_lower) # Count brand names brands_in_url = sum(1 for b in self.brand_names if b in url_lower) brands_in_domain = sum(1 for b in self.brand_names if b in domain_lower) brands_in_path = sum(1 for b in self.brand_names if b in path_lower) # Brand impersonation brand_impersonation = 1 if brands_in_path > 0 and brands_in_domain == 0 else 0 return { 'num_phishing_keywords': phishing_in_url, 'phishing_in_domain': phishing_in_domain, 'phishing_in_path': phishing_in_path, 'num_brands': brands_in_url, 'brand_in_domain': 1 if brands_in_domain > 0 else 0, 'brand_in_path': 1 if brands_in_path > 0 else 0, 'brand_impersonation': brand_impersonation, # Specific high-value keywords 'has_login': 1 if 'login' in url_lower or 'signin' in url_lower else 0, 'has_account': 1 if 'account' in url_lower else 0, 'has_verify': 1 if 'verify' in url_lower or 'confirm' in url_lower else 0, 'has_secure': 1 if 'secure' in url_lower or 'security' in url_lower else 0, 'has_update': 1 if 'update' in url_lower else 0, 'has_bank': 1 if 'bank' in url_lower else 0, 'has_password': 1 if 'password' in url_lower or 'passwd' in url_lower else 0, 'has_suspend': 1 if 'suspend' in url_lower or 'locked' in url_lower else 0, # Suspicious patterns 'has_webscr': 1 if 'webscr' in url_lower else 0, 'has_cmd': 1 if 'cmd=' in url_lower else 0, 'has_cgi': 1 if 'cgi-bin' in url_lower or 'cgi_bin' in url_lower else 0, # Advanced brand spoofing features 'brand_in_subdomain_not_domain': self._brand_subdomain_spoofing(parsed), 'multiple_brands_in_url': 1 if brands_in_url >= 2 else 0, 'brand_with_hyphen': self._brand_with_hyphen(domain_lower), 'suspicious_brand_tld': self._suspicious_brand_tld(domain), 'brand_keyword_combo': self._brand_phishing_keyword_combo(url_lower), } def _encoding_features(self, url: str, domain: str) -> dict: """Encoding-related features.""" has_punycode = 'xn--' in domain try: decoded = unquote(url) encoding_diff = len(decoded) - len(url) except: encoding_diff = 0 try: has_hex = 1 if re.search(r'[0-9a-f]{20,}', url.lower()) else 0 except: has_hex = 0 try: has_base64 = 1 if re.search(r'[A-Za-z0-9+/]{30,}={0,2}', url) else 0 except: has_base64 = 0 try: has_unicode = 1 if any(ord(c) > 127 for c in url) else 0 except: has_unicode = 0 return { 'has_url_encoding': 1 if '%' in url else 0, 'encoding_count': url.count('%'), 'encoding_diff': abs(encoding_diff), 'has_punycode': 1 if has_punycode else 0, 'has_unicode': has_unicode, 'has_hex_string': has_hex, 'has_base64': has_base64, # Homograph & encoding detection 'has_lookalike_chars': self._detect_lookalike_chars(domain), 'mixed_script_score': self._mixed_script_detection(domain), 'homograph_brand_risk': self._homograph_brand_check(domain), 'suspected_idn_homograph': self._idn_homograph_score(url), 'double_encoding': self._detect_double_encoding(url), 'encoding_in_domain': 1 if '%' in domain else 0, 'suspicious_unicode_category': self._suspicious_unicode_chars(url), } # ============================================================ # HELPER METHODS # ============================================================ def _entropy(self, text: str) -> float: """Calculate Shannon entropy.""" if not text: return 0.0 freq = Counter(text) length = len(text) return -sum((c / length) * math.log2(c / length) for c in freq.values()) def _max_consecutive(self, text: str, condition) -> int: """Max consecutive characters matching condition.""" max_count = count = 0 for char in text: if condition(char): count += 1 max_count = max(max_count, count) else: count = 0 return max_count def _max_consecutive_consonants(self, text: str) -> int: """Max consecutive consonants.""" consonants = set('bcdfghjklmnpqrstvwxyz') max_count = count = 0 for char in text.lower(): if char in consonants: count += 1 max_count = max(max_count, count) else: count = 0 return max_count def _repeat_rate(self, text: str) -> float: """Rate of repeated adjacent characters.""" if len(text) < 2: return 0.0 repeats = sum(1 for i in range(len(text) - 1) if text[i] == text[i + 1]) return repeats / (len(text) - 1) def _unique_ngram_ratio(self, text: str, n: int) -> float: """Ratio of unique n-grams to total n-grams.""" if len(text) < n: return 0.0 ngrams = [text[i:i + n] for i in range(len(text) - n + 1)] return len(set(ngrams)) / len(ngrams) def _is_ip(self, domain: str) -> bool: """Check if domain is IP address.""" # IPv4 if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain): return True # IPv6 try: socket.inet_pton(socket.AF_INET6, domain.strip('[]')) return True except: return False # ============================================================ # NEW/IMPROVED METHODS # ============================================================ def _is_url_shortener(self, domain: str) -> int: """ URL shortener detection - EXACT match. """ domain_lower = domain.lower() return 1 if domain_lower in self.shorteners else 0 def _is_free_platform(self, domain: str) -> int: """ Detect if hosted on free platform. CRITICAL FIX: Exact or suffix match (not substring!). Examples: - 'mysite.weebly.com' → 1 (suffix match) - 'weebly.com' → 1 (exact match) - 'weebly-alternative.com' → 0 (NOT a match!) """ domain_lower = domain.lower() # Exact match if domain_lower in self.free_platforms: return 1 if domain_lower in self.google_services: return 1 if domain_lower in self.adobe_services: return 1 if domain_lower in self.microsoft_services: return 1 if domain_lower in self.zoom_services: return 1 # Suffix match (subdomain.platform.com) for platform in self.free_platforms: if domain_lower.endswith('.' + platform): return 1 return 0 def _get_platform_subdomain_length(self, domain: str) -> int: """ IMPROVED: Handle multi-level subdomains. Examples: - docs.google.com → subdomain = 'docs' (4 chars) - new.express.adobe.com → subdomain = 'new.express' (11 chars) - storage.cloud.google.com → subdomain = 'storage.cloud' (13 chars) """ domain_lower = domain.lower() # Check Google if '.google.com' in domain_lower: subdomain = domain_lower.replace('.google.com', '') return len(subdomain) # Check Adobe if '.adobe.com' in domain_lower: subdomain = domain_lower.replace('.adobe.com', '') return len(subdomain) # Check Microsoft if '.office.com' in domain_lower: subdomain = domain_lower.replace('.office.com', '') return len(subdomain) # Check free platforms (existing logic) for platform in self.free_platforms: if domain_lower.endswith('.' + platform): subdomain = domain_lower[:-len('.' + platform)] return len(subdomain) return 0 def _detect_uuid_pattern(self, domain: str) -> int: """ Detect UUID patterns in subdomain (Replit, Firebase, etc.). Example: 'b82dba2b-fde4-4477-b6d5-8b17144e1bee.replit.dev' → 1 """ # UUID pattern: 8-4-4-4-12 hex characters uuid_pattern = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}' return 1 if re.search(uuid_pattern, domain.lower()) else 0 # ============================================================ # DOMAIN RANDOMNESS HELPERS # ============================================================ def _calculate_domain_randomness(self, domain: str) -> float: """Calculate randomness score for domain (0-1).""" if not domain or len(domain) < 4: return 0.5 domain_lower = domain.lower() scores = [] # 1. Vowel distribution vowels = 'aeiou' vowel_positions = [i for i, c in enumerate(domain_lower) if c in vowels] if len(vowel_positions) >= 2: avg_gap = sum(vowel_positions[i+1] - vowel_positions[i] for i in range(len(vowel_positions)-1)) / (len(vowel_positions)-1) vowel_irregularity = min(abs(avg_gap - 2.5) / 5, 1.0) scores.append(vowel_irregularity) # 2. Character frequency char_freq = Counter(domain_lower) common_letters = 'etaoinshr' common_count = sum(char_freq.get(c, 0) for c in common_letters) uncommon_ratio = 1 - (common_count / max(len(domain_lower), 1)) scores.append(uncommon_ratio) # 3. Repeated characters unique_ratio = len(set(domain_lower)) / max(len(domain_lower), 1) if unique_ratio > 0.75: scores.append((unique_ratio - 0.75) / 0.25) else: scores.append(0) return min(sum(scores) / max(len(scores), 1), 1.0) def _consonant_clustering_score(self, text: str) -> float: """Detect unnatural consonant clusters.""" if not text: return 0 text_lower = text.lower() consonants = 'bcdfghjklmnpqrstvwxyz' max_cluster = 0 current_cluster = 0 for char in text_lower: if char in consonants: current_cluster += 1 max_cluster = max(max_cluster, current_cluster) else: current_cluster = 0 if max_cluster >= 5: return 1.0 elif max_cluster >= 4: return 0.7 elif max_cluster >= 3: return 0.4 else: return 0.0 def _keyboard_pattern_score(self, text: str) -> int: """Detect keyboard walking patterns.""" if not text: return 0 text_lower = text.lower() count = 0 for pattern in self.keyboard_patterns: if pattern in text_lower: count += 1 return count def _contains_dictionary_word(self, text: str) -> int: """Check if text contains any common English word.""" if not text or len(text) < 4: return 0 text_lower = text.lower() if text_lower in self.common_words: return 1 for word in self.common_words: if len(word) >= 4 and word in text_lower: return 1 return 0 def _pronounceability_score(self, text: str) -> float: """Score based on bigram frequencies in English.""" if not text or len(text) < 2: return 0.5 text_lower = text.lower() common_bigrams = { 'th', 'he', 'in', 'er', 'an', 're', 'on', 'at', 'en', 'nd', 'ti', 'es', 'or', 'te', 'of', 'ed', 'is', 'it', 'al', 'ar', 'st', 'to', 'nt', 'ng', 'se', 'ha', 'as', 'ou', 'io', 've' } bigrams = [text_lower[i:i+2] for i in range(len(text_lower)-1)] if not bigrams: return 0.5 common_count = sum(1 for bg in bigrams if bg in common_bigrams) score = common_count / len(bigrams) return score def _suspicious_digit_position(self, text: str) -> int: """Detect suspicious digit positions.""" if not text: return 0 if text and text[0].isdigit(): return 1 if len(text) >= 2 and text[-1].isdigit() and text[-2].isdigit(): return 1 return 0 # ============================================================ # BRAND SPOOFING HELPERS # ============================================================ def _brand_subdomain_spoofing(self, parsed) -> int: """Detect brand in subdomain but not main domain.""" try: parts = parsed.netloc.split('.') if len(parts) < 3: return 0 subdomains = '.'.join(parts[:-2]).lower() main_domain = '.'.join(parts[-2:]).lower() for brand in self.brand_names: if brand in subdomains and brand not in main_domain: return 1 return 0 except: return 0 def _brand_with_hyphen(self, domain: str) -> int: """Detect hyphenated brand names.""" if not domain: return 0 domain_lower = domain.lower() for brand in self.brand_names: if len(brand) >= 4: for i in range(1, len(brand)): hyphenated = brand[:i] + '-' + brand[i:] if hyphenated in domain_lower: return 1 return 0 def _suspicious_brand_tld(self, domain: str) -> int: """Detect brand name with suspicious TLD.""" if not domain: return 0 domain_lower = domain.lower() parts = domain_lower.split('.') if len(parts) < 2: return 0 tld = parts[-1] domain_without_tld = '.'.join(parts[:-1]) if tld in self.suspicious_tlds: for brand in self.brand_names: if brand in domain_without_tld: return 1 return 0 def _brand_phishing_keyword_combo(self, url: str) -> int: """Detect brand + phishing keyword combination.""" if not url: return 0 url_lower = url.lower() has_brand = any(brand in url_lower for brand in self.brand_names) if has_brand: phishing_combo_keywords = [ 'verify', 'security', 'secure', 'account', 'update', 'login', 'confirm', 'suspended', 'locked' ] for keyword in phishing_combo_keywords: if keyword in url_lower: return 1 return 0 # ============================================================ # PATH & QUERY HELPERS # ============================================================ def _brand_in_path_only(self, path: str, domain: str) -> int: """Detect brand in path but not in domain.""" if not path or not domain: return 0 path_lower = path.lower() domain_lower = domain.lower() for brand in self.brand_names: if brand in path_lower and brand not in domain_lower: return 1 return 0 def _suspicious_extension_pattern(self, path: str) -> int: """Detect suspicious extension patterns.""" if not path: return 0 path_lower = path.lower() suspicious_patterns = [ '.php.exe', '.html.exe', '.pdf.exe', '.doc.exe', '.zip.exe', '.rar.exe', '.html.zip', '.pdf.scr' ] for pattern in suspicious_patterns: if pattern in path_lower: return 1 parts = path_lower.split('.') if len(parts) >= 3: ext1 = parts[-2] ext2 = parts[-1] doc_exts = ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'html', 'htm'] exec_exts = ['exe', 'scr', 'bat', 'cmd', 'com', 'pif'] if ext1 in doc_exts and ext2 in exec_exts: return 1 return 0 # ============================================================ # ENCODING HELPERS # ============================================================ def _detect_lookalike_chars(self, domain: str) -> int: """Detect lookalike characters.""" if not domain: return 0 domain_lower = domain.lower() suspicious_patterns = [ ('rn', 'm'), ('vv', 'w'), ('cl', 'd'), ] for pattern, _ in suspicious_patterns: if pattern in domain_lower: return 1 if any(c in domain_lower for c in ['0', '1']): has_letters = any(c.isalpha() for c in domain_lower) if has_letters: for lookalike_char in self.lookalike_chars: if lookalike_char in domain_lower: return 1 return 0 def _mixed_script_detection(self, domain: str) -> int: """Detect mixing of scripts.""" if not domain: return 0 scripts = set() for char in domain: if char.isalpha(): try: script = unicodedata.name(char).split()[0] if script in ['LATIN', 'CYRILLIC', 'GREEK']: scripts.add(script) except: pass return len(scripts) if len(scripts) > 1 else 0 def _homograph_brand_check(self, domain: str) -> int: """Check for homograph attacks on brands.""" if not domain: return 0 domain_lower = domain.lower() top_brands = ['paypal', 'apple', 'amazon', 'google', 'microsoft', 'facebook'] for brand in top_brands: if len(domain_lower) < len(brand) - 2 or len(domain_lower) > len(brand) + 2: continue differences = 0 for i in range(min(len(domain_lower), len(brand))): if i < len(domain_lower) and i < len(brand): if domain_lower[i] != brand[i]: if (domain_lower[i] in '01' and brand[i] in 'ol') or \ (domain_lower[i] in 'ol' and brand[i] in '01'): differences += 1 else: differences += 1 if differences <= 2 and differences > 0: return 1 return 0 def _idn_homograph_score(self, url: str) -> float: """Combined IDN homograph attack score.""" score = 0.0 count = 0 if 'xn--' in url.lower(): score += 0.5 count += 1 non_ascii = sum(1 for c in url if ord(c) > 127) if non_ascii > 0: score += min(non_ascii / 10, 0.3) count += 1 return score / max(count, 1) if count > 0 else 0.0 def _detect_double_encoding(self, url: str) -> int: """Detect double URL encoding.""" if not url: return 0 double_encoded_patterns = ['%25', '%2520', '%252e', '%252f'] count = sum(url.lower().count(pattern) for pattern in double_encoded_patterns) return count def _suspicious_unicode_chars(self, url: str) -> int: """Detect uncommon Unicode categories.""" if not url: return 0 suspicious_count = 0 for char in url: try: category = unicodedata.category(char) if category in ['Mn', 'Mc', 'Me', 'Zl', 'Zp', 'Cc', 'Cf', 'Sm', 'Sc', 'Sk', 'So']: suspicious_count += 1 except: pass return suspicious_count # ============================================================ # FEATURE REFINEMENT HELPERS # ============================================================ def _categorize_length(self, length: int, thresholds: list) -> int: """Multi-category encoding for length features.""" for i, threshold in enumerate(thresholds): if length <= threshold: return i return len(thresholds) def _categorize_extension(self, extension: str) -> int: """ Categorize file extension: 0 = none 1 = document 2 = web/script 3 = executable 4 = archive 5 = image 6 = other """ if not extension: return 0 ext_lower = extension.lower() if ext_lower in ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt', 'rtf']: return 1 if ext_lower in ['html', 'htm', 'php', 'asp', 'aspx', 'jsp', 'js', 'css']: return 2 if ext_lower in ['exe', 'bat', 'cmd', 'scr', 'msi', 'com', 'pif', 'app', 'apk']: return 3 if ext_lower in ['zip', 'rar', '7z', 'tar', 'gz', 'bz2']: return 4 if ext_lower in ['jpg', 'jpeg', 'png', 'gif', 'svg', 'ico', 'webp']: return 5 return 6 def _character_diversity(self, text: str) -> float: """Shannon diversity index for characters.""" if not text: return 0.0 unique_chars = len(set(text)) return min(unique_chars / max(len(text), 1), 1.0) def _calculate_url_complexity(self, url: str) -> float: """Combined URL complexity score.""" if not url: return 0.0 special_chars = sum(1 for c in url if not c.isalnum() and c not in [':', '/', '.']) special_ratio = special_chars / max(len(url), 1) length_score = min(len(url) / 200, 1.0) encoding_score = min(url.count('%') / 10, 1.0) complexity = (special_ratio * 0.4 + length_score * 0.3 + encoding_score * 0.3) return min(complexity, 1.0) # ============================================================ # UTILITY METHODS # ============================================================ def _get_default_features(self) -> dict: """Default feature values for error cases.""" # Get feature names dynamically dummy_url = "http://example.com" try: return self.extract_features(dummy_url) except: return {} def get_feature_names(self) -> list: """ Get list of all feature names DYNAMICALLY. FIXED: No longer hardcoded! """ dummy_url = "http://example.com/test" dummy_features = self.extract_features(dummy_url) # Remove 'label' if present feature_names = [k for k in dummy_features.keys() if k != 'label'] return sorted(feature_names) def extract_batch(self, urls: list, show_progress: bool = True) -> pd.DataFrame: """ Extract features from multiple URLs. Args: urls: List of URL strings show_progress: Show progress messages Returns: DataFrame with features """ if show_progress: logger.info(f"Extracting URL features from {len(urls):,} URLs...") features_list = [] progress_interval = 50000 for i, url in enumerate(urls): if show_progress and i > 0 and i % progress_interval == 0: logger.info(f" Processed {i:,} / {len(urls):,} ({100 * i / len(urls):.1f}%)") features = self.extract_features(url) features_list.append(features) df = pd.DataFrame(features_list) if show_progress: logger.info(f"✓ Extracted {len(df.columns)} features from {len(df):,} URLs") return df def main(): """Extract URL-only features from dataset.""" import argparse parser = argparse.ArgumentParser(description='URL-Only Feature Extraction v2.1 (IMPROVED)') parser.add_argument('--sample', type=int, default=None, help='Sample N URLs') parser.add_argument('--output', type=str, default=None, help='Output filename') args = parser.parse_args() logger.info("=" * 70) logger.info("URL-Only Feature Extraction v2") logger.info("=" * 70) logger.info("") logger.info("NEW Features:") logger.info(" - Fixed free platform detection (exact/suffix match)") logger.info(" - Added platform_subdomain_length") logger.info(" - Added has_uuid_subdomain") logger.info(" - Added longest_part thresholds (gt_20, gt_30, gt_40)") logger.info(" - Expanded brand list with regional brands") logger.info(" - Improved extension categorization") logger.info("") # Load dataset script_dir = Path(__file__).parent data_file = (script_dir / '../../data/processed/clean_dataset.csv').resolve() logger.info(f"Loading: {data_file.name}") df = pd.read_csv(data_file) logger.info(f"Loaded: {len(df):,} URLs") if args.sample and args.sample < len(df): df = df.sample(n=args.sample, random_state=42) logger.info(f"Sampled: {len(df):,} URLs") # Extract features extractor = URLFeatureExtractorV2() features_df = extractor.extract_batch(df['url'].tolist()) features_df['label'] = df['label'].values # Save output_dir = (script_dir / '../../data/features').resolve() output_dir.mkdir(parents=True, exist_ok=True) if args.output: output_file = output_dir / args.output else: suffix = f'_sample{args.sample}' if args.sample else '' output_file = output_dir / f'url_features_v2{suffix}.csv' features_df.to_csv(output_file, index=False) logger.info("") logger.info("=" * 70) logger.info(f"✓ Saved: {output_file}") logger.info(f" Shape: {features_df.shape}") logger.info(f" Features: {len(features_df.columns) - 1}") logger.info("=" * 70) # Show feature names print("\nAll Features:") feature_names = extractor.get_feature_names() for i, name in enumerate(feature_names, 1): print(f"{i:3d}. {name}") # Show stats print("\n\nFeature Statistics (first 30):") print(features_df.describe().T.head(30)) # Show new features stats print("\n\nNEW FEATURES Statistics:") new_features = [ 'is_free_platform', 'platform_subdomain_length', 'has_uuid_subdomain', 'longest_part_gt_20', 'longest_part_gt_30', 'longest_part_gt_40' ] for feat in new_features: if feat in features_df.columns: if feat == 'platform_subdomain_length': print(f"\n{feat}:") print(f" Mean: {features_df[feat].mean():.2f}") print(f" Max: {features_df[feat].max()}") print(f" Non-zero: {(features_df[feat] > 0).sum()} ({(features_df[feat] > 0).sum() / len(features_df) * 100:.1f}%)") else: print(f"\n{feat}: {features_df[feat].sum()} / {len(features_df)} ({features_df[feat].mean() * 100:.1f}%)") if __name__ == "__main__": main()