""" Combined URL+HTML Phishing Detector - Interactive Demo Downloads HTML from URL, extracts both URL and HTML features, and predicts using the combined model (XGBoost + Random Forest). Usage: python scripts/predict_combined.py python scripts/predict_combined.py https://example.com """ import sys import logging import warnings from pathlib import Path import joblib import numpy as np import pandas as pd import requests from colorama import init, Fore, Style warnings.filterwarnings('ignore', message='.*Unverified HTTPS.*') import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) init(autoreset=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S', ) logger = logging.getLogger('predict_combined') # Project imports PROJECT_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(PROJECT_ROOT)) from scripts.feature_extraction.url.url_features_v3 import URLFeatureExtractorOptimized from scripts.feature_extraction.html.html_feature_extractor import HTMLFeatureExtractor from scripts.feature_extraction.html.feature_engineering import engineer_features class CombinedPhishingDetector: """Detect phishing using combined URL + HTML features.""" HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', } def __init__(self): models_dir = PROJECT_ROOT / 'saved_models' # Feature extractors self.url_extractor = URLFeatureExtractorOptimized() self.html_extractor = HTMLFeatureExtractor() # Load combined models self.models = {} self._load_model(models_dir, 'XGBoost Combined', 'xgboost_combined.joblib', 'xgboost_combined_feature_names.joblib') self._load_model(models_dir, 'Random Forest Combined', 'random_forest_combined.joblib', 'random_forest_combined_feature_names.joblib') if not self.models: raise FileNotFoundError( "No combined models found! Train first:\n" " python scripts/merge_url_html_features.py --balance\n" " python models/train_combined_models.py") def _load_model(self, models_dir: Path, name: str, model_file: str, features_file: str): model_path = models_dir / model_file feat_path = models_dir / features_file if model_path.exists(): self.models[name] = { 'model': joblib.load(model_path), 'features': joblib.load(feat_path) if feat_path.exists() else None, } n = len(self.models[name]['features']) if self.models[name]['features'] else '?' logger.info(f"Loaded {name} ({n} features)") def predict(self, url: str) -> dict: """Download HTML, extract features, predict.""" # 1. Extract URL features url_features = self.url_extractor.extract_features(url) url_df = pd.DataFrame([url_features]) url_df = url_df.rename(columns={c: f'url_{c}' for c in url_df.columns}) # 2. Download + extract HTML features html_features = None html_error = None try: resp = requests.get(url, timeout=10, verify=False, headers=self.HEADERS) raw_html_features = self.html_extractor.extract_features(resp.text) raw_df = pd.DataFrame([raw_html_features]) eng_df = engineer_features(raw_df) eng_df = eng_df.rename(columns={c: f'html_{c}' for c in eng_df.columns}) html_features = raw_html_features except Exception as e: html_error = str(e) logger.warning(f"Could not download HTML: {e}") # Create zero-filled HTML features eng_df = pd.DataFrame() # 3. Combine combined_df = pd.concat([url_df, eng_df], axis=1) # 4. Predict with each model predictions = [] for name, data in self.models.items(): model = data['model'] expected = data['features'] if expected: aligned = pd.DataFrame(columns=expected) for f in expected: aligned[f] = combined_df[f].values if f in combined_df.columns else 0 X = aligned.values else: X = combined_df.values proba = model.predict_proba(X)[0] pred = 1 if proba[1] > 0.5 else 0 predictions.append({ 'model_name': name, 'prediction': 'PHISHING' if pred else 'LEGITIMATE', 'confidence': float(proba[pred] * 100), 'phishing_probability': float(proba[1] * 100), 'legitimate_probability': float(proba[0] * 100), }) # Consensus phishing_votes = sum(1 for p in predictions if p['prediction'] == 'PHISHING') total = len(predictions) is_phishing = phishing_votes > total / 2 if phishing_votes == total: consensus = "ALL MODELS AGREE: PHISHING" elif phishing_votes == 0: consensus = "ALL MODELS AGREE: LEGITIMATE" else: consensus = f"MIXED: {phishing_votes}/{total} models say PHISHING" return { 'url': url, 'is_phishing': is_phishing, 'consensus': consensus, 'predictions': predictions, 'url_features': url_features, 'html_features': html_features, 'html_error': html_error, } def print_results(self, result: dict): """Pretty-print results.""" print("\n" + "=" * 80) print(f"{Fore.CYAN}{Style.BRIGHT}COMBINED URL+HTML PHISHING DETECTION{Style.RESET_ALL}") print("=" * 80) print(f"\n{Fore.YELLOW}URL:{Style.RESET_ALL} {result['url']}") if result.get('html_error'): print(f"{Fore.RED}HTML download failed: {result['html_error']}{Style.RESET_ALL}") print(f"{Fore.YELLOW}Using URL features only (HTML features zeroed){Style.RESET_ALL}") # Model predictions print(f"\n{Fore.CYAN}{Style.BRIGHT}MODEL PREDICTIONS:{Style.RESET_ALL}") print("-" * 80) for pred in result['predictions']: is_safe = pred['prediction'] == 'LEGITIMATE' color = Fore.GREEN if is_safe else Fore.RED icon = "✓" if is_safe else "⚠" print(f"\n{Style.BRIGHT}{pred['model_name']}:{Style.RESET_ALL}") print(f" {icon} Prediction: {color}{Style.BRIGHT}{pred['prediction']}{Style.RESET_ALL}") print(f" Confidence: {pred['confidence']:.1f}%") print(f" Phishing: {Fore.RED}{pred['phishing_probability']:6.2f}%{Style.RESET_ALL}") print(f" Legitimate: {Fore.GREEN}{pred['legitimate_probability']:6.2f}%{Style.RESET_ALL}") # Consensus print(f"\n{Fore.CYAN}{Style.BRIGHT}CONSENSUS:{Style.RESET_ALL}") print("-" * 80) if result['is_phishing']: print(f"🚨 {Fore.RED}{Style.BRIGHT}{result['consensus']}{Style.RESET_ALL}") else: print(f"✅ {Fore.GREEN}{Style.BRIGHT}{result['consensus']}{Style.RESET_ALL}") # Key features url_feat = result.get('url_features', {}) html_feat = result.get('html_features', {}) print(f"\n{Fore.CYAN}{Style.BRIGHT}KEY URL FEATURES:{Style.RESET_ALL}") print("-" * 80) url_keys = [ ('Domain Length', url_feat.get('domain_length', 0)), ('Num Subdomains', url_feat.get('num_subdomains', 0)), ('Domain Dots', url_feat.get('domain_dots', 0)), ('Is Shortened', 'Yes' if url_feat.get('is_shortened') else 'No'), ('Is Free Platform', 'Yes' if url_feat.get('is_free_platform') else 'No'), ('Is HTTP', 'Yes' if url_feat.get('is_http') else 'No'), ('Has @ Symbol', 'Yes' if url_feat.get('has_at_symbol') else 'No'), ] for name, val in url_keys: print(f" {name:25s}: {val}") if html_feat: print(f"\n{Fore.CYAN}{Style.BRIGHT}KEY HTML FEATURES:{Style.RESET_ALL}") print("-" * 80) html_keys = [ ('Text Length', html_feat.get('text_length', 0)), ('Num Links', html_feat.get('num_links', 0)), ('Num Forms', html_feat.get('num_forms', 0)), ('Password Fields', html_feat.get('num_password_fields', 0)), ('Has Login Form', 'Yes' if html_feat.get('has_login_form') else 'No'), ('Has Meta Refresh', 'Yes' if html_feat.get('has_meta_refresh') else 'No'), ('Has atob()', 'Yes' if html_feat.get('has_atob') else 'No'), ('External Links', html_feat.get('num_external_links', 0)), ] for name, val in html_keys: print(f" {name:25s}: {val}") print("\n" + "=" * 80 + "\n") def main(): print(f"\n{Fore.CYAN}{Style.BRIGHT}") print("╔══════════════════════════════════════════════════════════════╗") print("║ COMBINED URL+HTML PHISHING DETECTOR ║") print("╚══════════════════════════════════════════════════════════════╝") print(f"{Style.RESET_ALL}") print(f"{Fore.YELLOW}Loading models...{Style.RESET_ALL}") detector = CombinedPhishingDetector() print(f"{Fore.GREEN}✓ Models loaded!{Style.RESET_ALL}\n") # Single URL from command line if len(sys.argv) > 1: url = sys.argv[1] if not url.startswith(('http://', 'https://')): url = 'https://' + url result = detector.predict(url) detector.print_results(result) return # Interactive loop while True: print(f"{Fore.CYAN}{'─' * 80}{Style.RESET_ALL}") url = input(f"{Fore.YELLOW}Enter URL (or 'quit'):{Style.RESET_ALL} ").strip() if url.lower() in ('quit', 'exit', 'q'): print(f"\n{Fore.GREEN}Goodbye!{Style.RESET_ALL}\n") break if not url: continue if not url.startswith(('http://', 'https://')): url = 'https://' + url try: result = detector.predict(url) detector.print_results(result) except Exception as e: print(f"\n{Fore.RED}Error: {e}{Style.RESET_ALL}\n") if __name__ == '__main__': main()