Spaces:
Runtime error
Runtime error
File size: 16,686 Bytes
2cc7f91 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 | """
URL Phishing Detector - Interactive Demo
Test any URL with all trained models and see predictions with confidence scores.
"""
import sys
import pandas as pd
import joblib
from pathlib import Path
from colorama import init, Fore, Style
# Initialize colorama for colored output
init(autoreset=True)
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("url_predictor")
sys.path.append(str(Path(__file__).parent.parent))
from scripts.feature_extraction.url.url_features_v2 import URLFeatureExtractorV2
class URLPhishingDetector:
"""Detect phishing URLs using trained models."""
def __init__(self):
"""Initialize detector with all models."""
self.script_dir = Path(__file__).parent.parent
self.models_dir = (self.script_dir / 'saved_models').resolve()
# Whitelist of trusted domains
self.trusted_domains = {
# Tech giants
'youtube.com', 'facebook.com', 'twitter.com', 'x.com',
'linkedin.com', 'microsoft.com', 'apple.com', 'amazon.com',
# Development
'github.com', 'gitlab.com', 'stackoverflow.com', 'npmjs.com',
# AI Services
'claude.ai', 'anthropic.com', 'openai.com', 'chatgpt.com',
# Education & Info
'wikipedia.org', 'reddit.com', 'quora.com', 'medium.com',
# Cloud & Services
'aws.amazon.com', 'azure.microsoft.com', 'cloud.google.com',
'vercel.com', 'netlify.com', 'heroku.com',
# Communication
'slack.com', 'discord.com', 'zoom.us', 'teams.microsoft.com',
# Finance (major)
'paypal.com', 'stripe.com', 'visa.com', 'mastercard.com',
# E-commerce
'ebay.com', 'shopify.com', 'etsy.com', 'walmart.com',
}
# Custom thresholds for each model (reduce false positives)
self.thresholds = {
'Logistic Regression': 0.5, # Standard threshold
'Random Forest': 0.5, # Standard threshold
'XGBoost': 0.5 # Standard threshold
}
# Load feature extractor
self.extractor = URLFeatureExtractorV2()
# Load scaler (only needed for Logistic Regression)
scaler_path = self.models_dir / 'scaler.joblib'
if scaler_path.exists():
self.scaler = joblib.load(scaler_path)
logger.info("✓ Loaded scaler")
else:
self.scaler = None
logger.warning("✗ Scaler not found (only needed for Logistic Regression)")
# Load all models
self.models = {}
self.feature_names = {}
self._load_models()
def _load_models(self):
"""Load all trained models."""
model_files = {
'Logistic Regression': 'logistic_regression.joblib',
'Random Forest': 'random_forest.joblib',
'XGBoost': 'xgboost.joblib'
}
for name, filename in model_files.items():
model_path = self.models_dir / filename
if model_path.exists():
model = joblib.load(model_path)
self.models[name] = model
# Store expected feature names from model
if hasattr(model, 'feature_names_in_'):
self.feature_names[name] = list(model.feature_names_in_)
logger.info(f"✓ Loaded {name} ({len(self.feature_names[name])} features)")
elif self.scaler is not None and hasattr(self.scaler, 'feature_names_in_'):
# Use scaler's feature names for models without them (like Logistic Regression)
self.feature_names[name] = list(self.scaler.feature_names_in_)
logger.info(f"✓ Loaded {name} (using scaler features: {len(self.feature_names[name])} features)")
else:
logger.info(f"✓ Loaded {name}")
else:
logger.warning(f"✗ Model not found: {filename}")
def predict_url(self, url: str) -> dict:
"""
Predict if URL is phishing or legitimate.
Args:
url: URL string to analyze
Returns:
Dictionary with predictions from all models
"""
# Check if domain is in whitelist
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
# If trusted domain, override predictions
is_whitelisted = any(domain.endswith(trusted) for trusted in self.trusted_domains)
# Extract features
features_dict = self.extractor.extract_features(url)
# Convert to DataFrame (excluding label)
features_df = pd.DataFrame([features_dict])
if 'label' in features_df.columns:
features_df = features_df.drop('label', axis=1)
# Get predictions from all models
results = {}
for model_name, model in self.models.items():
# Override for whitelisted domains
if is_whitelisted:
results[model_name] = {
'prediction': 'LEGITIMATE',
'prediction_code': 0,
'confidence': 99.99,
'phishing_probability': 0.01,
'legitimate_probability': 99.99,
'whitelisted': True
}
continue
# Align features with model's expected features
if model_name in self.feature_names:
expected_features = self.feature_names[model_name]
# Create aligned DataFrame with correct column order
features_aligned = pd.DataFrame(columns=expected_features)
for feat in expected_features:
if feat in features_df.columns:
features_aligned[feat] = features_df[feat].values
else:
features_aligned[feat] = 0 # Fill missing features with 0
# Convert to numpy to avoid sklearn feature name validation
features_to_predict = features_aligned.values
else:
# Fallback: use any stored feature names from other models
if self.feature_names:
expected_features = list(self.feature_names.values())[0]
features_aligned = pd.DataFrame(columns=expected_features)
for feat in expected_features:
if feat in features_df.columns:
features_aligned[feat] = features_df[feat].values
else:
features_aligned[feat] = 0
features_to_predict = features_aligned.values
else:
features_to_predict = features_df.values
# Scale features only for Logistic Regression
if model_name == 'Logistic Regression' and self.scaler is not None:
features_to_use = self.scaler.transform(features_to_predict)
else:
features_to_use = features_to_predict
# Get probability/confidence (features are already numpy arrays)
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(features_to_use)[0]
phishing_prob = probabilities[1] * 100
legitimate_prob = probabilities[0] * 100
# Apply custom threshold
threshold = self.thresholds.get(model_name, 0.5)
prediction = 1 if probabilities[1] > threshold else 0
confidence = probabilities[prediction] * 100
else:
# For models without predict_proba (fallback)
prediction = model.predict(features_to_use)[0]
confidence = 100.0
phishing_prob = 100.0 if prediction == 1 else 0.0
legitimate_prob = 0.0 if prediction == 1 else 100.0
results[model_name] = {
'prediction': 'PHISHING' if prediction == 1 else 'LEGITIMATE',
'prediction_code': int(prediction),
'confidence': confidence,
'phishing_probability': phishing_prob,
'legitimate_probability': legitimate_prob,
'whitelisted': False,
'threshold': self.thresholds.get(model_name, 0.5)
}
return results, features_dict # type: ignore
def print_results(self, url: str, results: dict, features: dict):
"""Print formatted results."""
print("\n" + "=" * 80)
print(f"{Fore.CYAN}{Style.BRIGHT}URL PHISHING DETECTION RESULTS{Style.RESET_ALL}")
print("=" * 80)
# Print URL
print(f"\n{Fore.YELLOW}URL:{Style.RESET_ALL} {url}")
# Print model predictions
print(f"\n{Fore.CYAN}{Style.BRIGHT}MODEL PREDICTIONS:{Style.RESET_ALL}")
print("-" * 80)
for model_name, result in results.items():
prediction = result['prediction']
confidence = result['confidence']
phishing_prob = result['phishing_probability']
legitimate_prob = result['legitimate_probability']
threshold = result.get('threshold', 0.5)
# Color based on prediction
if prediction == 'PHISHING':
color = Fore.RED
icon = "⚠️"
else:
color = Fore.GREEN
icon = "✓"
print(f"\n{Style.BRIGHT}{model_name}:{Style.RESET_ALL}")
print(f" {icon} Prediction: {color}{Style.BRIGHT}{prediction}{Style.RESET_ALL}")
# Show if whitelisted
if result.get('whitelisted', False):
print(f" {Fore.CYAN}ℹ️ Trusted domain (whitelisted){Style.RESET_ALL}")
else:
print(f" Decision Threshold: {threshold*100:.0f}%")
print(f" Confidence: {confidence:.2f}%")
print(f" Probabilities:")
print(f" • Phishing: {Fore.RED}{phishing_prob:6.2f}%{Style.RESET_ALL}")
print(f" • Legitimate: {Fore.GREEN}{legitimate_prob:6.2f}%{Style.RESET_ALL}")
# Consensus
print(f"\n{Fore.CYAN}{Style.BRIGHT}CONSENSUS:{Style.RESET_ALL}")
print("-" * 80)
phishing_votes = sum(1 for r in results.values() if r['prediction'] == 'PHISHING')
total_models = len(results)
if phishing_votes == total_models:
consensus_color = Fore.RED
consensus_icon = "🚨"
consensus_text = "ALL MODELS AGREE: PHISHING"
elif phishing_votes == 0:
consensus_color = Fore.GREEN
consensus_icon = "✅"
consensus_text = "ALL MODELS AGREE: LEGITIMATE"
else:
consensus_color = Fore.YELLOW
consensus_icon = "⚠️"
consensus_text = f"MIXED RESULTS: {phishing_votes}/{total_models} models say PHISHING"
print(f"{consensus_icon} {consensus_color}{Style.BRIGHT}{consensus_text}{Style.RESET_ALL}")
# Key features (based on top features from models)
print(f"\n{Fore.CYAN}{Style.BRIGHT}TOP FEATURES (Model Importance):{Style.RESET_ALL}")
print("-" * 80)
# Top features from Random Forest and XGBoost analysis
top_features = [
('Num Domain Parts', features.get('num_domain_parts', 0), None),
('Domain Dots', features.get('domain_dots', 0), None),
('URL Shortener', '✓ Yes' if features.get('is_shortened', 0) == 1 else '✗ No',
features.get('is_shortened', 0)),
('Num Subdomains', features.get('num_subdomains', 0), None),
('Domain Hyphens', features.get('domain_hyphens', 0), None),
('Free Platform', '✓ Yes' if features.get('is_free_platform', 0) == 1 else '✗ No',
features.get('is_free_platform', 0)),
('Free Hosting', '✓ Yes' if features.get('is_free_hosting', 0) == 1 else '✗ No',
features.get('is_free_hosting', 0)),
('Platform Subdomain Len', features.get('platform_subdomain_length', 0), None),
('Avg Domain Part Len', f"{features.get('avg_domain_part_len', 0):.2f}", None),
('Domain Length Category', features.get('domain_length_category', 0), None),
('Path Digits', features.get('path_digits', 0), None),
('Is HTTP', '✓ Yes' if features.get('is_http', 0) == 1 else '✗ No',
features.get('is_http', 0)),
('Multiple Brands in URL', '✓ Yes' if features.get('multiple_brands_in_url', 0) == 1 else '✗ No',
features.get('multiple_brands_in_url', 0)),
('Brand in Path', '✓ Yes' if features.get('brand_in_path', 0) == 1 else '✗ No',
features.get('brand_in_path', 0)),
('Path Slashes', features.get('path_slashes', 0), None),
('Encoding Diff', f"{features.get('encoding_diff', 0):.3f}", None),
('Symbol Ratio (Domain)', f"{features.get('symbol_ratio_domain', 0):.3f}", None),
('Domain Length', features.get('domain_length', 0), None),
('Has @ Symbol', '✓ Yes' if features.get('has_at_symbol', 0) == 1 else '✗ No',
features.get('has_at_symbol', 0)),
('TLD Length', features.get('tld_length', 0), None),
]
for feature_name, value, risk_flag in top_features:
# Color code risky features
if risk_flag is not None:
if risk_flag == 1: # Risky feature is present
value_display = f"{Fore.RED}{value}{Style.RESET_ALL}"
else:
value_display = f"{Fore.GREEN}{value}{Style.RESET_ALL}"
else:
value_display = str(value)
print(f" • {feature_name:25s}: {value_display}")
print("\n" + "=" * 80 + "\n")
def main():
"""Main interactive function."""
print(f"\n{Fore.CYAN}{Style.BRIGHT}╔══════════════════════════════════════════════════════════════╗")
print(f"║ URL PHISHING DETECTOR - INTERACTIVE DEMO ║")
print(f"╚══════════════════════════════════════════════════════════════╝{Style.RESET_ALL}\n")
# Initialize detector
print(f"{Fore.YELLOW}Loading models...{Style.RESET_ALL}")
detector = URLPhishingDetector()
print(f"{Fore.GREEN}✓ All models loaded successfully!{Style.RESET_ALL}\n")
# Interactive loop
while True:
print(f"{Fore.CYAN}{'─' * 80}{Style.RESET_ALL}")
url = input(f"{Fore.YELLOW}Enter URL to test (or 'quit' to exit):{Style.RESET_ALL} ").strip()
if url.lower() in ['quit', 'exit', 'q']:
print(f"\n{Fore.GREEN}Thank you for using URL Phishing Detector!{Style.RESET_ALL}\n")
break
if not url:
print(f"{Fore.RED}Please enter a valid URL{Style.RESET_ALL}\n")
continue
# Add http:// if no scheme
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
# Get predictions
results, features = detector.predict_url(url)
# Print results
detector.print_results(url, results, features)
except Exception as e:
print(f"\n{Fore.RED}Error analyzing URL: {str(e)}{Style.RESET_ALL}\n")
logger.error(f"Error: {str(e)}")
if __name__ == "__main__":
main()
|