Spaces:
Runtime error
Runtime error
| """ | |
| Combined URL+HTML Phishing Detector - Interactive Demo | |
| Downloads HTML from URL, extracts both URL and HTML features, | |
| and predicts using the combined model (XGBoost + Random Forest). | |
| Usage: | |
| python scripts/predict_combined.py | |
| python scripts/predict_combined.py https://example.com | |
| """ | |
| import sys | |
| import logging | |
| import warnings | |
| from pathlib import Path | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| from colorama import init, Fore, Style | |
| warnings.filterwarnings('ignore', message='.*Unverified HTTPS.*') | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| init(autoreset=True) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S', | |
| ) | |
| logger = logging.getLogger('predict_combined') | |
| # Project imports | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from scripts.feature_extraction.url.url_features_v3 import URLFeatureExtractorOptimized | |
| from scripts.feature_extraction.html.html_feature_extractor import HTMLFeatureExtractor | |
| from scripts.feature_extraction.html.feature_engineering import engineer_features | |
| class CombinedPhishingDetector: | |
| """Detect phishing using combined URL + HTML features.""" | |
| HEADERS = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' | |
| 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| } | |
| def __init__(self): | |
| models_dir = PROJECT_ROOT / 'saved_models' | |
| # Feature extractors | |
| self.url_extractor = URLFeatureExtractorOptimized() | |
| self.html_extractor = HTMLFeatureExtractor() | |
| # Load combined models | |
| self.models = {} | |
| self._load_model(models_dir, 'XGBoost Combined', | |
| 'xgboost_combined.joblib', | |
| 'xgboost_combined_feature_names.joblib') | |
| self._load_model(models_dir, 'Random Forest Combined', | |
| 'random_forest_combined.joblib', | |
| 'random_forest_combined_feature_names.joblib') | |
| if not self.models: | |
| raise FileNotFoundError( | |
| "No combined models found! Train first:\n" | |
| " python scripts/merge_url_html_features.py --balance\n" | |
| " python models/train_combined_models.py") | |
| def _load_model(self, models_dir: Path, name: str, | |
| model_file: str, features_file: str): | |
| model_path = models_dir / model_file | |
| feat_path = models_dir / features_file | |
| if model_path.exists(): | |
| self.models[name] = { | |
| 'model': joblib.load(model_path), | |
| 'features': joblib.load(feat_path) if feat_path.exists() else None, | |
| } | |
| n = len(self.models[name]['features']) if self.models[name]['features'] else '?' | |
| logger.info(f"Loaded {name} ({n} features)") | |
| def predict(self, url: str) -> dict: | |
| """Download HTML, extract features, predict.""" | |
| # 1. Extract URL features | |
| url_features = self.url_extractor.extract_features(url) | |
| url_df = pd.DataFrame([url_features]) | |
| url_df = url_df.rename(columns={c: f'url_{c}' for c in url_df.columns}) | |
| # 2. Download + extract HTML features | |
| html_features = None | |
| html_error = None | |
| try: | |
| resp = requests.get(url, timeout=10, verify=False, headers=self.HEADERS) | |
| raw_html_features = self.html_extractor.extract_features(resp.text) | |
| raw_df = pd.DataFrame([raw_html_features]) | |
| eng_df = engineer_features(raw_df) | |
| eng_df = eng_df.rename(columns={c: f'html_{c}' for c in eng_df.columns}) | |
| html_features = raw_html_features | |
| except Exception as e: | |
| html_error = str(e) | |
| logger.warning(f"Could not download HTML: {e}") | |
| # Create zero-filled HTML features | |
| eng_df = pd.DataFrame() | |
| # 3. Combine | |
| combined_df = pd.concat([url_df, eng_df], axis=1) | |
| # 4. Predict with each model | |
| predictions = [] | |
| for name, data in self.models.items(): | |
| model = data['model'] | |
| expected = data['features'] | |
| if expected: | |
| aligned = pd.DataFrame(columns=expected) | |
| for f in expected: | |
| aligned[f] = combined_df[f].values if f in combined_df.columns else 0 | |
| X = aligned.values | |
| else: | |
| X = combined_df.values | |
| proba = model.predict_proba(X)[0] | |
| pred = 1 if proba[1] > 0.5 else 0 | |
| predictions.append({ | |
| 'model_name': name, | |
| 'prediction': 'PHISHING' if pred else 'LEGITIMATE', | |
| 'confidence': float(proba[pred] * 100), | |
| 'phishing_probability': float(proba[1] * 100), | |
| 'legitimate_probability': float(proba[0] * 100), | |
| }) | |
| # Consensus | |
| phishing_votes = sum(1 for p in predictions if p['prediction'] == 'PHISHING') | |
| total = len(predictions) | |
| is_phishing = phishing_votes > total / 2 | |
| if phishing_votes == total: | |
| consensus = "ALL MODELS AGREE: PHISHING" | |
| elif phishing_votes == 0: | |
| consensus = "ALL MODELS AGREE: LEGITIMATE" | |
| else: | |
| consensus = f"MIXED: {phishing_votes}/{total} models say PHISHING" | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'consensus': consensus, | |
| 'predictions': predictions, | |
| 'url_features': url_features, | |
| 'html_features': html_features, | |
| 'html_error': html_error, | |
| } | |
| def print_results(self, result: dict): | |
| """Pretty-print results.""" | |
| print("\n" + "=" * 80) | |
| print(f"{Fore.CYAN}{Style.BRIGHT}COMBINED URL+HTML PHISHING DETECTION{Style.RESET_ALL}") | |
| print("=" * 80) | |
| print(f"\n{Fore.YELLOW}URL:{Style.RESET_ALL} {result['url']}") | |
| if result.get('html_error'): | |
| print(f"{Fore.RED}HTML download failed: {result['html_error']}{Style.RESET_ALL}") | |
| print(f"{Fore.YELLOW}Using URL features only (HTML features zeroed){Style.RESET_ALL}") | |
| # Model predictions | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}MODEL PREDICTIONS:{Style.RESET_ALL}") | |
| print("-" * 80) | |
| for pred in result['predictions']: | |
| is_safe = pred['prediction'] == 'LEGITIMATE' | |
| color = Fore.GREEN if is_safe else Fore.RED | |
| icon = "✓" if is_safe else "⚠" | |
| print(f"\n{Style.BRIGHT}{pred['model_name']}:{Style.RESET_ALL}") | |
| print(f" {icon} Prediction: {color}{Style.BRIGHT}{pred['prediction']}{Style.RESET_ALL}") | |
| print(f" Confidence: {pred['confidence']:.1f}%") | |
| print(f" Phishing: {Fore.RED}{pred['phishing_probability']:6.2f}%{Style.RESET_ALL}") | |
| print(f" Legitimate: {Fore.GREEN}{pred['legitimate_probability']:6.2f}%{Style.RESET_ALL}") | |
| # Consensus | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}CONSENSUS:{Style.RESET_ALL}") | |
| print("-" * 80) | |
| if result['is_phishing']: | |
| print(f"🚨 {Fore.RED}{Style.BRIGHT}{result['consensus']}{Style.RESET_ALL}") | |
| else: | |
| print(f"✅ {Fore.GREEN}{Style.BRIGHT}{result['consensus']}{Style.RESET_ALL}") | |
| # Key features | |
| url_feat = result.get('url_features', {}) | |
| html_feat = result.get('html_features', {}) | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}KEY URL FEATURES:{Style.RESET_ALL}") | |
| print("-" * 80) | |
| url_keys = [ | |
| ('Domain Length', url_feat.get('domain_length', 0)), | |
| ('Num Subdomains', url_feat.get('num_subdomains', 0)), | |
| ('Domain Dots', url_feat.get('domain_dots', 0)), | |
| ('Is Shortened', 'Yes' if url_feat.get('is_shortened') else 'No'), | |
| ('Is Free Platform', 'Yes' if url_feat.get('is_free_platform') else 'No'), | |
| ('Is HTTP', 'Yes' if url_feat.get('is_http') else 'No'), | |
| ('Has @ Symbol', 'Yes' if url_feat.get('has_at_symbol') else 'No'), | |
| ] | |
| for name, val in url_keys: | |
| print(f" {name:25s}: {val}") | |
| if html_feat: | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}KEY HTML FEATURES:{Style.RESET_ALL}") | |
| print("-" * 80) | |
| html_keys = [ | |
| ('Text Length', html_feat.get('text_length', 0)), | |
| ('Num Links', html_feat.get('num_links', 0)), | |
| ('Num Forms', html_feat.get('num_forms', 0)), | |
| ('Password Fields', html_feat.get('num_password_fields', 0)), | |
| ('Has Login Form', 'Yes' if html_feat.get('has_login_form') else 'No'), | |
| ('Has Meta Refresh', 'Yes' if html_feat.get('has_meta_refresh') else 'No'), | |
| ('Has atob()', 'Yes' if html_feat.get('has_atob') else 'No'), | |
| ('External Links', html_feat.get('num_external_links', 0)), | |
| ] | |
| for name, val in html_keys: | |
| print(f" {name:25s}: {val}") | |
| print("\n" + "=" * 80 + "\n") | |
| def main(): | |
| print(f"\n{Fore.CYAN}{Style.BRIGHT}") | |
| print("╔══════════════════════════════════════════════════════════════╗") | |
| print("║ COMBINED URL+HTML PHISHING DETECTOR ║") | |
| print("╚══════════════════════════════════════════════════════════════╝") | |
| print(f"{Style.RESET_ALL}") | |
| print(f"{Fore.YELLOW}Loading models...{Style.RESET_ALL}") | |
| detector = CombinedPhishingDetector() | |
| print(f"{Fore.GREEN}✓ Models loaded!{Style.RESET_ALL}\n") | |
| # Single URL from command line | |
| if len(sys.argv) > 1: | |
| url = sys.argv[1] | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| result = detector.predict(url) | |
| detector.print_results(result) | |
| return | |
| # Interactive loop | |
| while True: | |
| print(f"{Fore.CYAN}{'─' * 80}{Style.RESET_ALL}") | |
| url = input(f"{Fore.YELLOW}Enter URL (or 'quit'):{Style.RESET_ALL} ").strip() | |
| if url.lower() in ('quit', 'exit', 'q'): | |
| print(f"\n{Fore.GREEN}Goodbye!{Style.RESET_ALL}\n") | |
| break | |
| if not url: | |
| continue | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| try: | |
| result = detector.predict(url) | |
| detector.print_results(result) | |
| except Exception as e: | |
| print(f"\n{Fore.RED}Error: {e}{Style.RESET_ALL}\n") | |
| if __name__ == '__main__': | |
| main() | |