Spaces:
Runtime error
Runtime error
| """ | |
| URL Feature Extraction v1 - URL-Only Features for Stage 1 Model | |
| This extractor focuses ONLY on URL structure and lexical features. | |
| NO HTTP requests, NO external services, NO HTML parsing. | |
| Features: | |
| - Lexical (length, characters, entropy) | |
| - Structural (domain parts, path segments, TLD) | |
| - Statistical (entropy, n-grams, patterns) | |
| - Security indicators (from URL only) | |
| - Brand/phishing patterns | |
| Designed for: | |
| - Fast inference (< 1ms per URL) | |
| - No network dependencies | |
| - Production deployment | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from urllib.parse import urlparse, parse_qs, unquote | |
| import re | |
| import math | |
| import socket | |
| from pathlib import Path | |
| from collections import Counter | |
| import sys | |
| import logging | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger("url_features_v2") | |
| class URLFeatureExtractorV2: | |
| """ | |
| Fast URL-only feature extractor for Stage 1 phishing detection. | |
| No HTTP requests, no external API calls - pure URL analysis. | |
| """ | |
| def __init__(self): | |
| """Initialize feature extractor with keyword lists.""" | |
| # Phishing-related keywords | |
| self.phishing_keywords = [ | |
| 'login', 'signin', 'sign-in', 'log-in', 'logon', 'signon', | |
| 'account', 'accounts', 'update', 'verify', 'verification', | |
| 'secure', 'security', 'banking', 'bank', 'confirm', 'password', | |
| 'passwd', 'credential', 'suspended', 'locked', 'unusual', | |
| 'authenticate', 'auth', 'wallet', 'invoice', 'payment', | |
| 'billing', 'expire', 'expired', 'limited', 'restrict', | |
| 'urgent', 'immediately', 'alert', 'warning', 'resolve', | |
| 'recover', 'restore', 'reactivate', 'unlock', 'validate' | |
| ] | |
| # Brand names commonly targeted | |
| self.brand_names = [ | |
| 'paypal', 'ebay', 'amazon', 'apple', 'microsoft', 'google', | |
| 'facebook', 'instagram', 'twitter', 'netflix', 'linkedin', | |
| 'dropbox', 'chase', 'wellsfargo', 'bankofamerica', 'citibank', | |
| 'americanexpress', 'amex', 'visa', 'mastercard', 'outlook', | |
| 'office365', 'office', 'yahoo', 'aol', 'icloud', 'adobe', | |
| 'spotify', 'steam', 'dhl', 'fedex', 'ups', 'usps', | |
| 'coinbase', 'binance', 'blockchain', 'metamask', 'whatsapp', | |
| 'telegram', 'discord', 'zoom', 'docusign', 'wetransfer', | |
| 'hsbc', 'barclays', 'santander', 'ing', 'revolut' | |
| ] | |
| # URL shorteners | |
| self.shorteners = [ | |
| 'bit.ly', 'bitly.com', 'goo.gl', 'tinyurl.com', 't.co', 'ow.ly', | |
| 'is.gd', 'buff.ly', 'adf.ly', 'bit.do', 'short.to', 'tiny.cc', | |
| 'j.mp', 'surl.li', 'rb.gy', 'cutt.ly', 'qrco.de', 'v.gd', | |
| 'shorturl.at', 'rebrand.ly', 'clck.ru', 's.id', 'shrtco.de' | |
| ] | |
| # Suspicious TLDs | |
| self.suspicious_tlds = { | |
| 'tk', 'ml', 'ga', 'cf', 'gq', # Free domains | |
| 'xyz', 'top', 'club', 'work', 'date', 'racing', 'win', | |
| 'loan', 'download', 'stream', 'click', 'link', 'bid', | |
| 'review', 'party', 'trade', 'webcam', 'science', | |
| 'accountant', 'faith', 'cricket', 'zip', 'mov' | |
| } | |
| # Trusted TLDs | |
| self.trusted_tlds = { | |
| 'com', 'org', 'net', 'edu', 'gov', 'mil', | |
| 'uk', 'us', 'ca', 'de', 'fr', 'jp', 'au', | |
| 'nl', 'be', 'ch', 'it', 'es', 'se', 'no' | |
| } | |
| # Free hosting services | |
| self.free_hosting = [ | |
| 'weebly.com', 'wix.com', 'wordpress.com', 'blogspot.com', | |
| 'tumblr.com', 'jimdo.com', 'github.io', 'gitlab.io', | |
| 'netlify.app', 'vercel.app', 'herokuapp.com', 'firebaseapp.com', | |
| 'web.app', 'pages.dev', 'godaddysites.com', 'square.site', | |
| '000webhostapp.com', 'sites.google.com', 'carrd.co' | |
| ] | |
| def extract_features(self, url: str) -> dict: | |
| """ | |
| Extract all URL-only features from a single URL. | |
| Args: | |
| url: URL string | |
| Returns: | |
| Dictionary of features | |
| """ | |
| try: | |
| # Ensure URL has scheme | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'http://' + url | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower() | |
| domain_no_port = domain.split(':')[0] | |
| path = parsed.path | |
| query = parsed.query | |
| features = {} | |
| # 1. Length features | |
| features.update(self._length_features(url, domain_no_port, path, query)) | |
| # 2. Character count features | |
| features.update(self._char_count_features(url, domain_no_port, path)) | |
| # 3. Ratio features | |
| features.update(self._ratio_features(url, domain_no_port)) | |
| # 4. Domain structure features | |
| features.update(self._domain_features(domain_no_port, parsed)) | |
| # 5. Path features | |
| features.update(self._path_features(path)) | |
| # 6. Query features | |
| features.update(self._query_features(query)) | |
| # 7. Statistical features (entropy, patterns) | |
| features.update(self._statistical_features(url, domain_no_port, path)) | |
| # 8. Security indicator features | |
| features.update(self._security_features(url, parsed, domain_no_port)) | |
| # 9. Keyword/brand features | |
| features.update(self._keyword_features(url, domain_no_port, path)) | |
| # 10. Encoding features | |
| features.update(self._encoding_features(url, domain_no_port)) | |
| return features | |
| except Exception as e: | |
| logger.error(f"Error extracting features from URL: {url[:50]}... Error: {e}") | |
| return self._get_default_features() | |
| def _length_features(self, url: str, domain: str, path: str, query: str) -> dict: | |
| """Length-based features.""" | |
| return { | |
| 'url_length': len(url), | |
| 'domain_length': len(domain), | |
| 'path_length': len(path), | |
| 'query_length': len(query), | |
| # Binary indicators | |
| 'url_length_gt_75': 1 if len(url) > 75 else 0, | |
| 'url_length_gt_100': 1 if len(url) > 100 else 0, | |
| 'url_length_gt_150': 1 if len(url) > 150 else 0, | |
| 'domain_length_gt_25': 1 if len(domain) > 25 else 0, | |
| } | |
| def _char_count_features(self, url: str, domain: str, path: str) -> dict: | |
| """Character count features.""" | |
| return { | |
| # URL character counts | |
| 'num_dots': url.count('.'), | |
| 'num_hyphens': url.count('-'), | |
| 'num_underscores': url.count('_'), | |
| 'num_slashes': url.count('/'), | |
| 'num_question_marks': url.count('?'), | |
| 'num_ampersands': url.count('&'), | |
| 'num_equals': url.count('='), | |
| 'num_at': url.count('@'), | |
| 'num_percent': url.count('%'), | |
| 'num_digits_url': sum(c.isdigit() for c in url), | |
| 'num_letters_url': sum(c.isalpha() for c in url), | |
| # Domain character counts | |
| 'domain_dots': domain.count('.'), | |
| 'domain_hyphens': domain.count('-'), | |
| 'domain_digits': sum(c.isdigit() for c in domain), | |
| # Path character counts | |
| 'path_slashes': path.count('/'), | |
| 'path_dots': path.count('.'), | |
| 'path_digits': sum(c.isdigit() for c in path), | |
| } | |
| def _ratio_features(self, url: str, domain: str) -> dict: | |
| """Ratio-based features.""" | |
| url_len = max(len(url), 1) | |
| domain_len = max(len(domain), 1) | |
| return { | |
| 'digit_ratio_url': sum(c.isdigit() for c in url) / url_len, | |
| 'letter_ratio_url': sum(c.isalpha() for c in url) / url_len, | |
| 'special_char_ratio': sum(not c.isalnum() for c in url) / url_len, | |
| 'digit_ratio_domain': sum(c.isdigit() for c in domain) / domain_len, | |
| 'symbol_ratio_domain': sum(c in '-_.' for c in domain) / domain_len, | |
| } | |
| def _domain_features(self, domain: str, parsed) -> dict: | |
| """Domain structure features.""" | |
| parts = domain.split('.') | |
| tld = parts[-1] if parts else '' | |
| # Get SLD (second level domain) | |
| sld = parts[-2] if len(parts) > 1 else '' | |
| # Count subdomains (parts minus domain and TLD) | |
| num_subdomains = max(0, len(parts) - 2) | |
| return { | |
| 'num_subdomains': num_subdomains, | |
| 'num_domain_parts': len(parts), | |
| 'tld_length': len(tld), | |
| 'sld_length': len(sld), | |
| 'longest_domain_part': max((len(p) for p in parts), default=0), | |
| 'avg_domain_part_len': sum(len(p) for p in parts) / max(len(parts), 1), | |
| # TLD indicators | |
| 'has_suspicious_tld': 1 if tld in self.suspicious_tlds else 0, | |
| 'has_trusted_tld': 1 if tld in self.trusted_tlds else 0, | |
| # Port | |
| 'has_port': 1 if parsed.port else 0, | |
| 'has_non_std_port': 1 if parsed.port and parsed.port not in [80, 443] else 0, | |
| } | |
| def _path_features(self, path: str) -> dict: | |
| """Path structure features.""" | |
| segments = [s for s in path.split('/') if s] | |
| # Get file extension if present | |
| extension = '' | |
| if '.' in path: | |
| potential_ext = path.rsplit('.', 1)[-1].split('?')[0].lower() | |
| if len(potential_ext) <= 10: | |
| extension = potential_ext | |
| return { | |
| 'path_depth': len(segments), | |
| 'max_path_segment_len': max((len(s) for s in segments), default=0), | |
| 'avg_path_segment_len': sum(len(s) for s in segments) / max(len(segments), 1), | |
| # Extension features | |
| 'has_extension': 1 if extension else 0, | |
| 'has_php': 1 if extension == 'php' else 0, | |
| 'has_html': 1 if extension in ['html', 'htm'] else 0, | |
| 'has_exe': 1 if extension in ['exe', 'bat', 'cmd', 'msi'] else 0, | |
| # Suspicious path patterns | |
| 'has_double_slash': 1 if '//' in path else 0, | |
| } | |
| def _query_features(self, query: str) -> dict: | |
| """Query string features.""" | |
| params = parse_qs(query) | |
| return { | |
| 'num_params': len(params), | |
| 'has_query': 1 if query else 0, | |
| 'query_value_length': sum(len(''.join(v)) for v in params.values()), | |
| 'max_param_len': max((len(k) + len(''.join(v)) for k, v in params.items()), default=0), | |
| } | |
| def _statistical_features(self, url: str, domain: str, path: str) -> dict: | |
| """Statistical and entropy features.""" | |
| return { | |
| # Entropy | |
| 'url_entropy': self._entropy(url), | |
| 'domain_entropy': self._entropy(domain), | |
| 'path_entropy': self._entropy(path) if path else 0, | |
| # Consecutive character patterns | |
| 'max_consecutive_digits': self._max_consecutive(url, str.isdigit), | |
| 'max_consecutive_chars': self._max_consecutive(url, str.isalpha), | |
| 'max_consecutive_consonants': self._max_consecutive_consonants(domain), | |
| # Character variance | |
| 'char_repeat_rate': self._repeat_rate(url), | |
| # N-gram uniqueness | |
| 'unique_bigram_ratio': self._unique_ngram_ratio(url, 2), | |
| 'unique_trigram_ratio': self._unique_ngram_ratio(url, 3), | |
| # Vowel/consonant ratio in domain | |
| 'vowel_ratio_domain': self._vowel_ratio(domain), | |
| } | |
| def _security_features(self, url: str, parsed, domain: str) -> dict: | |
| """Security indicator features (URL-based only).""" | |
| return { | |
| # Protocol | |
| 'is_https': 1 if parsed.scheme == 'https' else 0, | |
| 'is_http': 1 if parsed.scheme == 'http' else 0, | |
| # IP address | |
| 'has_ip_address': 1 if self._is_ip(domain) else 0, | |
| # Suspicious patterns | |
| 'has_at_symbol': 1 if '@' in url else 0, | |
| 'has_redirect': 1 if 'redirect' in url.lower() or 'url=' in url.lower() else 0, | |
| # URL shortener | |
| 'is_shortened': 1 if any(s in domain for s in self.shorteners) else 0, | |
| # Free hosting | |
| 'is_free_hosting': 1 if any(h in domain for h in self.free_hosting) else 0, | |
| # www presence | |
| 'has_www': 1 if domain.startswith('www.') else 0, | |
| 'www_in_middle': 1 if 'www' in domain and not domain.startswith('www') else 0, | |
| } | |
| def _keyword_features(self, url: str, domain: str, path: str) -> dict: | |
| """Keyword and brand detection features.""" | |
| url_lower = url.lower() | |
| domain_lower = domain.lower() | |
| path_lower = path.lower() | |
| # Count phishing keywords | |
| phishing_in_url = sum(1 for k in self.phishing_keywords if k in url_lower) | |
| phishing_in_domain = sum(1 for k in self.phishing_keywords if k in domain_lower) | |
| phishing_in_path = sum(1 for k in self.phishing_keywords if k in path_lower) | |
| # Count brand names | |
| brands_in_url = sum(1 for b in self.brand_names if b in url_lower) | |
| brands_in_domain = sum(1 for b in self.brand_names if b in domain_lower) | |
| brands_in_path = sum(1 for b in self.brand_names if b in path_lower) | |
| # Brand impersonation: brand in path but not in domain | |
| brand_impersonation = 1 if brands_in_path > 0 and brands_in_domain == 0 else 0 | |
| return { | |
| 'num_phishing_keywords': phishing_in_url, | |
| 'phishing_in_domain': phishing_in_domain, | |
| 'phishing_in_path': phishing_in_path, | |
| 'num_brands': brands_in_url, | |
| 'brand_in_domain': 1 if brands_in_domain > 0 else 0, | |
| 'brand_in_path': 1 if brands_in_path > 0 else 0, | |
| 'brand_impersonation': brand_impersonation, | |
| # Specific high-value keywords | |
| 'has_login': 1 if 'login' in url_lower or 'signin' in url_lower else 0, | |
| 'has_account': 1 if 'account' in url_lower else 0, | |
| 'has_verify': 1 if 'verify' in url_lower or 'confirm' in url_lower else 0, | |
| 'has_secure': 1 if 'secure' in url_lower or 'security' in url_lower else 0, | |
| 'has_update': 1 if 'update' in url_lower else 0, | |
| 'has_bank': 1 if 'bank' in url_lower else 0, | |
| 'has_password': 1 if 'password' in url_lower or 'passwd' in url_lower else 0, | |
| 'has_suspend': 1 if 'suspend' in url_lower or 'locked' in url_lower else 0, | |
| # Suspicious patterns | |
| 'has_webscr': 1 if 'webscr' in url_lower else 0, | |
| 'has_cmd': 1 if 'cmd=' in url_lower else 0, | |
| 'has_cgi': 1 if 'cgi-bin' in url_lower or 'cgi_bin' in url_lower else 0, | |
| } | |
| def _encoding_features(self, url: str, domain: str) -> dict: | |
| """Encoding-related features.""" | |
| # Check for punycode | |
| has_punycode = 'xn--' in domain | |
| # Decode and check difference | |
| try: | |
| decoded = unquote(url) | |
| encoding_diff = len(decoded) - len(url) | |
| except: | |
| encoding_diff = 0 | |
| # Safe regex checks (wrap in try-except for malformed URLs) | |
| try: | |
| has_hex = 1 if re.search(r'[0-9a-f]{20,}', url.lower()) else 0 | |
| except: | |
| has_hex = 0 | |
| try: | |
| has_base64 = 1 if re.search(r'[A-Za-z0-9+/]{30,}={0,2}', url) else 0 | |
| except: | |
| has_base64 = 0 | |
| try: | |
| has_unicode = 1 if any(ord(c) > 127 for c in url) else 0 | |
| except: | |
| has_unicode = 0 | |
| return { | |
| 'has_url_encoding': 1 if '%' in url else 0, | |
| 'encoding_count': url.count('%'), | |
| 'encoding_diff': abs(encoding_diff), | |
| 'has_punycode': 1 if has_punycode else 0, | |
| 'has_unicode': has_unicode, | |
| 'has_hex_string': has_hex, | |
| 'has_base64': has_base64, | |
| } | |
| # Helper methods | |
| def _entropy(self, text: str) -> float: | |
| """Calculate Shannon entropy.""" | |
| if not text: | |
| return 0.0 | |
| freq = Counter(text) | |
| length = len(text) | |
| return -sum((c / length) * math.log2(c / length) for c in freq.values()) | |
| def _max_consecutive(self, text: str, condition) -> int: | |
| """Max consecutive characters matching condition.""" | |
| max_count = count = 0 | |
| for char in text: | |
| if condition(char): | |
| count += 1 | |
| max_count = max(max_count, count) | |
| else: | |
| count = 0 | |
| return max_count | |
| def _max_consecutive_consonants(self, text: str) -> int: | |
| """Max consecutive consonants.""" | |
| consonants = set('bcdfghjklmnpqrstvwxyz') | |
| max_count = count = 0 | |
| for char in text.lower(): | |
| if char in consonants: | |
| count += 1 | |
| max_count = max(max_count, count) | |
| else: | |
| count = 0 | |
| return max_count | |
| def _repeat_rate(self, text: str) -> float: | |
| """Rate of repeated adjacent characters.""" | |
| if len(text) < 2: | |
| return 0.0 | |
| repeats = sum(1 for i in range(len(text) - 1) if text[i] == text[i + 1]) | |
| return repeats / (len(text) - 1) | |
| def _unique_ngram_ratio(self, text: str, n: int) -> float: | |
| """Ratio of unique n-grams to total n-grams.""" | |
| if len(text) < n: | |
| return 0.0 | |
| ngrams = [text[i:i + n] for i in range(len(text) - n + 1)] | |
| return len(set(ngrams)) / len(ngrams) | |
| def _vowel_ratio(self, text: str) -> float: | |
| """Ratio of vowels in text.""" | |
| if not text: | |
| return 0.0 | |
| vowels = sum(1 for c in text.lower() if c in 'aeiou') | |
| letters = sum(1 for c in text if c.isalpha()) | |
| return vowels / max(letters, 1) | |
| def _is_ip(self, domain: str) -> bool: | |
| """Check if domain is IP address.""" | |
| # IPv4 | |
| if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain): | |
| return True | |
| # IPv6 | |
| try: | |
| socket.inet_pton(socket.AF_INET6, domain.strip('[]')) | |
| return True | |
| except: | |
| return False | |
| def _get_default_features(self) -> dict: | |
| """Default feature values for error cases.""" | |
| return {name: 0 for name in self.get_feature_names()} | |
| def get_feature_names(self) -> list: | |
| """Get list of all feature names.""" | |
| # Extract from a dummy URL to get all feature names | |
| dummy_features = { | |
| # Length features | |
| 'url_length': 0, 'domain_length': 0, 'path_length': 0, 'query_length': 0, | |
| 'url_length_gt_75': 0, 'url_length_gt_100': 0, 'url_length_gt_150': 0, | |
| 'domain_length_gt_25': 0, | |
| # Char counts | |
| 'num_dots': 0, 'num_hyphens': 0, 'num_underscores': 0, 'num_slashes': 0, | |
| 'num_question_marks': 0, 'num_ampersands': 0, 'num_equals': 0, 'num_at': 0, | |
| 'num_percent': 0, 'num_digits_url': 0, 'num_letters_url': 0, | |
| 'domain_dots': 0, 'domain_hyphens': 0, 'domain_digits': 0, | |
| 'path_slashes': 0, 'path_dots': 0, 'path_digits': 0, | |
| # Ratios | |
| 'digit_ratio_url': 0, 'letter_ratio_url': 0, 'special_char_ratio': 0, | |
| 'digit_ratio_domain': 0, 'symbol_ratio_domain': 0, | |
| # Domain features | |
| 'num_subdomains': 0, 'num_domain_parts': 0, 'tld_length': 0, 'sld_length': 0, | |
| 'longest_domain_part': 0, 'avg_domain_part_len': 0, | |
| 'has_suspicious_tld': 0, 'has_trusted_tld': 0, 'has_port': 0, 'has_non_std_port': 0, | |
| # Path features | |
| 'path_depth': 0, 'max_path_segment_len': 0, 'avg_path_segment_len': 0, | |
| 'has_extension': 0, 'has_php': 0, 'has_html': 0, 'has_exe': 0, 'has_double_slash': 0, | |
| # Query features | |
| 'num_params': 0, 'has_query': 0, 'query_value_length': 0, 'max_param_len': 0, | |
| # Statistical features | |
| 'url_entropy': 0, 'domain_entropy': 0, 'path_entropy': 0, | |
| 'max_consecutive_digits': 0, 'max_consecutive_chars': 0, 'max_consecutive_consonants': 0, | |
| 'char_repeat_rate': 0, 'unique_bigram_ratio': 0, 'unique_trigram_ratio': 0, | |
| 'vowel_ratio_domain': 0, | |
| # Security features | |
| 'is_https': 0, 'is_http': 0, 'has_ip_address': 0, 'has_at_symbol': 0, | |
| 'has_redirect': 0, 'is_shortened': 0, 'is_free_hosting': 0, 'has_www': 0, 'www_in_middle': 0, | |
| # Keyword features | |
| 'num_phishing_keywords': 0, 'phishing_in_domain': 0, 'phishing_in_path': 0, | |
| 'num_brands': 0, 'brand_in_domain': 0, 'brand_in_path': 0, 'brand_impersonation': 0, | |
| 'has_login': 0, 'has_account': 0, 'has_verify': 0, 'has_secure': 0, 'has_update': 0, | |
| 'has_bank': 0, 'has_password': 0, 'has_suspend': 0, | |
| 'has_webscr': 0, 'has_cmd': 0, 'has_cgi': 0, | |
| # Encoding features | |
| 'has_url_encoding': 0, 'encoding_count': 0, 'encoding_diff': 0, | |
| 'has_punycode': 0, 'has_unicode': 0, 'has_hex_string': 0, 'has_base64': 0, | |
| } | |
| return list(dummy_features.keys()) | |
| def extract_batch(self, urls: list, show_progress: bool = True) -> pd.DataFrame: | |
| """ | |
| Extract features from multiple URLs. | |
| Args: | |
| urls: List of URL strings | |
| show_progress: Show progress messages | |
| Returns: | |
| DataFrame with features | |
| """ | |
| if show_progress: | |
| logger.info(f"Extracting URL features from {len(urls):,} URLs...") | |
| features_list = [] | |
| progress_interval = 50000 | |
| for i, url in enumerate(urls): | |
| if show_progress and i > 0 and i % progress_interval == 0: | |
| logger.info(f" Processed {i:,} / {len(urls):,} ({100 * i / len(urls):.1f}%)") | |
| features = self.extract_features(url) | |
| features_list.append(features) | |
| df = pd.DataFrame(features_list) | |
| if show_progress: | |
| logger.info(f"✓ Extracted {len(df.columns)} features from {len(df):,} URLs") | |
| return df | |
| def main(): | |
| """Extract URL-only features from dataset.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description='URL-Only Feature Extraction (Stage 1)') | |
| parser.add_argument('--sample', type=int, default=None, help='Sample N URLs') | |
| parser.add_argument('--output', type=str, default=None, help='Output filename') | |
| args = parser.parse_args() | |
| logger.info("=" * 70) | |
| logger.info("URL-Only Feature Extraction v1") | |
| logger.info("=" * 70) | |
| logger.info("") | |
| logger.info("Features: URL structure, lexical, statistical") | |
| logger.info("NO HTTP requests, NO external APIs") | |
| logger.info("") | |
| # Load dataset | |
| script_dir = Path(__file__).parent | |
| data_file = (script_dir / '../../data/processed/clean_dataset.csv').resolve() | |
| logger.info(f"Loading: {data_file.name}") | |
| df = pd.read_csv(data_file) | |
| logger.info(f"Loaded: {len(df):,} URLs") | |
| if args.sample and args.sample < len(df): | |
| df = df.sample(n=args.sample, random_state=42) | |
| logger.info(f"Sampled: {len(df):,} URLs") | |
| # Extract features | |
| extractor = URLFeatureExtractorV2() | |
| features_df = extractor.extract_batch(df['url'].tolist()) | |
| features_df['label'] = df['label'].values | |
| # Save | |
| output_dir = (script_dir / '../../data/features').resolve() | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| if args.output: | |
| output_file = output_dir / args.output | |
| else: | |
| suffix = f'_sample{args.sample}' if args.sample else '' | |
| output_file = output_dir / f'url_features{suffix}.csv' | |
| features_df.to_csv(output_file, index=False) | |
| logger.info("") | |
| logger.info("=" * 70) | |
| logger.info(f"✓ Saved: {output_file}") | |
| logger.info(f" Shape: {features_df.shape}") | |
| logger.info(f" Features: {len(features_df.columns) - 1}") | |
| logger.info("=" * 70) | |
| # Show stats | |
| print("\nFeature Statistics (sample):") | |
| print(features_df.describe().T.head(20)) | |
| if __name__ == "__main__": | |
| main() | |