Rasel Santillan
Update
badc9ad
"""
Model loading and prediction module for phishing URL detection.
"""
import logging
import numpy as np
import pandas as pd
import joblib
from typing import Dict, Any, Optional, Tuple
import warnings
from huggingface_hub import hf_hub_download
# Import feature extraction function
from .url_feature_extractor import extract_features
warnings.filterwarnings("ignore", message="X does not have valid feature names", category=UserWarning)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variable to cache the loaded model (singleton pattern)
_model_cache: Optional[Dict[str, Any]] = None
def get_model_path() -> str:
"""
Download the model from Hugging Face Hub and return the local path.
Returns:
str: Local path to the downloaded model file
"""
model_path = hf_hub_download(
repo_id="xxemrzru/url-stacking-model",
filename="url_stacking_model.joblib"
)
return model_path
def load_model() -> Dict[str, Any]:
"""
Load the saved stacking model from file.
Uses singleton pattern to load model only once.
Returns:
dict: Dictionary containing model components:
- base_models: Dictionary of base models
- meta_scaler: Scaler for meta features (RobustScaler or StandardScaler)
- scaler_name: Name of the scaler used (for logging)
- meta_model: Final meta model
- feature_names: List of feature names
- model_names: List of base model names
Raises:
FileNotFoundError: If model file doesn't exist
Exception: If model loading fails
"""
global _model_cache
# Return cached model if already loaded
if _model_cache is not None:
logger.info("Using cached model")
return _model_cache
try:
model_path = get_model_path()
logger.info(f"Loading model from: {model_path}")
model_data = joblib.load(model_path)
# Cache the model
_model_cache = {
"base_models": model_data["base_models"],
"meta_scaler": model_data["meta_scaler"],
"scaler_name": model_data.get("scaler_name", "Unknown"),
"meta_model": model_data["meta_model"],
"feature_names": model_data["feature_names"],
"model_names": model_data["model_names"]
}
scaler_name = _model_cache["scaler_name"]
logger.info(f"βœ… Model loaded successfully (Meta scaler: {scaler_name})")
return _model_cache
except Exception as e:
logger.error(f"❌ Failed to load model: {str(e)}")
raise
def predict_from_features(features_dict: Dict[str, Any], model_components: Dict[str, Any]) -> Dict[str, Any]:
"""
Make predictions given a dictionary of extracted features.
Args:
features_dict: Dictionary where keys are feature names and values are feature values
model_components: The loaded components returned by load_model()
Returns:
dict: Contains 'predicted_label' (0 or 1) and 'phish_probability' (float)
Raises:
ValueError: If required features are missing
"""
base_models = model_components["base_models"]
meta_scaler = model_components["meta_scaler"]
meta_model = model_components["meta_model"]
feature_names = model_components["feature_names"]
model_names = model_components["model_names"]
# Convert to DataFrame to ensure shape consistency
X = pd.DataFrame([features_dict])
# Ensure all required columns exist
missing_cols = set(feature_names) - set(X.columns)
if missing_cols:
raise ValueError(f"❌ Missing required features: {missing_cols}")
# Keep only known features and order them correctly
X = X[feature_names]
# Level 0: Base model predictions
meta_features = np.zeros((X.shape[0], len(base_models)))
for idx, (model_name, model) in enumerate(base_models.items()):
meta_features[:, idx] = model.predict_proba(X)[:, 1]
meta_features_df = pd.DataFrame(meta_features, columns=[f"{n}_pred" for n in model_names])
# Level 1: Meta-model prediction
meta_scaled = meta_scaler.transform(meta_features_df)
meta_scaled = pd.DataFrame(meta_scaled, columns=meta_features_df.columns)
final_pred = meta_model.predict(meta_scaled)[0]
final_prob = meta_model.predict_proba(meta_scaled)[:, 1][0]
return {
"predicted_label": int(final_pred),
"phish_probability": float(final_prob)
}
def sanitize_features(features_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Sanitize features by replacing -1 values with sensible defaults.
This handles cases where feature extraction partially failed but still
returned some valid features (e.g., when Playwright successfully fetches
a page but some individual feature extractions fail).
Args:
features_dict: Dictionary of extracted features
Returns:
dict: Sanitized features with -1 values replaced
"""
sanitized = features_dict.copy()
# Define default values for different feature types
# Binary features (has_*) default to 0 (not present)
# Count features (number_of_*) default to 0
# Length features (length_of_*) default to 0
for key, value in sanitized.items():
if value == -1:
# Replace -1 with 0 for all feature types
# This is conservative: assumes missing features are not present
sanitized[key] = 0
logger.debug(f"Sanitized feature '{key}': -1 -> 0")
return sanitized
def predict_url(url: str) -> Dict[str, Any]:
"""
Main prediction function that takes a raw URL and returns prediction.
This function:
1. Loads the model (cached after first load)
2. Extracts features from the URL using url_feature_extractor
3. Sanitizes features (replaces -1 with 0)
4. Makes prediction using the stacking model
Args:
url: Raw URL string to analyze
Returns:
dict: Prediction result containing:
- url: The input URL
- prediction: "phishing" or "legitimate"
- confidence: Probability score (0-1)
- predicted_label: 0 (legitimate) or 1 (phishing)
- phish_probability: Same as confidence
Raises:
Exception: If feature extraction or prediction fails
"""
try:
# Load model (uses cache if already loaded)
model_components = load_model()
# Extract features from URL
logger.info(f"Extracting features from URL: {url}")
features_dict = extract_features(url)
# Check if feature extraction failed (all -1 values indicate extraction failure)
if all(v == -1 for v in features_dict.values()):
logger.warning(f"Feature extraction failed for URL: {url}")
# Return a default prediction for unreachable URLs
return {
"url": url,
"prediction": "unknown",
"confidence": 0.0,
"predicted_label": -1,
"phish_probability": 0.0,
"error": "Failed to extract features - URL may be unreachable"
}
# Sanitize features: replace -1 values with sensible defaults
# This allows partial feature extraction to still produce predictions
failed_features = sum(1 for v in features_dict.values() if v == -1)
if failed_features > 0:
logger.warning(f"⚠ {failed_features} features failed extraction, using defaults")
features_dict = sanitize_features(features_dict)
# Make prediction
logger.info("Making prediction...")
prediction_result = predict_from_features(features_dict, model_components)
# Format response
predicted_label = prediction_result["predicted_label"]
phish_probability = prediction_result["phish_probability"]
result = {
"url": url,
"prediction": "phishing" if predicted_label == 1 else "legitimate",
"confidence": phish_probability if predicted_label == 1 else (1 - phish_probability),
"predicted_label": predicted_label,
"phish_probability": phish_probability
}
logger.info(f"βœ… Prediction complete: {result['prediction']} (confidence: {result['confidence']:.2%})")
return result
except Exception as e:
logger.error(f"❌ Prediction failed: {str(e)}")
raise
def get_meta_features_and_update(url: str, true_label: int) -> Tuple[Optional[np.ndarray], bool]:
"""
Extract meta-features from URL and update the SGD meta model using partial_fit.
This function:
1. Extracts features from the URL
2. Generates meta-features using base models (probability outputs)
3. Updates the SGD meta model with partial_fit(meta_features, true_label)
4. Saves the updated model to disk
Args:
url: Raw URL string to extract features from
true_label: True label (0 for legitimate, 1 for phishing)
Returns:
tuple: (meta_features_array, success_flag)
- meta_features_array: numpy array of meta features used for update
- success_flag: boolean indicating if update was successful
Raises:
Exception: If feature extraction or model update fails
"""
try:
# Load model components
model_components = load_model()
base_models = model_components["base_models"]
meta_scaler = model_components["meta_scaler"]
meta_model = model_components["meta_model"]
feature_names = model_components["feature_names"]
model_names = model_components["model_names"]
# Extract features from URL
logger.info(f"Extracting features for update from URL: {url}")
features_dict = extract_features(url)
# Check if feature extraction failed
if all(v == -1 for v in features_dict.values()):
logger.warning(f"Feature extraction failed for URL update: {url}")
return None, False
# Sanitize features: replace -1 values with sensible defaults
failed_features = sum(1 for v in features_dict.values() if v == -1)
if failed_features > 0:
logger.warning(f"⚠ {failed_features} features failed extraction during update, using defaults")
features_dict = sanitize_features(features_dict)
# Convert to DataFrame and ensure proper ordering
X = pd.DataFrame([features_dict])
missing_cols = set(feature_names) - set(X.columns)
if missing_cols:
raise ValueError(f"Missing required features: {missing_cols}")
X = X[feature_names]
# Generate meta-features using base models (probability outputs)
meta_features = np.zeros((X.shape[0], len(base_models)))
for idx, (model_name, model) in enumerate(base_models.items()):
meta_features[:, idx] = model.predict_proba(X)[:, 1]
meta_features_df = pd.DataFrame(meta_features, columns=[f"{n}_pred" for n in model_names])
# Scale meta-features
meta_scaled = meta_scaler.transform(meta_features_df)
# Update the SGD meta model using partial_fit
logger.info(f"Updating meta model with partial_fit for label: {true_label}")
meta_model.partial_fit(meta_scaled, [true_label], classes=[0, 1])
# Update the cached model with the new meta model
global _model_cache
if _model_cache is not None:
_model_cache["meta_model"] = meta_model
# Save the updated model to disk
save_updated_model(model_components, meta_model)
logger.info(f"βœ… Model updated successfully for URL: {url}")
return meta_scaled[0], True
except Exception as e:
logger.error(f"❌ Failed to update model: {str(e)}")
return None, False
def save_updated_model(model_components: Dict[str, Any], updated_meta_model) -> None:
"""
Save the updated model components to disk.
Args:
model_components: Dictionary containing model components
updated_meta_model: The updated SGD meta model
"""
try:
model_path = get_model_path()
# Create updated model data
updated_model_data = {
"base_models": model_components["base_models"],
"meta_scaler": model_components["meta_scaler"],
"scaler_name": model_components.get("scaler_name", "Unknown"),
"meta_model": updated_meta_model, # Use the updated meta model
"feature_names": model_components["feature_names"],
"model_names": model_components["model_names"]
}
# Save to disk
joblib.dump(updated_model_data, model_path)
logger.info(f"βœ… Updated model saved to: {model_path}")
except Exception as e:
logger.error(f"❌ Failed to save updated model: {str(e)}")
raise