rb1337's picture
Upload 50 files
2cc7f91 verified
"""
Phishing Detection API Server
FastAPI server combining URL and HTML phishing detection
"""
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
import sys
from pathlib import Path
from typing import Optional
import warnings
# Suppress warnings before importing other libraries
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', message='.*XGBoost.*')
warnings.filterwarnings('ignore', message='.*Unverified HTTPS.*')
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import json
import joblib
import pandas as pd
import numpy as np
import requests
from urllib.parse import urlparse
import logging
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Add parent directory to path
sys.path.append(str(Path(__file__).parent.parent))
# Use OPTIMIZED URL feature extractor with normalization
from scripts.feature_extraction.url.url_features_v3 import URLFeatureExtractorOptimized
from scripts.feature_extraction.html.html_feature_extractor import HTMLFeatureExtractor
from scripts.feature_extraction.html.feature_engineering import engineer_features
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Helper function to convert numpy/pandas types to Python native types
def convert_to_json_serializable(obj):
"""Convert numpy/pandas types to JSON-serializable Python types"""
if isinstance(obj, dict):
return {key: convert_to_json_serializable(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_to_json_serializable(item) for item in obj]
elif isinstance(obj, (np.integer, np.int64, np.int32)): # type: ignore
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)): # type: ignore
return float(obj)
elif isinstance(obj, np.ndarray):
return convert_to_json_serializable(obj.tolist())
elif isinstance(obj, (pd.Series, pd.DataFrame)):
return convert_to_json_serializable(obj.to_dict())
elif isinstance(obj, np.bool_):
return bool(obj)
else:
return obj
# Initialize FastAPI app
app = FastAPI(
title="Phishing Detection API",
description="API for detecting phishing URLs and HTML content",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
static_dir = Path(__file__).parent / 'static'
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
# Request models
class URLRequest(BaseModel):
url: str
class HTMLRequest(BaseModel):
html_content: str
url: Optional[str] = None
# Response models
class PredictionResult(BaseModel):
model_name: str
prediction: str
confidence: float
phishing_probability: float
legitimate_probability: float
class URLPredictionResponse(BaseModel):
url: str
is_phishing: bool
consensus: str
predictions: list[PredictionResult]
features: dict
class HTMLPredictionResponse(BaseModel):
source: str
is_phishing: bool
consensus: str
predictions: list[PredictionResult]
features: dict
class PhishingDetectorService:
"""Singleton service for phishing detection with pre-loaded models."""
_instance = None
_initialized = False
TRUSTED_DOMAINS = frozenset({
'youtube.com', 'facebook.com', 'twitter.com', 'x.com',
'linkedin.com', 'microsoft.com', 'apple.com', 'amazon.com',
'github.com', 'gitlab.com', 'stackoverflow.com',
'claude.ai', 'anthropic.com', 'openai.com', 'chatgpt.com',
'wikipedia.org', 'reddit.com', 'instagram.com', 'whatsapp.com',
})
DEFAULT_THRESHOLD = 0.5
HTML_DOWNLOAD_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._initialized:
return
logger.info("Initializing Phishing Detector Service...")
self.models_dir = Path(__file__).parent.parent / 'saved_models'
# Initialize extractors
self.url_extractor = URLFeatureExtractorOptimized()
self.html_extractor = HTMLFeatureExtractor()
# Load models
self.url_models = {}
self.url_feature_names = {}
self.scaler = None
self._load_url_models()
self.html_models = {}
self._load_html_models()
self.combined_models = {}
self._load_combined_models()
# CNN models
self.cnn_url_model = None
self.cnn_url_vocab = None
self.cnn_html_model = None
self.cnn_html_vocab = None
self._load_cnn_url_model()
self._load_cnn_html_model()
self._initialized = True
logger.info("✓ Service initialized successfully")
def _load_url_models(self):
"""Load URL prediction models"""
# Load scaler
scaler_path = self.models_dir / 'scaler.joblib'
if scaler_path.exists():
self.scaler = joblib.load(scaler_path)
logger.info("✓ Loaded scaler for URL models")
# Load models
url_model_files = {
'Logistic Regression': 'logistic_regression.joblib',
'Random Forest': 'random_forest.joblib',
'XGBoost': 'xgboost.joblib'
}
for name, filename in url_model_files.items():
model_path = self.models_dir / filename
if model_path.exists():
model = joblib.load(model_path)
self.url_models[name] = model
# Store expected feature names from model
if hasattr(model, 'feature_names_in_'):
self.url_feature_names[name] = list(model.feature_names_in_)
logger.info(f"✓ Loaded URL model: {name} ({len(self.url_feature_names[name])} features)")
elif self.scaler and hasattr(self.scaler, 'feature_names_in_'):
# Use scaler's feature names for models without them (like Logistic Regression)
self.url_feature_names[name] = list(self.scaler.feature_names_in_)
logger.info(f"✓ Loaded URL model: {name} (using scaler features: {len(self.url_feature_names[name])} features)")
else:
logger.info(f"✓ Loaded URL model: {name}")
def _load_html_models(self):
"""Load HTML prediction models."""
html_model_files = {
'Random Forest': ('random_forest_html.joblib', 'random_forest_html_feature_names.joblib'),
'XGBoost': ('xgboost_html.joblib', 'xgboost_html_feature_names.joblib'),
}
for name, (model_file, features_file) in html_model_files.items():
model_path = self.models_dir / model_file
features_path = self.models_dir / features_file
if model_path.exists():
self.html_models[name] = {
'model': joblib.load(model_path),
'features': joblib.load(features_path) if features_path.exists() else None,
}
logger.info(f"✓ Loaded HTML model: {name}")
def _load_combined_models(self):
"""Load combined URL+HTML prediction models."""
combined_model_files = {
'Random Forest Combined': ('random_forest_combined.joblib', 'random_forest_combined_feature_names.joblib'),
'XGBoost Combined': ('xgboost_combined.joblib', 'xgboost_combined_feature_names.joblib'),
}
for name, (model_file, features_file) in combined_model_files.items():
model_path = self.models_dir / model_file
features_path = self.models_dir / features_file
if model_path.exists():
self.combined_models[name] = {
'model': joblib.load(model_path),
'features': joblib.load(features_path) if features_path.exists() else None,
}
n = len(self.combined_models[name]['features']) if self.combined_models[name]['features'] else '?'
logger.info(f"✓ Loaded combined model: {name} ({n} features)")
def _load_cnn_url_model(self):
"""Load character-level CNN URL model and vocabulary."""
model_path = self.models_dir / 'cnn_url_model.keras'
vocab_path = self.models_dir / 'cnn_url_vocab.json'
if not model_path.exists():
logger.warning(f"✗ CNN URL model not found: {model_path}")
return
if not vocab_path.exists():
logger.warning(f"✗ CNN URL vocabulary not found: {vocab_path}")
return
try:
import tensorflow as tf
self.cnn_url_model = tf.keras.models.load_model(str(model_path))
with open(vocab_path, 'r') as f:
self.cnn_url_vocab = json.load(f)
logger.info(f"✓ Loaded CNN URL model (vocab_size={self.cnn_url_vocab['vocab_size']}, max_len={self.cnn_url_vocab['max_len']})")
except Exception as e:
logger.warning(f"✗ Failed to load CNN URL model: {e}")
self.cnn_url_model = None
self.cnn_url_vocab = None
def _load_cnn_html_model(self):
"""Load character-level CNN HTML model and vocabulary."""
model_path = self.models_dir / 'cnn_html_model.keras'
vocab_path = self.models_dir / 'cnn_html_vocab.json'
if not model_path.exists():
logger.warning(f"✗ CNN HTML model not found: {model_path}")
return
if not vocab_path.exists():
logger.warning(f"✗ CNN HTML vocabulary not found: {vocab_path}")
return
try:
import tensorflow as tf
self.cnn_html_model = tf.keras.models.load_model(str(model_path))
with open(vocab_path, 'r') as f:
self.cnn_html_vocab = json.load(f)
logger.info(f"✓ Loaded CNN HTML model (vocab_size={self.cnn_html_vocab['vocab_size']}, max_len={self.cnn_html_vocab['max_len']})")
except Exception as e:
logger.warning(f"✗ Failed to load CNN HTML model: {e}")
self.cnn_html_model = None
self.cnn_html_vocab = None
def _encode_for_cnn(self, text: str, vocab: dict) -> np.ndarray:
"""Encode text to a padded integer sequence for a CNN model."""
char_to_idx = vocab['char_to_idx']
max_len = vocab['max_len']
PAD_IDX = 0
UNK_IDX = 1
encoded = [char_to_idx.get(c, UNK_IDX) for c in text[:max_len]]
encoded += [PAD_IDX] * (max_len - len(encoded))
return np.array([encoded], dtype=np.int32)
# ── Shared helpers ─────────────────────────────────────────────
@staticmethod
def _calculate_consensus(predictions: list[dict]) -> tuple[bool, str]:
"""Return (is_phishing, consensus_text) from a list of prediction dicts."""
total = len(predictions)
phishing_votes = sum(1 for p in predictions if p['prediction'] == 'PHISHING')
is_phishing = phishing_votes > total / 2
if phishing_votes == total:
consensus = "ALL MODELS AGREE: PHISHING"
elif phishing_votes == 0:
consensus = "ALL MODELS AGREE: LEGITIMATE"
else:
consensus = f"MIXED: {phishing_votes}/{total} models say PHISHING"
return is_phishing, consensus
def _align_features(self, features_df: pd.DataFrame, model_name: str) -> np.ndarray:
"""Align extracted features to a model's expected feature order."""
expected = self.url_feature_names.get(model_name)
if expected is None and self.url_feature_names:
expected = next(iter(self.url_feature_names.values()))
if expected is not None:
aligned = pd.DataFrame(columns=expected)
for feat in expected:
aligned[feat] = features_df[feat].values if feat in features_df.columns else 0
return aligned.values
return features_df.values
@staticmethod
def _build_prediction(model_name: str, model, features: np.ndarray, threshold: float = 0.5) -> dict:
"""Run a single model and return a standardised prediction dict."""
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(features)[0]
pred = 1 if probabilities[1] > threshold else 0
confidence = probabilities[pred] * 100
phishing_prob = probabilities[1] * 100
legitimate_prob = probabilities[0] * 100
else:
pred = model.predict(features)[0]
confidence = 100.0
phishing_prob = 100.0 if pred == 1 else 0.0
legitimate_prob = 0.0 if pred == 1 else 100.0
return {
'model_name': model_name,
'prediction': 'PHISHING' if pred == 1 else 'LEGITIMATE',
'confidence': confidence,
'phishing_probability': phishing_prob,
'legitimate_probability': legitimate_prob,
}
@staticmethod
def _whitelisted_prediction(model_name: str) -> dict:
"""Return a pre-built LEGITIMATE prediction for whitelisted domains."""
return {
'model_name': model_name,
'prediction': 'LEGITIMATE',
'confidence': 99.99,
'phishing_probability': 0.01,
'legitimate_probability': 99.99,
}
# ── URL prediction ────────────────────────────────────────────
def predict_url(self, url: str) -> dict:
"""Predict if a URL is phishing using all URL models."""
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS)
# Extract features
features_dict = self.url_extractor.extract_features(url)
features_df = pd.DataFrame([features_dict]).drop(columns=['label'], errors='ignore')
# Get predictions from each URL model
predictions = []
for model_name, model in self.url_models.items():
if is_whitelisted:
predictions.append(self._whitelisted_prediction(model_name))
continue
aligned = self._align_features(features_df, model_name)
if model_name == 'Logistic Regression' and self.scaler:
aligned = self.scaler.transform(aligned)
predictions.append(
self._build_prediction(model_name, model, aligned, self.DEFAULT_THRESHOLD)
)
is_phishing, consensus = self._calculate_consensus(predictions)
return {
'url': url,
'is_phishing': is_phishing,
'consensus': consensus,
'predictions': predictions,
'features': features_dict,
}
# ── HTML prediction ───────────────────────────────────────────
def predict_html(self, html_content: str, source: str = "") -> dict:
"""Predict if HTML content is phishing using all HTML models."""
features = self.html_extractor.extract_features(html_content)
engineered_df = engineer_features(pd.DataFrame([features]))
predictions = []
for model_name, model_data in self.html_models.items():
model = model_data['model']
feature_names = model_data['features']
if feature_names:
feature_list = list(feature_names)
feature_values = [
engineered_df[f].iloc[0] if f in engineered_df.columns else features.get(f, 0)
for f in feature_list
]
X = np.array([feature_values])
else:
X = engineered_df.values
predictions.append(self._build_prediction(model_name, model, X))
is_phishing, consensus = self._calculate_consensus(predictions)
return {
'source': source or 'HTML Content',
'is_phishing': is_phishing,
'consensus': consensus,
'predictions': predictions,
'features': features,
}
# ── Full scan (URL + HTML) ─────────────────────────────────────
def predict_from_url(self, url: str) -> dict:
"""Download HTML from URL and analyse both URL and HTML."""
url_result = self.predict_url(url)
try:
resp = requests.get(url, timeout=10, verify=False, headers=self.HTML_DOWNLOAD_HEADERS)
html_result = self.predict_html(resp.text, source=url)
all_predictions = url_result['predictions'] + html_result['predictions']
is_phishing, consensus = self._calculate_consensus(all_predictions)
return {
'url': url,
'is_phishing': is_phishing,
'url_analysis': url_result,
'html_analysis': html_result,
'combined_consensus': consensus,
}
except Exception as e:
logger.warning(f"Could not download HTML: {e}")
return {
'url': url,
'is_phishing': url_result['is_phishing'],
'url_analysis': url_result,
'html_analysis': None,
'error': str(e),
}
# ── CNN prediction ─────────────────────────────────────────────
def predict_cnn(self, url: str, html_content: str | None = None) -> dict:
"""Predict using both character-level CNN models (URL + HTML)."""
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS)
predictions = []
# CNN URL model
if self.cnn_url_model is not None and self.cnn_url_vocab is not None:
if is_whitelisted:
predictions.append(self._whitelisted_prediction('CNN URL (Char-level)'))
else:
X = self._encode_for_cnn(url, self.cnn_url_vocab)
phishing_prob = float(self.cnn_url_model.predict(X, verbose=0)[0][0])
legitimate_prob = 1.0 - phishing_prob
is_phishing_pred = phishing_prob >= self.DEFAULT_THRESHOLD
confidence = (phishing_prob if is_phishing_pred else legitimate_prob) * 100
predictions.append({
'model_name': 'CNN URL (Char-level)',
'prediction': 'PHISHING' if is_phishing_pred else 'LEGITIMATE',
'confidence': confidence,
'phishing_probability': phishing_prob * 100,
'legitimate_probability': legitimate_prob * 100,
})
# CNN HTML model
if self.cnn_html_model is not None and self.cnn_html_vocab is not None and html_content:
if is_whitelisted:
predictions.append(self._whitelisted_prediction('CNN HTML (Char-level)'))
else:
X = self._encode_for_cnn(html_content, self.cnn_html_vocab)
phishing_prob = float(self.cnn_html_model.predict(X, verbose=0)[0][0])
legitimate_prob = 1.0 - phishing_prob
is_phishing_pred = phishing_prob >= self.DEFAULT_THRESHOLD
confidence = (phishing_prob if is_phishing_pred else legitimate_prob) * 100
predictions.append({
'model_name': 'CNN HTML (Char-level)',
'prediction': 'PHISHING' if is_phishing_pred else 'LEGITIMATE',
'confidence': confidence,
'phishing_probability': phishing_prob * 100,
'legitimate_probability': legitimate_prob * 100,
})
if not predictions:
raise RuntimeError("No CNN models are loaded")
is_phishing, consensus = self._calculate_consensus(predictions)
return {
'url': url,
'is_phishing': is_phishing,
'consensus': consensus,
'predictions': predictions,
'features': {},
}
# ── Combined prediction ────────────────────────────────────────
def predict_combined(self, url: str) -> dict:
"""Predict using combined URL+HTML models (single ensemble)."""
if not self.combined_models:
raise RuntimeError("No combined models loaded")
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS)
# Extract URL features
url_features = self.url_extractor.extract_features(url)
url_df = pd.DataFrame([url_features]).drop(columns=['label'], errors='ignore')
url_df = url_df.rename(columns={c: f'url_{c}' for c in url_df.columns})
# Download + extract HTML features
html_features = {}
html_error = None
eng_df = pd.DataFrame()
try:
resp = requests.get(url, timeout=10, verify=False, headers=self.HTML_DOWNLOAD_HEADERS)
html_features = self.html_extractor.extract_features(resp.text)
raw_df = pd.DataFrame([html_features])
eng_df = engineer_features(raw_df)
eng_df = eng_df.rename(columns={c: f'html_{c}' for c in eng_df.columns})
except Exception as e:
html_error = str(e)
logger.warning(f"Combined: could not download HTML: {e}")
# Combine features
combined_df = pd.concat([url_df, eng_df], axis=1)
# Predict
predictions = []
for model_name, model_data in self.combined_models.items():
if is_whitelisted:
predictions.append(self._whitelisted_prediction(model_name))
continue
model = model_data['model']
expected = model_data['features']
if expected:
feature_list = list(expected)
aligned = pd.DataFrame(columns=feature_list)
for f in feature_list:
aligned[f] = combined_df[f].values if f in combined_df.columns else 0
X = aligned.values
else:
X = combined_df.values
predictions.append(
self._build_prediction(model_name, model, X, self.DEFAULT_THRESHOLD)
)
is_phishing, consensus = self._calculate_consensus(predictions)
return {
'url': url,
'is_phishing': is_phishing,
'consensus': consensus,
'predictions': predictions,
'url_features': url_features,
'html_features': html_features,
'html_error': html_error,
}
# ── Unified all-models prediction ──────────────────────────────
def predict_all(self, url: str) -> dict:
"""Run ALL models on a URL and return categorised results."""
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
is_whitelisted = any(domain.endswith(d) for d in self.TRUSTED_DOMAINS)
# ── 1. URL feature-based models ───────────────────────────
url_result = self.predict_url(url)
# ── 2. Download HTML (shared across HTML/combined/CNN-HTML) ─
html_content = None
html_error = None
try:
resp = requests.get(url, timeout=10, verify=False, headers=self.HTML_DOWNLOAD_HEADERS)
html_content = resp.text
except Exception as e:
html_error = str(e)
logger.warning(f"predict_all: could not download HTML: {e}")
# ── 3. HTML feature-based models ─────────────────────────
html_result = None
if html_content and self.html_models:
html_result = self.predict_html(html_content, source=url)
# ── 4. Combined URL+HTML feature-based models ────────────
combined_result = None
if self.combined_models:
try:
combined_result = self._predict_combined_with_html(url, html_content, is_whitelisted)
except Exception as e:
logger.warning(f"predict_all: combined prediction failed: {e}")
# ── 5. CNN models (URL + HTML) ───────────────────────────
cnn_result = None
if self.cnn_url_model is not None or self.cnn_html_model is not None:
try:
cnn_result = self.predict_cnn(url, html_content)
except Exception as e:
logger.warning(f"predict_all: CNN prediction failed: {e}")
# ── Aggregate consensus ──────────────────────────────────
all_predictions = []
if url_result:
all_predictions.extend(url_result.get('predictions', []))
if html_result:
all_predictions.extend(html_result.get('predictions', []))
if combined_result:
all_predictions.extend(combined_result.get('predictions', []))
if cnn_result:
all_predictions.extend(cnn_result.get('predictions', []))
is_phishing, consensus = self._calculate_consensus(all_predictions) if all_predictions else (False, "No models available")
return {
'url': url,
'is_phishing': is_phishing,
'overall_consensus': consensus,
'url_models': url_result,
'html_models': html_result,
'combined_models': combined_result,
'cnn_models': cnn_result,
'html_error': html_error,
}
def _predict_combined_with_html(self, url: str, html_content: str | None, is_whitelisted: bool) -> dict:
"""Predict using combined models, optionally with pre-fetched HTML."""
# Extract URL features
url_features = self.url_extractor.extract_features(url)
url_df = pd.DataFrame([url_features]).drop(columns=['label'], errors='ignore')
url_df = url_df.rename(columns={c: f'url_{c}' for c in url_df.columns})
# HTML features
html_features = {}
html_error = None
eng_df = pd.DataFrame()
if html_content:
try:
html_features = self.html_extractor.extract_features(html_content)
raw_df = pd.DataFrame([html_features])
eng_df = engineer_features(raw_df)
eng_df = eng_df.rename(columns={c: f'html_{c}' for c in eng_df.columns})
except Exception as e:
html_error = str(e)
# Combine
combined_df = pd.concat([url_df, eng_df], axis=1)
# Predict
predictions = []
for model_name, model_data in self.combined_models.items():
if is_whitelisted:
predictions.append(self._whitelisted_prediction(model_name))
continue
model = model_data['model']
expected = model_data['features']
if expected:
feature_list = list(expected)
aligned = pd.DataFrame(columns=feature_list)
for f in feature_list:
aligned[f] = combined_df[f].values if f in combined_df.columns else 0
X = aligned.values
else:
X = combined_df.values
predictions.append(
self._build_prediction(model_name, model, X, self.DEFAULT_THRESHOLD)
)
is_phishing, consensus_text = self._calculate_consensus(predictions)
return {
'url': url,
'is_phishing': is_phishing,
'consensus': consensus_text,
'predictions': predictions,
'url_features': url_features,
'html_features': html_features,
'html_error': html_error,
}
# Initialize service (singleton)
detector = PhishingDetectorService()
# ── Helpers ───────────────────────────────────────────────────────
def _serve_static_html(filename: str, cache: bool = False) -> HTMLResponse:
"""Return an HTMLResponse for a file inside static/, or 404."""
path = Path(__file__).parent / 'static' / filename
if not path.exists():
return HTMLResponse(content="<h1>Page not found</h1>", status_code=404)
headers = {"Cache-Control": "public, max-age=86400"} if cache else None
return HTMLResponse(content=path.read_text(encoding='utf-8'), headers=headers)
# ── API Endpoints ─────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve the main web interface."""
return _serve_static_html('index.html')
@app.get("/models", response_class=HTMLResponse)
async def models_page():
"""Serve the model details page."""
return _serve_static_html('models.html', cache=True)
async def _safe_predict(label: str, fn, *args) -> JSONResponse:
"""Run a prediction function with uniform error handling."""
try:
return JSONResponse(content=convert_to_json_serializable(fn(*args)))
except Exception as e:
logger.error(f"Error in {label}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/predict/url", response_model=URLPredictionResponse)
async def predict_url(request: URLRequest):
"""Predict if URL is phishing."""
return await _safe_predict("predict_url", detector.predict_url, request.url)
@app.post("/api/predict/html")
async def predict_html(request: HTMLRequest):
"""Predict if HTML content is phishing."""
return await _safe_predict("predict_html", detector.predict_html, request.html_content, request.url or "")
@app.post("/api/predict/full")
async def predict_full(request: URLRequest):
"""Analyse URL and download HTML for complete analysis."""
return await _safe_predict("predict_full", detector.predict_from_url, request.url)
@app.post("/api/predict/combined")
async def predict_combined(request: URLRequest):
"""Predict using combined URL+HTML model."""
return await _safe_predict("predict_combined", detector.predict_combined, request.url)
@app.post("/api/predict/cnn")
async def predict_cnn(request: URLRequest):
"""Predict using character-level CNN models."""
return await _safe_predict("predict_cnn", detector.predict_cnn, request.url, None)
@app.post("/api/predict/all")
async def predict_all(request: URLRequest):
"""Run ALL models on a URL — unified endpoint."""
return await _safe_predict("predict_all", detector.predict_all, request.url)
@app.get("/api/health")
async def health():
"""Health check endpoint"""
return {
"status": "healthy",
"url_models": len(detector.url_models),
"html_models": len(detector.html_models),
"combined_models": len(detector.combined_models),
"cnn_url_model": detector.cnn_url_model is not None,
"cnn_html_model": detector.cnn_html_model is not None,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)