Spaces:
Runtime error
Runtime error
| """ | |
| URL Phishing Detector - Interactive Demo | |
| Test any URL with all trained models and see predictions with confidence scores. | |
| """ | |
| import sys | |
| import pandas as pd | |
| import joblib | |
| from pathlib import Path | |
| from colorama import init, Fore, Style | |
| # Initialize colorama for colored output | |
| init(autoreset=True) | |
| import logging | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger("url_predictor") | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from scripts.feature_extraction.url.url_features_v2 import URLFeatureExtractorV2 | |
| class URLPhishingDetector: | |
| """Detect phishing URLs using trained models.""" | |
| def __init__(self): | |
| """Initialize detector with all models.""" | |
| self.script_dir = Path(__file__).parent.parent | |
| self.models_dir = (self.script_dir / 'saved_models').resolve() | |
| # Whitelist of trusted domains | |
| self.trusted_domains = { | |
| # Tech giants | |
| 'youtube.com', 'facebook.com', 'twitter.com', 'x.com', | |
| 'linkedin.com', 'microsoft.com', 'apple.com', 'amazon.com', | |
| # Development | |
| 'github.com', 'gitlab.com', 'stackoverflow.com', 'npmjs.com', | |
| # AI Services | |
| 'claude.ai', 'anthropic.com', 'openai.com', 'chatgpt.com', | |
| # Education & Info | |
| 'wikipedia.org', 'reddit.com', 'quora.com', 'medium.com', | |
| # Cloud & Services | |
| 'aws.amazon.com', 'azure.microsoft.com', 'cloud.google.com', | |
| 'vercel.com', 'netlify.com', 'heroku.com', | |
| # Communication | |
| 'slack.com', 'discord.com', 'zoom.us', 'teams.microsoft.com', | |
| # Finance (major) | |
| 'paypal.com', 'stripe.com', 'visa.com', 'mastercard.com', | |
| # E-commerce | |
| 'ebay.com', 'shopify.com', 'etsy.com', 'walmart.com', | |
| } | |
| # Custom thresholds for each model (reduce false positives) | |
| self.thresholds = { | |
| 'Logistic Regression': 0.5, # Standard threshold | |
| 'Random Forest': 0.5, # Standard threshold | |
| 'XGBoost': 0.5 # Standard threshold | |
| } | |
| # Load feature extractor | |
| self.extractor = URLFeatureExtractorV2() | |
| # Load scaler (only needed for Logistic Regression) | |
| scaler_path = self.models_dir / 'scaler.joblib' | |
| if scaler_path.exists(): | |
| self.scaler = joblib.load(scaler_path) | |
| logger.info("✓ Loaded scaler") | |
| else: | |
| self.scaler = None | |
| logger.warning("✗ Scaler not found (only needed for Logistic Regression)") | |
| # Load all models | |
| self.models = {} | |
| self.feature_names = {} | |
| self._load_models() | |
| def _load_models(self): | |
| """Load all trained models.""" | |
| model_files = { | |
| 'Logistic Regression': 'logistic_regression.joblib', | |
| 'Random Forest': 'random_forest.joblib', | |
| 'XGBoost': 'xgboost.joblib' | |
| } | |
| for name, filename in model_files.items(): | |
| model_path = self.models_dir / filename | |
| if model_path.exists(): | |
| model = joblib.load(model_path) | |
| self.models[name] = model | |
| # Store expected feature names from model | |
| if hasattr(model, 'feature_names_in_'): | |
| self.feature_names[name] = list(model.feature_names_in_) | |
| logger.info(f"✓ Loaded {name} ({len(self.feature_names[name])} features)") | |
| elif self.scaler is not None and hasattr(self.scaler, 'feature_names_in_'): | |
| # Use scaler's feature names for models without them (like Logistic Regression) | |
| self.feature_names[name] = list(self.scaler.feature_names_in_) | |
| logger.info(f"✓ Loaded {name} (using scaler features: {len(self.feature_names[name])} features)") | |
| else: | |
| logger.info(f"✓ Loaded {name}") | |
| else: | |
| logger.warning(f"✗ Model not found: {filename}") | |
| def predict_url(self, url: str) -> dict: | |
| """ | |
| Predict if URL is phishing or legitimate. | |
| Args: | |
| url: URL string to analyze | |
| Returns: | |
| Dictionary with predictions from all models | |
| """ | |
| # Check if domain is in whitelist | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace('www.', '') | |
| # If trusted domain, override predictions | |
| is_whitelisted = any(domain.endswith(trusted) for trusted in self.trusted_domains) | |
| # Extract features | |
| features_dict = self.extractor.extract_features(url) | |
| # Convert to DataFrame (excluding label) | |
| features_df = pd.DataFrame([features_dict]) | |
| if 'label' in features_df.columns: | |
| features_df = features_df.drop('label', axis=1) | |
| # Get predictions from all models | |
| results = {} | |
| for model_name, model in self.models.items(): | |
| # Override for whitelisted domains | |
| if is_whitelisted: | |
| results[model_name] = { | |
| 'prediction': 'LEGITIMATE', | |
| 'prediction_code': 0, | |
| 'confidence': 99.99, | |
| 'phishing_probability': 0.01, | |
| 'legitimate_probability': 99.99, | |
| 'whitelisted': True | |
| } | |
| continue | |
| # Align features with model's expected features | |
| if model_name in self.feature_names: | |
| expected_features = self.feature_names[model_name] | |
| # Create aligned DataFrame with correct column order | |
| features_aligned = pd.DataFrame(columns=expected_features) | |
| for feat in expected_features: | |
| if feat in features_df.columns: | |
| features_aligned[feat] = features_df[feat].values | |
| else: | |
| features_aligned[feat] = 0 # Fill missing features with 0 | |
| # Convert to numpy to avoid sklearn feature name validation | |
| features_to_predict = features_aligned.values | |
| else: | |
| # Fallback: use any stored feature names from other models | |
| if self.feature_names: | |
| expected_features = list(self.feature_names.values())[0] | |
| features_aligned = pd.DataFrame(columns=expected_features) | |
| for feat in expected_features: | |
| if feat in features_df.columns: | |
| features_aligned[feat] = features_df[feat].values | |
| else: | |
| features_aligned[feat] = 0 | |
| features_to_predict = features_aligned.values | |
| else: | |
| features_to_predict = features_df.values | |
| # Scale features only for Logistic Regression | |
| if model_name == 'Logistic Regression' and self.scaler is not None: | |
| features_to_use = self.scaler.transform(features_to_predict) | |
| else: | |
| features_to_use = features_to_predict | |
| # Get probability/confidence (features are already numpy arrays) | |
| if hasattr(model, 'predict_proba'): | |
| probabilities = model.predict_proba(features_to_use)[0] | |
| phishing_prob = probabilities[1] * 100 | |
| legitimate_prob = probabilities[0] * 100 | |
| # Apply custom threshold | |
| threshold = self.thresholds.get(model_name, 0.5) | |
| prediction = 1 if probabilities[1] > threshold else 0 | |
| confidence = probabilities[prediction] * 100 | |
| else: | |
| # For models without predict_proba (fallback) | |
| prediction = model.predict(features_to_use)[0] | |
| confidence = 100.0 | |
| phishing_prob = 100.0 if prediction == 1 else 0.0 | |
| legitimate_prob = 0.0 if prediction == 1 else 100.0 | |
| results[model_name] = { | |
| 'prediction': 'PHISHING' if prediction == 1 else 'LEGITIMATE', | |
| 'prediction_code': int(prediction), | |
| 'confidence': confidence, | |
| 'phishing_probability': phishing_prob, | |
| 'legitimate_probability': legitimate_prob, | |
| 'whitelisted': False, | |
| 'threshold': self.thresholds.get(model_name, 0.5) | |
| } | |
| return results, features_dict # type: ignore | |
| def print_results(self, url: str, results: dict, features: dict): | |
| """Print formatted results.""" | |
| print("\n" + "=" * 80) | |
| print(f"{Fore.CYAN}{Style.BRIGHT}URL PHISHING DETECTION RESULTS{Style.RESET_ALL}") | |
| print("=" * 80) | |
| # Print URL | |
| print(f"\n{Fore.YELLOW}URL:{Style.RESET_ALL} {url}") | |
| # Print model predictions | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}MODEL PREDICTIONS:{Style.RESET_ALL}") | |
| print("-" * 80) | |
| for model_name, result in results.items(): | |
| prediction = result['prediction'] | |
| confidence = result['confidence'] | |
| phishing_prob = result['phishing_probability'] | |
| legitimate_prob = result['legitimate_probability'] | |
| threshold = result.get('threshold', 0.5) | |
| # Color based on prediction | |
| if prediction == 'PHISHING': | |
| color = Fore.RED | |
| icon = "⚠️" | |
| else: | |
| color = Fore.GREEN | |
| icon = "✓" | |
| print(f"\n{Style.BRIGHT}{model_name}:{Style.RESET_ALL}") | |
| print(f" {icon} Prediction: {color}{Style.BRIGHT}{prediction}{Style.RESET_ALL}") | |
| # Show if whitelisted | |
| if result.get('whitelisted', False): | |
| print(f" {Fore.CYAN}ℹ️ Trusted domain (whitelisted){Style.RESET_ALL}") | |
| else: | |
| print(f" Decision Threshold: {threshold*100:.0f}%") | |
| print(f" Confidence: {confidence:.2f}%") | |
| print(f" Probabilities:") | |
| print(f" • Phishing: {Fore.RED}{phishing_prob:6.2f}%{Style.RESET_ALL}") | |
| print(f" • Legitimate: {Fore.GREEN}{legitimate_prob:6.2f}%{Style.RESET_ALL}") | |
| # Consensus | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}CONSENSUS:{Style.RESET_ALL}") | |
| print("-" * 80) | |
| phishing_votes = sum(1 for r in results.values() if r['prediction'] == 'PHISHING') | |
| total_models = len(results) | |
| if phishing_votes == total_models: | |
| consensus_color = Fore.RED | |
| consensus_icon = "🚨" | |
| consensus_text = "ALL MODELS AGREE: PHISHING" | |
| elif phishing_votes == 0: | |
| consensus_color = Fore.GREEN | |
| consensus_icon = "✅" | |
| consensus_text = "ALL MODELS AGREE: LEGITIMATE" | |
| else: | |
| consensus_color = Fore.YELLOW | |
| consensus_icon = "⚠️" | |
| consensus_text = f"MIXED RESULTS: {phishing_votes}/{total_models} models say PHISHING" | |
| print(f"{consensus_icon} {consensus_color}{Style.BRIGHT}{consensus_text}{Style.RESET_ALL}") | |
| # Key features (based on top features from models) | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}TOP FEATURES (Model Importance):{Style.RESET_ALL}") | |
| print("-" * 80) | |
| # Top features from Random Forest and XGBoost analysis | |
| top_features = [ | |
| ('Num Domain Parts', features.get('num_domain_parts', 0), None), | |
| ('Domain Dots', features.get('domain_dots', 0), None), | |
| ('URL Shortener', '✓ Yes' if features.get('is_shortened', 0) == 1 else '✗ No', | |
| features.get('is_shortened', 0)), | |
| ('Num Subdomains', features.get('num_subdomains', 0), None), | |
| ('Domain Hyphens', features.get('domain_hyphens', 0), None), | |
| ('Free Platform', '✓ Yes' if features.get('is_free_platform', 0) == 1 else '✗ No', | |
| features.get('is_free_platform', 0)), | |
| ('Free Hosting', '✓ Yes' if features.get('is_free_hosting', 0) == 1 else '✗ No', | |
| features.get('is_free_hosting', 0)), | |
| ('Platform Subdomain Len', features.get('platform_subdomain_length', 0), None), | |
| ('Avg Domain Part Len', f"{features.get('avg_domain_part_len', 0):.2f}", None), | |
| ('Domain Length Category', features.get('domain_length_category', 0), None), | |
| ('Path Digits', features.get('path_digits', 0), None), | |
| ('Is HTTP', '✓ Yes' if features.get('is_http', 0) == 1 else '✗ No', | |
| features.get('is_http', 0)), | |
| ('Multiple Brands in URL', '✓ Yes' if features.get('multiple_brands_in_url', 0) == 1 else '✗ No', | |
| features.get('multiple_brands_in_url', 0)), | |
| ('Brand in Path', '✓ Yes' if features.get('brand_in_path', 0) == 1 else '✗ No', | |
| features.get('brand_in_path', 0)), | |
| ('Path Slashes', features.get('path_slashes', 0), None), | |
| ('Encoding Diff', f"{features.get('encoding_diff', 0):.3f}", None), | |
| ('Symbol Ratio (Domain)', f"{features.get('symbol_ratio_domain', 0):.3f}", None), | |
| ('Domain Length', features.get('domain_length', 0), None), | |
| ('Has @ Symbol', '✓ Yes' if features.get('has_at_symbol', 0) == 1 else '✗ No', | |
| features.get('has_at_symbol', 0)), | |
| ('TLD Length', features.get('tld_length', 0), None), | |
| ] | |
| for feature_name, value, risk_flag in top_features: | |
| # Color code risky features | |
| if risk_flag is not None: | |
| if risk_flag == 1: # Risky feature is present | |
| value_display = f"{Fore.RED}{value}{Style.RESET_ALL}" | |
| else: | |
| value_display = f"{Fore.GREEN}{value}{Style.RESET_ALL}" | |
| else: | |
| value_display = str(value) | |
| print(f" • {feature_name:25s}: {value_display}") | |
| print("\n" + "=" * 80 + "\n") | |
| def main(): | |
| """Main interactive function.""" | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}╔══════════════════════════════════════════════════════════════╗") | |
| print(f"║ URL PHISHING DETECTOR - INTERACTIVE DEMO ║") | |
| print(f"╚══════════════════════════════════════════════════════════════╝{Style.RESET_ALL}\n") | |
| # Initialize detector | |
| print(f"{Fore.YELLOW}Loading models...{Style.RESET_ALL}") | |
| detector = URLPhishingDetector() | |
| print(f"{Fore.GREEN}✓ All models loaded successfully!{Style.RESET_ALL}\n") | |
| # Interactive loop | |
| while True: | |
| print(f"{Fore.CYAN}{'─' * 80}{Style.RESET_ALL}") | |
| url = input(f"{Fore.YELLOW}Enter URL to test (or 'quit' to exit):{Style.RESET_ALL} ").strip() | |
| if url.lower() in ['quit', 'exit', 'q']: | |
| print(f"\n{Fore.GREEN}Thank you for using URL Phishing Detector!{Style.RESET_ALL}\n") | |
| break | |
| if not url: | |
| print(f"{Fore.RED}Please enter a valid URL{Style.RESET_ALL}\n") | |
| continue | |
| # Add http:// if no scheme | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'http://' + url | |
| try: | |
| # Get predictions | |
| results, features = detector.predict_url(url) | |
| # Print results | |
| detector.print_results(url, results, features) | |
| except Exception as e: | |
| print(f"\n{Fore.RED}Error analyzing URL: {str(e)}{Style.RESET_ALL}\n") | |
| logger.error(f"Error: {str(e)}") | |
| if __name__ == "__main__": | |
| main() | |