rb1337's picture
Upload 50 files
2cc7f91 verified
"""
URL Feature Extraction v2 - IMPROVED VERSION
Improvements:
- Fixed free hosting detection (exact/suffix match instead of substring)
- Added free platform detection (Google Sites, Weebly, Firebase, etc.)
- Added UUID subdomain detection (Replit, Firebase patterns)
- Added platform subdomain length feature
- Added longest_part thresholds (gt_20, gt_30, gt_40)
- Expanded brand list with regional brands
- Improved extension categorization (added archive, image categories)
- Fixed get_feature_names() to be dynamic
- Better URL shortener detection
Key Features:
- Lexical (length, characters, entropy)
- Structural (domain parts, path segments, TLD)
- Statistical (entropy, n-grams, patterns)
- Security indicators (from URL only)
- Brand/phishing patterns
- FREE PLATFORM ABUSE DETECTION (NEW!)
Designed for:
- Fast inference (< 1ms per URL)
- No network dependencies
- Production deployment
"""
import pandas as pd
import numpy as np
from urllib.parse import urlparse, parse_qs, unquote
import re
import math
import socket
import unicodedata
from pathlib import Path
from collections import Counter
import sys
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("url_features_v2")
class URLFeatureExtractorV2:
"""
Fast URL-only feature extractor for Stage 1 phishing detection.
IMPROVED VERSION with better free platform detection.
"""
def __init__(self):
"""Initialize feature extractor with keyword lists."""
# Phishing-related keywords
self.phishing_keywords = [
'login', 'signin', 'sign-in', 'log-in', 'logon', 'signon',
'account', 'accounts', 'update', 'verify', 'verification',
'secure', 'security', 'banking', 'bank', 'confirm', 'password',
'passwd', 'credential', 'suspended', 'locked', 'unusual',
'authenticate', 'auth', 'wallet', 'invoice', 'payment',
'billing', 'expire', 'expired', 'limited', 'restrict',
'urgent', 'immediately', 'alert', 'warning', 'resolve',
'recover', 'restore', 'reactivate', 'unlock', 'validate'
]
# Brand names - EXPANDED with regional brands
self.brand_names = [
# US Tech Giants
'paypal', 'ebay', 'amazon', 'apple', 'microsoft', 'google',
'facebook', 'instagram', 'twitter', 'netflix', 'linkedin',
'dropbox', 'adobe', 'spotify', 'steam', 'zoom', 'docusign',
'salesforce', 'shopify', 'square', 'venmo', 'cashapp', 'zelle',
# US Banks
'chase', 'wellsfargo', 'bankofamerica', 'citibank', 'citi',
'americanexpress', 'amex', 'visa', 'mastercard',
'capitalone', 'usbank', 'pnc', 'truist',
# Email/Communication
'outlook', 'office365', 'office', 'yahoo', 'aol', 'icloud',
'gmail', 'protonmail', 'whatsapp', 'telegram', 'discord',
'signal', 'skype', 'teams',
# Shipping/Logistics
'dhl', 'fedex', 'ups', 'usps', 'amazon', 'alibaba',
# Crypto/Finance
'coinbase', 'binance', 'blockchain', 'metamask', 'kraken',
'gemini', 'robinhood', 'etrade', 'fidelity', 'schwab',
'payoneer', 'stripe', 'wise', 'revolut',
# Social/Entertainment
'tiktok', 'snapchat', 'twitch', 'roblox', 'epic', 'epicgames',
'playstation', 'xbox', 'nintendo', 'blizzard', 'riot',
# REGIONAL BRANDS (from analysis)
# Europe
'allegro', 'allegrolokalnie', # Poland
'olx', # Europe/LatAm
'bol', 'marktplaats', # Netherlands
'leboncoin', # France
'idealo', 'otto', # Germany
'hsbc', 'barclays', 'santander', 'ing', 'revolut', # European banks
# Asia
'rakuten', # Japan
'lazada', 'shopee', # Southeast Asia
'baidu', 'taobao', 'alipay', 'wechat', 'weibo', # China
'paytm', 'phonepe', # India
# Latin America
'mercadolibre', 'mercadopago', # LatAm
# Russia
'yandex', 'vk', 'mailru',
# Other
'uber', 'lyft', 'airbnb', 'booking', 'expedia',
'wetransfer', 'mediafire', 'mega',
]
# URL shorteners - EXACT MATCH ONLY
self.shorteners = {
# Original
'bit.ly', 'bitly.com', 'goo.gl', 'tinyurl.com', 't.co', 'ow.ly',
'is.gd', 'buff.ly', 'adf.ly', 'bit.do', 'short.to', 'tiny.cc',
'j.mp', 'surl.li', 'rb.gy', 'cutt.ly', 'qrco.de', 'v.gd',
'shorturl.at', 'rebrand.ly', 'clck.ru', 's.id', 'shrtco.de',
# NEW from analysis (CRITICAL!)
'qrco.de', # 3,824 occurrences!
'q-r.to', # 2,974
'l.ead.me', # 2,907
'ead.me', # Base domain
'urlz.fr',
'hotm.art',
'reurl.cc',
'did.li',
'zpr.io',
'linkin.bio',
'linqapp.com',
'linktr.ee',
'flow.page',
'campsite.bio',
'qr-codes.io',
'scanned.page',
'l.wl.co',
'wl.co',
'hm.ru',
'flowcode.com',
}
# Suspicious TLDs
self.suspicious_tlds = {
'tk', 'ml', 'ga', 'cf', 'gq', # Free domains
'xyz', 'top', 'club', 'work', 'date', 'racing', 'win',
'loan', 'download', 'stream', 'click', 'link', 'bid',
'review', 'party', 'trade', 'webcam', 'science',
'accountant', 'faith', 'cricket', 'zip', 'mov',
'icu', 'buzz', 'space', 'online', 'site', 'website',
'tech', 'store', 'rest', 'cfd', 'monster', 'sbs'
}
# Trusted TLDs
self.trusted_tlds = {
'com', 'org', 'net', 'edu', 'gov', 'mil',
'uk', 'us', 'ca', 'de', 'fr', 'jp', 'au',
'nl', 'be', 'ch', 'it', 'es', 'se', 'no',
'pl', 'br', 'in', 'mx', 'kr', 'ru', 'cn'
}
# FREE PLATFORMS - EXACT/SUFFIX MATCH (from your PhishTank analysis!)
self.free_platforms = {
# Website Builders
'weebly.com', 'wixsite.com', 'wix.com', 'webflow.io',
'framer.website', 'carrd.co', 'notion.site', 'webwave.me',
'godaddysites.com', 'square.site', 'sites.google.com',
# Google Platforms (HIGH PHISHING RATE from analysis)
'firebaseapp.com', 'web.app', 'appspot.com',
'firebase.app', 'page.link',
# Developer Platforms (from analysis: Replit, Vercel, etc.)
'github.io', 'gitlab.io', 'pages.github.com',
'vercel.app', 'netlify.app', 'netlify.com',
'replit.dev', 'repl.co', 'replit.co',
'glitch.me', 'glitch.com',
'pages.dev', 'workers.dev', # Cloudflare
'herokuapp.com', 'heroku.com',
'onrender.com', 'railway.app', 'fly.dev',
'amplifyapp.com', # AWS Amplify
'surge.sh', 'now.sh',
# Blogging/CMS
'wordpress.com', 'blogspot.com', 'blogger.com',
'tumblr.com', 'medium.com', 'ghost.io',
'substack.com', 'beehiiv.com',
# Adobe/Creative
'adobesites.com', 'myportfolio.com', 'behance.net',
'adobe.com', 'framer.app',
# Forms/Surveys (from analysis: jotform, hsforms)
'jotform.com', 'typeform.com', 'forms.gle',
'hsforms.com', 'hubspot.com', 'surveymonkey.com',
'formstack.com', 'cognito.com',
# File Sharing
'dropboxusercontent.com', 'dl.dropboxusercontent.com',
'sharepoint.com', '1drv.ms', 'onedrive.live.com',
'box.com', 'wetransfer.com', 'we.tl',
# Free Hosting
'000webhostapp.com', 'freehosting.com', 'freehostia.com',
'5gbfree.com', 'x10hosting.com', 'awardspace.com',
'byet.host', 'infinityfree.com',
# Education/Sandbox
'repl.it', 'codepen.io', 'jsfiddle.net', 'codesandbox.io',
'stackblitz.com', 'observablehq.com',
# Other (from analysis)
'webcindario.com', 'gitbook.io', 'tinyurl.com',
'start.page', 'my.site', 'site123.com'
}
# Common English words for dictionary check
self.common_words = {
'about', 'account', 'after', 'again', 'all', 'also', 'america', 'american',
'another', 'answer', 'any', 'app', 'apple', 'area', 'back', 'bank', 'best',
'between', 'book', 'business', 'call', 'can', 'card', 'care', 'case', 'center',
'central', 'change', 'check', 'city', 'class', 'cloud', 'come', 'company',
'contact', 'control', 'country', 'course', 'credit', 'data', 'day', 'dept',
'department', 'different', 'digital', 'doctor', 'down', 'east', 'easy', 'end',
'energy', 'even', 'event', 'every', 'express', 'fact', 'family', 'feel',
'field', 'file', 'find', 'first', 'food', 'form', 'free', 'friend', 'from',
'game', 'general', 'get', 'give', 'global', 'good', 'government', 'great',
'group', 'hand', 'have', 'head', 'health', 'help', 'here', 'high', 'home',
'house', 'how', 'image', 'info', 'information', 'insurance', 'international',
'into', 'just', 'keep', 'kind', 'know', 'large', 'last', 'late', 'leave',
'left', 'legal', 'life', 'like', 'line', 'little', 'local', 'long', 'look',
'love', 'mail', 'main', 'make', 'management', 'manager', 'many', 'map', 'market',
'marketing', 'media', 'medical', 'member', 'message', 'money', 'month', 'more',
'most', 'move', 'music', 'name', 'national', 'need', 'network', 'never', 'new',
'news', 'next', 'north', 'not', 'note', 'number', 'office', 'official', 'old',
'online', 'only', 'open', 'order', 'other', 'over', 'page', 'part', 'party',
'people', 'person', 'personal', 'photo', 'place', 'plan', 'play', 'plus', 'point',
'policy', 'portal', 'post', 'power', 'press', 'price', 'private', 'product',
'program', 'project', 'property', 'public', 'quality', 'question', 'quick', 'rate',
'read', 'real', 'record', 'report', 'research', 'resource', 'result', 'right',
'room', 'sale', 'sales', 'save', 'school', 'search', 'second', 'section',
'security', 'see', 'senior', 'service', 'services', 'set', 'shop', 'show',
'side', 'sign', 'site', 'small', 'social', 'software', 'solution', 'solutions',
'some', 'south', 'space', 'special', 'staff', 'start', 'state', 'store', 'story',
'student', 'study', 'support', 'sure', 'system', 'systems', 'take', 'team', 'tech',
'technology', 'test', 'text', 'than', 'that', 'their', 'them', 'then', 'there',
'these', 'they', 'thing', 'think', 'this', 'those', 'through', 'time', 'today',
'together', 'total', 'trade', 'training', 'travel', 'trust', 'type', 'under',
'university', 'until', 'update', 'upon', 'user', 'value', 'very', 'video',
'view', 'want', 'water', 'website', 'week', 'well', 'west', 'what', 'when',
'where', 'which', 'while', 'white', 'will', 'with', 'within', 'without', 'woman',
'women', 'word', 'work', 'world', 'would', 'write', 'year', 'york', 'young', 'your'
}
# Keyboard patterns
self.keyboard_patterns = [
'qwerty', 'asdfgh', 'zxcvbn', '12345', '123456', '1234567', '12345678',
'qwertyuiop', 'asdfghjkl', 'zxcvbnm'
]
# Lookalike character mappings
self.lookalike_chars = {
'0': 'o', 'o': '0',
'1': 'l', 'l': '1', 'i': '1',
'rn': 'm', 'vv': 'w', 'cl': 'd'
}
self.microsoft_services = {
'forms.office.com',
'sharepoint.com',
'onedrive.live.com',
'1drv.ms',
}
self.zoom_services = {
'docs.zoom.us',
'zoom.us',
}
self.adobe_services = {
'express.adobe.com',
'new.express.adobe.com', # Multi-level!
'spark.adobe.com',
'portfolio.adobe.com',
}
self.google_services = {
'docs.google.com',
'sites.google.com',
'drive.google.com',
'script.google.com',
'storage.googleapis.com',
'storage.cloud.google.com',
'forms.google.com',
'calendar.google.com',
'meet.google.com',
}
def extract_features(self, url: str) -> dict:
"""
Extract all URL-only features from a single URL.
Args:
url: URL string
Returns:
Dictionary of features
"""
try:
# Ensure URL has scheme
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
parsed = urlparse(url)
domain = parsed.netloc.lower()
domain_no_port = domain.split(':')[0]
path = parsed.path
query = parsed.query
features = {}
# 1. Length features
features.update(self._length_features(url, domain_no_port, path, query))
# 2. Character count features
features.update(self._char_count_features(url, domain_no_port, path))
# 3. Ratio features
features.update(self._ratio_features(url, domain_no_port))
# 4. Domain structure features
features.update(self._domain_features(domain_no_port, parsed))
# 5. Path features
features.update(self._path_features(path, domain_no_port))
# 6. Query features
features.update(self._query_features(query))
# 7. Statistical features (entropy, patterns)
features.update(self._statistical_features(url, domain_no_port, path))
# 8. Security indicator features
features.update(self._security_features(url, parsed, domain_no_port))
# 9. Keyword/brand features
features.update(self._keyword_features(url, domain_no_port, path, parsed))
# 10. Encoding features
features.update(self._encoding_features(url, domain_no_port))
return features
except Exception as e:
logger.error(f"Error extracting features from URL: {url[:50]}... Error: {e}")
return self._get_default_features()
def _length_features(self, url: str, domain: str, path: str, query: str) -> dict:
"""Length-based features."""
return {
'url_length': len(url),
'domain_length': len(domain),
'path_length': len(path),
'query_length': len(query),
# Categorical length encoding
'url_length_category': self._categorize_length(len(url), [30, 75, 150]),
'domain_length_category': self._categorize_length(len(domain), [10, 20, 30]),
}
def _char_count_features(self, url: str, domain: str, path: str) -> dict:
"""Character count features."""
return {
# URL character counts
'num_dots': url.count('.'),
'num_hyphens': url.count('-'),
'num_underscores': url.count('_'),
'num_slashes': url.count('/'),
'num_question_marks': url.count('?'),
'num_ampersands': url.count('&'),
'num_equals': url.count('='),
'num_at': url.count('@'),
'num_percent': url.count('%'),
'num_digits_url': sum(c.isdigit() for c in url),
'num_letters_url': sum(c.isalpha() for c in url),
# Domain character counts
'domain_dots': domain.count('.'),
'domain_hyphens': domain.count('-'),
'domain_digits': sum(c.isdigit() for c in domain),
# Path character counts
'path_slashes': path.count('/'),
'path_dots': path.count('.'),
'path_digits': sum(c.isdigit() for c in path),
}
def _ratio_features(self, url: str, domain: str) -> dict:
"""Ratio-based features."""
url_len = max(len(url), 1)
domain_len = max(len(domain), 1)
return {
'digit_ratio_url': sum(c.isdigit() for c in url) / url_len,
'letter_ratio_url': sum(c.isalpha() for c in url) / url_len,
'special_char_ratio': sum(not c.isalnum() for c in url) / url_len,
'digit_ratio_domain': sum(c.isdigit() for c in domain) / domain_len,
'symbol_ratio_domain': sum(c in '-_.' for c in domain) / domain_len,
}
def _domain_features(self, domain: str, parsed) -> dict:
"""Domain structure features."""
parts = domain.split('.')
tld = parts[-1] if parts else ''
sld = parts[-2] if len(parts) > 1 else ''
num_subdomains = max(0, len(parts) - 2)
longest_part = max((len(p) for p in parts), default=0)
return {
'num_subdomains': num_subdomains,
'num_domain_parts': len(parts),
'tld_length': len(tld),
'sld_length': len(sld),
'longest_domain_part': longest_part,
'avg_domain_part_len': sum(len(p) for p in parts) / max(len(parts), 1),
# NEW: Longest part thresholds (from analysis!)
'longest_part_gt_20': 1 if longest_part > 20 else 0,
'longest_part_gt_30': 1 if longest_part > 30 else 0,
'longest_part_gt_40': 1 if longest_part > 40 else 0,
# TLD indicators
'has_suspicious_tld': 1 if tld in self.suspicious_tlds else 0,
'has_trusted_tld': 1 if tld in self.trusted_tlds else 0,
# Port
'has_port': 1 if parsed.port else 0,
'has_non_std_port': 1 if parsed.port and parsed.port not in [80, 443] else 0,
# Domain randomness features
'domain_randomness_score': self._calculate_domain_randomness(sld),
'sld_consonant_cluster_score': self._consonant_clustering_score(sld),
'sld_keyboard_pattern': self._keyboard_pattern_score(sld),
'sld_has_dictionary_word': self._contains_dictionary_word(sld),
'sld_pronounceability_score': self._pronounceability_score(sld),
'domain_digit_position_suspicious': self._suspicious_digit_position(sld),
}
def _path_features(self, path: str, domain: str) -> dict:
"""Path structure features."""
segments = [s for s in path.split('/') if s]
# Get file extension if present
extension = ''
if '.' in path:
potential_ext = path.rsplit('.', 1)[-1].split('?')[0].lower()
if len(potential_ext) <= 10:
extension = potential_ext
return {
'path_depth': len(segments),
'max_path_segment_len': max((len(s) for s in segments), default=0),
'avg_path_segment_len': sum(len(s) for s in segments) / max(len(segments), 1),
# Extension features
'has_extension': 1 if extension else 0,
'extension_category': self._categorize_extension(extension),
'has_suspicious_extension': 1 if extension in ['zip', 'exe', 'apk', 'scr', 'bat', 'cmd'] else 0,
'has_exe': 1 if extension in ['exe', 'bat', 'cmd', 'msi'] else 0,
# Suspicious path patterns
'has_double_slash': 1 if '//' in path else 0,
'path_has_brand_not_domain': self._brand_in_path_only(path, domain),
'path_has_ip_pattern': 1 if re.search(r'\d{1,3}[._-]\d{1,3}[._-]\d{1,3}', path) else 0,
'suspicious_path_extension_combo': self._suspicious_extension_pattern(path),
}
def _query_features(self, query: str) -> dict:
"""Query string features."""
params = parse_qs(query)
return {
'num_params': len(params),
'has_query': 1 if query else 0,
'query_value_length': sum(len(''.join(v)) for v in params.values()),
'max_param_len': max((len(k) + len(''.join(v)) for k, v in params.items()), default=0),
'query_has_url': 1 if re.search(r'https?%3A%2F%2F|http%3A//', query.lower()) else 0,
}
def _statistical_features(self, url: str, domain: str, path: str) -> dict:
"""Statistical and entropy features."""
parts = domain.split('.')
sld = parts[-2] if len(parts) > 1 else domain
return {
# Entropy
'url_entropy': self._entropy(url),
'domain_entropy': self._entropy(domain),
'path_entropy': self._entropy(path) if path else 0,
# Consecutive character patterns
'max_consecutive_digits': self._max_consecutive(url, str.isdigit),
'max_consecutive_chars': self._max_consecutive(url, str.isalpha),
'max_consecutive_consonants': self._max_consecutive_consonants(domain),
# Character variance
'char_repeat_rate': self._repeat_rate(url),
# N-gram uniqueness
'unique_bigram_ratio': self._unique_ngram_ratio(url, 2),
'unique_trigram_ratio': self._unique_ngram_ratio(url, 3),
# Improved statistical features
'sld_letter_diversity': self._character_diversity(sld),
'domain_has_numbers_letters': 1 if any(c.isdigit() for c in domain) and any(c.isalpha() for c in domain) else 0,
'url_complexity_score': self._calculate_url_complexity(url),
}
def _security_features(self, url: str, parsed, domain: str) -> dict:
"""Security indicator features (URL-based only)."""
parts = domain.split('.')
return {
# IP address
'has_ip_address': 1 if self._is_ip(domain) else 0,
# Suspicious patterns
'has_at_symbol': 1 if '@' in url else 0,
'has_redirect': 1 if 'redirect' in url.lower() or 'url=' in url.lower() else 0,
# URL shortener - FIXED: exact match only
'is_shortened': self._is_url_shortener(domain),
# Free hosting - DEPRECATED (use is_free_platform instead)
'is_free_hosting': self._is_free_platform(domain),
# NEW: Free platform detection (CRITICAL for your dataset!)
'is_free_platform': self._is_free_platform(domain),
'platform_subdomain_length': self._get_platform_subdomain_length(domain),
'has_uuid_subdomain': self._detect_uuid_pattern(domain),
}
def _keyword_features(self, url: str, domain: str, path: str, parsed) -> dict:
"""Keyword and brand detection features."""
url_lower = url.lower()
domain_lower = domain.lower()
path_lower = path.lower()
# Count phishing keywords
phishing_in_url = sum(1 for k in self.phishing_keywords if k in url_lower)
phishing_in_domain = sum(1 for k in self.phishing_keywords if k in domain_lower)
phishing_in_path = sum(1 for k in self.phishing_keywords if k in path_lower)
# Count brand names
brands_in_url = sum(1 for b in self.brand_names if b in url_lower)
brands_in_domain = sum(1 for b in self.brand_names if b in domain_lower)
brands_in_path = sum(1 for b in self.brand_names if b in path_lower)
# Brand impersonation
brand_impersonation = 1 if brands_in_path > 0 and brands_in_domain == 0 else 0
return {
'num_phishing_keywords': phishing_in_url,
'phishing_in_domain': phishing_in_domain,
'phishing_in_path': phishing_in_path,
'num_brands': brands_in_url,
'brand_in_domain': 1 if brands_in_domain > 0 else 0,
'brand_in_path': 1 if brands_in_path > 0 else 0,
'brand_impersonation': brand_impersonation,
# Specific high-value keywords
'has_login': 1 if 'login' in url_lower or 'signin' in url_lower else 0,
'has_account': 1 if 'account' in url_lower else 0,
'has_verify': 1 if 'verify' in url_lower or 'confirm' in url_lower else 0,
'has_secure': 1 if 'secure' in url_lower or 'security' in url_lower else 0,
'has_update': 1 if 'update' in url_lower else 0,
'has_bank': 1 if 'bank' in url_lower else 0,
'has_password': 1 if 'password' in url_lower or 'passwd' in url_lower else 0,
'has_suspend': 1 if 'suspend' in url_lower or 'locked' in url_lower else 0,
# Suspicious patterns
'has_webscr': 1 if 'webscr' in url_lower else 0,
'has_cmd': 1 if 'cmd=' in url_lower else 0,
'has_cgi': 1 if 'cgi-bin' in url_lower or 'cgi_bin' in url_lower else 0,
# Advanced brand spoofing features
'brand_in_subdomain_not_domain': self._brand_subdomain_spoofing(parsed),
'multiple_brands_in_url': 1 if brands_in_url >= 2 else 0,
'brand_with_hyphen': self._brand_with_hyphen(domain_lower),
'suspicious_brand_tld': self._suspicious_brand_tld(domain),
'brand_keyword_combo': self._brand_phishing_keyword_combo(url_lower),
}
def _encoding_features(self, url: str, domain: str) -> dict:
"""Encoding-related features."""
has_punycode = 'xn--' in domain
try:
decoded = unquote(url)
encoding_diff = len(decoded) - len(url)
except:
encoding_diff = 0
try:
has_hex = 1 if re.search(r'[0-9a-f]{20,}', url.lower()) else 0
except:
has_hex = 0
try:
has_base64 = 1 if re.search(r'[A-Za-z0-9+/]{30,}={0,2}', url) else 0
except:
has_base64 = 0
try:
has_unicode = 1 if any(ord(c) > 127 for c in url) else 0
except:
has_unicode = 0
return {
'has_url_encoding': 1 if '%' in url else 0,
'encoding_count': url.count('%'),
'encoding_diff': abs(encoding_diff),
'has_punycode': 1 if has_punycode else 0,
'has_unicode': has_unicode,
'has_hex_string': has_hex,
'has_base64': has_base64,
# Homograph & encoding detection
'has_lookalike_chars': self._detect_lookalike_chars(domain),
'mixed_script_score': self._mixed_script_detection(domain),
'homograph_brand_risk': self._homograph_brand_check(domain),
'suspected_idn_homograph': self._idn_homograph_score(url),
'double_encoding': self._detect_double_encoding(url),
'encoding_in_domain': 1 if '%' in domain else 0,
'suspicious_unicode_category': self._suspicious_unicode_chars(url),
}
# ============================================================
# HELPER METHODS
# ============================================================
def _entropy(self, text: str) -> float:
"""Calculate Shannon entropy."""
if not text:
return 0.0
freq = Counter(text)
length = len(text)
return -sum((c / length) * math.log2(c / length) for c in freq.values())
def _max_consecutive(self, text: str, condition) -> int:
"""Max consecutive characters matching condition."""
max_count = count = 0
for char in text:
if condition(char):
count += 1
max_count = max(max_count, count)
else:
count = 0
return max_count
def _max_consecutive_consonants(self, text: str) -> int:
"""Max consecutive consonants."""
consonants = set('bcdfghjklmnpqrstvwxyz')
max_count = count = 0
for char in text.lower():
if char in consonants:
count += 1
max_count = max(max_count, count)
else:
count = 0
return max_count
def _repeat_rate(self, text: str) -> float:
"""Rate of repeated adjacent characters."""
if len(text) < 2:
return 0.0
repeats = sum(1 for i in range(len(text) - 1) if text[i] == text[i + 1])
return repeats / (len(text) - 1)
def _unique_ngram_ratio(self, text: str, n: int) -> float:
"""Ratio of unique n-grams to total n-grams."""
if len(text) < n:
return 0.0
ngrams = [text[i:i + n] for i in range(len(text) - n + 1)]
return len(set(ngrams)) / len(ngrams)
def _is_ip(self, domain: str) -> bool:
"""Check if domain is IP address."""
# IPv4
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain):
return True
# IPv6
try:
socket.inet_pton(socket.AF_INET6, domain.strip('[]'))
return True
except:
return False
# ============================================================
# NEW/IMPROVED METHODS
# ============================================================
def _is_url_shortener(self, domain: str) -> int:
"""
URL shortener detection - EXACT match.
"""
domain_lower = domain.lower()
return 1 if domain_lower in self.shorteners else 0
def _is_free_platform(self, domain: str) -> int:
"""
Detect if hosted on free platform.
CRITICAL FIX: Exact or suffix match (not substring!).
Examples:
- 'mysite.weebly.com' → 1 (suffix match)
- 'weebly.com' → 1 (exact match)
- 'weebly-alternative.com' → 0 (NOT a match!)
"""
domain_lower = domain.lower()
# Exact match
if domain_lower in self.free_platforms:
return 1
if domain_lower in self.google_services:
return 1
if domain_lower in self.adobe_services:
return 1
if domain_lower in self.microsoft_services:
return 1
if domain_lower in self.zoom_services:
return 1
# Suffix match (subdomain.platform.com)
for platform in self.free_platforms:
if domain_lower.endswith('.' + platform):
return 1
return 0
def _get_platform_subdomain_length(self, domain: str) -> int:
"""
IMPROVED: Handle multi-level subdomains.
Examples:
- docs.google.com → subdomain = 'docs' (4 chars)
- new.express.adobe.com → subdomain = 'new.express' (11 chars)
- storage.cloud.google.com → subdomain = 'storage.cloud' (13 chars)
"""
domain_lower = domain.lower()
# Check Google
if '.google.com' in domain_lower:
subdomain = domain_lower.replace('.google.com', '')
return len(subdomain)
# Check Adobe
if '.adobe.com' in domain_lower:
subdomain = domain_lower.replace('.adobe.com', '')
return len(subdomain)
# Check Microsoft
if '.office.com' in domain_lower:
subdomain = domain_lower.replace('.office.com', '')
return len(subdomain)
# Check free platforms (existing logic)
for platform in self.free_platforms:
if domain_lower.endswith('.' + platform):
subdomain = domain_lower[:-len('.' + platform)]
return len(subdomain)
return 0
def _detect_uuid_pattern(self, domain: str) -> int:
"""
Detect UUID patterns in subdomain (Replit, Firebase, etc.).
Example:
'b82dba2b-fde4-4477-b6d5-8b17144e1bee.replit.dev' → 1
"""
# UUID pattern: 8-4-4-4-12 hex characters
uuid_pattern = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}'
return 1 if re.search(uuid_pattern, domain.lower()) else 0
# ============================================================
# DOMAIN RANDOMNESS HELPERS
# ============================================================
def _calculate_domain_randomness(self, domain: str) -> float:
"""Calculate randomness score for domain (0-1)."""
if not domain or len(domain) < 4:
return 0.5
domain_lower = domain.lower()
scores = []
# 1. Vowel distribution
vowels = 'aeiou'
vowel_positions = [i for i, c in enumerate(domain_lower) if c in vowels]
if len(vowel_positions) >= 2:
avg_gap = sum(vowel_positions[i+1] - vowel_positions[i]
for i in range(len(vowel_positions)-1)) / (len(vowel_positions)-1)
vowel_irregularity = min(abs(avg_gap - 2.5) / 5, 1.0)
scores.append(vowel_irregularity)
# 2. Character frequency
char_freq = Counter(domain_lower)
common_letters = 'etaoinshr'
common_count = sum(char_freq.get(c, 0) for c in common_letters)
uncommon_ratio = 1 - (common_count / max(len(domain_lower), 1))
scores.append(uncommon_ratio)
# 3. Repeated characters
unique_ratio = len(set(domain_lower)) / max(len(domain_lower), 1)
if unique_ratio > 0.75:
scores.append((unique_ratio - 0.75) / 0.25)
else:
scores.append(0)
return min(sum(scores) / max(len(scores), 1), 1.0)
def _consonant_clustering_score(self, text: str) -> float:
"""Detect unnatural consonant clusters."""
if not text:
return 0
text_lower = text.lower()
consonants = 'bcdfghjklmnpqrstvwxyz'
max_cluster = 0
current_cluster = 0
for char in text_lower:
if char in consonants:
current_cluster += 1
max_cluster = max(max_cluster, current_cluster)
else:
current_cluster = 0
if max_cluster >= 5:
return 1.0
elif max_cluster >= 4:
return 0.7
elif max_cluster >= 3:
return 0.4
else:
return 0.0
def _keyboard_pattern_score(self, text: str) -> int:
"""Detect keyboard walking patterns."""
if not text:
return 0
text_lower = text.lower()
count = 0
for pattern in self.keyboard_patterns:
if pattern in text_lower:
count += 1
return count
def _contains_dictionary_word(self, text: str) -> int:
"""Check if text contains any common English word."""
if not text or len(text) < 4:
return 0
text_lower = text.lower()
if text_lower in self.common_words:
return 1
for word in self.common_words:
if len(word) >= 4 and word in text_lower:
return 1
return 0
def _pronounceability_score(self, text: str) -> float:
"""Score based on bigram frequencies in English."""
if not text or len(text) < 2:
return 0.5
text_lower = text.lower()
common_bigrams = {
'th', 'he', 'in', 'er', 'an', 're', 'on', 'at', 'en', 'nd',
'ti', 'es', 'or', 'te', 'of', 'ed', 'is', 'it', 'al', 'ar',
'st', 'to', 'nt', 'ng', 'se', 'ha', 'as', 'ou', 'io', 've'
}
bigrams = [text_lower[i:i+2] for i in range(len(text_lower)-1)]
if not bigrams:
return 0.5
common_count = sum(1 for bg in bigrams if bg in common_bigrams)
score = common_count / len(bigrams)
return score
def _suspicious_digit_position(self, text: str) -> int:
"""Detect suspicious digit positions."""
if not text:
return 0
if text and text[0].isdigit():
return 1
if len(text) >= 2 and text[-1].isdigit() and text[-2].isdigit():
return 1
return 0
# ============================================================
# BRAND SPOOFING HELPERS
# ============================================================
def _brand_subdomain_spoofing(self, parsed) -> int:
"""Detect brand in subdomain but not main domain."""
try:
parts = parsed.netloc.split('.')
if len(parts) < 3:
return 0
subdomains = '.'.join(parts[:-2]).lower()
main_domain = '.'.join(parts[-2:]).lower()
for brand in self.brand_names:
if brand in subdomains and brand not in main_domain:
return 1
return 0
except:
return 0
def _brand_with_hyphen(self, domain: str) -> int:
"""Detect hyphenated brand names."""
if not domain:
return 0
domain_lower = domain.lower()
for brand in self.brand_names:
if len(brand) >= 4:
for i in range(1, len(brand)):
hyphenated = brand[:i] + '-' + brand[i:]
if hyphenated in domain_lower:
return 1
return 0
def _suspicious_brand_tld(self, domain: str) -> int:
"""Detect brand name with suspicious TLD."""
if not domain:
return 0
domain_lower = domain.lower()
parts = domain_lower.split('.')
if len(parts) < 2:
return 0
tld = parts[-1]
domain_without_tld = '.'.join(parts[:-1])
if tld in self.suspicious_tlds:
for brand in self.brand_names:
if brand in domain_without_tld:
return 1
return 0
def _brand_phishing_keyword_combo(self, url: str) -> int:
"""Detect brand + phishing keyword combination."""
if not url:
return 0
url_lower = url.lower()
has_brand = any(brand in url_lower for brand in self.brand_names)
if has_brand:
phishing_combo_keywords = [
'verify', 'security', 'secure', 'account', 'update',
'login', 'confirm', 'suspended', 'locked'
]
for keyword in phishing_combo_keywords:
if keyword in url_lower:
return 1
return 0
# ============================================================
# PATH & QUERY HELPERS
# ============================================================
def _brand_in_path_only(self, path: str, domain: str) -> int:
"""Detect brand in path but not in domain."""
if not path or not domain:
return 0
path_lower = path.lower()
domain_lower = domain.lower()
for brand in self.brand_names:
if brand in path_lower and brand not in domain_lower:
return 1
return 0
def _suspicious_extension_pattern(self, path: str) -> int:
"""Detect suspicious extension patterns."""
if not path:
return 0
path_lower = path.lower()
suspicious_patterns = [
'.php.exe', '.html.exe', '.pdf.exe', '.doc.exe',
'.zip.exe', '.rar.exe', '.html.zip', '.pdf.scr'
]
for pattern in suspicious_patterns:
if pattern in path_lower:
return 1
parts = path_lower.split('.')
if len(parts) >= 3:
ext1 = parts[-2]
ext2 = parts[-1]
doc_exts = ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'html', 'htm']
exec_exts = ['exe', 'scr', 'bat', 'cmd', 'com', 'pif']
if ext1 in doc_exts and ext2 in exec_exts:
return 1
return 0
# ============================================================
# ENCODING HELPERS
# ============================================================
def _detect_lookalike_chars(self, domain: str) -> int:
"""Detect lookalike characters."""
if not domain:
return 0
domain_lower = domain.lower()
suspicious_patterns = [
('rn', 'm'),
('vv', 'w'),
('cl', 'd'),
]
for pattern, _ in suspicious_patterns:
if pattern in domain_lower:
return 1
if any(c in domain_lower for c in ['0', '1']):
has_letters = any(c.isalpha() for c in domain_lower)
if has_letters:
for lookalike_char in self.lookalike_chars:
if lookalike_char in domain_lower:
return 1
return 0
def _mixed_script_detection(self, domain: str) -> int:
"""Detect mixing of scripts."""
if not domain:
return 0
scripts = set()
for char in domain:
if char.isalpha():
try:
script = unicodedata.name(char).split()[0]
if script in ['LATIN', 'CYRILLIC', 'GREEK']:
scripts.add(script)
except:
pass
return len(scripts) if len(scripts) > 1 else 0
def _homograph_brand_check(self, domain: str) -> int:
"""Check for homograph attacks on brands."""
if not domain:
return 0
domain_lower = domain.lower()
top_brands = ['paypal', 'apple', 'amazon', 'google', 'microsoft', 'facebook']
for brand in top_brands:
if len(domain_lower) < len(brand) - 2 or len(domain_lower) > len(brand) + 2:
continue
differences = 0
for i in range(min(len(domain_lower), len(brand))):
if i < len(domain_lower) and i < len(brand):
if domain_lower[i] != brand[i]:
if (domain_lower[i] in '01' and brand[i] in 'ol') or \
(domain_lower[i] in 'ol' and brand[i] in '01'):
differences += 1
else:
differences += 1
if differences <= 2 and differences > 0:
return 1
return 0
def _idn_homograph_score(self, url: str) -> float:
"""Combined IDN homograph attack score."""
score = 0.0
count = 0
if 'xn--' in url.lower():
score += 0.5
count += 1
non_ascii = sum(1 for c in url if ord(c) > 127)
if non_ascii > 0:
score += min(non_ascii / 10, 0.3)
count += 1
return score / max(count, 1) if count > 0 else 0.0
def _detect_double_encoding(self, url: str) -> int:
"""Detect double URL encoding."""
if not url:
return 0
double_encoded_patterns = ['%25', '%2520', '%252e', '%252f']
count = sum(url.lower().count(pattern) for pattern in double_encoded_patterns)
return count
def _suspicious_unicode_chars(self, url: str) -> int:
"""Detect uncommon Unicode categories."""
if not url:
return 0
suspicious_count = 0
for char in url:
try:
category = unicodedata.category(char)
if category in ['Mn', 'Mc', 'Me', 'Zl', 'Zp',
'Cc', 'Cf', 'Sm', 'Sc', 'Sk', 'So']:
suspicious_count += 1
except:
pass
return suspicious_count
# ============================================================
# FEATURE REFINEMENT HELPERS
# ============================================================
def _categorize_length(self, length: int, thresholds: list) -> int:
"""Multi-category encoding for length features."""
for i, threshold in enumerate(thresholds):
if length <= threshold:
return i
return len(thresholds)
def _categorize_extension(self, extension: str) -> int:
"""
Categorize file extension:
0 = none
1 = document
2 = web/script
3 = executable
4 = archive
5 = image
6 = other
"""
if not extension:
return 0
ext_lower = extension.lower()
if ext_lower in ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt', 'rtf']:
return 1
if ext_lower in ['html', 'htm', 'php', 'asp', 'aspx', 'jsp', 'js', 'css']:
return 2
if ext_lower in ['exe', 'bat', 'cmd', 'scr', 'msi', 'com', 'pif', 'app', 'apk']:
return 3
if ext_lower in ['zip', 'rar', '7z', 'tar', 'gz', 'bz2']:
return 4
if ext_lower in ['jpg', 'jpeg', 'png', 'gif', 'svg', 'ico', 'webp']:
return 5
return 6
def _character_diversity(self, text: str) -> float:
"""Shannon diversity index for characters."""
if not text:
return 0.0
unique_chars = len(set(text))
return min(unique_chars / max(len(text), 1), 1.0)
def _calculate_url_complexity(self, url: str) -> float:
"""Combined URL complexity score."""
if not url:
return 0.0
special_chars = sum(1 for c in url if not c.isalnum() and c not in [':', '/', '.'])
special_ratio = special_chars / max(len(url), 1)
length_score = min(len(url) / 200, 1.0)
encoding_score = min(url.count('%') / 10, 1.0)
complexity = (special_ratio * 0.4 + length_score * 0.3 + encoding_score * 0.3)
return min(complexity, 1.0)
# ============================================================
# UTILITY METHODS
# ============================================================
def _get_default_features(self) -> dict:
"""Default feature values for error cases."""
# Get feature names dynamically
dummy_url = "http://example.com"
try:
return self.extract_features(dummy_url)
except:
return {}
def get_feature_names(self) -> list:
"""
Get list of all feature names DYNAMICALLY.
FIXED: No longer hardcoded!
"""
dummy_url = "http://example.com/test"
dummy_features = self.extract_features(dummy_url)
# Remove 'label' if present
feature_names = [k for k in dummy_features.keys() if k != 'label']
return sorted(feature_names)
def extract_batch(self, urls: list, show_progress: bool = True) -> pd.DataFrame:
"""
Extract features from multiple URLs.
Args:
urls: List of URL strings
show_progress: Show progress messages
Returns:
DataFrame with features
"""
if show_progress:
logger.info(f"Extracting URL features from {len(urls):,} URLs...")
features_list = []
progress_interval = 50000
for i, url in enumerate(urls):
if show_progress and i > 0 and i % progress_interval == 0:
logger.info(f" Processed {i:,} / {len(urls):,} ({100 * i / len(urls):.1f}%)")
features = self.extract_features(url)
features_list.append(features)
df = pd.DataFrame(features_list)
if show_progress:
logger.info(f"✓ Extracted {len(df.columns)} features from {len(df):,} URLs")
return df
def main():
"""Extract URL-only features from dataset."""
import argparse
parser = argparse.ArgumentParser(description='URL-Only Feature Extraction v2.1 (IMPROVED)')
parser.add_argument('--sample', type=int, default=None, help='Sample N URLs')
parser.add_argument('--output', type=str, default=None, help='Output filename')
args = parser.parse_args()
logger.info("=" * 70)
logger.info("URL-Only Feature Extraction v2")
logger.info("=" * 70)
logger.info("")
logger.info("NEW Features:")
logger.info(" - Fixed free platform detection (exact/suffix match)")
logger.info(" - Added platform_subdomain_length")
logger.info(" - Added has_uuid_subdomain")
logger.info(" - Added longest_part thresholds (gt_20, gt_30, gt_40)")
logger.info(" - Expanded brand list with regional brands")
logger.info(" - Improved extension categorization")
logger.info("")
# Load dataset
script_dir = Path(__file__).parent
data_file = (script_dir / '../../data/processed/clean_dataset.csv').resolve()
logger.info(f"Loading: {data_file.name}")
df = pd.read_csv(data_file)
logger.info(f"Loaded: {len(df):,} URLs")
if args.sample and args.sample < len(df):
df = df.sample(n=args.sample, random_state=42)
logger.info(f"Sampled: {len(df):,} URLs")
# Extract features
extractor = URLFeatureExtractorV2()
features_df = extractor.extract_batch(df['url'].tolist())
features_df['label'] = df['label'].values
# Save
output_dir = (script_dir / '../../data/features').resolve()
output_dir.mkdir(parents=True, exist_ok=True)
if args.output:
output_file = output_dir / args.output
else:
suffix = f'_sample{args.sample}' if args.sample else ''
output_file = output_dir / f'url_features_v2{suffix}.csv'
features_df.to_csv(output_file, index=False)
logger.info("")
logger.info("=" * 70)
logger.info(f"✓ Saved: {output_file}")
logger.info(f" Shape: {features_df.shape}")
logger.info(f" Features: {len(features_df.columns) - 1}")
logger.info("=" * 70)
# Show feature names
print("\nAll Features:")
feature_names = extractor.get_feature_names()
for i, name in enumerate(feature_names, 1):
print(f"{i:3d}. {name}")
# Show stats
print("\n\nFeature Statistics (first 30):")
print(features_df.describe().T.head(30))
# Show new features stats
print("\n\nNEW FEATURES Statistics:")
new_features = [
'is_free_platform', 'platform_subdomain_length', 'has_uuid_subdomain',
'longest_part_gt_20', 'longest_part_gt_30', 'longest_part_gt_40'
]
for feat in new_features:
if feat in features_df.columns:
if feat == 'platform_subdomain_length':
print(f"\n{feat}:")
print(f" Mean: {features_df[feat].mean():.2f}")
print(f" Max: {features_df[feat].max()}")
print(f" Non-zero: {(features_df[feat] > 0).sum()} ({(features_df[feat] > 0).sum() / len(features_df) * 100:.1f}%)")
else:
print(f"\n{feat}: {features_df[feat].sum()} / {len(features_df)} ({features_df[feat].mean() * 100:.1f}%)")
if __name__ == "__main__":
main()