Spaces:
Runtime error
Runtime error
| """ | |
| Model loading and prediction module for phishing URL detection. | |
| """ | |
| import logging | |
| import numpy as np | |
| import pandas as pd | |
| import joblib | |
| from typing import Dict, Any, Optional, Tuple | |
| import warnings | |
| from huggingface_hub import hf_hub_download | |
| # Import feature extraction function | |
| from .url_feature_extractor import extract_features | |
| warnings.filterwarnings("ignore", message="X does not have valid feature names", category=UserWarning) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global variable to cache the loaded model (singleton pattern) | |
| _model_cache: Optional[Dict[str, Any]] = None | |
| def get_model_path() -> str: | |
| """ | |
| Download the model from Hugging Face Hub and return the local path. | |
| Returns: | |
| str: Local path to the downloaded model file | |
| """ | |
| model_path = hf_hub_download( | |
| repo_id="xxemrzru/url-stacking-model", | |
| filename="url_stacking_model.joblib" | |
| ) | |
| return model_path | |
| def load_model() -> Dict[str, Any]: | |
| """ | |
| Load the saved stacking model from file. | |
| Uses singleton pattern to load model only once. | |
| Returns: | |
| dict: Dictionary containing model components: | |
| - base_models: Dictionary of base models | |
| - meta_scaler: Scaler for meta features (RobustScaler or StandardScaler) | |
| - scaler_name: Name of the scaler used (for logging) | |
| - meta_model: Final meta model | |
| - feature_names: List of feature names | |
| - model_names: List of base model names | |
| Raises: | |
| FileNotFoundError: If model file doesn't exist | |
| Exception: If model loading fails | |
| """ | |
| global _model_cache | |
| # Return cached model if already loaded | |
| if _model_cache is not None: | |
| logger.info("Using cached model") | |
| return _model_cache | |
| try: | |
| model_path = get_model_path() | |
| logger.info(f"Loading model from: {model_path}") | |
| model_data = joblib.load(model_path) | |
| # Cache the model | |
| _model_cache = { | |
| "base_models": model_data["base_models"], | |
| "meta_scaler": model_data["meta_scaler"], | |
| "scaler_name": model_data.get("scaler_name", "Unknown"), | |
| "meta_model": model_data["meta_model"], | |
| "feature_names": model_data["feature_names"], | |
| "model_names": model_data["model_names"] | |
| } | |
| scaler_name = _model_cache["scaler_name"] | |
| logger.info(f"β Model loaded successfully (Meta scaler: {scaler_name})") | |
| return _model_cache | |
| except Exception as e: | |
| logger.error(f"β Failed to load model: {str(e)}") | |
| raise | |
| def predict_from_features(features_dict: Dict[str, Any], model_components: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Make predictions given a dictionary of extracted features. | |
| Args: | |
| features_dict: Dictionary where keys are feature names and values are feature values | |
| model_components: The loaded components returned by load_model() | |
| Returns: | |
| dict: Contains 'predicted_label' (0 or 1) and 'phish_probability' (float) | |
| Raises: | |
| ValueError: If required features are missing | |
| """ | |
| base_models = model_components["base_models"] | |
| meta_scaler = model_components["meta_scaler"] | |
| meta_model = model_components["meta_model"] | |
| feature_names = model_components["feature_names"] | |
| model_names = model_components["model_names"] | |
| # Convert to DataFrame to ensure shape consistency | |
| X = pd.DataFrame([features_dict]) | |
| # Ensure all required columns exist | |
| missing_cols = set(feature_names) - set(X.columns) | |
| if missing_cols: | |
| raise ValueError(f"β Missing required features: {missing_cols}") | |
| # Keep only known features and order them correctly | |
| X = X[feature_names] | |
| # Level 0: Base model predictions | |
| meta_features = np.zeros((X.shape[0], len(base_models))) | |
| for idx, (model_name, model) in enumerate(base_models.items()): | |
| meta_features[:, idx] = model.predict_proba(X)[:, 1] | |
| meta_features_df = pd.DataFrame(meta_features, columns=[f"{n}_pred" for n in model_names]) | |
| # Level 1: Meta-model prediction | |
| meta_scaled = meta_scaler.transform(meta_features_df) | |
| meta_scaled = pd.DataFrame(meta_scaled, columns=meta_features_df.columns) | |
| final_pred = meta_model.predict(meta_scaled)[0] | |
| final_prob = meta_model.predict_proba(meta_scaled)[:, 1][0] | |
| return { | |
| "predicted_label": int(final_pred), | |
| "phish_probability": float(final_prob) | |
| } | |
| def sanitize_features(features_dict: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Sanitize features by replacing -1 values with sensible defaults. | |
| This handles cases where feature extraction partially failed but still | |
| returned some valid features (e.g., when Playwright successfully fetches | |
| a page but some individual feature extractions fail). | |
| Args: | |
| features_dict: Dictionary of extracted features | |
| Returns: | |
| dict: Sanitized features with -1 values replaced | |
| """ | |
| sanitized = features_dict.copy() | |
| # Define default values for different feature types | |
| # Binary features (has_*) default to 0 (not present) | |
| # Count features (number_of_*) default to 0 | |
| # Length features (length_of_*) default to 0 | |
| for key, value in sanitized.items(): | |
| if value == -1: | |
| # Replace -1 with 0 for all feature types | |
| # This is conservative: assumes missing features are not present | |
| sanitized[key] = 0 | |
| logger.debug(f"Sanitized feature '{key}': -1 -> 0") | |
| return sanitized | |
| def predict_url(url: str) -> Dict[str, Any]: | |
| """ | |
| Main prediction function that takes a raw URL and returns prediction. | |
| This function: | |
| 1. Loads the model (cached after first load) | |
| 2. Extracts features from the URL using url_feature_extractor | |
| 3. Sanitizes features (replaces -1 with 0) | |
| 4. Makes prediction using the stacking model | |
| Args: | |
| url: Raw URL string to analyze | |
| Returns: | |
| dict: Prediction result containing: | |
| - url: The input URL | |
| - prediction: "phishing" or "legitimate" | |
| - confidence: Probability score (0-1) | |
| - predicted_label: 0 (legitimate) or 1 (phishing) | |
| - phish_probability: Same as confidence | |
| Raises: | |
| Exception: If feature extraction or prediction fails | |
| """ | |
| try: | |
| # Load model (uses cache if already loaded) | |
| model_components = load_model() | |
| # Extract features from URL | |
| logger.info(f"Extracting features from URL: {url}") | |
| features_dict = extract_features(url) | |
| # Check if feature extraction failed (all -1 values indicate extraction failure) | |
| if all(v == -1 for v in features_dict.values()): | |
| logger.warning(f"Feature extraction failed for URL: {url}") | |
| # Return a default prediction for unreachable URLs | |
| return { | |
| "url": url, | |
| "prediction": "unknown", | |
| "confidence": 0.0, | |
| "predicted_label": -1, | |
| "phish_probability": 0.0, | |
| "error": "Failed to extract features - URL may be unreachable" | |
| } | |
| # Sanitize features: replace -1 values with sensible defaults | |
| # This allows partial feature extraction to still produce predictions | |
| failed_features = sum(1 for v in features_dict.values() if v == -1) | |
| if failed_features > 0: | |
| logger.warning(f"β {failed_features} features failed extraction, using defaults") | |
| features_dict = sanitize_features(features_dict) | |
| # Make prediction | |
| logger.info("Making prediction...") | |
| prediction_result = predict_from_features(features_dict, model_components) | |
| # Format response | |
| predicted_label = prediction_result["predicted_label"] | |
| phish_probability = prediction_result["phish_probability"] | |
| result = { | |
| "url": url, | |
| "prediction": "phishing" if predicted_label == 1 else "legitimate", | |
| "confidence": phish_probability if predicted_label == 1 else (1 - phish_probability), | |
| "predicted_label": predicted_label, | |
| "phish_probability": phish_probability | |
| } | |
| logger.info(f"β Prediction complete: {result['prediction']} (confidence: {result['confidence']:.2%})") | |
| return result | |
| except Exception as e: | |
| logger.error(f"β Prediction failed: {str(e)}") | |
| raise | |
| def get_meta_features_and_update(url: str, true_label: int) -> Tuple[Optional[np.ndarray], bool]: | |
| """ | |
| Extract meta-features from URL and update the SGD meta model using partial_fit. | |
| This function: | |
| 1. Extracts features from the URL | |
| 2. Generates meta-features using base models (probability outputs) | |
| 3. Updates the SGD meta model with partial_fit(meta_features, true_label) | |
| 4. Saves the updated model to disk | |
| Args: | |
| url: Raw URL string to extract features from | |
| true_label: True label (0 for legitimate, 1 for phishing) | |
| Returns: | |
| tuple: (meta_features_array, success_flag) | |
| - meta_features_array: numpy array of meta features used for update | |
| - success_flag: boolean indicating if update was successful | |
| Raises: | |
| Exception: If feature extraction or model update fails | |
| """ | |
| try: | |
| # Load model components | |
| model_components = load_model() | |
| base_models = model_components["base_models"] | |
| meta_scaler = model_components["meta_scaler"] | |
| meta_model = model_components["meta_model"] | |
| feature_names = model_components["feature_names"] | |
| model_names = model_components["model_names"] | |
| # Extract features from URL | |
| logger.info(f"Extracting features for update from URL: {url}") | |
| features_dict = extract_features(url) | |
| # Check if feature extraction failed | |
| if all(v == -1 for v in features_dict.values()): | |
| logger.warning(f"Feature extraction failed for URL update: {url}") | |
| return None, False | |
| # Sanitize features: replace -1 values with sensible defaults | |
| failed_features = sum(1 for v in features_dict.values() if v == -1) | |
| if failed_features > 0: | |
| logger.warning(f"β {failed_features} features failed extraction during update, using defaults") | |
| features_dict = sanitize_features(features_dict) | |
| # Convert to DataFrame and ensure proper ordering | |
| X = pd.DataFrame([features_dict]) | |
| missing_cols = set(feature_names) - set(X.columns) | |
| if missing_cols: | |
| raise ValueError(f"Missing required features: {missing_cols}") | |
| X = X[feature_names] | |
| # Generate meta-features using base models (probability outputs) | |
| meta_features = np.zeros((X.shape[0], len(base_models))) | |
| for idx, (model_name, model) in enumerate(base_models.items()): | |
| meta_features[:, idx] = model.predict_proba(X)[:, 1] | |
| meta_features_df = pd.DataFrame(meta_features, columns=[f"{n}_pred" for n in model_names]) | |
| # Scale meta-features | |
| meta_scaled = meta_scaler.transform(meta_features_df) | |
| # Update the SGD meta model using partial_fit | |
| logger.info(f"Updating meta model with partial_fit for label: {true_label}") | |
| meta_model.partial_fit(meta_scaled, [true_label], classes=[0, 1]) | |
| # Update the cached model with the new meta model | |
| global _model_cache | |
| if _model_cache is not None: | |
| _model_cache["meta_model"] = meta_model | |
| # Save the updated model to disk | |
| save_updated_model(model_components, meta_model) | |
| logger.info(f"β Model updated successfully for URL: {url}") | |
| return meta_scaled[0], True | |
| except Exception as e: | |
| logger.error(f"β Failed to update model: {str(e)}") | |
| return None, False | |
| def save_updated_model(model_components: Dict[str, Any], updated_meta_model) -> None: | |
| """ | |
| Save the updated model components to disk. | |
| Args: | |
| model_components: Dictionary containing model components | |
| updated_meta_model: The updated SGD meta model | |
| """ | |
| try: | |
| model_path = get_model_path() | |
| # Create updated model data | |
| updated_model_data = { | |
| "base_models": model_components["base_models"], | |
| "meta_scaler": model_components["meta_scaler"], | |
| "scaler_name": model_components.get("scaler_name", "Unknown"), | |
| "meta_model": updated_meta_model, # Use the updated meta model | |
| "feature_names": model_components["feature_names"], | |
| "model_names": model_components["model_names"] | |
| } | |
| # Save to disk | |
| joblib.dump(updated_model_data, model_path) | |
| logger.info(f"β Updated model saved to: {model_path}") | |
| except Exception as e: | |
| logger.error(f"β Failed to save updated model: {str(e)}") | |
| raise | |