Spaces:
Runtime error
Runtime error
| """ | |
| URL Feature Extraction v2 - IMPROVED VERSION | |
| Improvements: | |
| - Fixed free hosting detection (exact/suffix match instead of substring) | |
| - Added free platform detection (Google Sites, Weebly, Firebase, etc.) | |
| - Added UUID subdomain detection (Replit, Firebase patterns) | |
| - Added platform subdomain length feature | |
| - Added longest_part thresholds (gt_20, gt_30, gt_40) | |
| - Expanded brand list with regional brands | |
| - Improved extension categorization (added archive, image categories) | |
| - Fixed get_feature_names() to be dynamic | |
| - Better URL shortener detection | |
| Key Features: | |
| - Lexical (length, characters, entropy) | |
| - Structural (domain parts, path segments, TLD) | |
| - Statistical (entropy, n-grams, patterns) | |
| - Security indicators (from URL only) | |
| - Brand/phishing patterns | |
| - FREE PLATFORM ABUSE DETECTION (NEW!) | |
| Designed for: | |
| - Fast inference (< 1ms per URL) | |
| - No network dependencies | |
| - Production deployment | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from urllib.parse import urlparse, parse_qs, unquote | |
| import re | |
| import math | |
| import socket | |
| import unicodedata | |
| from pathlib import Path | |
| from collections import Counter | |
| import sys | |
| import logging | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger("url_features_v2") | |
| class URLFeatureExtractorV2: | |
| """ | |
| Fast URL-only feature extractor for Stage 1 phishing detection. | |
| IMPROVED VERSION with better free platform detection. | |
| """ | |
| def __init__(self): | |
| """Initialize feature extractor with keyword lists.""" | |
| # Phishing-related keywords | |
| self.phishing_keywords = [ | |
| 'login', 'signin', 'sign-in', 'log-in', 'logon', 'signon', | |
| 'account', 'accounts', 'update', 'verify', 'verification', | |
| 'secure', 'security', 'banking', 'bank', 'confirm', 'password', | |
| 'passwd', 'credential', 'suspended', 'locked', 'unusual', | |
| 'authenticate', 'auth', 'wallet', 'invoice', 'payment', | |
| 'billing', 'expire', 'expired', 'limited', 'restrict', | |
| 'urgent', 'immediately', 'alert', 'warning', 'resolve', | |
| 'recover', 'restore', 'reactivate', 'unlock', 'validate' | |
| ] | |
| # Brand names - EXPANDED with regional brands | |
| self.brand_names = [ | |
| # US Tech Giants | |
| 'paypal', 'ebay', 'amazon', 'apple', 'microsoft', 'google', | |
| 'facebook', 'instagram', 'twitter', 'netflix', 'linkedin', | |
| 'dropbox', 'adobe', 'spotify', 'steam', 'zoom', 'docusign', | |
| 'salesforce', 'shopify', 'square', 'venmo', 'cashapp', 'zelle', | |
| # US Banks | |
| 'chase', 'wellsfargo', 'bankofamerica', 'citibank', 'citi', | |
| 'americanexpress', 'amex', 'visa', 'mastercard', | |
| 'capitalone', 'usbank', 'pnc', 'truist', | |
| # Email/Communication | |
| 'outlook', 'office365', 'office', 'yahoo', 'aol', 'icloud', | |
| 'gmail', 'protonmail', 'whatsapp', 'telegram', 'discord', | |
| 'signal', 'skype', 'teams', | |
| # Shipping/Logistics | |
| 'dhl', 'fedex', 'ups', 'usps', 'amazon', 'alibaba', | |
| # Crypto/Finance | |
| 'coinbase', 'binance', 'blockchain', 'metamask', 'kraken', | |
| 'gemini', 'robinhood', 'etrade', 'fidelity', 'schwab', | |
| 'payoneer', 'stripe', 'wise', 'revolut', | |
| # Social/Entertainment | |
| 'tiktok', 'snapchat', 'twitch', 'roblox', 'epic', 'epicgames', | |
| 'playstation', 'xbox', 'nintendo', 'blizzard', 'riot', | |
| # REGIONAL BRANDS (from analysis) | |
| # Europe | |
| 'allegro', 'allegrolokalnie', # Poland | |
| 'olx', # Europe/LatAm | |
| 'bol', 'marktplaats', # Netherlands | |
| 'leboncoin', # France | |
| 'idealo', 'otto', # Germany | |
| 'hsbc', 'barclays', 'santander', 'ing', 'revolut', # European banks | |
| # Asia | |
| 'rakuten', # Japan | |
| 'lazada', 'shopee', # Southeast Asia | |
| 'baidu', 'taobao', 'alipay', 'wechat', 'weibo', # China | |
| 'paytm', 'phonepe', # India | |
| # Latin America | |
| 'mercadolibre', 'mercadopago', # LatAm | |
| # Russia | |
| 'yandex', 'vk', 'mailru', | |
| # Other | |
| 'uber', 'lyft', 'airbnb', 'booking', 'expedia', | |
| 'wetransfer', 'mediafire', 'mega', | |
| ] | |
| # URL shorteners - EXACT MATCH ONLY | |
| self.shorteners = { | |
| # Original | |
| 'bit.ly', 'bitly.com', 'goo.gl', 'tinyurl.com', 't.co', 'ow.ly', | |
| 'is.gd', 'buff.ly', 'adf.ly', 'bit.do', 'short.to', 'tiny.cc', | |
| 'j.mp', 'surl.li', 'rb.gy', 'cutt.ly', 'qrco.de', 'v.gd', | |
| 'shorturl.at', 'rebrand.ly', 'clck.ru', 's.id', 'shrtco.de', | |
| # NEW from analysis (CRITICAL!) | |
| 'qrco.de', # 3,824 occurrences! | |
| 'q-r.to', # 2,974 | |
| 'l.ead.me', # 2,907 | |
| 'ead.me', # Base domain | |
| 'urlz.fr', | |
| 'hotm.art', | |
| 'reurl.cc', | |
| 'did.li', | |
| 'zpr.io', | |
| 'linkin.bio', | |
| 'linqapp.com', | |
| 'linktr.ee', | |
| 'flow.page', | |
| 'campsite.bio', | |
| 'qr-codes.io', | |
| 'scanned.page', | |
| 'l.wl.co', | |
| 'wl.co', | |
| 'hm.ru', | |
| 'flowcode.com', | |
| } | |
| # Suspicious TLDs | |
| self.suspicious_tlds = { | |
| 'tk', 'ml', 'ga', 'cf', 'gq', # Free domains | |
| 'xyz', 'top', 'club', 'work', 'date', 'racing', 'win', | |
| 'loan', 'download', 'stream', 'click', 'link', 'bid', | |
| 'review', 'party', 'trade', 'webcam', 'science', | |
| 'accountant', 'faith', 'cricket', 'zip', 'mov', | |
| 'icu', 'buzz', 'space', 'online', 'site', 'website', | |
| 'tech', 'store', 'rest', 'cfd', 'monster', 'sbs' | |
| } | |
| # Trusted TLDs | |
| self.trusted_tlds = { | |
| 'com', 'org', 'net', 'edu', 'gov', 'mil', | |
| 'uk', 'us', 'ca', 'de', 'fr', 'jp', 'au', | |
| 'nl', 'be', 'ch', 'it', 'es', 'se', 'no', | |
| 'pl', 'br', 'in', 'mx', 'kr', 'ru', 'cn' | |
| } | |
| # FREE PLATFORMS - EXACT/SUFFIX MATCH (from your PhishTank analysis!) | |
| self.free_platforms = { | |
| # Website Builders | |
| 'weebly.com', 'wixsite.com', 'wix.com', 'webflow.io', | |
| 'framer.website', 'carrd.co', 'notion.site', 'webwave.me', | |
| 'godaddysites.com', 'square.site', 'sites.google.com', | |
| # Google Platforms (HIGH PHISHING RATE from analysis) | |
| 'firebaseapp.com', 'web.app', 'appspot.com', | |
| 'firebase.app', 'page.link', | |
| # Developer Platforms (from analysis: Replit, Vercel, etc.) | |
| 'github.io', 'gitlab.io', 'pages.github.com', | |
| 'vercel.app', 'netlify.app', 'netlify.com', | |
| 'replit.dev', 'repl.co', 'replit.co', | |
| 'glitch.me', 'glitch.com', | |
| 'pages.dev', 'workers.dev', # Cloudflare | |
| 'herokuapp.com', 'heroku.com', | |
| 'onrender.com', 'railway.app', 'fly.dev', | |
| 'amplifyapp.com', # AWS Amplify | |
| 'surge.sh', 'now.sh', | |
| # Blogging/CMS | |
| 'wordpress.com', 'blogspot.com', 'blogger.com', | |
| 'tumblr.com', 'medium.com', 'ghost.io', | |
| 'substack.com', 'beehiiv.com', | |
| # Adobe/Creative | |
| 'adobesites.com', 'myportfolio.com', 'behance.net', | |
| 'adobe.com', 'framer.app', | |
| # Forms/Surveys (from analysis: jotform, hsforms) | |
| 'jotform.com', 'typeform.com', 'forms.gle', | |
| 'hsforms.com', 'hubspot.com', 'surveymonkey.com', | |
| 'formstack.com', 'cognito.com', | |
| # File Sharing | |
| 'dropboxusercontent.com', 'dl.dropboxusercontent.com', | |
| 'sharepoint.com', '1drv.ms', 'onedrive.live.com', | |
| 'box.com', 'wetransfer.com', 'we.tl', | |
| # Free Hosting | |
| '000webhostapp.com', 'freehosting.com', 'freehostia.com', | |
| '5gbfree.com', 'x10hosting.com', 'awardspace.com', | |
| 'byet.host', 'infinityfree.com', | |
| # Education/Sandbox | |
| 'repl.it', 'codepen.io', 'jsfiddle.net', 'codesandbox.io', | |
| 'stackblitz.com', 'observablehq.com', | |
| # Other (from analysis) | |
| 'webcindario.com', 'gitbook.io', 'tinyurl.com', | |
| 'start.page', 'my.site', 'site123.com' | |
| } | |
| # Common English words for dictionary check | |
| self.common_words = { | |
| 'about', 'account', 'after', 'again', 'all', 'also', 'america', 'american', | |
| 'another', 'answer', 'any', 'app', 'apple', 'area', 'back', 'bank', 'best', | |
| 'between', 'book', 'business', 'call', 'can', 'card', 'care', 'case', 'center', | |
| 'central', 'change', 'check', 'city', 'class', 'cloud', 'come', 'company', | |
| 'contact', 'control', 'country', 'course', 'credit', 'data', 'day', 'dept', | |
| 'department', 'different', 'digital', 'doctor', 'down', 'east', 'easy', 'end', | |
| 'energy', 'even', 'event', 'every', 'express', 'fact', 'family', 'feel', | |
| 'field', 'file', 'find', 'first', 'food', 'form', 'free', 'friend', 'from', | |
| 'game', 'general', 'get', 'give', 'global', 'good', 'government', 'great', | |
| 'group', 'hand', 'have', 'head', 'health', 'help', 'here', 'high', 'home', | |
| 'house', 'how', 'image', 'info', 'information', 'insurance', 'international', | |
| 'into', 'just', 'keep', 'kind', 'know', 'large', 'last', 'late', 'leave', | |
| 'left', 'legal', 'life', 'like', 'line', 'little', 'local', 'long', 'look', | |
| 'love', 'mail', 'main', 'make', 'management', 'manager', 'many', 'map', 'market', | |
| 'marketing', 'media', 'medical', 'member', 'message', 'money', 'month', 'more', | |
| 'most', 'move', 'music', 'name', 'national', 'need', 'network', 'never', 'new', | |
| 'news', 'next', 'north', 'not', 'note', 'number', 'office', 'official', 'old', | |
| 'online', 'only', 'open', 'order', 'other', 'over', 'page', 'part', 'party', | |
| 'people', 'person', 'personal', 'photo', 'place', 'plan', 'play', 'plus', 'point', | |
| 'policy', 'portal', 'post', 'power', 'press', 'price', 'private', 'product', | |
| 'program', 'project', 'property', 'public', 'quality', 'question', 'quick', 'rate', | |
| 'read', 'real', 'record', 'report', 'research', 'resource', 'result', 'right', | |
| 'room', 'sale', 'sales', 'save', 'school', 'search', 'second', 'section', | |
| 'security', 'see', 'senior', 'service', 'services', 'set', 'shop', 'show', | |
| 'side', 'sign', 'site', 'small', 'social', 'software', 'solution', 'solutions', | |
| 'some', 'south', 'space', 'special', 'staff', 'start', 'state', 'store', 'story', | |
| 'student', 'study', 'support', 'sure', 'system', 'systems', 'take', 'team', 'tech', | |
| 'technology', 'test', 'text', 'than', 'that', 'their', 'them', 'then', 'there', | |
| 'these', 'they', 'thing', 'think', 'this', 'those', 'through', 'time', 'today', | |
| 'together', 'total', 'trade', 'training', 'travel', 'trust', 'type', 'under', | |
| 'university', 'until', 'update', 'upon', 'user', 'value', 'very', 'video', | |
| 'view', 'want', 'water', 'website', 'week', 'well', 'west', 'what', 'when', | |
| 'where', 'which', 'while', 'white', 'will', 'with', 'within', 'without', 'woman', | |
| 'women', 'word', 'work', 'world', 'would', 'write', 'year', 'york', 'young', 'your' | |
| } | |
| # Keyboard patterns | |
| self.keyboard_patterns = [ | |
| 'qwerty', 'asdfgh', 'zxcvbn', '12345', '123456', '1234567', '12345678', | |
| 'qwertyuiop', 'asdfghjkl', 'zxcvbnm' | |
| ] | |
| # Lookalike character mappings | |
| self.lookalike_chars = { | |
| '0': 'o', 'o': '0', | |
| '1': 'l', 'l': '1', 'i': '1', | |
| 'rn': 'm', 'vv': 'w', 'cl': 'd' | |
| } | |
| self.microsoft_services = { | |
| 'forms.office.com', | |
| 'sharepoint.com', | |
| 'onedrive.live.com', | |
| '1drv.ms', | |
| } | |
| self.zoom_services = { | |
| 'docs.zoom.us', | |
| 'zoom.us', | |
| } | |
| self.adobe_services = { | |
| 'express.adobe.com', | |
| 'new.express.adobe.com', # Multi-level! | |
| 'spark.adobe.com', | |
| 'portfolio.adobe.com', | |
| } | |
| self.google_services = { | |
| 'docs.google.com', | |
| 'sites.google.com', | |
| 'drive.google.com', | |
| 'script.google.com', | |
| 'storage.googleapis.com', | |
| 'storage.cloud.google.com', | |
| 'forms.google.com', | |
| 'calendar.google.com', | |
| 'meet.google.com', | |
| } | |
| def extract_features(self, url: str) -> dict: | |
| """ | |
| Extract all URL-only features from a single URL. | |
| Args: | |
| url: URL string | |
| Returns: | |
| Dictionary of features | |
| """ | |
| try: | |
| # Ensure URL has scheme | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'http://' + url | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower() | |
| domain_no_port = domain.split(':')[0] | |
| path = parsed.path | |
| query = parsed.query | |
| features = {} | |
| # 1. Length features | |
| features.update(self._length_features(url, domain_no_port, path, query)) | |
| # 2. Character count features | |
| features.update(self._char_count_features(url, domain_no_port, path)) | |
| # 3. Ratio features | |
| features.update(self._ratio_features(url, domain_no_port)) | |
| # 4. Domain structure features | |
| features.update(self._domain_features(domain_no_port, parsed)) | |
| # 5. Path features | |
| features.update(self._path_features(path, domain_no_port)) | |
| # 6. Query features | |
| features.update(self._query_features(query)) | |
| # 7. Statistical features (entropy, patterns) | |
| features.update(self._statistical_features(url, domain_no_port, path)) | |
| # 8. Security indicator features | |
| features.update(self._security_features(url, parsed, domain_no_port)) | |
| # 9. Keyword/brand features | |
| features.update(self._keyword_features(url, domain_no_port, path, parsed)) | |
| # 10. Encoding features | |
| features.update(self._encoding_features(url, domain_no_port)) | |
| return features | |
| except Exception as e: | |
| logger.error(f"Error extracting features from URL: {url[:50]}... Error: {e}") | |
| return self._get_default_features() | |
| def _length_features(self, url: str, domain: str, path: str, query: str) -> dict: | |
| """Length-based features.""" | |
| return { | |
| 'url_length': len(url), | |
| 'domain_length': len(domain), | |
| 'path_length': len(path), | |
| 'query_length': len(query), | |
| # Categorical length encoding | |
| 'url_length_category': self._categorize_length(len(url), [30, 75, 150]), | |
| 'domain_length_category': self._categorize_length(len(domain), [10, 20, 30]), | |
| } | |
| def _char_count_features(self, url: str, domain: str, path: str) -> dict: | |
| """Character count features.""" | |
| return { | |
| # URL character counts | |
| 'num_dots': url.count('.'), | |
| 'num_hyphens': url.count('-'), | |
| 'num_underscores': url.count('_'), | |
| 'num_slashes': url.count('/'), | |
| 'num_question_marks': url.count('?'), | |
| 'num_ampersands': url.count('&'), | |
| 'num_equals': url.count('='), | |
| 'num_at': url.count('@'), | |
| 'num_percent': url.count('%'), | |
| 'num_digits_url': sum(c.isdigit() for c in url), | |
| 'num_letters_url': sum(c.isalpha() for c in url), | |
| # Domain character counts | |
| 'domain_dots': domain.count('.'), | |
| 'domain_hyphens': domain.count('-'), | |
| 'domain_digits': sum(c.isdigit() for c in domain), | |
| # Path character counts | |
| 'path_slashes': path.count('/'), | |
| 'path_dots': path.count('.'), | |
| 'path_digits': sum(c.isdigit() for c in path), | |
| } | |
| def _ratio_features(self, url: str, domain: str) -> dict: | |
| """Ratio-based features.""" | |
| url_len = max(len(url), 1) | |
| domain_len = max(len(domain), 1) | |
| return { | |
| 'digit_ratio_url': sum(c.isdigit() for c in url) / url_len, | |
| 'letter_ratio_url': sum(c.isalpha() for c in url) / url_len, | |
| 'special_char_ratio': sum(not c.isalnum() for c in url) / url_len, | |
| 'digit_ratio_domain': sum(c.isdigit() for c in domain) / domain_len, | |
| 'symbol_ratio_domain': sum(c in '-_.' for c in domain) / domain_len, | |
| } | |
| def _domain_features(self, domain: str, parsed) -> dict: | |
| """Domain structure features.""" | |
| parts = domain.split('.') | |
| tld = parts[-1] if parts else '' | |
| sld = parts[-2] if len(parts) > 1 else '' | |
| num_subdomains = max(0, len(parts) - 2) | |
| longest_part = max((len(p) for p in parts), default=0) | |
| return { | |
| 'num_subdomains': num_subdomains, | |
| 'num_domain_parts': len(parts), | |
| 'tld_length': len(tld), | |
| 'sld_length': len(sld), | |
| 'longest_domain_part': longest_part, | |
| 'avg_domain_part_len': sum(len(p) for p in parts) / max(len(parts), 1), | |
| # NEW: Longest part thresholds (from analysis!) | |
| 'longest_part_gt_20': 1 if longest_part > 20 else 0, | |
| 'longest_part_gt_30': 1 if longest_part > 30 else 0, | |
| 'longest_part_gt_40': 1 if longest_part > 40 else 0, | |
| # TLD indicators | |
| 'has_suspicious_tld': 1 if tld in self.suspicious_tlds else 0, | |
| 'has_trusted_tld': 1 if tld in self.trusted_tlds else 0, | |
| # Port | |
| 'has_port': 1 if parsed.port else 0, | |
| 'has_non_std_port': 1 if parsed.port and parsed.port not in [80, 443] else 0, | |
| # Domain randomness features | |
| 'domain_randomness_score': self._calculate_domain_randomness(sld), | |
| 'sld_consonant_cluster_score': self._consonant_clustering_score(sld), | |
| 'sld_keyboard_pattern': self._keyboard_pattern_score(sld), | |
| 'sld_has_dictionary_word': self._contains_dictionary_word(sld), | |
| 'sld_pronounceability_score': self._pronounceability_score(sld), | |
| 'domain_digit_position_suspicious': self._suspicious_digit_position(sld), | |
| } | |
| def _path_features(self, path: str, domain: str) -> dict: | |
| """Path structure features.""" | |
| segments = [s for s in path.split('/') if s] | |
| # Get file extension if present | |
| extension = '' | |
| if '.' in path: | |
| potential_ext = path.rsplit('.', 1)[-1].split('?')[0].lower() | |
| if len(potential_ext) <= 10: | |
| extension = potential_ext | |
| return { | |
| 'path_depth': len(segments), | |
| 'max_path_segment_len': max((len(s) for s in segments), default=0), | |
| 'avg_path_segment_len': sum(len(s) for s in segments) / max(len(segments), 1), | |
| # Extension features | |
| 'has_extension': 1 if extension else 0, | |
| 'extension_category': self._categorize_extension(extension), | |
| 'has_suspicious_extension': 1 if extension in ['zip', 'exe', 'apk', 'scr', 'bat', 'cmd'] else 0, | |
| 'has_exe': 1 if extension in ['exe', 'bat', 'cmd', 'msi'] else 0, | |
| # Suspicious path patterns | |
| 'has_double_slash': 1 if '//' in path else 0, | |
| 'path_has_brand_not_domain': self._brand_in_path_only(path, domain), | |
| 'path_has_ip_pattern': 1 if re.search(r'\d{1,3}[._-]\d{1,3}[._-]\d{1,3}', path) else 0, | |
| 'suspicious_path_extension_combo': self._suspicious_extension_pattern(path), | |
| } | |
| def _query_features(self, query: str) -> dict: | |
| """Query string features.""" | |
| params = parse_qs(query) | |
| return { | |
| 'num_params': len(params), | |
| 'has_query': 1 if query else 0, | |
| 'query_value_length': sum(len(''.join(v)) for v in params.values()), | |
| 'max_param_len': max((len(k) + len(''.join(v)) for k, v in params.items()), default=0), | |
| 'query_has_url': 1 if re.search(r'https?%3A%2F%2F|http%3A//', query.lower()) else 0, | |
| } | |
| def _statistical_features(self, url: str, domain: str, path: str) -> dict: | |
| """Statistical and entropy features.""" | |
| parts = domain.split('.') | |
| sld = parts[-2] if len(parts) > 1 else domain | |
| return { | |
| # Entropy | |
| 'url_entropy': self._entropy(url), | |
| 'domain_entropy': self._entropy(domain), | |
| 'path_entropy': self._entropy(path) if path else 0, | |
| # Consecutive character patterns | |
| 'max_consecutive_digits': self._max_consecutive(url, str.isdigit), | |
| 'max_consecutive_chars': self._max_consecutive(url, str.isalpha), | |
| 'max_consecutive_consonants': self._max_consecutive_consonants(domain), | |
| # Character variance | |
| 'char_repeat_rate': self._repeat_rate(url), | |
| # N-gram uniqueness | |
| 'unique_bigram_ratio': self._unique_ngram_ratio(url, 2), | |
| 'unique_trigram_ratio': self._unique_ngram_ratio(url, 3), | |
| # Improved statistical features | |
| 'sld_letter_diversity': self._character_diversity(sld), | |
| 'domain_has_numbers_letters': 1 if any(c.isdigit() for c in domain) and any(c.isalpha() for c in domain) else 0, | |
| 'url_complexity_score': self._calculate_url_complexity(url), | |
| } | |
| def _security_features(self, url: str, parsed, domain: str) -> dict: | |
| """Security indicator features (URL-based only).""" | |
| parts = domain.split('.') | |
| return { | |
| # IP address | |
| 'has_ip_address': 1 if self._is_ip(domain) else 0, | |
| # Suspicious patterns | |
| 'has_at_symbol': 1 if '@' in url else 0, | |
| 'has_redirect': 1 if 'redirect' in url.lower() or 'url=' in url.lower() else 0, | |
| # URL shortener - FIXED: exact match only | |
| 'is_shortened': self._is_url_shortener(domain), | |
| # Free hosting - DEPRECATED (use is_free_platform instead) | |
| 'is_free_hosting': self._is_free_platform(domain), | |
| # NEW: Free platform detection (CRITICAL for your dataset!) | |
| 'is_free_platform': self._is_free_platform(domain), | |
| 'platform_subdomain_length': self._get_platform_subdomain_length(domain), | |
| 'has_uuid_subdomain': self._detect_uuid_pattern(domain), | |
| } | |
| def _keyword_features(self, url: str, domain: str, path: str, parsed) -> dict: | |
| """Keyword and brand detection features.""" | |
| url_lower = url.lower() | |
| domain_lower = domain.lower() | |
| path_lower = path.lower() | |
| # Count phishing keywords | |
| phishing_in_url = sum(1 for k in self.phishing_keywords if k in url_lower) | |
| phishing_in_domain = sum(1 for k in self.phishing_keywords if k in domain_lower) | |
| phishing_in_path = sum(1 for k in self.phishing_keywords if k in path_lower) | |
| # Count brand names | |
| brands_in_url = sum(1 for b in self.brand_names if b in url_lower) | |
| brands_in_domain = sum(1 for b in self.brand_names if b in domain_lower) | |
| brands_in_path = sum(1 for b in self.brand_names if b in path_lower) | |
| # Brand impersonation | |
| brand_impersonation = 1 if brands_in_path > 0 and brands_in_domain == 0 else 0 | |
| return { | |
| 'num_phishing_keywords': phishing_in_url, | |
| 'phishing_in_domain': phishing_in_domain, | |
| 'phishing_in_path': phishing_in_path, | |
| 'num_brands': brands_in_url, | |
| 'brand_in_domain': 1 if brands_in_domain > 0 else 0, | |
| 'brand_in_path': 1 if brands_in_path > 0 else 0, | |
| 'brand_impersonation': brand_impersonation, | |
| # Specific high-value keywords | |
| 'has_login': 1 if 'login' in url_lower or 'signin' in url_lower else 0, | |
| 'has_account': 1 if 'account' in url_lower else 0, | |
| 'has_verify': 1 if 'verify' in url_lower or 'confirm' in url_lower else 0, | |
| 'has_secure': 1 if 'secure' in url_lower or 'security' in url_lower else 0, | |
| 'has_update': 1 if 'update' in url_lower else 0, | |
| 'has_bank': 1 if 'bank' in url_lower else 0, | |
| 'has_password': 1 if 'password' in url_lower or 'passwd' in url_lower else 0, | |
| 'has_suspend': 1 if 'suspend' in url_lower or 'locked' in url_lower else 0, | |
| # Suspicious patterns | |
| 'has_webscr': 1 if 'webscr' in url_lower else 0, | |
| 'has_cmd': 1 if 'cmd=' in url_lower else 0, | |
| 'has_cgi': 1 if 'cgi-bin' in url_lower or 'cgi_bin' in url_lower else 0, | |
| # Advanced brand spoofing features | |
| 'brand_in_subdomain_not_domain': self._brand_subdomain_spoofing(parsed), | |
| 'multiple_brands_in_url': 1 if brands_in_url >= 2 else 0, | |
| 'brand_with_hyphen': self._brand_with_hyphen(domain_lower), | |
| 'suspicious_brand_tld': self._suspicious_brand_tld(domain), | |
| 'brand_keyword_combo': self._brand_phishing_keyword_combo(url_lower), | |
| } | |
| def _encoding_features(self, url: str, domain: str) -> dict: | |
| """Encoding-related features.""" | |
| has_punycode = 'xn--' in domain | |
| try: | |
| decoded = unquote(url) | |
| encoding_diff = len(decoded) - len(url) | |
| except: | |
| encoding_diff = 0 | |
| try: | |
| has_hex = 1 if re.search(r'[0-9a-f]{20,}', url.lower()) else 0 | |
| except: | |
| has_hex = 0 | |
| try: | |
| has_base64 = 1 if re.search(r'[A-Za-z0-9+/]{30,}={0,2}', url) else 0 | |
| except: | |
| has_base64 = 0 | |
| try: | |
| has_unicode = 1 if any(ord(c) > 127 for c in url) else 0 | |
| except: | |
| has_unicode = 0 | |
| return { | |
| 'has_url_encoding': 1 if '%' in url else 0, | |
| 'encoding_count': url.count('%'), | |
| 'encoding_diff': abs(encoding_diff), | |
| 'has_punycode': 1 if has_punycode else 0, | |
| 'has_unicode': has_unicode, | |
| 'has_hex_string': has_hex, | |
| 'has_base64': has_base64, | |
| # Homograph & encoding detection | |
| 'has_lookalike_chars': self._detect_lookalike_chars(domain), | |
| 'mixed_script_score': self._mixed_script_detection(domain), | |
| 'homograph_brand_risk': self._homograph_brand_check(domain), | |
| 'suspected_idn_homograph': self._idn_homograph_score(url), | |
| 'double_encoding': self._detect_double_encoding(url), | |
| 'encoding_in_domain': 1 if '%' in domain else 0, | |
| 'suspicious_unicode_category': self._suspicious_unicode_chars(url), | |
| } | |
| # ============================================================ | |
| # HELPER METHODS | |
| # ============================================================ | |
| def _entropy(self, text: str) -> float: | |
| """Calculate Shannon entropy.""" | |
| if not text: | |
| return 0.0 | |
| freq = Counter(text) | |
| length = len(text) | |
| return -sum((c / length) * math.log2(c / length) for c in freq.values()) | |
| def _max_consecutive(self, text: str, condition) -> int: | |
| """Max consecutive characters matching condition.""" | |
| max_count = count = 0 | |
| for char in text: | |
| if condition(char): | |
| count += 1 | |
| max_count = max(max_count, count) | |
| else: | |
| count = 0 | |
| return max_count | |
| def _max_consecutive_consonants(self, text: str) -> int: | |
| """Max consecutive consonants.""" | |
| consonants = set('bcdfghjklmnpqrstvwxyz') | |
| max_count = count = 0 | |
| for char in text.lower(): | |
| if char in consonants: | |
| count += 1 | |
| max_count = max(max_count, count) | |
| else: | |
| count = 0 | |
| return max_count | |
| def _repeat_rate(self, text: str) -> float: | |
| """Rate of repeated adjacent characters.""" | |
| if len(text) < 2: | |
| return 0.0 | |
| repeats = sum(1 for i in range(len(text) - 1) if text[i] == text[i + 1]) | |
| return repeats / (len(text) - 1) | |
| def _unique_ngram_ratio(self, text: str, n: int) -> float: | |
| """Ratio of unique n-grams to total n-grams.""" | |
| if len(text) < n: | |
| return 0.0 | |
| ngrams = [text[i:i + n] for i in range(len(text) - n + 1)] | |
| return len(set(ngrams)) / len(ngrams) | |
| def _is_ip(self, domain: str) -> bool: | |
| """Check if domain is IP address.""" | |
| # IPv4 | |
| if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain): | |
| return True | |
| # IPv6 | |
| try: | |
| socket.inet_pton(socket.AF_INET6, domain.strip('[]')) | |
| return True | |
| except: | |
| return False | |
| # ============================================================ | |
| # NEW/IMPROVED METHODS | |
| # ============================================================ | |
| def _is_url_shortener(self, domain: str) -> int: | |
| """ | |
| URL shortener detection - EXACT match. | |
| """ | |
| domain_lower = domain.lower() | |
| return 1 if domain_lower in self.shorteners else 0 | |
| def _is_free_platform(self, domain: str) -> int: | |
| """ | |
| Detect if hosted on free platform. | |
| CRITICAL FIX: Exact or suffix match (not substring!). | |
| Examples: | |
| - 'mysite.weebly.com' → 1 (suffix match) | |
| - 'weebly.com' → 1 (exact match) | |
| - 'weebly-alternative.com' → 0 (NOT a match!) | |
| """ | |
| domain_lower = domain.lower() | |
| # Exact match | |
| if domain_lower in self.free_platforms: | |
| return 1 | |
| if domain_lower in self.google_services: | |
| return 1 | |
| if domain_lower in self.adobe_services: | |
| return 1 | |
| if domain_lower in self.microsoft_services: | |
| return 1 | |
| if domain_lower in self.zoom_services: | |
| return 1 | |
| # Suffix match (subdomain.platform.com) | |
| for platform in self.free_platforms: | |
| if domain_lower.endswith('.' + platform): | |
| return 1 | |
| return 0 | |
| def _get_platform_subdomain_length(self, domain: str) -> int: | |
| """ | |
| IMPROVED: Handle multi-level subdomains. | |
| Examples: | |
| - docs.google.com → subdomain = 'docs' (4 chars) | |
| - new.express.adobe.com → subdomain = 'new.express' (11 chars) | |
| - storage.cloud.google.com → subdomain = 'storage.cloud' (13 chars) | |
| """ | |
| domain_lower = domain.lower() | |
| # Check Google | |
| if '.google.com' in domain_lower: | |
| subdomain = domain_lower.replace('.google.com', '') | |
| return len(subdomain) | |
| # Check Adobe | |
| if '.adobe.com' in domain_lower: | |
| subdomain = domain_lower.replace('.adobe.com', '') | |
| return len(subdomain) | |
| # Check Microsoft | |
| if '.office.com' in domain_lower: | |
| subdomain = domain_lower.replace('.office.com', '') | |
| return len(subdomain) | |
| # Check free platforms (existing logic) | |
| for platform in self.free_platforms: | |
| if domain_lower.endswith('.' + platform): | |
| subdomain = domain_lower[:-len('.' + platform)] | |
| return len(subdomain) | |
| return 0 | |
| def _detect_uuid_pattern(self, domain: str) -> int: | |
| """ | |
| Detect UUID patterns in subdomain (Replit, Firebase, etc.). | |
| Example: | |
| 'b82dba2b-fde4-4477-b6d5-8b17144e1bee.replit.dev' → 1 | |
| """ | |
| # UUID pattern: 8-4-4-4-12 hex characters | |
| uuid_pattern = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}' | |
| return 1 if re.search(uuid_pattern, domain.lower()) else 0 | |
| # ============================================================ | |
| # DOMAIN RANDOMNESS HELPERS | |
| # ============================================================ | |
| def _calculate_domain_randomness(self, domain: str) -> float: | |
| """Calculate randomness score for domain (0-1).""" | |
| if not domain or len(domain) < 4: | |
| return 0.5 | |
| domain_lower = domain.lower() | |
| scores = [] | |
| # 1. Vowel distribution | |
| vowels = 'aeiou' | |
| vowel_positions = [i for i, c in enumerate(domain_lower) if c in vowels] | |
| if len(vowel_positions) >= 2: | |
| avg_gap = sum(vowel_positions[i+1] - vowel_positions[i] | |
| for i in range(len(vowel_positions)-1)) / (len(vowel_positions)-1) | |
| vowel_irregularity = min(abs(avg_gap - 2.5) / 5, 1.0) | |
| scores.append(vowel_irregularity) | |
| # 2. Character frequency | |
| char_freq = Counter(domain_lower) | |
| common_letters = 'etaoinshr' | |
| common_count = sum(char_freq.get(c, 0) for c in common_letters) | |
| uncommon_ratio = 1 - (common_count / max(len(domain_lower), 1)) | |
| scores.append(uncommon_ratio) | |
| # 3. Repeated characters | |
| unique_ratio = len(set(domain_lower)) / max(len(domain_lower), 1) | |
| if unique_ratio > 0.75: | |
| scores.append((unique_ratio - 0.75) / 0.25) | |
| else: | |
| scores.append(0) | |
| return min(sum(scores) / max(len(scores), 1), 1.0) | |
| def _consonant_clustering_score(self, text: str) -> float: | |
| """Detect unnatural consonant clusters.""" | |
| if not text: | |
| return 0 | |
| text_lower = text.lower() | |
| consonants = 'bcdfghjklmnpqrstvwxyz' | |
| max_cluster = 0 | |
| current_cluster = 0 | |
| for char in text_lower: | |
| if char in consonants: | |
| current_cluster += 1 | |
| max_cluster = max(max_cluster, current_cluster) | |
| else: | |
| current_cluster = 0 | |
| if max_cluster >= 5: | |
| return 1.0 | |
| elif max_cluster >= 4: | |
| return 0.7 | |
| elif max_cluster >= 3: | |
| return 0.4 | |
| else: | |
| return 0.0 | |
| def _keyboard_pattern_score(self, text: str) -> int: | |
| """Detect keyboard walking patterns.""" | |
| if not text: | |
| return 0 | |
| text_lower = text.lower() | |
| count = 0 | |
| for pattern in self.keyboard_patterns: | |
| if pattern in text_lower: | |
| count += 1 | |
| return count | |
| def _contains_dictionary_word(self, text: str) -> int: | |
| """Check if text contains any common English word.""" | |
| if not text or len(text) < 4: | |
| return 0 | |
| text_lower = text.lower() | |
| if text_lower in self.common_words: | |
| return 1 | |
| for word in self.common_words: | |
| if len(word) >= 4 and word in text_lower: | |
| return 1 | |
| return 0 | |
| def _pronounceability_score(self, text: str) -> float: | |
| """Score based on bigram frequencies in English.""" | |
| if not text or len(text) < 2: | |
| return 0.5 | |
| text_lower = text.lower() | |
| common_bigrams = { | |
| 'th', 'he', 'in', 'er', 'an', 're', 'on', 'at', 'en', 'nd', | |
| 'ti', 'es', 'or', 'te', 'of', 'ed', 'is', 'it', 'al', 'ar', | |
| 'st', 'to', 'nt', 'ng', 'se', 'ha', 'as', 'ou', 'io', 've' | |
| } | |
| bigrams = [text_lower[i:i+2] for i in range(len(text_lower)-1)] | |
| if not bigrams: | |
| return 0.5 | |
| common_count = sum(1 for bg in bigrams if bg in common_bigrams) | |
| score = common_count / len(bigrams) | |
| return score | |
| def _suspicious_digit_position(self, text: str) -> int: | |
| """Detect suspicious digit positions.""" | |
| if not text: | |
| return 0 | |
| if text and text[0].isdigit(): | |
| return 1 | |
| if len(text) >= 2 and text[-1].isdigit() and text[-2].isdigit(): | |
| return 1 | |
| return 0 | |
| # ============================================================ | |
| # BRAND SPOOFING HELPERS | |
| # ============================================================ | |
| def _brand_subdomain_spoofing(self, parsed) -> int: | |
| """Detect brand in subdomain but not main domain.""" | |
| try: | |
| parts = parsed.netloc.split('.') | |
| if len(parts) < 3: | |
| return 0 | |
| subdomains = '.'.join(parts[:-2]).lower() | |
| main_domain = '.'.join(parts[-2:]).lower() | |
| for brand in self.brand_names: | |
| if brand in subdomains and brand not in main_domain: | |
| return 1 | |
| return 0 | |
| except: | |
| return 0 | |
| def _brand_with_hyphen(self, domain: str) -> int: | |
| """Detect hyphenated brand names.""" | |
| if not domain: | |
| return 0 | |
| domain_lower = domain.lower() | |
| for brand in self.brand_names: | |
| if len(brand) >= 4: | |
| for i in range(1, len(brand)): | |
| hyphenated = brand[:i] + '-' + brand[i:] | |
| if hyphenated in domain_lower: | |
| return 1 | |
| return 0 | |
| def _suspicious_brand_tld(self, domain: str) -> int: | |
| """Detect brand name with suspicious TLD.""" | |
| if not domain: | |
| return 0 | |
| domain_lower = domain.lower() | |
| parts = domain_lower.split('.') | |
| if len(parts) < 2: | |
| return 0 | |
| tld = parts[-1] | |
| domain_without_tld = '.'.join(parts[:-1]) | |
| if tld in self.suspicious_tlds: | |
| for brand in self.brand_names: | |
| if brand in domain_without_tld: | |
| return 1 | |
| return 0 | |
| def _brand_phishing_keyword_combo(self, url: str) -> int: | |
| """Detect brand + phishing keyword combination.""" | |
| if not url: | |
| return 0 | |
| url_lower = url.lower() | |
| has_brand = any(brand in url_lower for brand in self.brand_names) | |
| if has_brand: | |
| phishing_combo_keywords = [ | |
| 'verify', 'security', 'secure', 'account', 'update', | |
| 'login', 'confirm', 'suspended', 'locked' | |
| ] | |
| for keyword in phishing_combo_keywords: | |
| if keyword in url_lower: | |
| return 1 | |
| return 0 | |
| # ============================================================ | |
| # PATH & QUERY HELPERS | |
| # ============================================================ | |
| def _brand_in_path_only(self, path: str, domain: str) -> int: | |
| """Detect brand in path but not in domain.""" | |
| if not path or not domain: | |
| return 0 | |
| path_lower = path.lower() | |
| domain_lower = domain.lower() | |
| for brand in self.brand_names: | |
| if brand in path_lower and brand not in domain_lower: | |
| return 1 | |
| return 0 | |
| def _suspicious_extension_pattern(self, path: str) -> int: | |
| """Detect suspicious extension patterns.""" | |
| if not path: | |
| return 0 | |
| path_lower = path.lower() | |
| suspicious_patterns = [ | |
| '.php.exe', '.html.exe', '.pdf.exe', '.doc.exe', | |
| '.zip.exe', '.rar.exe', '.html.zip', '.pdf.scr' | |
| ] | |
| for pattern in suspicious_patterns: | |
| if pattern in path_lower: | |
| return 1 | |
| parts = path_lower.split('.') | |
| if len(parts) >= 3: | |
| ext1 = parts[-2] | |
| ext2 = parts[-1] | |
| doc_exts = ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'html', 'htm'] | |
| exec_exts = ['exe', 'scr', 'bat', 'cmd', 'com', 'pif'] | |
| if ext1 in doc_exts and ext2 in exec_exts: | |
| return 1 | |
| return 0 | |
| # ============================================================ | |
| # ENCODING HELPERS | |
| # ============================================================ | |
| def _detect_lookalike_chars(self, domain: str) -> int: | |
| """Detect lookalike characters.""" | |
| if not domain: | |
| return 0 | |
| domain_lower = domain.lower() | |
| suspicious_patterns = [ | |
| ('rn', 'm'), | |
| ('vv', 'w'), | |
| ('cl', 'd'), | |
| ] | |
| for pattern, _ in suspicious_patterns: | |
| if pattern in domain_lower: | |
| return 1 | |
| if any(c in domain_lower for c in ['0', '1']): | |
| has_letters = any(c.isalpha() for c in domain_lower) | |
| if has_letters: | |
| for lookalike_char in self.lookalike_chars: | |
| if lookalike_char in domain_lower: | |
| return 1 | |
| return 0 | |
| def _mixed_script_detection(self, domain: str) -> int: | |
| """Detect mixing of scripts.""" | |
| if not domain: | |
| return 0 | |
| scripts = set() | |
| for char in domain: | |
| if char.isalpha(): | |
| try: | |
| script = unicodedata.name(char).split()[0] | |
| if script in ['LATIN', 'CYRILLIC', 'GREEK']: | |
| scripts.add(script) | |
| except: | |
| pass | |
| return len(scripts) if len(scripts) > 1 else 0 | |
| def _homograph_brand_check(self, domain: str) -> int: | |
| """Check for homograph attacks on brands.""" | |
| if not domain: | |
| return 0 | |
| domain_lower = domain.lower() | |
| top_brands = ['paypal', 'apple', 'amazon', 'google', 'microsoft', 'facebook'] | |
| for brand in top_brands: | |
| if len(domain_lower) < len(brand) - 2 or len(domain_lower) > len(brand) + 2: | |
| continue | |
| differences = 0 | |
| for i in range(min(len(domain_lower), len(brand))): | |
| if i < len(domain_lower) and i < len(brand): | |
| if domain_lower[i] != brand[i]: | |
| if (domain_lower[i] in '01' and brand[i] in 'ol') or \ | |
| (domain_lower[i] in 'ol' and brand[i] in '01'): | |
| differences += 1 | |
| else: | |
| differences += 1 | |
| if differences <= 2 and differences > 0: | |
| return 1 | |
| return 0 | |
| def _idn_homograph_score(self, url: str) -> float: | |
| """Combined IDN homograph attack score.""" | |
| score = 0.0 | |
| count = 0 | |
| if 'xn--' in url.lower(): | |
| score += 0.5 | |
| count += 1 | |
| non_ascii = sum(1 for c in url if ord(c) > 127) | |
| if non_ascii > 0: | |
| score += min(non_ascii / 10, 0.3) | |
| count += 1 | |
| return score / max(count, 1) if count > 0 else 0.0 | |
| def _detect_double_encoding(self, url: str) -> int: | |
| """Detect double URL encoding.""" | |
| if not url: | |
| return 0 | |
| double_encoded_patterns = ['%25', '%2520', '%252e', '%252f'] | |
| count = sum(url.lower().count(pattern) for pattern in double_encoded_patterns) | |
| return count | |
| def _suspicious_unicode_chars(self, url: str) -> int: | |
| """Detect uncommon Unicode categories.""" | |
| if not url: | |
| return 0 | |
| suspicious_count = 0 | |
| for char in url: | |
| try: | |
| category = unicodedata.category(char) | |
| if category in ['Mn', 'Mc', 'Me', 'Zl', 'Zp', | |
| 'Cc', 'Cf', 'Sm', 'Sc', 'Sk', 'So']: | |
| suspicious_count += 1 | |
| except: | |
| pass | |
| return suspicious_count | |
| # ============================================================ | |
| # FEATURE REFINEMENT HELPERS | |
| # ============================================================ | |
| def _categorize_length(self, length: int, thresholds: list) -> int: | |
| """Multi-category encoding for length features.""" | |
| for i, threshold in enumerate(thresholds): | |
| if length <= threshold: | |
| return i | |
| return len(thresholds) | |
| def _categorize_extension(self, extension: str) -> int: | |
| """ | |
| Categorize file extension: | |
| 0 = none | |
| 1 = document | |
| 2 = web/script | |
| 3 = executable | |
| 4 = archive | |
| 5 = image | |
| 6 = other | |
| """ | |
| if not extension: | |
| return 0 | |
| ext_lower = extension.lower() | |
| if ext_lower in ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt', 'rtf']: | |
| return 1 | |
| if ext_lower in ['html', 'htm', 'php', 'asp', 'aspx', 'jsp', 'js', 'css']: | |
| return 2 | |
| if ext_lower in ['exe', 'bat', 'cmd', 'scr', 'msi', 'com', 'pif', 'app', 'apk']: | |
| return 3 | |
| if ext_lower in ['zip', 'rar', '7z', 'tar', 'gz', 'bz2']: | |
| return 4 | |
| if ext_lower in ['jpg', 'jpeg', 'png', 'gif', 'svg', 'ico', 'webp']: | |
| return 5 | |
| return 6 | |
| def _character_diversity(self, text: str) -> float: | |
| """Shannon diversity index for characters.""" | |
| if not text: | |
| return 0.0 | |
| unique_chars = len(set(text)) | |
| return min(unique_chars / max(len(text), 1), 1.0) | |
| def _calculate_url_complexity(self, url: str) -> float: | |
| """Combined URL complexity score.""" | |
| if not url: | |
| return 0.0 | |
| special_chars = sum(1 for c in url if not c.isalnum() and c not in [':', '/', '.']) | |
| special_ratio = special_chars / max(len(url), 1) | |
| length_score = min(len(url) / 200, 1.0) | |
| encoding_score = min(url.count('%') / 10, 1.0) | |
| complexity = (special_ratio * 0.4 + length_score * 0.3 + encoding_score * 0.3) | |
| return min(complexity, 1.0) | |
| # ============================================================ | |
| # UTILITY METHODS | |
| # ============================================================ | |
| def _get_default_features(self) -> dict: | |
| """Default feature values for error cases.""" | |
| # Get feature names dynamically | |
| dummy_url = "http://example.com" | |
| try: | |
| return self.extract_features(dummy_url) | |
| except: | |
| return {} | |
| def get_feature_names(self) -> list: | |
| """ | |
| Get list of all feature names DYNAMICALLY. | |
| FIXED: No longer hardcoded! | |
| """ | |
| dummy_url = "http://example.com/test" | |
| dummy_features = self.extract_features(dummy_url) | |
| # Remove 'label' if present | |
| feature_names = [k for k in dummy_features.keys() if k != 'label'] | |
| return sorted(feature_names) | |
| def extract_batch(self, urls: list, show_progress: bool = True) -> pd.DataFrame: | |
| """ | |
| Extract features from multiple URLs. | |
| Args: | |
| urls: List of URL strings | |
| show_progress: Show progress messages | |
| Returns: | |
| DataFrame with features | |
| """ | |
| if show_progress: | |
| logger.info(f"Extracting URL features from {len(urls):,} URLs...") | |
| features_list = [] | |
| progress_interval = 50000 | |
| for i, url in enumerate(urls): | |
| if show_progress and i > 0 and i % progress_interval == 0: | |
| logger.info(f" Processed {i:,} / {len(urls):,} ({100 * i / len(urls):.1f}%)") | |
| features = self.extract_features(url) | |
| features_list.append(features) | |
| df = pd.DataFrame(features_list) | |
| if show_progress: | |
| logger.info(f"✓ Extracted {len(df.columns)} features from {len(df):,} URLs") | |
| return df | |
| def main(): | |
| """Extract URL-only features from dataset.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description='URL-Only Feature Extraction v2.1 (IMPROVED)') | |
| parser.add_argument('--sample', type=int, default=None, help='Sample N URLs') | |
| parser.add_argument('--output', type=str, default=None, help='Output filename') | |
| args = parser.parse_args() | |
| logger.info("=" * 70) | |
| logger.info("URL-Only Feature Extraction v2") | |
| logger.info("=" * 70) | |
| logger.info("") | |
| logger.info("NEW Features:") | |
| logger.info(" - Fixed free platform detection (exact/suffix match)") | |
| logger.info(" - Added platform_subdomain_length") | |
| logger.info(" - Added has_uuid_subdomain") | |
| logger.info(" - Added longest_part thresholds (gt_20, gt_30, gt_40)") | |
| logger.info(" - Expanded brand list with regional brands") | |
| logger.info(" - Improved extension categorization") | |
| logger.info("") | |
| # Load dataset | |
| script_dir = Path(__file__).parent | |
| data_file = (script_dir / '../../data/processed/clean_dataset.csv').resolve() | |
| logger.info(f"Loading: {data_file.name}") | |
| df = pd.read_csv(data_file) | |
| logger.info(f"Loaded: {len(df):,} URLs") | |
| if args.sample and args.sample < len(df): | |
| df = df.sample(n=args.sample, random_state=42) | |
| logger.info(f"Sampled: {len(df):,} URLs") | |
| # Extract features | |
| extractor = URLFeatureExtractorV2() | |
| features_df = extractor.extract_batch(df['url'].tolist()) | |
| features_df['label'] = df['label'].values | |
| # Save | |
| output_dir = (script_dir / '../../data/features').resolve() | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| if args.output: | |
| output_file = output_dir / args.output | |
| else: | |
| suffix = f'_sample{args.sample}' if args.sample else '' | |
| output_file = output_dir / f'url_features_v2{suffix}.csv' | |
| features_df.to_csv(output_file, index=False) | |
| logger.info("") | |
| logger.info("=" * 70) | |
| logger.info(f"✓ Saved: {output_file}") | |
| logger.info(f" Shape: {features_df.shape}") | |
| logger.info(f" Features: {len(features_df.columns) - 1}") | |
| logger.info("=" * 70) | |
| # Show feature names | |
| print("\nAll Features:") | |
| feature_names = extractor.get_feature_names() | |
| for i, name in enumerate(feature_names, 1): | |
| print(f"{i:3d}. {name}") | |
| # Show stats | |
| print("\n\nFeature Statistics (first 30):") | |
| print(features_df.describe().T.head(30)) | |
| # Show new features stats | |
| print("\n\nNEW FEATURES Statistics:") | |
| new_features = [ | |
| 'is_free_platform', 'platform_subdomain_length', 'has_uuid_subdomain', | |
| 'longest_part_gt_20', 'longest_part_gt_30', 'longest_part_gt_40' | |
| ] | |
| for feat in new_features: | |
| if feat in features_df.columns: | |
| if feat == 'platform_subdomain_length': | |
| print(f"\n{feat}:") | |
| print(f" Mean: {features_df[feat].mean():.2f}") | |
| print(f" Max: {features_df[feat].max()}") | |
| print(f" Non-zero: {(features_df[feat] > 0).sum()} ({(features_df[feat] > 0).sum() / len(features_df) * 100:.1f}%)") | |
| else: | |
| print(f"\n{feat}: {features_df[feat].sum()} / {len(features_df)} ({features_df[feat].mean() * 100:.1f}%)") | |
| if __name__ == "__main__": | |
| main() |