rb1337's picture
Upload 50 files
2cc7f91 verified
"""
URL Feature Extraction v1 - URL-Only Features for Stage 1 Model
This extractor focuses ONLY on URL structure and lexical features.
NO HTTP requests, NO external services, NO HTML parsing.
Features:
- Lexical (length, characters, entropy)
- Structural (domain parts, path segments, TLD)
- Statistical (entropy, n-grams, patterns)
- Security indicators (from URL only)
- Brand/phishing patterns
Designed for:
- Fast inference (< 1ms per URL)
- No network dependencies
- Production deployment
"""
import pandas as pd
import numpy as np
from urllib.parse import urlparse, parse_qs, unquote
import re
import math
import socket
from pathlib import Path
from collections import Counter
import sys
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("url_features_v2")
class URLFeatureExtractorV2:
"""
Fast URL-only feature extractor for Stage 1 phishing detection.
No HTTP requests, no external API calls - pure URL analysis.
"""
def __init__(self):
"""Initialize feature extractor with keyword lists."""
# Phishing-related keywords
self.phishing_keywords = [
'login', 'signin', 'sign-in', 'log-in', 'logon', 'signon',
'account', 'accounts', 'update', 'verify', 'verification',
'secure', 'security', 'banking', 'bank', 'confirm', 'password',
'passwd', 'credential', 'suspended', 'locked', 'unusual',
'authenticate', 'auth', 'wallet', 'invoice', 'payment',
'billing', 'expire', 'expired', 'limited', 'restrict',
'urgent', 'immediately', 'alert', 'warning', 'resolve',
'recover', 'restore', 'reactivate', 'unlock', 'validate'
]
# Brand names commonly targeted
self.brand_names = [
'paypal', 'ebay', 'amazon', 'apple', 'microsoft', 'google',
'facebook', 'instagram', 'twitter', 'netflix', 'linkedin',
'dropbox', 'chase', 'wellsfargo', 'bankofamerica', 'citibank',
'americanexpress', 'amex', 'visa', 'mastercard', 'outlook',
'office365', 'office', 'yahoo', 'aol', 'icloud', 'adobe',
'spotify', 'steam', 'dhl', 'fedex', 'ups', 'usps',
'coinbase', 'binance', 'blockchain', 'metamask', 'whatsapp',
'telegram', 'discord', 'zoom', 'docusign', 'wetransfer',
'hsbc', 'barclays', 'santander', 'ing', 'revolut'
]
# URL shorteners
self.shorteners = [
'bit.ly', 'bitly.com', 'goo.gl', 'tinyurl.com', 't.co', 'ow.ly',
'is.gd', 'buff.ly', 'adf.ly', 'bit.do', 'short.to', 'tiny.cc',
'j.mp', 'surl.li', 'rb.gy', 'cutt.ly', 'qrco.de', 'v.gd',
'shorturl.at', 'rebrand.ly', 'clck.ru', 's.id', 'shrtco.de'
]
# Suspicious TLDs
self.suspicious_tlds = {
'tk', 'ml', 'ga', 'cf', 'gq', # Free domains
'xyz', 'top', 'club', 'work', 'date', 'racing', 'win',
'loan', 'download', 'stream', 'click', 'link', 'bid',
'review', 'party', 'trade', 'webcam', 'science',
'accountant', 'faith', 'cricket', 'zip', 'mov'
}
# Trusted TLDs
self.trusted_tlds = {
'com', 'org', 'net', 'edu', 'gov', 'mil',
'uk', 'us', 'ca', 'de', 'fr', 'jp', 'au',
'nl', 'be', 'ch', 'it', 'es', 'se', 'no'
}
# Free hosting services
self.free_hosting = [
'weebly.com', 'wix.com', 'wordpress.com', 'blogspot.com',
'tumblr.com', 'jimdo.com', 'github.io', 'gitlab.io',
'netlify.app', 'vercel.app', 'herokuapp.com', 'firebaseapp.com',
'web.app', 'pages.dev', 'godaddysites.com', 'square.site',
'000webhostapp.com', 'sites.google.com', 'carrd.co'
]
def extract_features(self, url: str) -> dict:
"""
Extract all URL-only features from a single URL.
Args:
url: URL string
Returns:
Dictionary of features
"""
try:
# Ensure URL has scheme
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
parsed = urlparse(url)
domain = parsed.netloc.lower()
domain_no_port = domain.split(':')[0]
path = parsed.path
query = parsed.query
features = {}
# 1. Length features
features.update(self._length_features(url, domain_no_port, path, query))
# 2. Character count features
features.update(self._char_count_features(url, domain_no_port, path))
# 3. Ratio features
features.update(self._ratio_features(url, domain_no_port))
# 4. Domain structure features
features.update(self._domain_features(domain_no_port, parsed))
# 5. Path features
features.update(self._path_features(path))
# 6. Query features
features.update(self._query_features(query))
# 7. Statistical features (entropy, patterns)
features.update(self._statistical_features(url, domain_no_port, path))
# 8. Security indicator features
features.update(self._security_features(url, parsed, domain_no_port))
# 9. Keyword/brand features
features.update(self._keyword_features(url, domain_no_port, path))
# 10. Encoding features
features.update(self._encoding_features(url, domain_no_port))
return features
except Exception as e:
logger.error(f"Error extracting features from URL: {url[:50]}... Error: {e}")
return self._get_default_features()
def _length_features(self, url: str, domain: str, path: str, query: str) -> dict:
"""Length-based features."""
return {
'url_length': len(url),
'domain_length': len(domain),
'path_length': len(path),
'query_length': len(query),
# Binary indicators
'url_length_gt_75': 1 if len(url) > 75 else 0,
'url_length_gt_100': 1 if len(url) > 100 else 0,
'url_length_gt_150': 1 if len(url) > 150 else 0,
'domain_length_gt_25': 1 if len(domain) > 25 else 0,
}
def _char_count_features(self, url: str, domain: str, path: str) -> dict:
"""Character count features."""
return {
# URL character counts
'num_dots': url.count('.'),
'num_hyphens': url.count('-'),
'num_underscores': url.count('_'),
'num_slashes': url.count('/'),
'num_question_marks': url.count('?'),
'num_ampersands': url.count('&'),
'num_equals': url.count('='),
'num_at': url.count('@'),
'num_percent': url.count('%'),
'num_digits_url': sum(c.isdigit() for c in url),
'num_letters_url': sum(c.isalpha() for c in url),
# Domain character counts
'domain_dots': domain.count('.'),
'domain_hyphens': domain.count('-'),
'domain_digits': sum(c.isdigit() for c in domain),
# Path character counts
'path_slashes': path.count('/'),
'path_dots': path.count('.'),
'path_digits': sum(c.isdigit() for c in path),
}
def _ratio_features(self, url: str, domain: str) -> dict:
"""Ratio-based features."""
url_len = max(len(url), 1)
domain_len = max(len(domain), 1)
return {
'digit_ratio_url': sum(c.isdigit() for c in url) / url_len,
'letter_ratio_url': sum(c.isalpha() for c in url) / url_len,
'special_char_ratio': sum(not c.isalnum() for c in url) / url_len,
'digit_ratio_domain': sum(c.isdigit() for c in domain) / domain_len,
'symbol_ratio_domain': sum(c in '-_.' for c in domain) / domain_len,
}
def _domain_features(self, domain: str, parsed) -> dict:
"""Domain structure features."""
parts = domain.split('.')
tld = parts[-1] if parts else ''
# Get SLD (second level domain)
sld = parts[-2] if len(parts) > 1 else ''
# Count subdomains (parts minus domain and TLD)
num_subdomains = max(0, len(parts) - 2)
return {
'num_subdomains': num_subdomains,
'num_domain_parts': len(parts),
'tld_length': len(tld),
'sld_length': len(sld),
'longest_domain_part': max((len(p) for p in parts), default=0),
'avg_domain_part_len': sum(len(p) for p in parts) / max(len(parts), 1),
# TLD indicators
'has_suspicious_tld': 1 if tld in self.suspicious_tlds else 0,
'has_trusted_tld': 1 if tld in self.trusted_tlds else 0,
# Port
'has_port': 1 if parsed.port else 0,
'has_non_std_port': 1 if parsed.port and parsed.port not in [80, 443] else 0,
}
def _path_features(self, path: str) -> dict:
"""Path structure features."""
segments = [s for s in path.split('/') if s]
# Get file extension if present
extension = ''
if '.' in path:
potential_ext = path.rsplit('.', 1)[-1].split('?')[0].lower()
if len(potential_ext) <= 10:
extension = potential_ext
return {
'path_depth': len(segments),
'max_path_segment_len': max((len(s) for s in segments), default=0),
'avg_path_segment_len': sum(len(s) for s in segments) / max(len(segments), 1),
# Extension features
'has_extension': 1 if extension else 0,
'has_php': 1 if extension == 'php' else 0,
'has_html': 1 if extension in ['html', 'htm'] else 0,
'has_exe': 1 if extension in ['exe', 'bat', 'cmd', 'msi'] else 0,
# Suspicious path patterns
'has_double_slash': 1 if '//' in path else 0,
}
def _query_features(self, query: str) -> dict:
"""Query string features."""
params = parse_qs(query)
return {
'num_params': len(params),
'has_query': 1 if query else 0,
'query_value_length': sum(len(''.join(v)) for v in params.values()),
'max_param_len': max((len(k) + len(''.join(v)) for k, v in params.items()), default=0),
}
def _statistical_features(self, url: str, domain: str, path: str) -> dict:
"""Statistical and entropy features."""
return {
# Entropy
'url_entropy': self._entropy(url),
'domain_entropy': self._entropy(domain),
'path_entropy': self._entropy(path) if path else 0,
# Consecutive character patterns
'max_consecutive_digits': self._max_consecutive(url, str.isdigit),
'max_consecutive_chars': self._max_consecutive(url, str.isalpha),
'max_consecutive_consonants': self._max_consecutive_consonants(domain),
# Character variance
'char_repeat_rate': self._repeat_rate(url),
# N-gram uniqueness
'unique_bigram_ratio': self._unique_ngram_ratio(url, 2),
'unique_trigram_ratio': self._unique_ngram_ratio(url, 3),
# Vowel/consonant ratio in domain
'vowel_ratio_domain': self._vowel_ratio(domain),
}
def _security_features(self, url: str, parsed, domain: str) -> dict:
"""Security indicator features (URL-based only)."""
return {
# Protocol
'is_https': 1 if parsed.scheme == 'https' else 0,
'is_http': 1 if parsed.scheme == 'http' else 0,
# IP address
'has_ip_address': 1 if self._is_ip(domain) else 0,
# Suspicious patterns
'has_at_symbol': 1 if '@' in url else 0,
'has_redirect': 1 if 'redirect' in url.lower() or 'url=' in url.lower() else 0,
# URL shortener
'is_shortened': 1 if any(s in domain for s in self.shorteners) else 0,
# Free hosting
'is_free_hosting': 1 if any(h in domain for h in self.free_hosting) else 0,
# www presence
'has_www': 1 if domain.startswith('www.') else 0,
'www_in_middle': 1 if 'www' in domain and not domain.startswith('www') else 0,
}
def _keyword_features(self, url: str, domain: str, path: str) -> dict:
"""Keyword and brand detection features."""
url_lower = url.lower()
domain_lower = domain.lower()
path_lower = path.lower()
# Count phishing keywords
phishing_in_url = sum(1 for k in self.phishing_keywords if k in url_lower)
phishing_in_domain = sum(1 for k in self.phishing_keywords if k in domain_lower)
phishing_in_path = sum(1 for k in self.phishing_keywords if k in path_lower)
# Count brand names
brands_in_url = sum(1 for b in self.brand_names if b in url_lower)
brands_in_domain = sum(1 for b in self.brand_names if b in domain_lower)
brands_in_path = sum(1 for b in self.brand_names if b in path_lower)
# Brand impersonation: brand in path but not in domain
brand_impersonation = 1 if brands_in_path > 0 and brands_in_domain == 0 else 0
return {
'num_phishing_keywords': phishing_in_url,
'phishing_in_domain': phishing_in_domain,
'phishing_in_path': phishing_in_path,
'num_brands': brands_in_url,
'brand_in_domain': 1 if brands_in_domain > 0 else 0,
'brand_in_path': 1 if brands_in_path > 0 else 0,
'brand_impersonation': brand_impersonation,
# Specific high-value keywords
'has_login': 1 if 'login' in url_lower or 'signin' in url_lower else 0,
'has_account': 1 if 'account' in url_lower else 0,
'has_verify': 1 if 'verify' in url_lower or 'confirm' in url_lower else 0,
'has_secure': 1 if 'secure' in url_lower or 'security' in url_lower else 0,
'has_update': 1 if 'update' in url_lower else 0,
'has_bank': 1 if 'bank' in url_lower else 0,
'has_password': 1 if 'password' in url_lower or 'passwd' in url_lower else 0,
'has_suspend': 1 if 'suspend' in url_lower or 'locked' in url_lower else 0,
# Suspicious patterns
'has_webscr': 1 if 'webscr' in url_lower else 0,
'has_cmd': 1 if 'cmd=' in url_lower else 0,
'has_cgi': 1 if 'cgi-bin' in url_lower or 'cgi_bin' in url_lower else 0,
}
def _encoding_features(self, url: str, domain: str) -> dict:
"""Encoding-related features."""
# Check for punycode
has_punycode = 'xn--' in domain
# Decode and check difference
try:
decoded = unquote(url)
encoding_diff = len(decoded) - len(url)
except:
encoding_diff = 0
# Safe regex checks (wrap in try-except for malformed URLs)
try:
has_hex = 1 if re.search(r'[0-9a-f]{20,}', url.lower()) else 0
except:
has_hex = 0
try:
has_base64 = 1 if re.search(r'[A-Za-z0-9+/]{30,}={0,2}', url) else 0
except:
has_base64 = 0
try:
has_unicode = 1 if any(ord(c) > 127 for c in url) else 0
except:
has_unicode = 0
return {
'has_url_encoding': 1 if '%' in url else 0,
'encoding_count': url.count('%'),
'encoding_diff': abs(encoding_diff),
'has_punycode': 1 if has_punycode else 0,
'has_unicode': has_unicode,
'has_hex_string': has_hex,
'has_base64': has_base64,
}
# Helper methods
def _entropy(self, text: str) -> float:
"""Calculate Shannon entropy."""
if not text:
return 0.0
freq = Counter(text)
length = len(text)
return -sum((c / length) * math.log2(c / length) for c in freq.values())
def _max_consecutive(self, text: str, condition) -> int:
"""Max consecutive characters matching condition."""
max_count = count = 0
for char in text:
if condition(char):
count += 1
max_count = max(max_count, count)
else:
count = 0
return max_count
def _max_consecutive_consonants(self, text: str) -> int:
"""Max consecutive consonants."""
consonants = set('bcdfghjklmnpqrstvwxyz')
max_count = count = 0
for char in text.lower():
if char in consonants:
count += 1
max_count = max(max_count, count)
else:
count = 0
return max_count
def _repeat_rate(self, text: str) -> float:
"""Rate of repeated adjacent characters."""
if len(text) < 2:
return 0.0
repeats = sum(1 for i in range(len(text) - 1) if text[i] == text[i + 1])
return repeats / (len(text) - 1)
def _unique_ngram_ratio(self, text: str, n: int) -> float:
"""Ratio of unique n-grams to total n-grams."""
if len(text) < n:
return 0.0
ngrams = [text[i:i + n] for i in range(len(text) - n + 1)]
return len(set(ngrams)) / len(ngrams)
def _vowel_ratio(self, text: str) -> float:
"""Ratio of vowels in text."""
if not text:
return 0.0
vowels = sum(1 for c in text.lower() if c in 'aeiou')
letters = sum(1 for c in text if c.isalpha())
return vowels / max(letters, 1)
def _is_ip(self, domain: str) -> bool:
"""Check if domain is IP address."""
# IPv4
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain):
return True
# IPv6
try:
socket.inet_pton(socket.AF_INET6, domain.strip('[]'))
return True
except:
return False
def _get_default_features(self) -> dict:
"""Default feature values for error cases."""
return {name: 0 for name in self.get_feature_names()}
def get_feature_names(self) -> list:
"""Get list of all feature names."""
# Extract from a dummy URL to get all feature names
dummy_features = {
# Length features
'url_length': 0, 'domain_length': 0, 'path_length': 0, 'query_length': 0,
'url_length_gt_75': 0, 'url_length_gt_100': 0, 'url_length_gt_150': 0,
'domain_length_gt_25': 0,
# Char counts
'num_dots': 0, 'num_hyphens': 0, 'num_underscores': 0, 'num_slashes': 0,
'num_question_marks': 0, 'num_ampersands': 0, 'num_equals': 0, 'num_at': 0,
'num_percent': 0, 'num_digits_url': 0, 'num_letters_url': 0,
'domain_dots': 0, 'domain_hyphens': 0, 'domain_digits': 0,
'path_slashes': 0, 'path_dots': 0, 'path_digits': 0,
# Ratios
'digit_ratio_url': 0, 'letter_ratio_url': 0, 'special_char_ratio': 0,
'digit_ratio_domain': 0, 'symbol_ratio_domain': 0,
# Domain features
'num_subdomains': 0, 'num_domain_parts': 0, 'tld_length': 0, 'sld_length': 0,
'longest_domain_part': 0, 'avg_domain_part_len': 0,
'has_suspicious_tld': 0, 'has_trusted_tld': 0, 'has_port': 0, 'has_non_std_port': 0,
# Path features
'path_depth': 0, 'max_path_segment_len': 0, 'avg_path_segment_len': 0,
'has_extension': 0, 'has_php': 0, 'has_html': 0, 'has_exe': 0, 'has_double_slash': 0,
# Query features
'num_params': 0, 'has_query': 0, 'query_value_length': 0, 'max_param_len': 0,
# Statistical features
'url_entropy': 0, 'domain_entropy': 0, 'path_entropy': 0,
'max_consecutive_digits': 0, 'max_consecutive_chars': 0, 'max_consecutive_consonants': 0,
'char_repeat_rate': 0, 'unique_bigram_ratio': 0, 'unique_trigram_ratio': 0,
'vowel_ratio_domain': 0,
# Security features
'is_https': 0, 'is_http': 0, 'has_ip_address': 0, 'has_at_symbol': 0,
'has_redirect': 0, 'is_shortened': 0, 'is_free_hosting': 0, 'has_www': 0, 'www_in_middle': 0,
# Keyword features
'num_phishing_keywords': 0, 'phishing_in_domain': 0, 'phishing_in_path': 0,
'num_brands': 0, 'brand_in_domain': 0, 'brand_in_path': 0, 'brand_impersonation': 0,
'has_login': 0, 'has_account': 0, 'has_verify': 0, 'has_secure': 0, 'has_update': 0,
'has_bank': 0, 'has_password': 0, 'has_suspend': 0,
'has_webscr': 0, 'has_cmd': 0, 'has_cgi': 0,
# Encoding features
'has_url_encoding': 0, 'encoding_count': 0, 'encoding_diff': 0,
'has_punycode': 0, 'has_unicode': 0, 'has_hex_string': 0, 'has_base64': 0,
}
return list(dummy_features.keys())
def extract_batch(self, urls: list, show_progress: bool = True) -> pd.DataFrame:
"""
Extract features from multiple URLs.
Args:
urls: List of URL strings
show_progress: Show progress messages
Returns:
DataFrame with features
"""
if show_progress:
logger.info(f"Extracting URL features from {len(urls):,} URLs...")
features_list = []
progress_interval = 50000
for i, url in enumerate(urls):
if show_progress and i > 0 and i % progress_interval == 0:
logger.info(f" Processed {i:,} / {len(urls):,} ({100 * i / len(urls):.1f}%)")
features = self.extract_features(url)
features_list.append(features)
df = pd.DataFrame(features_list)
if show_progress:
logger.info(f"✓ Extracted {len(df.columns)} features from {len(df):,} URLs")
return df
def main():
"""Extract URL-only features from dataset."""
import argparse
parser = argparse.ArgumentParser(description='URL-Only Feature Extraction (Stage 1)')
parser.add_argument('--sample', type=int, default=None, help='Sample N URLs')
parser.add_argument('--output', type=str, default=None, help='Output filename')
args = parser.parse_args()
logger.info("=" * 70)
logger.info("URL-Only Feature Extraction v1")
logger.info("=" * 70)
logger.info("")
logger.info("Features: URL structure, lexical, statistical")
logger.info("NO HTTP requests, NO external APIs")
logger.info("")
# Load dataset
script_dir = Path(__file__).parent
data_file = (script_dir / '../../data/processed/clean_dataset.csv').resolve()
logger.info(f"Loading: {data_file.name}")
df = pd.read_csv(data_file)
logger.info(f"Loaded: {len(df):,} URLs")
if args.sample and args.sample < len(df):
df = df.sample(n=args.sample, random_state=42)
logger.info(f"Sampled: {len(df):,} URLs")
# Extract features
extractor = URLFeatureExtractorV2()
features_df = extractor.extract_batch(df['url'].tolist())
features_df['label'] = df['label'].values
# Save
output_dir = (script_dir / '../../data/features').resolve()
output_dir.mkdir(parents=True, exist_ok=True)
if args.output:
output_file = output_dir / args.output
else:
suffix = f'_sample{args.sample}' if args.sample else ''
output_file = output_dir / f'url_features{suffix}.csv'
features_df.to_csv(output_file, index=False)
logger.info("")
logger.info("=" * 70)
logger.info(f"✓ Saved: {output_file}")
logger.info(f" Shape: {features_df.shape}")
logger.info(f" Features: {len(features_df.columns) - 1}")
logger.info("=" * 70)
# Show stats
print("\nFeature Statistics (sample):")
print(features_df.describe().T.head(20))
if __name__ == "__main__":
main()