Rasel Santillan
Add application file
7a3576b
"""
Model prediction helper module for phishing URL detection.
Handles model loading, feature extraction, and prediction.
"""
import os
import sys
import numpy as np
import pandas as pd
import joblib
import warnings
warnings.filterwarnings("ignore", message="X does not have valid feature names", category=UserWarning)
# Add parent directory to path to import url_feature_extraction module
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from url_feature_extraction.url_feature_extractor import extract_features
# Global variable to cache the loaded model
_model_cache = None
def load_model(model_path="model/url_stacking_model.joblib"):
"""
Load the saved stacking model from file.
Args:
model_path (str): Path to the model file relative to the FastAPI app directory
Returns:
dict: Dictionary containing model components:
- base_models: Dictionary of base models
- meta_scaler: Scaler for meta features
- meta_model: Meta model for final prediction
- feature_names: List of feature names
- model_names: List of model names
"""
global _model_cache
# Return cached model if already loaded
if _model_cache is not None:
return _model_cache
# Construct absolute path to model file
current_dir = os.path.dirname(os.path.abspath(__file__))
full_model_path = os.path.join(current_dir, "..", model_path)
full_model_path = os.path.normpath(full_model_path)
if not os.path.exists(full_model_path):
raise FileNotFoundError(f"Model file not found at: {full_model_path}")
# Load model
model_data = joblib.load(full_model_path)
print(f"✅ Model loaded successfully from: {full_model_path}")
_model_cache = {
"base_models": model_data["base_models"],
"meta_scaler": model_data["meta_scaler"],
"meta_model": model_data["meta_model"],
"feature_names": model_data["feature_names"],
"model_names": model_data["model_names"]
}
return _model_cache
def predict_url(url: str, model_components: dict = None):
"""
Make prediction for a given URL.
This function:
1. Extracts features from the raw URL using url_feature_extractor
2. Converts features to the format expected by the model
3. Makes prediction using the stacking model
Args:
url (str): Raw URL to predict
model_components (dict, optional): Pre-loaded model components.
If None, will load the model.
Returns:
dict: Dictionary containing:
- url: The input URL
- predicted_label: 0 (legitimate) or 1 (phishing)
- prediction: "legitimate" or "phishing"
- phish_probability: Probability of being phishing (0.0 to 1.0)
- confidence: Confidence percentage
- features_extracted: Boolean indicating if features were successfully extracted
"""
# Load model if not provided
if model_components is None:
model_components = load_model()
# Extract features from URL
features_dict = extract_features(url)
# Check if feature extraction was successful
if features_dict.get('has_title') is None:
# URL was unreachable or feature extraction failed
return {
"url": url,
"predicted_label": None,
"prediction": "unknown",
"phish_probability": None,
"confidence": None,
"features_extracted": False,
"error": "Failed to extract features from URL. The URL may be unreachable or invalid."
}
# Make prediction using the features
try:
prediction_result = predict_from_features(features_dict, model_components)
predicted_label = prediction_result["predicted_label"]
phish_probability = prediction_result["phish_probability"]
# Calculate confidence
confidence = max(phish_probability, 1 - phish_probability) * 100
return {
"url": url,
"predicted_label": predicted_label,
"prediction": "phishing" if predicted_label == 1 else "legitimate",
"phish_probability": round(phish_probability, 4),
"confidence": round(confidence, 2),
"features_extracted": True
}
except Exception as e:
return {
"url": url,
"predicted_label": None,
"prediction": "error",
"phish_probability": None,
"confidence": None,
"features_extracted": True,
"error": f"Prediction error: {str(e)}"
}
def predict_from_features(features_dict: dict, model_components: dict):
"""
Make predictions given a dictionary of extracted features.
This function implements the stacking model prediction:
- Level 0: Base models make predictions
- Level 1: Meta model combines base model predictions
Args:
features_dict (dict): Dictionary where keys are feature names and values are feature values
model_components (dict): The loaded components returned by load_model()
Returns:
dict: Contains 'predicted_label' (0 or 1) and 'phish_probability' (float)
"""
base_models = model_components["base_models"]
meta_scaler = model_components["meta_scaler"]
meta_model = model_components["meta_model"]
feature_names = model_components["feature_names"]
model_names = model_components["model_names"]
# Convert to DataFrame to ensure shape consistency
X = pd.DataFrame([features_dict])
# Ensure all required columns exist
missing_cols = set(feature_names) - set(X.columns)
if missing_cols:
raise ValueError(f"❌ Missing required features: {missing_cols}")
# Keep only known features and order them correctly
X = X[feature_names]
# ------------------------------
# Level 0: Base model predictions
# ------------------------------
meta_features = np.zeros((X.shape[0], len(base_models)))
for idx, (model_name, model) in enumerate(base_models.items()):
meta_features[:, idx] = model.predict_proba(X)[:, 1]
meta_features_df = pd.DataFrame(meta_features, columns=[f"{n}_pred" for n in model_names])
# ------------------------------
# Level 1: Meta-model prediction
# ------------------------------
meta_scaled = meta_scaler.transform(meta_features_df)
meta_scaled = pd.DataFrame(meta_scaled, columns=meta_features_df.columns)
final_pred = meta_model.predict(meta_scaled)[0]
final_prob = meta_model.predict_proba(meta_scaled)[:, 1][0]
return {
"predicted_label": int(final_pred),
"phish_probability": float(final_prob)
}