Spaces:
Runtime error
Runtime error
| """ | |
| Phishing Detection API Server | |
| FastAPI server combining URL and HTML phishing detection | |
| """ | |
| import os | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
| os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
| import sys | |
| from pathlib import Path | |
| from typing import Optional | |
| import warnings | |
| # Suppress warnings before importing other libraries | |
| warnings.filterwarnings('ignore', category=UserWarning) | |
| warnings.filterwarnings('ignore', message='.*XGBoost.*') | |
| warnings.filterwarnings('ignore', message='.*Unverified HTTPS.*') | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import json | |
| import joblib | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| from urllib.parse import urlparse | |
| import logging | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # Add parent directory to path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| # Use OPTIMIZED URL feature extractor with normalization | |
| from scripts.feature_extraction.url.url_features_v3 import URLFeatureExtractorOptimized | |
| from scripts.feature_extraction.html.html_feature_extractor import HTMLFeatureExtractor | |
| from scripts.feature_extraction.html.feature_engineering import engineer_features | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Helper function to convert numpy/pandas types to Python native types | |
| def convert_to_json_serializable(obj): | |
| """Convert numpy/pandas types to JSON-serializable Python types""" | |
| if isinstance(obj, dict): | |
| return {key: convert_to_json_serializable(value) for key, value in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_to_json_serializable(item) for item in obj] | |
| elif isinstance(obj, (np.integer, np.int64, np.int32)): # type: ignore | |
| return int(obj) | |
| elif isinstance(obj, (np.floating, np.float64, np.float32)): # type: ignore | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return convert_to_json_serializable(obj.tolist()) | |
| elif isinstance(obj, (pd.Series, pd.DataFrame)): | |
| return convert_to_json_serializable(obj.to_dict()) | |
| elif isinstance(obj, np.bool_): | |
| return bool(obj) | |
| else: | |
| return obj | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Phishing Detection API", | |
| description="API for detecting phishing URLs and HTML content", | |
| version="1.0.0" | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files | |
| static_dir = Path(__file__).parent / 'static' | |
| app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") | |
| # Request models | |
| class URLRequest(BaseModel): | |
| url: str | |
| class HTMLRequest(BaseModel): | |
| html_content: str | |
| url: Optional[str] = None | |
| # Response models | |
| class PredictionResult(BaseModel): | |
| model_name: str | |
| prediction: str | |
| confidence: float | |
| phishing_probability: float | |
| legitimate_probability: float | |
| class URLPredictionResponse(BaseModel): | |
| url: str | |
| is_phishing: bool | |
| consensus: str | |
| predictions: list[PredictionResult] | |
| features: dict | |
| class HTMLPredictionResponse(BaseModel): | |
| source: str | |
| is_phishing: bool | |
| consensus: str | |
| predictions: list[PredictionResult] | |
| features: dict | |
| class PhishingDetectorService: | |
| """Singleton service for phishing detection with pre-loaded models.""" | |
| _instance = None | |
| _initialized = False | |
| TRUSTED_DOMAINS = frozenset({ | |
| 'youtube.com', 'facebook.com', 'twitter.com', 'x.com', | |
| 'linkedin.com', 'microsoft.com', 'apple.com', 'amazon.com', | |
| 'github.com', 'gitlab.com', 'stackoverflow.com', | |
| 'claude.ai', 'anthropic.com', 'openai.com', 'chatgpt.com', | |
| 'wikipedia.org', 'reddit.com', 'instagram.com', 'whatsapp.com', | |
| }) | |
| DEFAULT_THRESHOLD = 0.5 | |
| HTML_DOWNLOAD_HEADERS = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| logger.info("Initializing Phishing Detector Service...") | |
| self.models_dir = Path(__file__).parent.parent / 'saved_models' | |
| # Initialize extractors | |
| self.url_extractor = URLFeatureExtractorOptimized() | |
| self.html_extractor = HTMLFeatureExtractor() | |
| # Load models | |
| self.url_models = {} | |
| self.url_feature_names = {} | |
| self.scaler = None | |
| self._load_url_models() | |
| self.html_models = {} | |
| self._load_html_models() | |
| self.combined_models = {} | |
| self._load_combined_models() | |
| # CNN models | |
| self.cnn_url_model = None | |
| self.cnn_url_vocab = None | |
| self.cnn_html_model = None | |
| self.cnn_html_vocab = None | |
| self._load_cnn_url_model() | |
| self._load_cnn_html_model() | |
| self._initialized = True | |
| logger.info("✓ Service initialized successfully") | |
| def _load_url_models(self): | |
| """Load URL prediction models""" | |
| # Load scaler | |
| scaler_path = self.models_dir / 'scaler.joblib' | |
| if scaler_path.exists(): | |
| self.scaler = joblib.load(scaler_path) | |
| logger.info("✓ Loaded scaler for URL models") | |
| # Load models | |
| url_model_files = { | |
| 'Logistic Regression': 'logistic_regression.joblib', | |
| 'Random Forest': 'random_forest.joblib', | |
| 'XGBoost': 'xgboost.joblib' | |
| } | |
| for name, filename in url_model_files.items(): | |
| model_path = self.models_dir / filename | |
| if model_path.exists(): | |
| model = joblib.load(model_path) | |
| self.url_models[name] = model | |
| # Store expected feature names from model | |
| if hasattr(model, 'feature_names_in_'): | |
| self.url_feature_names[name] = list(model.feature_names_in_) | |
| logger.info(f"✓ Loaded URL model: {name} ({len(self.url_feature_names[name])} features)") | |
| elif self.scaler and hasattr(self.scaler, 'feature_names_in_'): | |
| # Use scaler's feature names for models without them (like Logistic Regression) | |
| self.url_feature_names[name] = list(self.scaler.feature_names_in_) | |
| logger.info(f"✓ Loaded URL model: {name} (using scaler features: {len(self.url_feature_names[name])} features)") | |
| else: | |
| logger.info(f"✓ Loaded URL model: {name}") | |
| def _load_html_models(self): | |
| """Load HTML prediction models.""" | |
| html_model_files = { | |
| 'Random Forest': ('random_forest_html.joblib', 'random_forest_html_feature_names.joblib'), | |
| 'XGBoost': ('xgboost_html.joblib', 'xgboost_html_feature_names.joblib'), | |
| } | |
| for name, (model_file, features_file) in html_model_files.items(): | |
| model_path = self.models_dir / model_file | |
| features_path = self.models_dir / features_file | |
| if model_path.exists(): | |
| self.html_models[name] = { | |
| 'model': joblib.load(model_path), | |
| 'features': joblib.load(features_path) if features_path.exists() else None, | |
| } | |
| logger.info(f"✓ Loaded HTML model: {name}") | |
| def _load_combined_models(self): | |
| """Load combined URL+HTML prediction models.""" | |
| combined_model_files = { | |
| 'Random Forest Combined': ('random_forest_combined.joblib', 'random_forest_combined_feature_names.joblib'), | |
| 'XGBoost Combined': ('xgboost_combined.joblib', 'xgboost_combined_feature_names.joblib'), | |
| } | |
| for name, (model_file, features_file) in combined_model_files.items(): | |
| model_path = self.models_dir / model_file | |
| features_path = self.models_dir / features_file | |
| if model_path.exists(): | |
| self.combined_models[name] = { | |
| 'model': joblib.load(model_path), | |
| 'features': joblib.load(features_path) if features_path.exists() else None, | |
| } | |
| n = len(self.combined_models[name]['features']) if self.combined_models[name]['features'] else '?' | |
| logger.info(f"✓ Loaded combined model: {name} ({n} features)") | |
| def _load_cnn_url_model(self): | |
| """Load character-level CNN URL model and vocabulary.""" | |
| model_path = self.models_dir / 'cnn_url_model.keras' | |
| vocab_path = self.models_dir / 'cnn_url_vocab.json' | |
| if not model_path.exists(): | |
| logger.warning(f"✗ CNN URL model not found: {model_path}") | |
| return | |
| if not vocab_path.exists(): | |
| logger.warning(f"✗ CNN URL vocabulary not found: {vocab_path}") | |
| return | |
| try: | |
| import tensorflow as tf | |
| self.cnn_url_model = tf.keras.models.load_model(str(model_path)) | |
| with open(vocab_path, 'r') as f: | |
| self.cnn_url_vocab = json.load(f) | |
| logger.info(f"✓ Loaded CNN URL model (vocab_size={self.cnn_url_vocab['vocab_size']}, max_len={self.cnn_url_vocab['max_len']})") | |
| except Exception as e: | |
| logger.warning(f"✗ Failed to load CNN URL model: {e}") | |
| self.cnn_url_model = None | |
| self.cnn_url_vocab = None | |
| def _load_cnn_html_model(self): | |
| """Load character-level CNN HTML model and vocabulary.""" | |
| model_path = self.models_dir / 'cnn_html_model.keras' | |
| vocab_path = self.models_dir / 'cnn_html_vocab.json' | |
| if not model_path.exists(): | |
| logger.warning(f"✗ CNN HTML model not found: {model_path}") | |
| return | |
| if not vocab_path.exists(): | |
| logger.warning(f"✗ CNN HTML vocabulary not found: {vocab_path}") | |
| return | |
| try: | |
| import tensorflow as tf | |
| self.cnn_html_model = tf.keras.models.load_model(str(model_path)) | |
| with open(vocab_path, 'r') as f: | |
| self.cnn_html_vocab = json.load(f) | |
| logger.info(f"✓ Loaded CNN HTML model (vocab_size={self.cnn_html_vocab['vocab_size']}, max_len={self.cnn_html_vocab['max_len']})") | |
| except Exception as e: | |
| logger.warning(f"✗ Failed to load CNN HTML model: {e}") | |
| self.cnn_html_model = None | |
| self.cnn_html_vocab = None | |
| def _encode_for_cnn(self, text: str, vocab: dict) -> np.ndarray: | |
| """Encode text to a padded integer sequence for a CNN model.""" | |
| char_to_idx = vocab['char_to_idx'] | |
| max_len = vocab['max_len'] | |
| PAD_IDX = 0 | |
| UNK_IDX = 1 | |
| encoded = [char_to_idx.get(c, UNK_IDX) for c in text[:max_len]] | |
| encoded += [PAD_IDX] * (max_len - len(encoded)) | |
| return np.array([encoded], dtype=np.int32) | |
| # ── Shared helpers ───────────────────────────────────────────── | |
| def _calculate_consensus(predictions: list[dict]) -> tuple[bool, str]: | |
| """Return (is_phishing, consensus_text) from a list of prediction dicts.""" | |
| total = len(predictions) | |
| phishing_votes = sum(1 for p in predictions if p['prediction'] == 'PHISHING') | |
| is_phishing = phishing_votes > total / 2 | |
| if phishing_votes == total: | |
| consensus = "ALL MODELS AGREE: PHISHING" | |
| elif phishing_votes == 0: | |
| consensus = "ALL MODELS AGREE: LEGITIMATE" | |
| else: | |
| consensus = f"MIXED: {phishing_votes}/{total} models say PHISHING" | |
| return is_phishing, consensus | |
| def _align_features(self, features_df: pd.DataFrame, model_name: str) -> np.ndarray: | |
| """Align extracted features to a model's expected feature order.""" | |
| expected = self.url_feature_names.get(model_name) | |
| if expected is None and self.url_feature_names: | |
| expected = next(iter(self.url_feature_names.values())) | |
| if expected is not None: | |
| aligned = pd.DataFrame(columns=expected) | |
| for feat in expected: | |
| aligned[feat] = features_df[feat].values if feat in features_df.columns else 0 | |
| return aligned.values | |
| return features_df.values | |
| def _build_prediction(model_name: str, model, features: np.ndarray, threshold: float = 0.5) -> dict: | |
| """Run a single model and return a standardised prediction dict.""" | |
| if hasattr(model, 'predict_proba'): | |
| probabilities = model.predict_proba(features)[0] | |
| pred = 1 if probabilities[1] > threshold else 0 | |
| confidence = probabilities[pred] * 100 | |
| phishing_prob = probabilities[1] * 100 | |
| legitimate_prob = probabilities[0] * 100 | |
| else: | |
| pred = model.predict(features)[0] | |
| confidence = 100.0 | |
| phishing_prob = 100.0 if pred == 1 else 0.0 | |
| legitimate_prob = 0.0 if pred == 1 else 100.0 | |
| return { | |
| 'model_name': model_name, | |
| 'prediction': 'PHISHING' if pred == 1 else 'LEGITIMATE', | |
| 'confidence': confidence, | |
| 'phishing_probability': phishing_prob, | |
| 'legitimate_probability': legitimate_prob, | |
| } | |
| def _whitelisted_prediction(model_name: str) -> dict: | |
| """Return a pre-built LEGITIMATE prediction for whitelisted domains.""" | |
| return { | |
| 'model_name': model_name, | |
| 'prediction': 'LEGITIMATE', | |
| 'confidence': 99.99, | |
| 'phishing_probability': 0.01, | |
| 'legitimate_probability': 99.99, | |
| } | |
| # ── URL prediction ──────────────────────────────────────────── | |
| def predict_url(self, url: str) -> dict: | |
| """Predict if a URL is phishing using all URL models.""" | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace('www.', '') | |
| is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS) | |
| # Extract features | |
| features_dict = self.url_extractor.extract_features(url) | |
| features_df = pd.DataFrame([features_dict]).drop(columns=['label'], errors='ignore') | |
| # Get predictions from each URL model | |
| predictions = [] | |
| for model_name, model in self.url_models.items(): | |
| if is_whitelisted: | |
| predictions.append(self._whitelisted_prediction(model_name)) | |
| continue | |
| aligned = self._align_features(features_df, model_name) | |
| if model_name == 'Logistic Regression' and self.scaler: | |
| aligned = self.scaler.transform(aligned) | |
| predictions.append( | |
| self._build_prediction(model_name, model, aligned, self.DEFAULT_THRESHOLD) | |
| ) | |
| is_phishing, consensus = self._calculate_consensus(predictions) | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'consensus': consensus, | |
| 'predictions': predictions, | |
| 'features': features_dict, | |
| } | |
| # ── HTML prediction ─────────────────────────────────────────── | |
| def predict_html(self, html_content: str, source: str = "") -> dict: | |
| """Predict if HTML content is phishing using all HTML models.""" | |
| features = self.html_extractor.extract_features(html_content) | |
| engineered_df = engineer_features(pd.DataFrame([features])) | |
| predictions = [] | |
| for model_name, model_data in self.html_models.items(): | |
| model = model_data['model'] | |
| feature_names = model_data['features'] | |
| if feature_names: | |
| feature_list = list(feature_names) | |
| feature_values = [ | |
| engineered_df[f].iloc[0] if f in engineered_df.columns else features.get(f, 0) | |
| for f in feature_list | |
| ] | |
| X = np.array([feature_values]) | |
| else: | |
| X = engineered_df.values | |
| predictions.append(self._build_prediction(model_name, model, X)) | |
| is_phishing, consensus = self._calculate_consensus(predictions) | |
| return { | |
| 'source': source or 'HTML Content', | |
| 'is_phishing': is_phishing, | |
| 'consensus': consensus, | |
| 'predictions': predictions, | |
| 'features': features, | |
| } | |
| # ── Full scan (URL + HTML) ───────────────────────────────────── | |
| def predict_from_url(self, url: str) -> dict: | |
| """Download HTML from URL and analyse both URL and HTML.""" | |
| url_result = self.predict_url(url) | |
| try: | |
| resp = requests.get(url, timeout=10, verify=False, headers=self.HTML_DOWNLOAD_HEADERS) | |
| html_result = self.predict_html(resp.text, source=url) | |
| all_predictions = url_result['predictions'] + html_result['predictions'] | |
| is_phishing, consensus = self._calculate_consensus(all_predictions) | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'url_analysis': url_result, | |
| 'html_analysis': html_result, | |
| 'combined_consensus': consensus, | |
| } | |
| except Exception as e: | |
| logger.warning(f"Could not download HTML: {e}") | |
| return { | |
| 'url': url, | |
| 'is_phishing': url_result['is_phishing'], | |
| 'url_analysis': url_result, | |
| 'html_analysis': None, | |
| 'error': str(e), | |
| } | |
| # ── CNN prediction ───────────────────────────────────────────── | |
| def predict_cnn(self, url: str, html_content: str | None = None) -> dict: | |
| """Predict using both character-level CNN models (URL + HTML).""" | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace('www.', '') | |
| is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS) | |
| predictions = [] | |
| # CNN URL model | |
| if self.cnn_url_model is not None and self.cnn_url_vocab is not None: | |
| if is_whitelisted: | |
| predictions.append(self._whitelisted_prediction('CNN URL (Char-level)')) | |
| else: | |
| X = self._encode_for_cnn(url, self.cnn_url_vocab) | |
| phishing_prob = float(self.cnn_url_model.predict(X, verbose=0)[0][0]) | |
| legitimate_prob = 1.0 - phishing_prob | |
| is_phishing_pred = phishing_prob >= self.DEFAULT_THRESHOLD | |
| confidence = (phishing_prob if is_phishing_pred else legitimate_prob) * 100 | |
| predictions.append({ | |
| 'model_name': 'CNN URL (Char-level)', | |
| 'prediction': 'PHISHING' if is_phishing_pred else 'LEGITIMATE', | |
| 'confidence': confidence, | |
| 'phishing_probability': phishing_prob * 100, | |
| 'legitimate_probability': legitimate_prob * 100, | |
| }) | |
| # CNN HTML model | |
| if self.cnn_html_model is not None and self.cnn_html_vocab is not None and html_content: | |
| if is_whitelisted: | |
| predictions.append(self._whitelisted_prediction('CNN HTML (Char-level)')) | |
| else: | |
| X = self._encode_for_cnn(html_content, self.cnn_html_vocab) | |
| phishing_prob = float(self.cnn_html_model.predict(X, verbose=0)[0][0]) | |
| legitimate_prob = 1.0 - phishing_prob | |
| is_phishing_pred = phishing_prob >= self.DEFAULT_THRESHOLD | |
| confidence = (phishing_prob if is_phishing_pred else legitimate_prob) * 100 | |
| predictions.append({ | |
| 'model_name': 'CNN HTML (Char-level)', | |
| 'prediction': 'PHISHING' if is_phishing_pred else 'LEGITIMATE', | |
| 'confidence': confidence, | |
| 'phishing_probability': phishing_prob * 100, | |
| 'legitimate_probability': legitimate_prob * 100, | |
| }) | |
| if not predictions: | |
| raise RuntimeError("No CNN models are loaded") | |
| is_phishing, consensus = self._calculate_consensus(predictions) | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'consensus': consensus, | |
| 'predictions': predictions, | |
| 'features': {}, | |
| } | |
| # ── Combined prediction ──────────────────────────────────────── | |
| def predict_combined(self, url: str) -> dict: | |
| """Predict using combined URL+HTML models (single ensemble).""" | |
| if not self.combined_models: | |
| raise RuntimeError("No combined models loaded") | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace('www.', '') | |
| is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS) | |
| # Extract URL features | |
| url_features = self.url_extractor.extract_features(url) | |
| url_df = pd.DataFrame([url_features]).drop(columns=['label'], errors='ignore') | |
| url_df = url_df.rename(columns={c: f'url_{c}' for c in url_df.columns}) | |
| # Download + extract HTML features | |
| html_features = {} | |
| html_error = None | |
| eng_df = pd.DataFrame() | |
| try: | |
| resp = requests.get(url, timeout=10, verify=False, headers=self.HTML_DOWNLOAD_HEADERS) | |
| html_features = self.html_extractor.extract_features(resp.text) | |
| raw_df = pd.DataFrame([html_features]) | |
| eng_df = engineer_features(raw_df) | |
| eng_df = eng_df.rename(columns={c: f'html_{c}' for c in eng_df.columns}) | |
| except Exception as e: | |
| html_error = str(e) | |
| logger.warning(f"Combined: could not download HTML: {e}") | |
| # Combine features | |
| combined_df = pd.concat([url_df, eng_df], axis=1) | |
| # Predict | |
| predictions = [] | |
| for model_name, model_data in self.combined_models.items(): | |
| if is_whitelisted: | |
| predictions.append(self._whitelisted_prediction(model_name)) | |
| continue | |
| model = model_data['model'] | |
| expected = model_data['features'] | |
| if expected: | |
| feature_list = list(expected) | |
| aligned = pd.DataFrame(columns=feature_list) | |
| for f in feature_list: | |
| aligned[f] = combined_df[f].values if f in combined_df.columns else 0 | |
| X = aligned.values | |
| else: | |
| X = combined_df.values | |
| predictions.append( | |
| self._build_prediction(model_name, model, X, self.DEFAULT_THRESHOLD) | |
| ) | |
| is_phishing, consensus = self._calculate_consensus(predictions) | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'consensus': consensus, | |
| 'predictions': predictions, | |
| 'url_features': url_features, | |
| 'html_features': html_features, | |
| 'html_error': html_error, | |
| } | |
| # ── Unified all-models prediction ────────────────────────────── | |
| def predict_all(self, url: str) -> dict: | |
| """Run ALL models on a URL and return categorised results.""" | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace('www.', '') | |
| is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS) | |
| # ── 1. URL feature-based models ─────────────────────────── | |
| url_result = self.predict_url(url) | |
| # ── 2. Download HTML (shared across HTML/combined/CNN-HTML) ─ | |
| html_content = None | |
| html_error = None | |
| try: | |
| resp = requests.get(url, timeout=10, verify=False, headers=self.HTML_DOWNLOAD_HEADERS) | |
| html_content = resp.text | |
| except Exception as e: | |
| html_error = str(e) | |
| logger.warning(f"predict_all: could not download HTML: {e}") | |
| # ── 3. HTML feature-based models ───────────────────────── | |
| html_result = None | |
| if html_content and self.html_models: | |
| html_result = self.predict_html(html_content, source=url) | |
| # ── 4. Combined URL+HTML feature-based models ──────────── | |
| combined_result = None | |
| if self.combined_models: | |
| try: | |
| combined_result = self._predict_combined_with_html(url, html_content, is_whitelisted) | |
| except Exception as e: | |
| logger.warning(f"predict_all: combined prediction failed: {e}") | |
| # ── 5. CNN models (URL + HTML) ─────────────────────────── | |
| cnn_result = None | |
| if self.cnn_url_model is not None or self.cnn_html_model is not None: | |
| try: | |
| cnn_result = self.predict_cnn(url, html_content) | |
| except Exception as e: | |
| logger.warning(f"predict_all: CNN prediction failed: {e}") | |
| # ── Aggregate consensus ────────────────────────────────── | |
| all_predictions = [] | |
| if url_result: | |
| all_predictions.extend(url_result.get('predictions', [])) | |
| if html_result: | |
| all_predictions.extend(html_result.get('predictions', [])) | |
| if combined_result: | |
| all_predictions.extend(combined_result.get('predictions', [])) | |
| if cnn_result: | |
| all_predictions.extend(cnn_result.get('predictions', [])) | |
| is_phishing, consensus = self._calculate_consensus(all_predictions) if all_predictions else (False, "No models available") | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'overall_consensus': consensus, | |
| 'url_models': url_result, | |
| 'html_models': html_result, | |
| 'combined_models': combined_result, | |
| 'cnn_models': cnn_result, | |
| 'html_error': html_error, | |
| } | |
| def _predict_combined_with_html(self, url: str, html_content: str | None, is_whitelisted: bool) -> dict: | |
| """Predict using combined models, optionally with pre-fetched HTML.""" | |
| # Extract URL features | |
| url_features = self.url_extractor.extract_features(url) | |
| url_df = pd.DataFrame([url_features]).drop(columns=['label'], errors='ignore') | |
| url_df = url_df.rename(columns={c: f'url_{c}' for c in url_df.columns}) | |
| # HTML features | |
| html_features = {} | |
| html_error = None | |
| eng_df = pd.DataFrame() | |
| if html_content: | |
| try: | |
| html_features = self.html_extractor.extract_features(html_content) | |
| raw_df = pd.DataFrame([html_features]) | |
| eng_df = engineer_features(raw_df) | |
| eng_df = eng_df.rename(columns={c: f'html_{c}' for c in eng_df.columns}) | |
| except Exception as e: | |
| html_error = str(e) | |
| # Combine | |
| combined_df = pd.concat([url_df, eng_df], axis=1) | |
| # Predict | |
| predictions = [] | |
| for model_name, model_data in self.combined_models.items(): | |
| if is_whitelisted: | |
| predictions.append(self._whitelisted_prediction(model_name)) | |
| continue | |
| model = model_data['model'] | |
| expected = model_data['features'] | |
| if expected: | |
| feature_list = list(expected) | |
| aligned = pd.DataFrame(columns=feature_list) | |
| for f in feature_list: | |
| aligned[f] = combined_df[f].values if f in combined_df.columns else 0 | |
| X = aligned.values | |
| else: | |
| X = combined_df.values | |
| predictions.append( | |
| self._build_prediction(model_name, model, X, self.DEFAULT_THRESHOLD) | |
| ) | |
| is_phishing, consensus_text = self._calculate_consensus(predictions) | |
| return { | |
| 'url': url, | |
| 'is_phishing': is_phishing, | |
| 'consensus': consensus_text, | |
| 'predictions': predictions, | |
| 'url_features': url_features, | |
| 'html_features': html_features, | |
| 'html_error': html_error, | |
| } | |
| # Initialize service (singleton) | |
| detector = PhishingDetectorService() | |
| # ── Helpers ─────────────────────────────────────────────────────── | |
| def _serve_static_html(filename: str, cache: bool = False) -> HTMLResponse: | |
| """Return an HTMLResponse for a file inside static/, or 404.""" | |
| path = Path(__file__).parent / 'static' / filename | |
| if not path.exists(): | |
| return HTMLResponse(content="<h1>Page not found</h1>", status_code=404) | |
| headers = {"Cache-Control": "public, max-age=86400"} if cache else None | |
| return HTMLResponse(content=path.read_text(encoding='utf-8'), headers=headers) | |
| # ── API Endpoints ───────────────────────────────────────────────── | |
| async def root(): | |
| """Serve the main web interface.""" | |
| return _serve_static_html('index.html') | |
| async def models_page(): | |
| """Serve the model details page.""" | |
| return _serve_static_html('models.html', cache=True) | |
| async def _safe_predict(label: str, fn, *args) -> JSONResponse: | |
| """Run a prediction function with uniform error handling.""" | |
| try: | |
| return JSONResponse(content=convert_to_json_serializable(fn(*args))) | |
| except Exception as e: | |
| logger.error(f"Error in {label}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def predict_url(request: URLRequest): | |
| """Predict if URL is phishing.""" | |
| return await _safe_predict("predict_url", detector.predict_url, request.url) | |
| async def predict_html(request: HTMLRequest): | |
| """Predict if HTML content is phishing.""" | |
| return await _safe_predict("predict_html", detector.predict_html, request.html_content, request.url or "") | |
| async def predict_full(request: URLRequest): | |
| """Analyse URL and download HTML for complete analysis.""" | |
| return await _safe_predict("predict_full", detector.predict_from_url, request.url) | |
| async def predict_combined(request: URLRequest): | |
| """Predict using combined URL+HTML model.""" | |
| return await _safe_predict("predict_combined", detector.predict_combined, request.url) | |
| async def predict_cnn(request: URLRequest): | |
| """Predict using character-level CNN models.""" | |
| return await _safe_predict("predict_cnn", detector.predict_cnn, request.url, None) | |
| async def predict_all(request: URLRequest): | |
| """Run ALL models on a URL — unified endpoint.""" | |
| return await _safe_predict("predict_all", detector.predict_all, request.url) | |
| async def health(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "url_models": len(detector.url_models), | |
| "html_models": len(detector.html_models), | |
| "combined_models": len(detector.combined_models), | |
| "cnn_url_model": detector.cnn_url_model is not None, | |
| "cnn_html_model": detector.cnn_html_model is not None, | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |