File size: 16,686 Bytes
2cc7f91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""

URL Phishing Detector - Interactive Demo



Test any URL with all trained models and see predictions with confidence scores.

"""

import sys
import pandas as pd
import joblib
from pathlib import Path
from colorama import init, Fore, Style

# Initialize colorama for colored output
init(autoreset=True)
import logging

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)
logger = logging.getLogger("url_predictor")

sys.path.append(str(Path(__file__).parent.parent))
from scripts.feature_extraction.url.url_features_v2 import URLFeatureExtractorV2


class URLPhishingDetector:
    """Detect phishing URLs using trained models."""
    
    def __init__(self):
        """Initialize detector with all models."""
        self.script_dir = Path(__file__).parent.parent
        self.models_dir = (self.script_dir / 'saved_models').resolve()
        
        # Whitelist of trusted domains
        self.trusted_domains = {
            # Tech giants
            'youtube.com', 'facebook.com', 'twitter.com', 'x.com',
            'linkedin.com', 'microsoft.com', 'apple.com', 'amazon.com',
            # Development
            'github.com', 'gitlab.com', 'stackoverflow.com', 'npmjs.com',
            # AI Services
            'claude.ai', 'anthropic.com', 'openai.com', 'chatgpt.com',
            # Education & Info
            'wikipedia.org', 'reddit.com', 'quora.com', 'medium.com',
            # Cloud & Services
            'aws.amazon.com', 'azure.microsoft.com', 'cloud.google.com',
            'vercel.com', 'netlify.com', 'heroku.com',
            # Communication
            'slack.com', 'discord.com', 'zoom.us', 'teams.microsoft.com',
            # Finance (major)
            'paypal.com', 'stripe.com', 'visa.com', 'mastercard.com',
            # E-commerce
            'ebay.com', 'shopify.com', 'etsy.com', 'walmart.com',
        }
        
        # Custom thresholds for each model (reduce false positives)
        self.thresholds = {
            'Logistic Regression': 0.5,  # Standard threshold
            'Random Forest': 0.5,         # Standard threshold  
            'XGBoost': 0.5                # Standard threshold
        }
        
        # Load feature extractor
        self.extractor = URLFeatureExtractorV2()
        
        # Load scaler (only needed for Logistic Regression)
        scaler_path = self.models_dir / 'scaler.joblib'
        if scaler_path.exists():
            self.scaler = joblib.load(scaler_path)
            logger.info("✓ Loaded scaler")
        else:
            self.scaler = None
            logger.warning("✗ Scaler not found (only needed for Logistic Regression)")
        
        # Load all models
        self.models = {}
        self.feature_names = {}
        self._load_models()
    
    def _load_models(self):
        """Load all trained models."""
        model_files = {
            'Logistic Regression': 'logistic_regression.joblib',
            'Random Forest': 'random_forest.joblib',
            'XGBoost': 'xgboost.joblib'
        }
        
        for name, filename in model_files.items():
            model_path = self.models_dir / filename
            if model_path.exists():
                model = joblib.load(model_path)
                self.models[name] = model
                
                # Store expected feature names from model
                if hasattr(model, 'feature_names_in_'):
                    self.feature_names[name] = list(model.feature_names_in_)
                    logger.info(f"✓ Loaded {name} ({len(self.feature_names[name])} features)")
                elif self.scaler is not None and hasattr(self.scaler, 'feature_names_in_'):
                    # Use scaler's feature names for models without them (like Logistic Regression)
                    self.feature_names[name] = list(self.scaler.feature_names_in_)
                    logger.info(f"✓ Loaded {name} (using scaler features: {len(self.feature_names[name])} features)")
                else:
                    logger.info(f"✓ Loaded {name}")
            else:
                logger.warning(f"✗ Model not found: {filename}")
    
    def predict_url(self, url: str) -> dict:
        """

        Predict if URL is phishing or legitimate.

        

        Args:

            url: URL string to analyze

            

        Returns:

            Dictionary with predictions from all models

        """
        # Check if domain is in whitelist
        from urllib.parse import urlparse
        parsed = urlparse(url)
        domain = parsed.netloc.lower().replace('www.', '')
        
        # If trusted domain, override predictions
        is_whitelisted = any(domain.endswith(trusted) for trusted in self.trusted_domains)
        
        # Extract features
        features_dict = self.extractor.extract_features(url)
        
        # Convert to DataFrame (excluding label)
        features_df = pd.DataFrame([features_dict])
        if 'label' in features_df.columns:
            features_df = features_df.drop('label', axis=1)
        
        # Get predictions from all models
        results = {}
        for model_name, model in self.models.items():
            # Override for whitelisted domains
            if is_whitelisted:
                results[model_name] = {
                    'prediction': 'LEGITIMATE',
                    'prediction_code': 0,
                    'confidence': 99.99,
                    'phishing_probability': 0.01,
                    'legitimate_probability': 99.99,
                    'whitelisted': True
                }
                continue
            
            # Align features with model's expected features
            if model_name in self.feature_names:
                expected_features = self.feature_names[model_name]
                # Create aligned DataFrame with correct column order
                features_aligned = pd.DataFrame(columns=expected_features)
                for feat in expected_features:
                    if feat in features_df.columns:
                        features_aligned[feat] = features_df[feat].values
                    else:
                        features_aligned[feat] = 0  # Fill missing features with 0
                # Convert to numpy to avoid sklearn feature name validation
                features_to_predict = features_aligned.values
            else:
                # Fallback: use any stored feature names from other models
                if self.feature_names:
                    expected_features = list(self.feature_names.values())[0]
                    features_aligned = pd.DataFrame(columns=expected_features)
                    for feat in expected_features:
                        if feat in features_df.columns:
                            features_aligned[feat] = features_df[feat].values
                        else:
                            features_aligned[feat] = 0
                    features_to_predict = features_aligned.values
                else:
                    features_to_predict = features_df.values
            
            # Scale features only for Logistic Regression
            if model_name == 'Logistic Regression' and self.scaler is not None:
                features_to_use = self.scaler.transform(features_to_predict)
            else:
                features_to_use = features_to_predict
            
            # Get probability/confidence (features are already numpy arrays)
            if hasattr(model, 'predict_proba'):
                probabilities = model.predict_proba(features_to_use)[0]
                phishing_prob = probabilities[1] * 100
                legitimate_prob = probabilities[0] * 100
                
                # Apply custom threshold
                threshold = self.thresholds.get(model_name, 0.5)
                prediction = 1 if probabilities[1] > threshold else 0
                confidence = probabilities[prediction] * 100
            else:
                # For models without predict_proba (fallback)
                prediction = model.predict(features_to_use)[0]
                confidence = 100.0
                phishing_prob = 100.0 if prediction == 1 else 0.0
                legitimate_prob = 0.0 if prediction == 1 else 100.0
            
            results[model_name] = {
                'prediction': 'PHISHING' if prediction == 1 else 'LEGITIMATE',
                'prediction_code': int(prediction),
                'confidence': confidence,
                'phishing_probability': phishing_prob,
                'legitimate_probability': legitimate_prob,
                'whitelisted': False,
                'threshold': self.thresholds.get(model_name, 0.5)
            }
        
        return results, features_dict # type: ignore
    
    def print_results(self, url: str, results: dict, features: dict):
        """Print formatted results."""
        print("\n" + "=" * 80)
        print(f"{Fore.CYAN}{Style.BRIGHT}URL PHISHING DETECTION RESULTS{Style.RESET_ALL}")
        print("=" * 80)
        
        # Print URL
        print(f"\n{Fore.YELLOW}URL:{Style.RESET_ALL} {url}")
        
        # Print model predictions
        print(f"\n{Fore.CYAN}{Style.BRIGHT}MODEL PREDICTIONS:{Style.RESET_ALL}")
        print("-" * 80)
        
        for model_name, result in results.items():
            prediction = result['prediction']
            confidence = result['confidence']
            phishing_prob = result['phishing_probability']
            legitimate_prob = result['legitimate_probability']
            threshold = result.get('threshold', 0.5)
            
            # Color based on prediction
            if prediction == 'PHISHING':
                color = Fore.RED
                icon = "⚠️"
            else:
                color = Fore.GREEN
                icon = "✓"
            
            print(f"\n{Style.BRIGHT}{model_name}:{Style.RESET_ALL}")
            print(f"  {icon} Prediction: {color}{Style.BRIGHT}{prediction}{Style.RESET_ALL}")
            
            # Show if whitelisted
            if result.get('whitelisted', False):
                print(f"  {Fore.CYAN}ℹ️  Trusted domain (whitelisted){Style.RESET_ALL}")
            else:
                print(f"  Decision Threshold: {threshold*100:.0f}%")
            
            print(f"  Confidence: {confidence:.2f}%")
            print(f"  Probabilities:")
            print(f"    • Phishing:   {Fore.RED}{phishing_prob:6.2f}%{Style.RESET_ALL}")
            print(f"    • Legitimate: {Fore.GREEN}{legitimate_prob:6.2f}%{Style.RESET_ALL}")
        
        # Consensus
        print(f"\n{Fore.CYAN}{Style.BRIGHT}CONSENSUS:{Style.RESET_ALL}")
        print("-" * 80)
        
        phishing_votes = sum(1 for r in results.values() if r['prediction'] == 'PHISHING')
        total_models = len(results)
        
        if phishing_votes == total_models:
            consensus_color = Fore.RED
            consensus_icon = "🚨"
            consensus_text = "ALL MODELS AGREE: PHISHING"
        elif phishing_votes == 0:
            consensus_color = Fore.GREEN
            consensus_icon = "✅"
            consensus_text = "ALL MODELS AGREE: LEGITIMATE"
        else:
            consensus_color = Fore.YELLOW
            consensus_icon = "⚠️"
            consensus_text = f"MIXED RESULTS: {phishing_votes}/{total_models} models say PHISHING"
        
        print(f"{consensus_icon} {consensus_color}{Style.BRIGHT}{consensus_text}{Style.RESET_ALL}")
        
        # Key features (based on top features from models)
        print(f"\n{Fore.CYAN}{Style.BRIGHT}TOP FEATURES (Model Importance):{Style.RESET_ALL}")
        print("-" * 80)
        
        # Top features from Random Forest and XGBoost analysis
        top_features = [
            ('Num Domain Parts', features.get('num_domain_parts', 0), None),
            ('Domain Dots', features.get('domain_dots', 0), None),
            ('URL Shortener', '✓ Yes' if features.get('is_shortened', 0) == 1 else '✗ No',
             features.get('is_shortened', 0)),
            ('Num Subdomains', features.get('num_subdomains', 0), None),
            ('Domain Hyphens', features.get('domain_hyphens', 0), None),
            ('Free Platform', '✓ Yes' if features.get('is_free_platform', 0) == 1 else '✗ No',
             features.get('is_free_platform', 0)),
            ('Free Hosting', '✓ Yes' if features.get('is_free_hosting', 0) == 1 else '✗ No',
             features.get('is_free_hosting', 0)),
            ('Platform Subdomain Len', features.get('platform_subdomain_length', 0), None),
            ('Avg Domain Part Len', f"{features.get('avg_domain_part_len', 0):.2f}", None),
            ('Domain Length Category', features.get('domain_length_category', 0), None),
            ('Path Digits', features.get('path_digits', 0), None),
            ('Is HTTP', '✓ Yes' if features.get('is_http', 0) == 1 else '✗ No',
             features.get('is_http', 0)),
            ('Multiple Brands in URL', '✓ Yes' if features.get('multiple_brands_in_url', 0) == 1 else '✗ No',
             features.get('multiple_brands_in_url', 0)),
            ('Brand in Path', '✓ Yes' if features.get('brand_in_path', 0) == 1 else '✗ No',
             features.get('brand_in_path', 0)),
            ('Path Slashes', features.get('path_slashes', 0), None),
            ('Encoding Diff', f"{features.get('encoding_diff', 0):.3f}", None),
            ('Symbol Ratio (Domain)', f"{features.get('symbol_ratio_domain', 0):.3f}", None),
            ('Domain Length', features.get('domain_length', 0), None),
            ('Has @ Symbol', '✓ Yes' if features.get('has_at_symbol', 0) == 1 else '✗ No',
             features.get('has_at_symbol', 0)),
            ('TLD Length', features.get('tld_length', 0), None),
        ]
        
        for feature_name, value, risk_flag in top_features:
            # Color code risky features
            if risk_flag is not None:
                if risk_flag == 1:  # Risky feature is present
                    value_display = f"{Fore.RED}{value}{Style.RESET_ALL}"
                else:
                    value_display = f"{Fore.GREEN}{value}{Style.RESET_ALL}"
            else:
                value_display = str(value)
            
            print(f"  • {feature_name:25s}: {value_display}")
        
        print("\n" + "=" * 80 + "\n")


def main():
    """Main interactive function."""
    print(f"\n{Fore.CYAN}{Style.BRIGHT}╔══════════════════════════════════════════════════════════════╗")
    print(f"║          URL PHISHING DETECTOR - INTERACTIVE DEMO            ║")
    print(f"╚══════════════════════════════════════════════════════════════╝{Style.RESET_ALL}\n")
    
    # Initialize detector
    print(f"{Fore.YELLOW}Loading models...{Style.RESET_ALL}")
    detector = URLPhishingDetector()
    print(f"{Fore.GREEN}✓ All models loaded successfully!{Style.RESET_ALL}\n")
    
    # Interactive loop
    while True:
        print(f"{Fore.CYAN}{'─' * 80}{Style.RESET_ALL}")
        url = input(f"{Fore.YELLOW}Enter URL to test (or 'quit' to exit):{Style.RESET_ALL} ").strip()
        
        if url.lower() in ['quit', 'exit', 'q']:
            print(f"\n{Fore.GREEN}Thank you for using URL Phishing Detector!{Style.RESET_ALL}\n")
            break
        
        if not url:
            print(f"{Fore.RED}Please enter a valid URL{Style.RESET_ALL}\n")
            continue
        
        # Add http:// if no scheme
        if not url.startswith(('http://', 'https://')):
            url = 'http://' + url
        
        try:
            # Get predictions
            results, features = detector.predict_url(url)
            
            # Print results
            detector.print_results(url, results, features)
            
        except Exception as e:
            print(f"\n{Fore.RED}Error analyzing URL: {str(e)}{Style.RESET_ALL}\n")
            logger.error(f"Error: {str(e)}")


if __name__ == "__main__":
    main()